Foundation
Loading...
Searching...
No Matches
ThreadPool.hpp
Go to the documentation of this file.
1#pragma once
2#include "AtomicQueue.hpp"
3#include "Thread.hpp"
4#include <algorithm>
5#include <array>
6#include <cmath>
7#include <iterator>
8#include <type_traits>
9namespace Foundation::Core
10{
11 enum class JobPriority : size_t
12 {
13 Low,
14 Normal,
15 High,
16 };
17 inline constexpr size_t kJobPriorityCount = static_cast<size_t>(JobPriority::High) + 1;
18
24 enum class ExecutionPolicy
25 {
26 Seq, // run inline on the calling thread, in index order
27 Par, // fork-join across the pool (default)
28 };
29
36 struct Job
37 {
38 virtual ~Job() = default;
39 virtual void Execute(size_t id) noexcept = 0;
40 };
49 template <typename Lambda, typename ReturnType, typename... Args>
50 struct LambdaJob final : public Job
51 {
55 void Execute(size_t) noexcept override
56 {
57 try
58 {
59 if constexpr (std::is_same_v<ReturnType, void>)
60 {
61 mFunc();
62 mPromise.set_value();
63 }
64 else
65 mPromise.set_value(mFunc());
66 }
67 catch (...)
68 {
69 mPromise.set_exception(std::current_exception());
70 }
71 }
72 };
78 template <typename Fn, typename Arg>
79 inline void ParallelForInvoke(Fn& fn, Arg&& arg, size_t workerId)
80 {
81 if constexpr (std::is_invocable_v<Fn&, Arg&&, size_t>)
82 fn(std::forward<Arg>(arg), workerId);
83 else
84 fn(std::forward<Arg>(arg));
85 }
86
98 template <typename Fn>
100 {
101 Fn* fn{nullptr};
102 size_t total{0};
104 Atomic<size_t> remaining{0}; // co-invocations still running (fork-join latch)
105 void Execute(size_t workerId) noexcept override
106 {
107 for (size_t i; (i = cursor.fetch_add(1, std::memory_order_relaxed)) < total;)
109 if (remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
110 remaining.notify_all();
111 }
112 };
113
122 template <typename Fn>
124 {
126 size_t total{0};
128 Atomic<size_t> remaining{0}; // pushed worker jobs still running (fork-join latch)
130 Allocator* alloc{nullptr};
131 ParallelForAsyncState(Fn&& f, size_t t, Allocator* a) : fn(std::move(f)), total(t), alloc(a) {}
132 };
133
140 template <typename Fn>
142 {
145 void Execute(size_t workerId) noexcept override
146 {
147 for (size_t i; (i = state->cursor.fetch_add(1, std::memory_order_relaxed)) < state->total;)
149 // The worker that observes the latch reach zero is the last one touching `state`.
150 if (state->remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
151 {
152 Allocator* alloc = state->alloc;
153 state->promise.set_value();
154 Destruct(alloc, state);
155 }
156 }
157 };
158
163 using JobQueues = std::array<JobQueue, kJobPriorityCount>;
168 {
174
176 // Ensure threads are joined first on destruction
178 void ThreadPoolWorker(size_t id);
179 static constexpr size_t PriorityIndex(JobPriority priority) noexcept { return static_cast<size_t>(priority); }
180 template <typename T, typename... Args>
181 requires std::is_base_of_v<Job, T>
183 {
184 if (mShutdown)
185 throw std::runtime_error("ThreadPool shutting down");
186 if (PriorityIndex(priority) >= kJobPriorityCount)
187 throw std::runtime_error("Invalid job priority");
189 auto task = ConstructUniqueBase<Job, T>(allocator, std::forward<Args>(args)...);
190 T* ptr = static_cast<T*>(task.get());
191 if (!mJobs[PriorityIndex(priority)].Push(std::move(task)))
192 throw std::runtime_error("Jobs full");
193 mTotal.fetch_add(1, std::memory_order_relaxed);
194 mTotal.notify_one();
195 return ptr;
196 }
197 template <typename Lambda, typename... Args>
199 {
200 auto LambdaFn = [func = std::forward<Lambda>(func), ... args = args] { return func(args...); };
201 using LambdaType = decltype(LambdaFn);
202 using ReturnType = decltype(LambdaFn());
203 LambdaJob<LambdaType, ReturnType> job(std::forward<LambdaType>(LambdaFn));
204 auto fut = job.mPromise.get_future();
206 return std::move(fut);
207 }
208
209 public:
217 ThreadPool(size_t numThreads, size_t maxTasks, Allocator* alloc, StringView name = "ThreadPool");
225 template <typename T, typename... Args>
226 requires std::is_base_of_v<Job, T>
227 T* PushImpl(JobPriority priority, Args&&... args)
228 {
229 return PushImplInternal<T>(priority, nullptr, std::forward<Args>(args)...);
230 }
231 template <typename T, typename... Args>
232 requires std::is_base_of_v<Job, T>
234 {
235 return PushImplInternal<T>(JobPriority::Normal, nullptr, std::forward<Args>(args)...);
236 }
242 template <typename T, typename... Args>
243 requires std::is_base_of_v<Job, T>
245 {
246 return PushImplInternal<T>(JobPriority::Normal, jobAllocator, std::forward<Args>(args)...);
247 }
248 template <typename T, typename... Args>
249 requires std::is_base_of_v<Job, T>
251 {
252 return PushImplInternal<T>(priority, jobAllocator, std::forward<Args>(args)...);
253 }
258 template <typename Lambda, typename... Args>
259 auto Push(JobPriority priority, Lambda&& func, Args const&... args)
260 {
261 return PushLambdaInternal(priority, nullptr, std::forward<Lambda>(func), args...);
262 }
263 template <typename Lambda, typename... Args>
264 auto Push(Lambda&& func, Args const&... args)
265 {
266 return PushLambdaInternal(JobPriority::Normal, nullptr, std::forward<Lambda>(func), args...);
267 }
272 template <typename Lambda, typename... Args>
274 {
275 return PushLambdaInternal(JobPriority::Normal, jobAllocator, std::forward<Lambda>(func), args...);
276 }
277 template <typename Lambda, typename... Args>
279 {
280 return PushLambdaInternal(priority, jobAllocator, std::forward<Lambda>(func), args...);
281 }
291 void CoInvoke(Job& job, size_t count, JobPriority priority = JobPriority::Normal);
292
294 [[nodiscard]] size_t GetWorkerCount() const noexcept { return mThreads.size(); }
295
300 [[nodiscard]] size_t GetParallelForConcurrency() const noexcept { return mThreads.size() + 1; }
301
314 template <typename Fn>
315 void ParallelFor(ExecutionPolicy policy, size_t count, Fn&& fn)
316 {
317 if (count == 0)
318 return;
319 size_t const workers = mThreads.size();
320 if (policy == ExecutionPolicy::Seq || workers == 0)
321 {
322 for (size_t i = 0; i < count; ++i)
323 ParallelForInvoke(fn, i, size_t{0}); // inline: caller is worker 0
324 return;
325 }
327 job.fn = &fn;
328 job.total = count;
329 size_t const helpers = std::min(workers, count);
330 job.remaining.store(helpers + 1, std::memory_order_relaxed); // + the participating caller
332 // Participate on the calling thread with id == workers (its own scratch slot).
333 for (size_t i; (i = job.cursor.fetch_add(1, std::memory_order_relaxed)) < count;)
335 // Fork-join: every co-invocation must return before the stack job dies.
336 if (job.remaining.fetch_sub(1, std::memory_order_acq_rel) != 1)
337 {
338 size_t r;
339 while ((r = job.remaining.load(std::memory_order_acquire)) != 0)
340 job.remaining.wait(r, std::memory_order_acquire);
341 }
342 }
344 template <typename Fn>
345 void ParallelFor(size_t count, Fn&& fn)
346 {
347 ParallelFor(ExecutionPolicy::Par, count, std::forward<Fn>(fn));
348 }
349
357 template <typename It, typename Fn>
358 requires std::random_access_iterator<It>
359 void ParallelFor(ExecutionPolicy policy, It first, It last, Fn&& fn)
360 {
361 auto const count = last - first;
362 if (count <= 0)
363 return;
364 ParallelFor(policy, static_cast<size_t>(count), [&](size_t i, size_t worker)
365 { ParallelForInvoke(fn, first[static_cast<std::iter_difference_t<It>>(i)], worker); });
366 }
368 template <typename It, typename Fn>
369 requires std::random_access_iterator<It>
370 void ParallelFor(It first, It last, Fn&& fn)
371 {
372 ParallelFor(ExecutionPolicy::Par, first, last, std::forward<Fn>(fn));
373 }
374
377 {
379 p.set_value();
380 return p.get_future();
381 }
382
397 template <typename Fn>
399 {
400 size_t const workers = mThreads.size();
401 if (count == 0)
402 return MakeReadyFuture();
403 if (policy == ExecutionPolicy::Seq || workers == 0)
404 {
405 for (size_t i = 0; i < count; ++i)
406 ParallelForInvoke(fn, i, size_t{0}); // inline: caller is worker 0
407 return MakeReadyFuture();
408 }
410 State* state = Construct<State>(mAllocator, std::forward<Fn>(fn), count, mAllocator);
411 size_t const helpers = std::min(workers, count);
412 // Arm the latch and grab the future BEFORE pushing: a pushed worker may run (and free
413 // the state) before this call returns.
414 state->remaining.store(helpers, std::memory_order_relaxed);
415 Future<void> fut = state->promise.get_future();
416 size_t pushed = 0;
417 try
418 {
419 for (; pushed < helpers; ++pushed)
420 PushImpl<ParallelForAsyncJob<std::remove_reference_t<Fn>>>(JobPriority::Normal, state);
421 }
422 catch (...)
423 {
424 // Some worker jobs failed to enqueue: retire their latch slots so the remaining
425 // (successfully pushed) workers can still satisfy the promise. If none were pushed
426 // or they have all already finished, finish the promise here.
427 size_t const missing = helpers - pushed;
428 if (missing != 0 && state->remaining.fetch_sub(missing, std::memory_order_acq_rel) == missing)
429 {
430 Allocator* alloc = state->alloc;
431 state->promise.set_value();
432 Destruct(alloc, state);
433 }
434 throw;
435 }
436 return fut;
437 }
439 template <typename Fn>
441 {
442 return ParallelForAsync(ExecutionPolicy::Par, count, std::forward<Fn>(fn));
443 }
449 template <typename It, typename Fn>
450 requires std::random_access_iterator<It>
452 {
453 auto const count = last - first;
454 if (count <= 0)
455 return MakeReadyFuture();
456 return ParallelForAsync(policy, static_cast<size_t>(count),
457 [first, fn = std::forward<Fn>(fn)](size_t i, size_t worker) mutable
458 { ParallelForInvoke(fn, first[static_cast<std::iter_difference_t<It>>(i)], worker); });
459 }
461 template <typename It, typename Fn>
462 requires std::random_access_iterator<It>
464 {
465 return ParallelForAsync(ExecutionPolicy::Par, first, last, std::forward<Fn>(fn));
466 }
467
472 void Shutdown();
477 void Join();
481 ~ThreadPool();
482
484 {
485 return mTotal.load(std::memory_order_relaxed) - mComplete.load(std::memory_order_relaxed);
486 }
487 [[nodiscard]] size_t GetCompletedJobCount() const noexcept { return mComplete.load(std::memory_order_relaxed); }
488 [[nodiscard]] size_t GetTotalJobCount() const noexcept { return mTotal.load(std::memory_order_relaxed); }
489
493 const static size_t CalcTaskSize(size_t size) { return 1ULL << static_cast<size_t>(std::ceil(std::log2f(size))); }
494 };
495} // namespace Foundation::Core
General Purpose Allocator (GPA) interface.
Definition Allocator.hpp:24
Atomic, bounded multi-producer multi-consumer FIFO ring buffer with a fixed maximum size.
Definition AtomicQueue.hpp:84
Atomic, lock-free Thread Pool implementation with fixed bounds.
Definition ThreadPool.hpp:168
Future< void > ParallelForAsync(It first, It last, Fn &&fn)
Iterator-range async parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:463
Future< void > ParallelForAsync(ExecutionPolicy policy, size_t count, Fn &&fn)
Non-blocking parallel-for over [0, count): schedules the work across the pool's workers and returns i...
Definition ThreadPool.hpp:398
size_t GetTotalJobCount() const noexcept
Definition ThreadPool.hpp:488
auto Push(JobPriority priority, Lambda &&func, Args const &... args)
Push a lambda job to the thread pool.
Definition ThreadPool.hpp:259
static constexpr size_t PriorityIndex(JobPriority priority) noexcept
Definition ThreadPool.hpp:179
void ParallelFor(ExecutionPolicy policy, size_t count, Fn &&fn)
Parallel-for over [0, count): invokes fn for every index as fn(i) or fn(i, workerId) (the worker id i...
Definition ThreadPool.hpp:315
auto PushLambdaInternal(JobPriority priority, Allocator *jobAllocator, Lambda &&func, Args const &... args)
Definition ThreadPool.hpp:198
size_t GetParallelForConcurrency() const noexcept
Number of distinct worker ids a ParallelFor functor may see (workers + the participating caller)....
Definition ThreadPool.hpp:300
Allocator * mAllocator
Definition ThreadPool.hpp:169
~ThreadPool()
Shutdown, without waiting for pending jobs.
Definition ThreadPool.cpp:45
void ParallelFor(ExecutionPolicy policy, It first, It last, Fn &&fn)
Iterator-range parallel-for, like a (policy-aware) parallel std::for_each: invokes fn for each elemen...
Definition ThreadPool.hpp:359
static const size_t CalcTaskSize(size_t size)
Definition ThreadPool.hpp:493
void ParallelFor(It first, It last, Fn &&fn)
Iterator-range parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:370
void Shutdown()
Shutdown the ThreadPool, potentially cancelling all pending jobs.
Definition ThreadPool.cpp:30
T * PushImpl(Args &&... args)
Definition ThreadPool.hpp:233
T * PushImplAlloc(Allocator *jobAllocator, Args &&... args)
Push a job with an explicit allocator for the job object.
Definition ThreadPool.hpp:244
auto Push(Lambda &&func, Args const &... args)
Definition ThreadPool.hpp:264
Atomic< size_t > mComplete
Definition ThreadPool.hpp:172
T * PushImpl(JobPriority priority, Args &&... args)
Push a job implementing ThreadPoolJob to the thread pool.
Definition ThreadPool.hpp:227
String mName
Definition ThreadPool.hpp:170
Atomic< size_t > mTotal
Definition ThreadPool.hpp:173
Atomic< bool > mShutdown
Definition ThreadPool.hpp:171
void ParallelFor(size_t count, Fn &&fn)
Index parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:345
void CoInvoke(Job &job, size_t count, JobPriority priority=JobPriority::Normal)
Enqueues count non-owning references to a single, externally-owned job so that up to count workers co...
Definition ThreadPool.cpp:14
size_t GetWorkerCount() const noexcept
Number of worker threads. Worker ids passed to Execute are in [0, this).
Definition ThreadPool.hpp:294
size_t GetCompletedJobCount() const noexcept
Definition ThreadPool.hpp:487
Vector< Thread > mThreads
Definition ThreadPool.hpp:177
T * PushImplAlloc(JobPriority priority, Allocator *jobAllocator, Args &&... args)
Definition ThreadPool.hpp:250
auto PushAlloc(JobPriority priority, Allocator *jobAllocator, Lambda &&func, Args const &... args)
Definition ThreadPool.hpp:278
void Join()
Wait for all scheduled jobs to complete.
Definition ThreadPool.cpp:36
T * PushImplInternal(JobPriority priority, Allocator *jobAllocator, Args &&... args)
Definition ThreadPool.hpp:182
void ThreadPoolWorker(size_t id)
Definition ThreadPool.cpp:49
auto PushAlloc(Allocator *jobAllocator, Lambda &&func, Args const &... args)
Push a lambda job with an explicit allocator for the job object.
Definition ThreadPool.hpp:273
static Future< void > MakeReadyFuture()
Returns an already-satisfied Future<void> (for trivial/inline async paths).
Definition ThreadPool.hpp:376
Future< void > ParallelForAsync(ExecutionPolicy policy, It first, It last, Fn &&fn)
Iterator-range overload of ParallelForAsync (random-access iterators only).
Definition ThreadPool.hpp:451
size_t GetPendingJobCount() const noexcept
Definition ThreadPool.hpp:483
JobQueues mJobs
Definition ThreadPool.hpp:175
Future< void > ParallelForAsync(size_t count, Fn &&fn)
Index async parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:440
Lock-free atomic primitives and implementations of data structures.
Definition Allocator.hpp:5
std::vector< T, StlAllocator< T > > Vector
std::vector with explicit Foundation::Core::StlAllocator constructor
Definition Container.hpp:130
std::basic_string< char > String
Alias for std::basic_string<char>, without an explicit allocator constructor.
Definition Container.hpp:112
void ParallelForInvoke(Fn &fn, Arg &&arg, size_t workerId)
Invokes a ThreadPool::ParallelFor body for one item, passing the worker id only if the functor accept...
Definition ThreadPool.hpp:79
JobPriority
Definition ThreadPool.hpp:12
std::array< JobQueue, kJobPriorityCount > JobQueues
Definition ThreadPool.hpp:163
std::future< T > Future
Definition Thread.hpp:7
void Destruct(Allocator *resource, T *obj)
Convenience destructor for objects allocated with Construct or ConstructBase.
Definition Allocator.hpp:160
std::atomic< T > Atomic
Alias of std::atomic<T>.
Definition Atomic.hpp:26
ExecutionPolicy
Runtime execution policy for ThreadPool::ParallelFor, mirroring std::execution's seq/par — but chosen...
Definition ThreadPool.hpp:25
constexpr size_t kJobPriorityCount
Definition ThreadPool.hpp:17
std::basic_string_view< char > StringView
Alias for std::basic_string_view<char>
Definition Container.hpp:55
T * Construct(Allocator *resource, Args &&...args)
Convenience placement new with object of type T using a Foundation::Core::Allocator.
Definition Allocator.hpp:153
std::promise< T > Promise
Definition Thread.hpp:6
Job interface for use with ThreadPool.
Definition ThreadPool.hpp:37
virtual ~Job()=default
virtual void Execute(size_t id) noexcept=0
State-carrying lambda job for use with ThreadPool.
Definition ThreadPool.hpp:51
void Execute(size_t) noexcept override
Definition ThreadPool.hpp:55
LambdaJob(Lambda &&func)
Definition ThreadPool.hpp:54
Lambda mFunc
Definition ThreadPool.hpp:52
Promise< ReturnType > mPromise
Definition ThreadPool.hpp:53
One worker of a ParallelForAsyncState: drains the shared cursor and, when it is the last running work...
Definition ThreadPool.hpp:142
void Execute(size_t workerId) noexcept override
Definition ThreadPool.hpp:145
ParallelForAsyncState< Fn > * state
Definition ThreadPool.hpp:143
ParallelForAsyncJob(ParallelForAsyncState< Fn > *s)
Definition ThreadPool.hpp:144
Heap-resident shared state for a non-blocking ThreadPool::ParallelForAsync.
Definition ThreadPool.hpp:124
Allocator * alloc
Definition ThreadPool.hpp:130
Atomic< size_t > cursor
Definition ThreadPool.hpp:127
ParallelForAsyncState(Fn &&f, size_t t, Allocator *a)
Definition ThreadPool.hpp:131
size_t total
Definition ThreadPool.hpp:126
Fn fn
Definition ThreadPool.hpp:125
Atomic< size_t > remaining
Definition ThreadPool.hpp:128
Promise< void > promise
Definition ThreadPool.hpp:129
Self-draining for-loop job used by ThreadPool::ParallelFor.
Definition ThreadPool.hpp:100
void Execute(size_t workerId) noexcept override
Definition ThreadPool.hpp:105
Atomic< size_t > cursor
Definition ThreadPool.hpp:103
Fn * fn
Definition ThreadPool.hpp:101
Atomic< size_t > remaining
Definition ThreadPool.hpp:104
size_t total
Definition ThreadPool.hpp:102