Foundation
Loading...
Searching...
No Matches
JobGraph.hpp
Go to the documentation of this file.
1#pragma once
2#include "Logging.hpp"
3#include "ThreadPool.hpp"
4#include <condition_variable>
5#include <mutex>
6namespace Foundation::Core
7{
12 struct JobHandle
13 {
14 static constexpr size_t kInvalid = ~static_cast<size_t>(0);
15 size_t id{kInvalid};
16 [[nodiscard]] bool Valid() const noexcept { return id != kInvalid; }
17 };
18
45 {
46 public:
47 enum class NodeKind
48 {
49 Worker, // single body, runs once on a pool worker (or main if no workers)
50 ParallelFor, // body invoked per index, fanned out across the pool
51 Main, // single body, runs on the Wait/Pump thread
52 Barrier, // no body; completes when its producers complete
53 };
54
55 private:
56 // Type-erased node body. Single-shot nodes use RunSingle; parallel-for nodes use RunIndex.
57 struct IJobWork
58 {
59 virtual ~IJobWork() = default;
60 virtual void RunSingle(size_t /*workerId*/) {}
61 virtual void RunIndex(size_t /*index*/, size_t /*workerId*/) {}
62 };
63 template <typename Fn>
64 struct SingleWork final : IJobWork
65 {
66 Fn fn;
67 explicit SingleWork(Fn&& f) : fn(std::move(f)) {}
68 void RunSingle(size_t workerId) override
69 {
70 if constexpr (std::is_invocable_v<Fn&, size_t>)
71 fn(workerId);
72 else
73 fn();
74 }
75 };
76 template <typename Fn>
77 struct IndexWork final : IJobWork
78 {
79 Fn fn;
80 explicit IndexWork(Fn&& f) : fn(std::move(f)) {}
81 void RunIndex(size_t index, size_t workerId) override { ParallelForInvoke(fn, index, workerId); }
82 };
83
84 struct Node
85 {
89 size_t total{0}; // parallel-for element count
91 // Scheduling state guarded by JobGraph::mMutex.
92 size_t pending{0}; // unmet producer dependencies
93 bool finished{false};
95 // Parallel-for hot state (touched lock-free by the fanned-out workers).
98 explicit Node(Allocator* alloc) : dependents(alloc) {}
99 };
100
101 // Pool jobs that drive node bodies; defined in the .cpp (need the full JobGraph).
102 struct WorkerJob;
103 struct ParallelForJob;
104
110 Queue<size_t> mMainReady; // node ids ready to run on the Wait/Pump thread
112 size_t mMainWorkerId{0};
113 bool mSubmitted{false};
114
115 size_t AddNode(NodeKind kind, StringView name, UniquePtr<IJobWork> work, size_t total,
116 ExecutionPolicy policy);
117 void AddEdge(size_t producer, size_t consumer);
118 void Schedule(size_t id);
119 void DispatchWorker(size_t id);
120 void DispatchParallelFor(size_t id);
121 void EnqueueMain(size_t id);
122 void RunMainNode(size_t id);
123 void OnNodeFinished(size_t id);
124
125 public:
126 JobGraph(ThreadPool& pool, Allocator* allocator);
127 JobGraph(JobGraph const&) = delete;
128 JobGraph& operator=(JobGraph const&) = delete;
129 ~JobGraph();
130
132 template <typename Fn>
134 {
135 auto work = ConstructUniqueBase<IJobWork, SingleWork<std::remove_reference_t<Fn>>>(
136 mAllocator, std::forward<Fn>(fn));
137 return {AddNode(NodeKind::Worker, name, std::move(work), 0, policy)};
138 }
140 template <typename Fn>
142 {
143 auto work = ConstructUniqueBase<IJobWork, SingleWork<std::remove_reference_t<Fn>>>(
144 mAllocator, std::forward<Fn>(fn));
145 return {AddNode(NodeKind::Main, name, std::move(work), 0, ExecutionPolicy::Seq)};
146 }
148 template <typename Fn>
149 JobHandle AddParallelFor(StringView name, ExecutionPolicy policy, size_t count, Fn&& fn)
150 {
151 auto work = ConstructUniqueBase<IJobWork, IndexWork<std::remove_reference_t<Fn>>>(
152 mAllocator, std::forward<Fn>(fn));
153 return {AddNode(NodeKind::ParallelFor, name, std::move(work), count, policy)};
154 }
156 template <typename It, typename Fn>
157 requires std::random_access_iterator<It>
158 JobHandle AddParallelFor(StringView name, ExecutionPolicy policy, It first, It last, Fn&& fn)
159 {
160 auto const count = last - first;
161 size_t const total = count <= 0 ? 0 : static_cast<size_t>(count);
162 return AddParallelFor(name, policy, total,
163 [first, fn = std::forward<Fn>(fn)](size_t i, size_t worker) mutable
164 { ParallelForInvoke(fn, first[static_cast<std::iter_difference_t<It>>(i)], worker); });
165 }
168
173 template <typename... Producers>
174 void DependsOn(JobHandle consumer, Producers... producers)
175 {
176 (AddEdge(producers.id, consumer.id), ...);
177 }
178
180 void Submit();
182 void Wait(JobHandle target);
184 void Wait() { Wait(JobHandle{}); }
186 void PumpMainThread();
187
189 [[nodiscard]] size_t MainWorkerId() const noexcept { return mMainWorkerId; }
190 };
191} // namespace Foundation::Core
General Purpose Allocator (GPA) interface.
Definition Allocator.hpp:24
A small, transient CPU job graph layered on top of ThreadPool.
Definition JobGraph.hpp:45
void Wait()
Blocks until the whole graph completes (pumping main-thread nodes on the caller).
Definition JobGraph.hpp:184
void Submit()
Arms the graph and schedules all dependency-free nodes. Call exactly once.
Definition JobGraph.cpp:162
void DependsOn(JobHandle consumer, Producers... producers)
Declares that consumer may only run after every producers has completed.
Definition JobGraph.hpp:174
Mutex mMutex
Definition JobGraph.hpp:108
CondVar mCond
Definition JobGraph.hpp:109
size_t MainWorkerId() const noexcept
Worker id handed to main-thread node bodies (== ThreadPool::GetWorkerCount).
Definition JobGraph.hpp:189
JobGraph(JobGraph const &)=delete
void EnqueueMain(size_t id)
Definition JobGraph.cpp:114
JobHandle AddParallelFor(StringView name, ExecutionPolicy policy, It first, It last, Fn &&fn)
Iterator-range parallel-for (random-access iterators), like the ThreadPool overload.
Definition JobGraph.hpp:158
JobHandle AddJob(StringView name, ExecutionPolicy policy, Fn &&fn)
A single body that runs once on a pool worker (or the main thread under Seq / no workers).
Definition JobGraph.hpp:133
void AddEdge(size_t producer, size_t consumer)
Definition JobGraph.cpp:63
Allocator * mAllocator
Definition JobGraph.hpp:106
Vector< UniquePtr< Node > > mNodes
Definition JobGraph.hpp:107
Queue< size_t > mMainReady
Definition JobGraph.hpp:110
void Schedule(size_t id)
Definition JobGraph.cpp:77
size_t AddNode(NodeKind kind, StringView name, UniquePtr< IJobWork > work, size_t total, ExecutionPolicy policy)
Definition JobGraph.cpp:49
~JobGraph()
Definition JobGraph.cpp:41
void DispatchWorker(size_t id)
Definition JobGraph.cpp:96
ThreadPool & mPool
Definition JobGraph.hpp:105
JobHandle AddParallelFor(StringView name, ExecutionPolicy policy, size_t count, Fn &&fn)
Index parallel-for body invoked as fn(i) or fn(i, workerId) for each index.
Definition JobGraph.hpp:149
void RunMainNode(size_t id)
Definition JobGraph.cpp:123
JobHandle AddBarrier(StringView name)
A work-less join node that completes once all its producers complete.
Definition JobGraph.cpp:71
NodeKind
Definition JobGraph.hpp:48
void OnNodeFinished(size_t id)
Definition JobGraph.cpp:138
size_t mMainWorkerId
Definition JobGraph.hpp:112
size_t mNodesRemaining
Definition JobGraph.hpp:111
bool mSubmitted
Definition JobGraph.hpp:113
JobHandle AddMain(StringView name, Fn &&fn)
A single body that always runs on the thread calling Wait / PumpMainThread.
Definition JobGraph.hpp:141
JobGraph & operator=(JobGraph const &)=delete
void PumpMainThread()
Runs every currently-ready main-thread node on the caller without blocking.
Definition JobGraph.cpp:179
void DispatchParallelFor(size_t id)
Definition JobGraph.cpp:98
Atomic, lock-free Thread Pool implementation with fixed bounds.
Definition ThreadPool.hpp:168
Lock-free atomic primitives and implementations of data structures.
Definition Allocator.hpp:5
std::vector< T, StlAllocator< T > > Vector
std::vector with explicit Foundation::Core::StlAllocator constructor
Definition Container.hpp:130
std::mutex Mutex
Definition Thread.hpp:10
void ParallelForInvoke(Fn &fn, Arg &&arg, size_t workerId)
Invokes a ThreadPool::ParallelFor body for one item, passing the worker id only if the functor accept...
Definition ThreadPool.hpp:79
std::queue< T, Container > Queue
std::queue with explicit Foundation::Core::StlAllocator constructor
Definition Container.hpp:197
std::condition_variable CondVar
Definition Thread.hpp:9
std::atomic< T > Atomic
Alias of std::atomic<T>.
Definition Atomic.hpp:26
ExecutionPolicy
Runtime execution policy for ThreadPool::ParallelFor, mirroring std::execution's seq/par — but chosen...
Definition ThreadPool.hpp:25
std::basic_string_view< char > StringView
Alias for std::basic_string_view<char>
Definition Container.hpp:55
std::unique_ptr< T, Deleter > UniquePtr
std::unique_ptr with custom deleter that uses a Foundation::Core::Allocator to deallocate memory.
Definition Allocator.hpp:170
Definition JobGraph.hpp:58
virtual void RunSingle(size_t)
Definition JobGraph.hpp:60
virtual void RunIndex(size_t, size_t)
Definition JobGraph.hpp:61
Definition JobGraph.hpp:78
Fn fn
Definition JobGraph.hpp:79
IndexWork(Fn &&f)
Definition JobGraph.hpp:80
void RunIndex(size_t index, size_t workerId) override
Definition JobGraph.hpp:81
Definition JobGraph.hpp:85
size_t pending
Definition JobGraph.hpp:92
Vector< size_t > dependents
Definition JobGraph.hpp:94
StringView name
Definition JobGraph.hpp:90
UniquePtr< IJobWork > work
Definition JobGraph.hpp:88
size_t total
Definition JobGraph.hpp:89
NodeKind kind
Definition JobGraph.hpp:86
Node(Allocator *alloc)
Definition JobGraph.hpp:98
Atomic< size_t > cursor
Definition JobGraph.hpp:96
bool finished
Definition JobGraph.hpp:93
Atomic< size_t > remaining
Definition JobGraph.hpp:97
ExecutionPolicy policy
Definition JobGraph.hpp:87
Definition JobGraph.hpp:65
SingleWork(Fn &&f)
Definition JobGraph.hpp:67
Fn fn
Definition JobGraph.hpp:66
void RunSingle(size_t workerId) override
Definition JobGraph.hpp:68
Opaque handle to a node in a JobGraph. Returned by the Add* builders and passed to JobGraph::DependsO...
Definition JobGraph.hpp:13
size_t id
Definition JobGraph.hpp:15
bool Valid() const noexcept
Definition JobGraph.hpp:16
static constexpr size_t kInvalid
Definition JobGraph.hpp:14
Self-draining for-loop job used by ThreadPool::ParallelFor.
Definition ThreadPool.hpp:100
Custom deleter for Foundation::Core::UniquePtr and Foundation::Core::SharedPtr that uses a Foundation...
Definition Allocator.hpp:118