F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
ComQueue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title ComQueue.cpp
3 // \author vbai
4 // \brief cpp file for ComQueue component implementation class
5 // ======================================================================
6 
7 #include <Fw/Types/Assert.hpp>
9 #include <Fw/Com/ComPacket.hpp>
10 #include "Fw/Types/BasicTypes.hpp"
11 #include <type_traits>
12 
13 namespace Svc {
14 
15 // ----------------------------------------------------------------------
16 // Construction, initialization, and destruction
17 // ----------------------------------------------------------------------
18 
19 using FwUnsignedIndexType = std::make_unsigned<FwIndexType>::type;
20 
21 ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
22  static_assert(static_cast<FwUnsignedIndexType>(std::numeric_limits<FwIndexType>::max()) >= FW_NUM_ARRAY_ELEMENTS(this->entries),
23  "Number of entries must fit into FwIndexType");
24  for (FwIndexType i = 0; i < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->entries)); i++) {
25  this->entries[i].priority = 0;
26  this->entries[i].depth = 0;
27  }
28 }
29 
30 ComQueue ::ComQueue(const char* const compName)
31  : ComQueueComponentBase(compName),
32  m_state(WAITING),
33  m_allocationId(static_cast<FwEnumStoreType>(-1)),
34  m_allocator(nullptr),
35  m_allocation(nullptr) {
36  // Initialize throttles to "off"
37  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
38  this->m_throttle[i] = false;
39  }
40 }
41 
43 
45  // Deallocate memory ignoring error conditions
46  if ((this->m_allocator != nullptr) && (this->m_allocation != nullptr)) {
47  this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
48  }
49 }
50 
52  FwEnumStoreType allocationId,
53  Fw::MemAllocator& allocator) {
54  FwIndexType currentPriorityIndex = 0;
55  FwSizeType totalAllocation = 0;
56 
57  // Store/initialize allocator members
58  this->m_allocator = &allocator;
59  this->m_allocationId = allocationId;
60  this->m_allocation = nullptr;
61 
62  // Initializes the sorted queue metadata list in priority (sorted) order. This is accomplished by walking the
63  // priority values in priority order from 0 to TOTAL_PORT_COUNT. At each priory value, the supplied queue
64  // configuration table is walked and any entry matching the current priority values is used to add queue metadata to
65  // the prioritized list. This results in priority-sorted queue metadata objects that index back into the unsorted
66  // queue data structures.
67  //
68  // The total allocation size is tracked for passing to the allocation call and is a summation of
69  // (depth * message size) for each prioritized metadata object of (depth * message size)
70  for (FwIndexType currentPriority = 0; currentPriority < TOTAL_PORT_COUNT; currentPriority++) {
71  // Walk each queue configuration entry and add them into the prioritized metadata list when matching the current
72  // priority value
73  for (FwIndexType entryIndex = 0; entryIndex < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(queueConfig.entries)); entryIndex++) {
74  // Check for valid configuration entry
75  FW_ASSERT(queueConfig.entries[entryIndex].priority < TOTAL_PORT_COUNT,
76  static_cast<FwAssertArgType>(queueConfig.entries[entryIndex].priority),
77  static_cast<FwAssertArgType>(TOTAL_PORT_COUNT),
78  static_cast<FwAssertArgType>(entryIndex));
79 
80  if (currentPriority == queueConfig.entries[entryIndex].priority) {
81  // Set up the queue metadata object in order to track priority, depth, index into the queue list of the
82  // backing queue object, and message size. Both index and message size are calculated where priority and
83  // depth are copied from the configuration object.
84  QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
85  entry.priority = queueConfig.entries[entryIndex].priority;
86  entry.depth = queueConfig.entries[entryIndex].depth;
87  entry.index = entryIndex;
88  // Message size is determined by the type of object being stored, which in turn is determined by the
89  // index of the entry. Those lower than COM_PORT_COUNT are Fw::ComBuffers and those larger Fw::Buffer.
90  entry.msgSize = (entryIndex < COM_PORT_COUNT) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
91  // Overflow checks
92  FW_ASSERT((std::numeric_limits<FwSizeType>::max()/entry.depth) >= entry.msgSize,
93  static_cast<FwAssertArgType>(entry.depth),
94  static_cast<FwAssertArgType>(entry.msgSize));
95  FW_ASSERT(std::numeric_limits<FwSizeType>::max() - (entry.depth * entry.msgSize) >= totalAllocation);
96  totalAllocation += entry.depth * entry.msgSize;
97  currentPriorityIndex++;
98  }
99  }
100  }
101  // Allocate a single chunk of memory from the memory allocator. Memory recover is neither needed nor used.
102  bool recoverable = false;
103  this->m_allocation = this->m_allocator->allocate(this->m_allocationId, totalAllocation, recoverable);
104 
105  // Each of the backing queue objects must be supplied memory to store the queued messages. These data regions are
106  // sub-portions of the total allocated data. This memory is passed out by looping through each queue in prioritized
107  // order and passing out the memory to each queue's setup method.
108  FwSizeType allocationOffset = 0;
109  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
110  // Get current queue's allocation size and safety check the values
111  FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
112  FW_ASSERT(this->m_prioritizedList[i].index < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->m_queues)),
113  static_cast<FwAssertArgType>(this->m_prioritizedList[i].index));
114  FW_ASSERT(
115  (allocationSize + allocationOffset) <= totalAllocation,
116  static_cast<FwAssertArgType>(allocationSize),
117  static_cast<FwAssertArgType>(allocationOffset),
118  static_cast<FwAssertArgType>(totalAllocation));
119 
120  // Setup queue's memory allocation, depth, and message size. Setup is skipped for a depth 0 queue
121  if (allocationSize > 0) {
122  this->m_queues[this->m_prioritizedList[i].index].setup(
123  reinterpret_cast<U8*>(this->m_allocation) + allocationOffset, allocationSize,
124  this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
125  }
126  allocationOffset += allocationSize;
127  }
128  // Safety check that all memory was used as expected
129  FW_ASSERT(
130  allocationOffset == totalAllocation,
131  static_cast<FwAssertArgType>(allocationOffset),
132  static_cast<FwAssertArgType>(totalAllocation));
133 }
134 // ----------------------------------------------------------------------
135 // Handler implementations for user-defined typed input ports
136 // ----------------------------------------------------------------------
137 
138 void ComQueue::comPacketQueueIn_handler(const FwIndexType portNum, Fw::ComBuffer& data, U32 context) {
139  // Ensure that the port number of comPacketQueueIn is consistent with the expectation
140  FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
141  (void)this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
142 }
143 
144 void ComQueue::bufferQueueIn_handler(const FwIndexType portNum, Fw::Buffer& fwBuffer) {
145  FW_ASSERT(std::numeric_limits<FwIndexType>::max() - COM_PORT_COUNT > portNum);
146  const FwIndexType queueNum = static_cast<FwIndexType>(portNum + COM_PORT_COUNT);
147  // Ensure that the port number of bufferQueueIn is consistent with the expectation
148  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
149  FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
150  bool success =
151  this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
152  if (!success) {
153  this->bufferReturnOut_out(portNum, fwBuffer);
154  }
155 }
156 
157 void ComQueue::comStatusIn_handler(const FwIndexType portNum, Fw::Success& condition) {
158  switch (this->m_state) {
159  // On success, the queue should be processed. On failure, the component should still wait.
160  case WAITING:
161  if (condition.e == Fw::Success::SUCCESS) {
162  this->m_state = READY;
163  this->processQueue();
164  // A message may or may not be sent. Thus, READY or WAITING are acceptable final states.
165  FW_ASSERT((this->m_state == WAITING || this->m_state == READY), static_cast<FwAssertArgType>(this->m_state));
166  } else {
167  this->m_state = WAITING;
168  }
169  break;
170  // Both READY and unknown states should not be possible at this point. To receive a status message we must be
171  // one of the WAITING or RETRY states.
172  default:
173  FW_ASSERT(0, static_cast<FwAssertArgType>(this->m_state));
174  break;
175  }
176 }
177 
178 void ComQueue::run_handler(const FwIndexType portNum, U32 context) {
179  // Downlink the high-water marks for the Fw::ComBuffer array types
180  ComQueueDepth comQueueDepth;
181  for (U32 i = 0; i < comQueueDepth.SIZE; i++) {
182  comQueueDepth[i] = static_cast<U32>(this->m_queues[i].get_high_water_mark());
183  this->m_queues[i].clear_high_water_mark();
184  }
185  this->tlmWrite_comQueueDepth(comQueueDepth);
186 
187  // Downlink the high-water marks for the Fw::Buffer array types
188  BuffQueueDepth buffQueueDepth;
189  for (U32 i = 0; i < buffQueueDepth.SIZE; i++) {
190  buffQueueDepth[i] = static_cast<U32>(this->m_queues[i + COM_PORT_COUNT].get_high_water_mark());
191  this->m_queues[i + COM_PORT_COUNT].clear_high_water_mark();
192  }
193  this->tlmWrite_buffQueueDepth(buffQueueDepth);
194 }
195 
196 void ComQueue ::bufferReturnIn_handler(FwIndexType portNum,
197  Fw::Buffer& data,
198  const ComCfg::FrameContext& context) {
199  static_assert(std::numeric_limits<FwIndexType>::is_signed, "FwIndexType must be signed");
200  // For the buffer queues, the index of the queue is portNum offset by COM_PORT_COUNT since
201  // the first COM_PORT_COUNT queues are for ComBuffer. So we have for buffer queues:
202  // queueNum = portNum + COM_PORT_COUNT
203  // Since queueNum is used as APID, we can retrieve the original portNum like such:
204  FwIndexType bufferReturnPortNum = static_cast<FwIndexType>(context.getcomQueueIndex() - ComQueue::COM_PORT_COUNT);
205  // Failing this assert means that context.apid was modified since ComQueue set it, which should not happen
206  FW_ASSERT(bufferReturnPortNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(bufferReturnPortNum));
207  if (bufferReturnPortNum >= 0) {
208  // It is a coding error not to connect the associated bufferReturnOut port for each bufferReturnIn port
209  FW_ASSERT(this->isConnected_bufferReturnOut_OutputPort(bufferReturnPortNum), static_cast<FwAssertArgType>(bufferReturnPortNum));
210  // If this is a buffer port, return the buffer to the BufferDownlink
211  this->bufferReturnOut_out(bufferReturnPortNum, data);
212  }
213 }
214 
215 // ----------------------------------------------------------------------
216 // Hook implementations for typed async input ports
217 // ----------------------------------------------------------------------
218 
219 void ComQueue::bufferQueueIn_overflowHook(FwIndexType portNum, Fw::Buffer& fwBuffer) {
220  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
221  this->bufferReturnOut_out(portNum, fwBuffer);
222 }
223 
224 // ----------------------------------------------------------------------
225 // Private helper methods
226 // ----------------------------------------------------------------------
227 
228 bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
229  // Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
230  // set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
231  const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
232  FW_ASSERT((queueType == QueueType::COM_QUEUE) || (queueNum >= COM_PORT_COUNT),
233  static_cast<FwAssertArgType>(queueType), static_cast<FwAssertArgType>(queueNum));
234  const FwIndexType portNum = static_cast<FwIndexType>(queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT));
235  bool rvStatus = true;
236  FW_ASSERT(
237  expectedSize == size,
238  static_cast<FwAssertArgType>(size),
239  static_cast<FwAssertArgType>(expectedSize));
240  FW_ASSERT(portNum >= 0, static_cast<FwAssertArgType>(portNum));
241  Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
242  if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT) {
243  if (!this->m_throttle[queueNum]) {
244  this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
245  this->m_throttle[queueNum] = true;
246  }
247 
248  rvStatus = false;
249  }
250  // When the component is already in READY state process the queue to send out the next available message immediately
251  if (this->m_state == READY) {
252  this->processQueue();
253  }
254 
255  return rvStatus;
256 }
257 
258 void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer, FwIndexType queueIndex) {
259  FW_ASSERT(this->m_state == READY);
260 
261  Fw::Buffer outBuffer(comBuffer.getBuffAddr(), static_cast<Fw::Buffer::SizeType>(comBuffer.getBuffLength()));
262 
263  // Context APID is set to the queue index for now. A future implementation may want this to be configurable
264  ComCfg::FrameContext context;
265  context.setcomQueueIndex(queueIndex);
266  this->queueSend_out(0, outBuffer, context);
267  // Set state to WAITING for the status to come back
268  this->m_state = WAITING;
269 }
270 
271 void ComQueue::sendBuffer(Fw::Buffer& buffer, FwIndexType queueIndex) {
272  // Retry buffer expected to be cleared as we are either transferring ownership or have already deallocated it.
273  FW_ASSERT(this->m_state == READY);
274 
275  // Context APID is set to the queue index for now. A future implementation may want this to be configurable
276  ComCfg::FrameContext context;
277  context.setcomQueueIndex(queueIndex);
278  this->queueSend_out(0, buffer, context);
279 
280  // Set state to WAITING for the status to come back
281  this->m_state = WAITING;
282 }
283 
284 void ComQueue::processQueue() {
285  FwIndexType priorityIndex = 0;
286  FwIndexType sendPriority = 0;
287  // Check that we are in the appropriate state
288  FW_ASSERT(this->m_state == READY);
289 
290  // Walk all the queues in priority order. Send the first message that is available in priority order. No balancing
291  // is done within this loop.
292  for (priorityIndex = 0; priorityIndex < TOTAL_PORT_COUNT; priorityIndex++) {
293  QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
294  Types::Queue& queue = this->m_queues[entry.index];
295 
296  // Continue onto next prioritized queue if there is no items in the current queue
297  if (queue.getQueueSize() == 0) {
298  continue;
299  }
300 
301  // Send out the message based on the type
302  if (entry.index < COM_PORT_COUNT) {
303  Fw::ComBuffer comBuffer;
304  queue.dequeue(reinterpret_cast<U8*>(&comBuffer), sizeof(comBuffer));
305  this->sendComBuffer(comBuffer, entry.index);
306  } else {
307  Fw::Buffer buffer;
308  queue.dequeue(reinterpret_cast<U8*>(&buffer), sizeof(buffer));
309  this->sendBuffer(buffer, entry.index);
310  }
311 
312  // Update the throttle and the index that was just sent
313  this->m_throttle[entry.index] = false;
314 
315  // Priority used in the next loop
316  sendPriority = entry.priority;
317  break;
318  }
319 
320  // Starting on the priority entry after the one dispatched and continuing through the end of the set of entries that
321  // share the same priority, rotate those entries such that the currently dispatched queue is last and the rest are
322  // shifted up by one. This effectively round-robins the queues of the same priority.
323  for (priorityIndex++;
324  priorityIndex < TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
325  priorityIndex++) {
326  // Swap the previous entry with this one.
327  QueueMetadata temp = this->m_prioritizedList[priorityIndex];
328  this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
329  this->m_prioritizedList[priorityIndex - 1] = temp;
330  }
331 }
332 } // end namespace Svc
std::make_unsigned< FwIndexType >::type FwUnsignedIndexType
Definition: ComQueue.cpp:19
Representing success.
PlatformSizeType FwSizeType
void tlmWrite_buffQueueDepth(const Svc::BuffQueueDepth &arg, Fw::Time _tlmTime=Fw::Time()) const
T e
The raw enum value.
I32 FwEnumStoreType
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]
Definition: ComQueue.hpp:63
configuration table for each queue
Definition: ComQueue.hpp:62
virtual void * allocate(const FwEnumStoreType identifier, FwSizeType &size, bool &recoverable)=0
Allocate memory.
static const FwIndexType TOTAL_PORT_COUNT
Total count of input buffer ports and thus total queues.
Definition: ComQueue.hpp:35
ComQueue(const char *const compName)
Definition: ComQueue.cpp:30
No room left in the buffer to serialize data.
void clear_high_water_mark()
Definition: Queue.cpp:56
bool isConnected_bufferReturnOut_OutputPort(FwIndexType portNum)
void cleanup()
Definition: ComQueue.cpp:44
static const FwIndexType BUFFER_PORT_COUNT
Definition: ComQueue.hpp:30
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
Definition: ComQueue.hpp:27
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Definition: Queue.cpp:17
SerializeStatus
forward declaration for string
Serializable::SizeType getBuffLength() const
returns current buffer size
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
Definition: ComQueue.hpp:50
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
Definition: Queue.cpp:29
FwSizeType depth
Depth of the queue [0, infinity)
Definition: ComQueue.hpp:49
void log_WARNING_HI_QueueOverflow(Svc::QueueType queueType, U32 index) const
U8 * getBuffAddr()
gets buffer address for data filling
Definition: ComBuffer.cpp:40
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:56
FwSizeType get_high_water_mark() const
Definition: Queue.cpp:51
PlatformIndexType FwIndexType
void queueSend_out(FwIndexType portNum, Fw::Buffer &data, const ComCfg::FrameContext &context)
Invoke output port queueSend.
C++ header for working with basic fprime types.
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
Definition: BasicTypes.h:93
Type used to pass context info between components during framing/deframing.
void tlmWrite_comQueueDepth(const Svc::ComQueueDepth &arg, Fw::Time _tlmTime=Fw::Time()) const
RateGroupDivider component implementation.
FwIndexType getcomQueueIndex() const
Get member comQueueIndex.
void setcomQueueIndex(FwIndexType comQueueIndex)
Set member comQueueIndex.
virtual void deallocate(const FwEnumStoreType identifier, void *ptr)=0
Deallocate memory.
FwSizeType getQueueSize() const
Definition: Queue.cpp:60
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Definition: Queue.cpp:38
U32 SizeType
The size type for a buffer.
Definition: Buffer.hpp:47
#define FW_ASSERT(...)
Definition: Assert.hpp:14
Success/Failure.
void configure(QueueConfigurationTable queueConfig, FwEnumStoreType allocationId, Fw::MemAllocator &allocator)
Definition: ComQueue.cpp:51
Auto-generated base for ComQueue component.
void bufferReturnOut_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferReturnOut.