39 virtual void Execute(
size_t id)
noexcept = 0;
59 if constexpr (std::is_same_v<ReturnType, void>)
69 mPromise.set_exception(std::current_exception());
78 template <
typename Fn,
typename Arg>
81 if constexpr (std::is_invocable_v<Fn&, Arg&&, size_t>)
84 fn(std::forward<Arg>(
arg));
98 template <
typename Fn>
107 for (
size_t i; (
i =
cursor.fetch_add(1, std::memory_order_relaxed)) <
total;)
109 if (
remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
122 template <
typename Fn>
140 template <
typename Fn>
147 for (
size_t i; (
i =
state->cursor.fetch_add(1, std::memory_order_relaxed)) <
state->total;)
150 if (
state->remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
153 state->promise.set_value();
163 using JobQueues = std::array<JobQueue, kJobPriorityCount>;
180 template <
typename T,
typename...
Args>
181 requires std::is_base_of_v<Job, T>
185 throw std::runtime_error(
"ThreadPool shutting down");
187 throw std::runtime_error(
"Invalid job priority");
190 T* ptr =
static_cast<T*
>(
task.get());
192 throw std::runtime_error(
"Jobs full");
193 mTotal.fetch_add(1, std::memory_order_relaxed);
204 auto fut =
job.mPromise.get_future();
206 return std::move(
fut);
225 template <
typename T,
typename...
Args>
226 requires std::is_base_of_v<Job, T>
231 template <
typename T,
typename...
Args>
232 requires std::is_base_of_v<Job, T>
242 template <
typename T,
typename...
Args>
243 requires std::is_base_of_v<Job, T>
248 template <
typename T,
typename...
Args>
249 requires std::is_base_of_v<Job, T>
314 template <
typename Fn>
322 for (
size_t i = 0;
i < count; ++
i)
330 job.remaining.store(
helpers + 1, std::memory_order_relaxed);
333 for (
size_t i; (
i =
job.cursor.fetch_add(1, std::memory_order_relaxed)) < count;)
336 if (
job.remaining.fetch_sub(1, std::memory_order_acq_rel) != 1)
339 while ((
r =
job.remaining.load(std::memory_order_acquire)) != 0)
340 job.remaining.wait(
r, std::memory_order_acquire);
344 template <
typename Fn>
357 template <
typename It,
typename Fn>
358 requires std::random_access_iterator<It>
361 auto const count =
last - first;
368 template <
typename It,
typename Fn>
369 requires std::random_access_iterator<It>
380 return p.get_future();
397 template <
typename Fn>
405 for (
size_t i = 0;
i < count; ++
i)
414 state->remaining.store(
helpers, std::memory_order_relaxed);
431 state->promise.set_value();
439 template <
typename Fn>
449 template <
typename It,
typename Fn>
450 requires std::random_access_iterator<It>
453 auto const count =
last - first;
457 [first, fn = std::forward<Fn>(fn)](
size_t i,
size_t worker)
mutable
461 template <
typename It,
typename Fn>
462 requires std::random_access_iterator<It>
485 return mTotal.load(std::memory_order_relaxed) -
mComplete.load(std::memory_order_relaxed);
493 const static size_t CalcTaskSize(
size_t size) {
return 1ULL <<
static_cast<size_t>(std::ceil(std::log2f(size))); }
General Purpose Allocator (GPA) interface.
Definition Allocator.hpp:24
Atomic, bounded multi-producer multi-consumer FIFO ring buffer with a fixed maximum size.
Definition AtomicQueue.hpp:84
Atomic, lock-free Thread Pool implementation with fixed bounds.
Definition ThreadPool.hpp:168
Future< void > ParallelForAsync(It first, It last, Fn &&fn)
Iterator-range async parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:463
Future< void > ParallelForAsync(ExecutionPolicy policy, size_t count, Fn &&fn)
Non-blocking parallel-for over [0, count): schedules the work across the pool's workers and returns i...
Definition ThreadPool.hpp:398
size_t GetTotalJobCount() const noexcept
Definition ThreadPool.hpp:488
auto Push(JobPriority priority, Lambda &&func, Args const &... args)
Push a lambda job to the thread pool.
Definition ThreadPool.hpp:259
static constexpr size_t PriorityIndex(JobPriority priority) noexcept
Definition ThreadPool.hpp:179
void ParallelFor(ExecutionPolicy policy, size_t count, Fn &&fn)
Parallel-for over [0, count): invokes fn for every index as fn(i) or fn(i, workerId) (the worker id i...
Definition ThreadPool.hpp:315
auto PushLambdaInternal(JobPriority priority, Allocator *jobAllocator, Lambda &&func, Args const &... args)
Definition ThreadPool.hpp:198
size_t GetParallelForConcurrency() const noexcept
Number of distinct worker ids a ParallelFor functor may see (workers + the participating caller)....
Definition ThreadPool.hpp:300
Allocator * mAllocator
Definition ThreadPool.hpp:169
~ThreadPool()
Shutdown, without waiting for pending jobs.
Definition ThreadPool.cpp:45
void ParallelFor(ExecutionPolicy policy, It first, It last, Fn &&fn)
Iterator-range parallel-for, like a (policy-aware) parallel std::for_each: invokes fn for each elemen...
Definition ThreadPool.hpp:359
static const size_t CalcTaskSize(size_t size)
Definition ThreadPool.hpp:493
void ParallelFor(It first, It last, Fn &&fn)
Iterator-range parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:370
void Shutdown()
Shutdown the ThreadPool, potentially cancelling all pending jobs.
Definition ThreadPool.cpp:30
T * PushImpl(Args &&... args)
Definition ThreadPool.hpp:233
T * PushImplAlloc(Allocator *jobAllocator, Args &&... args)
Push a job with an explicit allocator for the job object.
Definition ThreadPool.hpp:244
auto Push(Lambda &&func, Args const &... args)
Definition ThreadPool.hpp:264
Atomic< size_t > mComplete
Definition ThreadPool.hpp:172
T * PushImpl(JobPriority priority, Args &&... args)
Push a job implementing ThreadPoolJob to the thread pool.
Definition ThreadPool.hpp:227
String mName
Definition ThreadPool.hpp:170
Atomic< size_t > mTotal
Definition ThreadPool.hpp:173
Atomic< bool > mShutdown
Definition ThreadPool.hpp:171
void ParallelFor(size_t count, Fn &&fn)
Index parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:345
void CoInvoke(Job &job, size_t count, JobPriority priority=JobPriority::Normal)
Enqueues count non-owning references to a single, externally-owned job so that up to count workers co...
Definition ThreadPool.cpp:14
size_t GetWorkerCount() const noexcept
Number of worker threads. Worker ids passed to Execute are in [0, this).
Definition ThreadPool.hpp:294
size_t GetCompletedJobCount() const noexcept
Definition ThreadPool.hpp:487
Vector< Thread > mThreads
Definition ThreadPool.hpp:177
T * PushImplAlloc(JobPriority priority, Allocator *jobAllocator, Args &&... args)
Definition ThreadPool.hpp:250
auto PushAlloc(JobPriority priority, Allocator *jobAllocator, Lambda &&func, Args const &... args)
Definition ThreadPool.hpp:278
void Join()
Wait for all scheduled jobs to complete.
Definition ThreadPool.cpp:36
T * PushImplInternal(JobPriority priority, Allocator *jobAllocator, Args &&... args)
Definition ThreadPool.hpp:182
void ThreadPoolWorker(size_t id)
Definition ThreadPool.cpp:49
auto PushAlloc(Allocator *jobAllocator, Lambda &&func, Args const &... args)
Push a lambda job with an explicit allocator for the job object.
Definition ThreadPool.hpp:273
static Future< void > MakeReadyFuture()
Returns an already-satisfied Future<void> (for trivial/inline async paths).
Definition ThreadPool.hpp:376
Future< void > ParallelForAsync(ExecutionPolicy policy, It first, It last, Fn &&fn)
Iterator-range overload of ParallelForAsync (random-access iterators only).
Definition ThreadPool.hpp:451
size_t GetPendingJobCount() const noexcept
Definition ThreadPool.hpp:483
JobQueues mJobs
Definition ThreadPool.hpp:175
Future< void > ParallelForAsync(size_t count, Fn &&fn)
Index async parallel-for defaulting to ExecutionPolicy::Par.
Definition ThreadPool.hpp:440
Lock-free atomic primitives and implementations of data structures.
Definition Allocator.hpp:5
std::vector< T, StlAllocator< T > > Vector
std::vector with explicit Foundation::Core::StlAllocator constructor
Definition Container.hpp:130
std::basic_string< char > String
Alias for std::basic_string<char>, without an explicit allocator constructor.
Definition Container.hpp:112
void ParallelForInvoke(Fn &fn, Arg &&arg, size_t workerId)
Invokes a ThreadPool::ParallelFor body for one item, passing the worker id only if the functor accept...
Definition ThreadPool.hpp:79
JobPriority
Definition ThreadPool.hpp:12
std::array< JobQueue, kJobPriorityCount > JobQueues
Definition ThreadPool.hpp:163
std::future< T > Future
Definition Thread.hpp:7
void Destruct(Allocator *resource, T *obj)
Convenience destructor for objects allocated with Construct or ConstructBase.
Definition Allocator.hpp:160
std::atomic< T > Atomic
Alias of std::atomic<T>.
Definition Atomic.hpp:26
ExecutionPolicy
Runtime execution policy for ThreadPool::ParallelFor, mirroring std::execution's seq/par — but chosen...
Definition ThreadPool.hpp:25
constexpr size_t kJobPriorityCount
Definition ThreadPool.hpp:17
std::basic_string_view< char > StringView
Alias for std::basic_string_view<char>
Definition Container.hpp:55
T * Construct(Allocator *resource, Args &&...args)
Convenience placement new with object of type T using a Foundation::Core::Allocator.
Definition Allocator.hpp:153
std::promise< T > Promise
Definition Thread.hpp:6
Job interface for use with ThreadPool.
Definition ThreadPool.hpp:37
virtual void Execute(size_t id) noexcept=0
State-carrying lambda job for use with ThreadPool.
Definition ThreadPool.hpp:51
void Execute(size_t) noexcept override
Definition ThreadPool.hpp:55
LambdaJob(Lambda &&func)
Definition ThreadPool.hpp:54
Lambda mFunc
Definition ThreadPool.hpp:52
Promise< ReturnType > mPromise
Definition ThreadPool.hpp:53
One worker of a ParallelForAsyncState: drains the shared cursor and, when it is the last running work...
Definition ThreadPool.hpp:142
void Execute(size_t workerId) noexcept override
Definition ThreadPool.hpp:145
ParallelForAsyncState< Fn > * state
Definition ThreadPool.hpp:143
ParallelForAsyncJob(ParallelForAsyncState< Fn > *s)
Definition ThreadPool.hpp:144
Heap-resident shared state for a non-blocking ThreadPool::ParallelForAsync.
Definition ThreadPool.hpp:124
Allocator * alloc
Definition ThreadPool.hpp:130
Atomic< size_t > cursor
Definition ThreadPool.hpp:127
ParallelForAsyncState(Fn &&f, size_t t, Allocator *a)
Definition ThreadPool.hpp:131
size_t total
Definition ThreadPool.hpp:126
Fn fn
Definition ThreadPool.hpp:125
Atomic< size_t > remaining
Definition ThreadPool.hpp:128
Promise< void > promise
Definition ThreadPool.hpp:129
Self-draining for-loop job used by ThreadPool::ParallelFor.
Definition ThreadPool.hpp:100
void Execute(size_t workerId) noexcept override
Definition ThreadPool.hpp:105
Atomic< size_t > cursor
Definition ThreadPool.hpp:103
Fn * fn
Definition ThreadPool.hpp:101
Atomic< size_t > remaining
Definition ThreadPool.hpp:104
size_t total
Definition ThreadPool.hpp:102