Foundation
Loading...
Searching...
No Matches
Queue.hpp
Go to the documentation of this file.
1#pragma once
2#include <Core/Core.hpp>
3#include <Core/Allocator.hpp>
4#include "Atomic.hpp"
6{
7 using namespace Foundation::Core;
12 template <typename T>
14 {
15 const size_t mModulo;
18 // Only used in reader thread
19 size_t mReadCached{};
20 // Only used in writer thread
21 size_t mWriteCached{};
22 public:
28 SPSCQueue(size_t size, Allocator* alloc) :
29 mModulo(size - 1), mBuffer(size, alloc) {
30 CHECK_MSG(size > 0 && (size & mModulo) == 0, "Size must be a power of two");
31 }
38 template<typename U>
39 bool Push(U&& data)
40 {
41 size_t write = mWrite.load(std::memory_order_relaxed);
42 // Amortize atomic reads
43 // We don't need to update the read head everytime. Only when we think we're full.
44 if (write - mReadCached == mBuffer.size()) [[unlikely]]
45 {
46 // Wrapped. Next cycle could begin only when there's enough space
47 mReadCached = mRead.load(std::memory_order_acquire);
48 if (write - mReadCached == mBuffer.size())
49 return false; // full
50 }
51 mBuffer[write & mModulo] = std::forward<U>(data);
52 mWrite.store(write + 1, std::memory_order_release);
53 return true;
54 }
62 bool Pop(T& out)
63 {
64 size_t read = mRead.load(std::memory_order_relaxed);
65 // Same as above
66 if (read == mWriteCached) [[unlikely]]
67 {
68 mWriteCached = mWrite.load(std::memory_order_acquire);
69 if (read == mWriteCached)
70 return false; // empty
71 }
72 mRead.store(read + 1, std::memory_order_release);
73 out = std::move(mBuffer[read & mModulo]);
74 return true;
75 }
76 };
81 template<typename T>
83 {
84 struct Data
85 {
87 Atomic<size_t> writeCycle{} /* when to write */ , readCycle{} /* when to read */;
88 };
89 const size_t mModulo, mShift;
92 size_t mWriteCached{};
93 public:
94 MPMCQueue(size_t size, Allocator* alloc) :
95 mModulo(size - 1), mShift(std::countr_zero(size)), mBuffer(size, alloc) {
96 CHECK_MSG(size > 0 && (size & mModulo) == 0, "Size must be a power of two");
97 }
98 class Writer
99 {
101 public:
109 template<typename U>
110 bool Push(U&& data) {
111 size_t write = queue->mWrite.load(std::memory_order_relaxed);
112 while (true)
113 {
114 // Stick with the current write index until we succeed
115 auto& elem = queue->mBuffer[write & queue->mModulo];
116 size_t read_cycle = elem.readCycle.load(std::memory_order_acquire);
117 size_t write_cycle = elem.writeCycle.load(std::memory_order_acquire);
118 if (write_cycle > read_cycle) [[unlikely]] // Still not consumed
119 return false; // full
120 size_t cycle = write >> queue->mShift;
121 if (write_cycle == cycle) // Ready to write
122 {
123 // Bump the write index if we can, claiming the old index. Try later otherwise.
124 if (queue->mWrite.compare_exchange_weak(write, write + 1, std::memory_order_relaxed))
125 {
126 elem.data = std::forward<U>(data);
127 elem.writeCycle.store(cycle + 1, std::memory_order_release);
128 return true;
129 }
130 } else // Not our turn yet? So we must be an old write. Update and try again.
131 write = queue->mWrite.load(std::memory_order_relaxed);
132 }
133 }
134 };
138 Writer CreateWriter() { return Writer(this); }
139 class Reader
140 {
142 public:
151 bool Pop(T& out)
152 {
153 size_t read = queue->mRead.load(std::memory_order_relaxed);
154 while (true)
155 {
156 // Same as above
157 auto& elem = queue->mBuffer[read & queue->mModulo];
158 size_t read_cycle = elem.readCycle.load(std::memory_order_acquire);
159 size_t write_cycle = elem.writeCycle.load(std::memory_order_relaxed);
160 if (read_cycle >= write_cycle) [[unlikely]] // Still not written
161 return false; // empty
162 size_t cycle = read >> queue->mShift;
163 if (read_cycle == cycle) // Ready to read
164 {
165 // Same as above for reading
166 if (queue->mRead.compare_exchange_weak(read, read + 1, std::memory_order_relaxed))
167 {
168 out = std::move(elem.data);
169 elem.readCycle.store(cycle + 1, std::memory_order_release);
170 return true;
171 }
172 } else // Old write
173 read = queue->mRead.load(std::memory_order_relaxed);
174 }
175 };
176 };
180 Reader CreateReader() { return Reader(this); }
181 };
182}
#define CHECK_MSG(expr, format_str,...)
Definition Logging.hpp:31
bool Pop(T &out)
Try to pop data from the queue.
Definition Queue.hpp:151
MPMCQueue *const queue
Definition Queue.hpp:141
Reader(MPMCQueue *queue)
Definition Queue.hpp:143
Writer(MPMCQueue *queue)
Definition Queue.hpp:102
bool Push(U &&data)
Try to push data into the queue.
Definition Queue.hpp:110
MPMCQueue *const queue
Definition Queue.hpp:100
Atomic, bounded multi-producer multi-consumer FIFO ring buffer with a fixed maximum size.
Definition Queue.hpp:83
const size_t mModulo
Definition Queue.hpp:89
const size_t mShift
Definition Queue.hpp:89
Reader CreateReader()
Create a Reader for concurrent popping.
Definition Queue.hpp:180
Atomic< size_t > mRead
Definition Queue.hpp:91
size_t mWriteCached
Definition Queue.hpp:92
MPMCQueue(size_t size, Allocator *alloc)
Definition Queue.hpp:94
Writer CreateWriter()
Create a Writer for concurrent pushing.
Definition Queue.hpp:138
Vector< Data > mBuffer
Definition Queue.hpp:90
Atomic< size_t > mWrite
Definition Queue.hpp:91
Atomic, bounded single-producer single-consumer FIFO ring buffer with a fixed maximum size.
Definition Queue.hpp:14
const size_t mModulo
Definition Queue.hpp:15
SPSCQueue(size_t size, Allocator *alloc)
Construct the SPSC Queue.
Definition Queue.hpp:28
Atomic< size_t > mWrite
Definition Queue.hpp:17
size_t mReadCached
Definition Queue.hpp:19
Vector< T > mBuffer
Definition Queue.hpp:16
bool Pop(T &out)
Try to pop data from the queue.
Definition Queue.hpp:62
size_t mWriteCached
Definition Queue.hpp:21
bool Push(U &&data)
Try to push data into the queue.
Definition Queue.hpp:39
Atomic< size_t > mRead
Definition Queue.hpp:17
General Purpose Allocator (GPA) interface.
Definition Allocator.hpp:24
Lock-free atomic primitives and implementations of data structures.
Definition Atomic.hpp:22
Allocators, Data Structures and introspection implementations.
Definition Allocator.hpp:5
std::unique_ptr< T, StlDeleter< T > > UniquePtr
std::unique_ptr with custom deleter that uses a Foundation::Core::Allocator to deallocate memory.
Definition Allocator.hpp:161
Definition Filesystem.hpp:36
Atomic< size_t > writeCycle
Definition Queue.hpp:87
Atomic< size_t > readCycle
Definition Queue.hpp:87
T data
Definition Queue.hpp:86