Foundation
Loading...
Searching...
No Matches
AtomicQueue.hpp
Go to the documentation of this file.
1#pragma once
2#include <bit>
3#include "Allocator.hpp"
4#include "Atomic.hpp"
5#include "Container.hpp"
6namespace Foundation::Core
7{
12 template <typename T>
14 {
15 const size_t mModulo;
18 // Only used in reader thread
19 size_t mReadCached{};
20 // Only used in writer thread
21 size_t mWriteCached{};
22 public:
28 SPSCQueue(size_t size, Allocator* alloc) :
29 mModulo(size - 1), mBuffer(size, alloc) {
30 if ((size & mModulo) != 0)
31 throw std::runtime_error("Size must be a power of two");
32 }
39 template<typename U>
40 bool Push(U&& data)
41 {
42 size_t write = mWrite.load(std::memory_order_relaxed);
43 // Amortize atomic reads
44 // We don't need to update the read head everytime. Only when we think we're full.
45 if (write - mReadCached == mBuffer.size()) [[unlikely]]
46 {
47 // Wrapped. Next cycle could begin only when there's enough space
48 mReadCached = mRead.load(std::memory_order_acquire);
49 if (write - mReadCached == mBuffer.size())
50 return false; // full
51 }
52 mBuffer[write & mModulo] = std::forward<U>(data);
53 mWrite.store(write + 1, std::memory_order_release);
54 return true;
55 }
63 bool Pop(T& out)
64 {
65 size_t read = mRead.load(std::memory_order_relaxed);
66 // Same as above
67 if (read == mWriteCached) [[unlikely]]
68 {
69 mWriteCached = mWrite.load(std::memory_order_acquire);
70 if (read == mWriteCached)
71 return false; // empty
72 }
73 mRead.store(read + 1, std::memory_order_release);
74 out = std::move(mBuffer[read & mModulo]);
75 return true;
76 }
77 };
82 template<typename T>
84 {
85 struct Data
86 {
88 Atomic<size_t> writeCycle{} /* when to write */ , readCycle{} /* when to read */;
89 };
90 const size_t mModulo, mShift;
93 size_t mWriteCached{};
94 public:
95 MPMCQueue(size_t size, Allocator* alloc) :
96 mModulo(size - 1), mShift(std::countr_zero(size)), mBuffer(size, alloc) {
97 if ((size & mModulo) != 0)
98 throw std::runtime_error("Size must be a power of two");
99 }
100 template<typename U>
101 bool Push(U&& data) {
102 size_t write = mWrite.load(std::memory_order_relaxed);
103 while (true)
104 {
105 // Stick with the current write index until we succeed
106 auto& elem = mBuffer[write & mModulo];
107 size_t read_cycle = elem.readCycle.load(std::memory_order_acquire);
108 size_t write_cycle = elem.writeCycle.load(std::memory_order_acquire);
109 if (write_cycle > read_cycle) [[unlikely]] // Still not consumed
110 return false; // full
111 size_t cycle = write >> mShift;
112 if (write_cycle == cycle) // Ready to write
113 {
114 // Bump the write index if we can, claiming the old index. Try later otherwise.
115 if (mWrite.compare_exchange_weak(write, write + 1, std::memory_order_relaxed))
116 {
117 elem.data = std::forward<U>(data);
118 elem.writeCycle.store(cycle + 1, std::memory_order_release);
119 return true;
120 }
121 } // Not our turn yet? So we must be an old write. Update and try again. CAS does this already.
122 }
123 }
124 bool Pop(T& out)
125 {
126 size_t read = mRead.load(std::memory_order_relaxed);
127 while (true)
128 {
129 // Same as above
130 auto& elem = mBuffer[read & mModulo];
131 size_t read_cycle = elem.readCycle.load(std::memory_order_acquire);
132 size_t write_cycle = elem.writeCycle.load(std::memory_order_relaxed);
133 if (read_cycle >= write_cycle) [[unlikely]] // Still not written
134 return false; // empty
135 size_t cycle = read >> mShift;
136 if (read_cycle == cycle) // Ready to read
137 {
138 // Same as above for reading
139 if (mRead.compare_exchange_weak(read, read + 1, std::memory_order_relaxed))
140 {
141 out = std::move(elem.data);
142 elem.readCycle.store(cycle + 1, std::memory_order_release);
143 return true;
144 }
145 } // Old write otherwise
146 }
147 }
148 };
149}
General Purpose Allocator (GPA) interface.
Definition Allocator.hpp:24
Atomic, bounded multi-producer multi-consumer FIFO ring buffer with a fixed maximum size.
Definition AtomicQueue.hpp:84
Vector< Data > mBuffer
Definition AtomicQueue.hpp:91
size_t mWriteCached
Definition AtomicQueue.hpp:93
MPMCQueue(size_t size, Allocator *alloc)
Definition AtomicQueue.hpp:95
Atomic< size_t > mRead
Definition AtomicQueue.hpp:92
bool Pop(T &out)
Definition AtomicQueue.hpp:124
const size_t mShift
Definition AtomicQueue.hpp:90
const size_t mModulo
Definition AtomicQueue.hpp:90
Atomic< size_t > mWrite
Definition AtomicQueue.hpp:92
bool Push(U &&data)
Definition AtomicQueue.hpp:101
Atomic, bounded single-producer single-consumer FIFO ring buffer with a fixed maximum size.
Definition AtomicQueue.hpp:14
size_t mReadCached
Definition AtomicQueue.hpp:19
const size_t mModulo
Definition AtomicQueue.hpp:15
bool Pop(T &out)
Try to pop data from the queue.
Definition AtomicQueue.hpp:63
Atomic< size_t > mWrite
Definition AtomicQueue.hpp:17
Atomic< size_t > mRead
Definition AtomicQueue.hpp:17
bool Push(U &&data)
Try to push data into the queue.
Definition AtomicQueue.hpp:40
size_t mWriteCached
Definition AtomicQueue.hpp:21
SPSCQueue(size_t size, Allocator *alloc)
Construct the SPSC Queue.
Definition AtomicQueue.hpp:28
Vector< T > mBuffer
Definition AtomicQueue.hpp:16
Lock-free atomic primitives and implementations of data structures.
Definition Allocator.hpp:5
std::vector< T, StlAllocator< T > > Vector
std::vector with explicit Foundation::Core::StlAllocator constructor
Definition Container.hpp:130
std::atomic< T > Atomic
Alias of std::atomic<T>.
Definition Atomic.hpp:26
T * Construct(Allocator *resource, Args &&...args)
Convenience placement new with object of type T using a Foundation::Core::Allocator.
Definition Allocator.hpp:149
Definition AtomicQueue.hpp:86
Atomic< size_t > writeCycle
Definition AtomicQueue.hpp:88
T data
Definition AtomicQueue.hpp:87
Atomic< size_t > readCycle
Definition AtomicQueue.hpp:88