F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
SpscQueue.hpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title SpscQueue.hpp
3 // \brief Lightweight wait-free non-allocating single producer single consumer queue
4 //
5 // This algorithm is lock-free, wait-free, thread-safe, and ISR-safe, but
6 // it relies on two restrictions to achieve these properties:
7 //
8 // 1. There may only be one producer thread, which is the thread
9 // that may call produce.
10 // 2. There may only be one consumer thread, which is the thread
11 // that may call consume and peek.
12 //
13 // For the purposes of this algorithm, an ISR can be considered to be a
14 // thread. In addition, multiple threads could share the responsibility of
15 // being a producer or the responsibility of being a consumer if another
16 // higher-level concurrency mechanism (like a Mutex) is used to ensure that
17 // there is only a single thread acting as producer or a single thread acting
18 // as consumer at any time.
19 //
20 // Attempting to produce from multiple threads or consume/peek from multiple
21 // threads without higher-level synchronization can lead to memory corruption.
22 //
23 // The isFull() and isEmpty() operations may be used from either the
24 // producer or consumer thread, but beware that the answer could potentially
25 // get outdated, depending on which thread calls it. Here are the valid
26 // uses:
27 //
28 // 1. If isEmpty() returns false when called by the consumer, then the
29 // next consume or peek operation is guaranteed to succeed.
30 // 2. If isFull() returns false when called by the producer, then the
31 // next produce operation is guaranteed to succeed.
32 //
33 // In addition, this algorithm does not dynamically allocate memory, making
34 // it robust for hard-real-time environments.
35 //
36 // ======================================================================
37 
38 #ifndef UTILS_TYPES_SPSC_QUEUE_HPP
39 #define UTILS_TYPES_SPSC_QUEUE_HPP
40 
41 #include <atomic>
42 
43 namespace Types {
44 
45 // Note: FwSizeType is probably generally larger than we need,
46 // but it should still be an efficient size to manipulate,
47 // and it's guaranteed to be unsigned, which is crucial.
48 template <class E, FwSizeType CAPACITY>
49 class SpscQueue {
50  public:
51  static_assert(CAPACITY * 2 <= std::numeric_limits<FwSizeType>::max(),
52  "This implementation distinguishes full and empty queues by using indices modulo CAPACITY * 2, "
53  "so CAPACITY * 2 must fit in the index type");
54 
55  SpscQueue() : m_elements{}, m_nextProduceIdx(0), m_nextConsumeIdx(0) {
56  FW_ASSERT(this->m_nextProduceIdx.is_lock_free() && this->m_nextConsumeIdx.is_lock_free());
57  }
58 
59  bool isFull() const {
60  return countElements(this->m_nextProduceIdx.load(), this->m_nextConsumeIdx.load()) == CAPACITY;
61  }
62 
63  bool isEmpty() const { return countElements(this->m_nextProduceIdx.load(), this->m_nextConsumeIdx.load()) == 0; }
64 
65  // May only be called by the single producer thread.
66  bool produce(const E& element) {
67  FwSizeType nextProduceIdx = this->m_nextProduceIdx.load();
68  FwSizeType nextConsumeIdx = this->m_nextConsumeIdx.load();
69 
70  if (countElements(nextProduceIdx, nextConsumeIdx) == CAPACITY) {
71  return false;
72  }
73 
74  this->m_elements[nextProduceIdx % CAPACITY] = element;
75  this->m_nextProduceIdx.store((nextProduceIdx + 1) % (CAPACITY * 2));
76  return true;
77  }
78 
79  // May only be called by the single consumer thread.
80  bool consume(E& elementOut) {
81  FwSizeType nextProduceIdx = this->m_nextProduceIdx.load();
82  FwSizeType nextConsumeIdx = this->m_nextConsumeIdx.load();
83 
84  if (countElements(nextProduceIdx, nextConsumeIdx) == 0) {
85  return false;
86  }
87 
88  elementOut = this->m_elements[nextConsumeIdx % CAPACITY];
89  this->m_nextConsumeIdx.store((nextConsumeIdx + 1) % (CAPACITY * 2));
90  return true;
91  }
92 
93  // May only be called by the single consumer thread.
94  bool peek(E& elementOut) const {
95  FwSizeType nextProduceIdx = this->m_nextProduceIdx.load();
96  FwSizeType nextConsumeIdx = this->m_nextConsumeIdx.load();
97 
98  if (countElements(nextProduceIdx, nextConsumeIdx) == 0) {
99  return false;
100  }
101 
102  elementOut = this->m_elements[nextConsumeIdx % CAPACITY];
103  return true;
104  }
105 
106  // May only be called by the single consumer thread.
107  bool consume() {
108  E ignored;
109  return consume(ignored);
110  }
111 
112  private:
113  E m_elements[CAPACITY];
114  std::atomic<FwSizeType> m_nextProduceIdx;
115  std::atomic<FwSizeType> m_nextConsumeIdx;
116 
117  static FwSizeType countElements(FwSizeType nextProduceIdx, FwSizeType nextConsumeIdx) {
118  FwSizeType count = (nextProduceIdx - nextConsumeIdx + CAPACITY * 2) % (CAPACITY * 2);
119  FW_ASSERT(count <= CAPACITY, nextProduceIdx, nextConsumeIdx, count, CAPACITY);
120  return count;
121  }
122 };
123 
124 } // namespace Types
125 
126 #endif
PlatformSizeType FwSizeType
bool peek(E &elementOut) const
Definition: SpscQueue.hpp:94
bool isFull() const
Definition: SpscQueue.hpp:59
bool produce(const E &element)
Definition: SpscQueue.hpp:66
bool consume(E &elementOut)
Definition: SpscQueue.hpp:80
#define FW_ASSERT(...)
Definition: Assert.hpp:14
bool isEmpty() const
Definition: SpscQueue.hpp:63