Foundation
Loading...
Searching...
No Matches
ThreadPool.hpp
Go to the documentation of this file.
1#pragma once
2#include "AtomicQueue.hpp"
3#include "Thread.hpp"
4#include <cmath>
5namespace Foundation::Core
6{
14 {
15 virtual ~ThreadPoolJob() = default;
16 virtual void Execute(size_t id) noexcept = 0;
17 };
26 template <typename Lambda, typename ReturnType, typename... Args>
28 {
32 void Execute(size_t) noexcept override
33 {
34 try
35 {
36 if constexpr (std::is_same_v<ReturnType, void>)
37 {
38 mFunc();
39 mPromise.set_value();
40 }
41 else
42 mPromise.set_value(mFunc());
43 }
44 catch (...)
45 {
46 mPromise.set_exception(std::current_exception());
47 }
48 }
49 };
58 {
64
66 // Ensure threads are joined first on destruction
68 void ThreadPoolWorker(size_t id);
69
70 public:
78 ThreadPool(size_t numThreads, size_t maxTasks, Allocator* alloc, StringView name = "ThreadPool");
86 template <typename T, typename... Args>
87 requires std::is_base_of_v<ThreadPoolJob, T>
89 {
90 if (mShutdown)
91 throw std::runtime_error("ThreadPool shutting down");
92 auto task = ConstructUniqueBase<ThreadPoolJob, T>(mAllocator, std::forward<Args>(args)...);
93 T* ptr = static_cast<T*>(task.get());
94 if (!mJobs.Push(std::move(task)))
95 throw std::runtime_error("Jobs full");
96 mTotal.fetch_add(1, std::memory_order_relaxed);
97 mTotal.notify_one();
98 return ptr;
99 }
104 template <typename Lambda, typename... Args>
105 auto Push(Lambda&& func, Args const&... args)
106 {
107 auto LambdaFn = [func = std::forward<Lambda>(func), ... args = args] { return func(args...); };
108 using LambdaType = decltype(LambdaFn);
109 using ReturnType = decltype(LambdaFn());
111 auto fut = job.mPromise.get_future();
113 return std::move(fut);
114 }
119 void Shutdown();
124 void Join();
128 ~ThreadPool();
129
131 {
132 return mTotal.load(std::memory_order_relaxed) - mComplete.load(std::memory_order_relaxed);
133 }
134 [[nodiscard]] size_t GetCompletedJobCount() const noexcept { return mComplete.load(std::memory_order_relaxed); }
135 [[nodiscard]] size_t GetTotalJobCount() const noexcept { return mTotal.load(std::memory_order_relaxed); }
136
140 constexpr static size_t getTaskSize(size_t size) { return 1ULL << static_cast<size_t>(std::ceil(std::log2f(size))); }
141 };
142} // namespace Foundation::Core
General Purpose Allocator (GPA) interface.
Definition Allocator.hpp:24
bool Push(U &&data)
Definition AtomicQueue.hpp:101
Atomic, lock-free Thread Pool implementation with fixed bounds.
Definition ThreadPool.hpp:58
size_t GetTotalJobCount() const noexcept
Definition ThreadPool.hpp:135
Allocator * mAllocator
Definition ThreadPool.hpp:59
~ThreadPool()
Shutdown, without waiting for pending jobs.
Definition ThreadPool.cpp:25
T * PushImpl(Args &&... args)
Push a job implementing ThreadPoolJob to the thread pool.
Definition ThreadPool.hpp:88
void Shutdown()
Shutdown the ThreadPool, potentially cancelling all pending jobs.
Definition ThreadPool.cpp:10
JobQueue mJobs
Definition ThreadPool.hpp:65
auto Push(Lambda &&func, Args const &... args)
Push a lambda job to the thread pool.
Definition ThreadPool.hpp:105
Atomic< size_t > mComplete
Definition ThreadPool.hpp:62
String mName
Definition ThreadPool.hpp:60
Atomic< size_t > mTotal
Definition ThreadPool.hpp:63
Atomic< bool > mShutdown
Definition ThreadPool.hpp:61
static constexpr size_t getTaskSize(size_t size)
Definition ThreadPool.hpp:140
size_t GetCompletedJobCount() const noexcept
Definition ThreadPool.hpp:134
Vector< Thread > mThreads
Definition ThreadPool.hpp:67
void Join()
Wait for all scheduled jobs to complete.
Definition ThreadPool.cpp:16
void ThreadPoolWorker(size_t id)
Definition ThreadPool.cpp:29
size_t GetPendingJobCount() const noexcept
Definition ThreadPool.hpp:130
Lock-free atomic primitives and implementations of data structures.
Definition Allocator.hpp:5
std::vector< T, StlAllocator< T > > Vector
std::vector with explicit Foundation::Core::StlAllocator constructor
Definition Container.hpp:130
std::basic_string< char > String
Alias for std::basic_string<char>, without an explicit allocator constructor.
Definition Container.hpp:112
std::atomic< T > Atomic
Alias of std::atomic<T>.
Definition Atomic.hpp:26
std::basic_string_view< char > StringView
Alias for std::basic_string_view<char>
Definition Container.hpp:55
T * Construct(Allocator *resource, Args &&...args)
Convenience placement new with object of type T using a Foundation::Core::Allocator.
Definition Allocator.hpp:149
std::promise< T > Promise
Definition Thread.hpp:6
Job interface for use with ThreadPool.
Definition ThreadPool.hpp:14
virtual ~ThreadPoolJob()=default
virtual void Execute(size_t id) noexcept=0
State-carrying lambda job for use with ThreadPool.
Definition ThreadPool.hpp:28
Promise< ReturnType > mPromise
Definition ThreadPool.hpp:30
Lambda mFunc
Definition ThreadPool.hpp:29
ThreadPoolLambdaJob(Lambda &&func)
Definition ThreadPool.hpp:31
void Execute(size_t) noexcept override
Definition ThreadPool.hpp:32