F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
ComQueue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title ComQueue.cpp
3 // \author vbai
4 // \brief cpp file for ComQueue component implementation class
5 // ======================================================================
6 
7 #include <Fw/Com/ComPacket.hpp>
8 #include <Fw/Types/Assert.hpp>
10 #include <type_traits>
11 #include "Fw/Types/BasicTypes.hpp"
12 
13 namespace Svc {
14 
15 // ----------------------------------------------------------------------
16 // Construction, initialization, and destruction
17 // ----------------------------------------------------------------------
18 
19 using FwUnsignedIndexType = std::make_unsigned<FwIndexType>::type;
20 
21 ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
22  static_assert(static_cast<FwUnsignedIndexType>(std::numeric_limits<FwIndexType>::max()) >=
23  FW_NUM_ARRAY_ELEMENTS(this->entries),
24  "Number of entries must fit into FwIndexType");
25  for (FwIndexType i = 0; i < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->entries)); i++) {
26  this->entries[i].priority = 0;
27  this->entries[i].depth = 0;
28  }
29 }
30 
31 ComQueue ::ComQueue(const char* const compName)
32  : ComQueueComponentBase(compName),
33  m_state(WAITING),
34  m_buffer_state(OWNED),
35  m_allocationId(static_cast<FwEnumStoreType>(-1)),
36  m_allocator(nullptr),
37  m_allocation(nullptr) {
38  // Initialize throttles to "off"
39  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
40  this->m_throttle[i] = false;
41  }
42 }
43 
45 
47  // Deallocate memory ignoring error conditions
48  if ((this->m_allocator != nullptr) && (this->m_allocation != nullptr)) {
49  this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
50  }
51 }
52 
54  FwEnumStoreType allocationId,
55  Fw::MemAllocator& allocator) {
56  FwIndexType currentPriorityIndex = 0;
57  FwSizeType totalAllocation = 0;
58 
59  // Store/initialize allocator members
60  this->m_allocator = &allocator;
61  this->m_allocationId = allocationId;
62  this->m_allocation = nullptr;
63 
64  // Initializes the sorted queue metadata list in priority (sorted) order. This is accomplished by walking the
65  // priority values in priority order from 0 to TOTAL_PORT_COUNT. At each priory value, the supplied queue
66  // configuration table is walked and any entry matching the current priority values is used to add queue metadata to
67  // the prioritized list. This results in priority-sorted queue metadata objects that index back into the unsorted
68  // queue data structures.
69  //
70  // The total allocation size is tracked for passing to the allocation call and is a summation of
71  // (depth * message size) for each prioritized metadata object of (depth * message size)
72  for (FwIndexType currentPriority = 0; currentPriority < TOTAL_PORT_COUNT; currentPriority++) {
73  // Walk each queue configuration entry and add them into the prioritized metadata list when matching the current
74  // priority value
75  for (FwIndexType entryIndex = 0;
76  entryIndex < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(queueConfig.entries)); entryIndex++) {
77  // Check for valid configuration entry
78  FW_ASSERT(queueConfig.entries[entryIndex].priority < TOTAL_PORT_COUNT,
79  static_cast<FwAssertArgType>(queueConfig.entries[entryIndex].priority),
80  static_cast<FwAssertArgType>(TOTAL_PORT_COUNT), static_cast<FwAssertArgType>(entryIndex));
81 
82  if (currentPriority == queueConfig.entries[entryIndex].priority) {
83  // Set up the queue metadata object in order to track priority, depth, index into the queue list of the
84  // backing queue object, and message size. Both index and message size are calculated where priority and
85  // depth are copied from the configuration object.
86  QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
87  entry.priority = queueConfig.entries[entryIndex].priority;
88  entry.depth = queueConfig.entries[entryIndex].depth;
89  entry.index = entryIndex;
90  // Message size is determined by the type of object being stored, which in turn is determined by the
91  // index of the entry. Those lower than COM_PORT_COUNT are Fw::ComBuffers and those larger Fw::Buffer.
92  entry.msgSize = (entryIndex < COM_PORT_COUNT) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
93  // Overflow checks
94  FW_ASSERT((std::numeric_limits<FwSizeType>::max() / entry.depth) >= entry.msgSize,
95  static_cast<FwAssertArgType>(entry.depth), static_cast<FwAssertArgType>(entry.msgSize));
96  FW_ASSERT(std::numeric_limits<FwSizeType>::max() - (entry.depth * entry.msgSize) >= totalAllocation);
97  totalAllocation += entry.depth * entry.msgSize;
98  currentPriorityIndex++;
99  }
100  }
101  }
102  // Allocate a single chunk of memory from the memory allocator. Memory recover is neither needed nor used.
103  bool recoverable = false;
104  this->m_allocation = this->m_allocator->allocate(this->m_allocationId, totalAllocation, recoverable);
105 
106  // Each of the backing queue objects must be supplied memory to store the queued messages. These data regions are
107  // sub-portions of the total allocated data. This memory is passed out by looping through each queue in prioritized
108  // order and passing out the memory to each queue's setup method.
109  FwSizeType allocationOffset = 0;
110  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
111  // Get current queue's allocation size and safety check the values
112  FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
113  FW_ASSERT(this->m_prioritizedList[i].index < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->m_queues)),
114  static_cast<FwAssertArgType>(this->m_prioritizedList[i].index));
115  FW_ASSERT((allocationSize + allocationOffset) <= totalAllocation, static_cast<FwAssertArgType>(allocationSize),
116  static_cast<FwAssertArgType>(allocationOffset), static_cast<FwAssertArgType>(totalAllocation));
117 
118  // Setup queue's memory allocation, depth, and message size. Setup is skipped for a depth 0 queue
119  if (allocationSize > 0) {
120  this->m_queues[this->m_prioritizedList[i].index].setup(
121  reinterpret_cast<U8*>(this->m_allocation) + allocationOffset, allocationSize,
122  this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
123  }
124  allocationOffset += allocationSize;
125  }
126  // Safety check that all memory was used as expected
127  FW_ASSERT(allocationOffset == totalAllocation, static_cast<FwAssertArgType>(allocationOffset),
128  static_cast<FwAssertArgType>(totalAllocation));
129 }
130 // ----------------------------------------------------------------------
131 // Handler implementations for user-defined typed input ports
132 // ----------------------------------------------------------------------
133 
134 void ComQueue::comPacketQueueIn_handler(const FwIndexType portNum, Fw::ComBuffer& data, U32 context) {
135  // Ensure that the port number of comPacketQueueIn is consistent with the expectation
136  FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
137  (void)this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
138 }
139 
140 void ComQueue::bufferQueueIn_handler(const FwIndexType portNum, Fw::Buffer& fwBuffer) {
141  FW_ASSERT(std::numeric_limits<FwIndexType>::max() - COM_PORT_COUNT > portNum);
142  const FwIndexType queueNum = static_cast<FwIndexType>(portNum + COM_PORT_COUNT);
143  // Ensure that the port number of bufferQueueIn is consistent with the expectation
144  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
145  FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
146  bool success =
147  this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
148  if (!success) {
149  this->bufferReturnOut_out(portNum, fwBuffer);
150  }
151 }
152 
153 void ComQueue::comStatusIn_handler(const FwIndexType portNum, Fw::Success& condition) {
154  switch (this->m_state) {
155  // On success, the queue should be processed. On failure, the component should still wait.
156  case WAITING:
157  if (condition.e == Fw::Success::SUCCESS) {
158  this->m_state = READY;
159  this->processQueue();
160  // A message may or may not be sent. Thus, READY or WAITING are acceptable final states.
161  FW_ASSERT((this->m_state == WAITING || this->m_state == READY),
162  static_cast<FwAssertArgType>(this->m_state));
163  } else {
164  this->m_state = WAITING;
165  }
166  break;
167  // Both READY and unknown states should not be possible at this point. To receive a status message we must be
168  // one of the WAITING or RETRY states.
169  default:
170  FW_ASSERT(0, static_cast<FwAssertArgType>(this->m_state));
171  break;
172  }
173 }
174 
175 void ComQueue::run_handler(const FwIndexType portNum, U32 context) {
176  // Downlink the high-water marks for the Fw::ComBuffer array types
177  ComQueueDepth comQueueDepth;
178  for (U32 i = 0; i < comQueueDepth.SIZE; i++) {
179  comQueueDepth[i] = static_cast<U32>(this->m_queues[i].get_high_water_mark());
180  this->m_queues[i].clear_high_water_mark();
181  }
182  this->tlmWrite_comQueueDepth(comQueueDepth);
183 
184  // Downlink the high-water marks for the Fw::Buffer array types
185  BuffQueueDepth buffQueueDepth;
186  for (U32 i = 0; i < buffQueueDepth.SIZE; i++) {
187  buffQueueDepth[i] = static_cast<U32>(this->m_queues[i + COM_PORT_COUNT].get_high_water_mark());
188  this->m_queues[i + COM_PORT_COUNT].clear_high_water_mark();
189  }
190  this->tlmWrite_buffQueueDepth(buffQueueDepth);
191 }
192 
193 void ComQueue ::dataReturnIn_handler(FwIndexType portNum, Fw::Buffer& data, const ComCfg::FrameContext& context) {
194  static_assert(std::numeric_limits<FwIndexType>::is_signed, "FwIndexType must be signed");
195  FW_ASSERT(this->m_buffer_state == UNOWNED);
196  this->m_buffer_state = OWNED;
197  // For the buffer queues, the index of the queue is portNum offset by COM_PORT_COUNT since
198  // the first COM_PORT_COUNT queues are for ComBuffer. So we have for buffer queues:
199  // queueNum = portNum + COM_PORT_COUNT
200  // Since queueNum is used as APID, we can retrieve the original portNum like such:
201  FwIndexType bufferReturnPortNum = static_cast<FwIndexType>(context.get_comQueueIndex() - ComQueue::COM_PORT_COUNT);
202  // Failing this assert means that context.apid was modified since ComQueue set it, which should not happen
203  FW_ASSERT(bufferReturnPortNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(bufferReturnPortNum));
204  if (bufferReturnPortNum >= 0) {
205  // It is a coding error not to connect the associated bufferReturnOut port for each dataReturnIn port
206  FW_ASSERT(this->isConnected_bufferReturnOut_OutputPort(bufferReturnPortNum),
207  static_cast<FwAssertArgType>(bufferReturnPortNum));
208  // If this is a buffer port, return the buffer to the BufferDownlink
209  this->bufferReturnOut_out(bufferReturnPortNum, data);
210  }
211 }
212 
213 // ----------------------------------------------------------------------
214 // Hook implementations for typed async input ports
215 // ----------------------------------------------------------------------
216 
217 void ComQueue::bufferQueueIn_overflowHook(FwIndexType portNum, Fw::Buffer& fwBuffer) {
218  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
219  this->bufferReturnOut_out(portNum, fwBuffer);
220 }
221 
222 // ----------------------------------------------------------------------
223 // Private helper methods
224 // ----------------------------------------------------------------------
225 
226 bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
227  // Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
228  // set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
229  const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
230  FW_ASSERT((queueType == QueueType::COM_QUEUE) || (queueNum >= COM_PORT_COUNT),
231  static_cast<FwAssertArgType>(queueType), static_cast<FwAssertArgType>(queueNum));
232  const FwIndexType portNum =
233  static_cast<FwIndexType>(queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT));
234  bool rvStatus = true;
235  FW_ASSERT(expectedSize == size, static_cast<FwAssertArgType>(size), static_cast<FwAssertArgType>(expectedSize));
236  FW_ASSERT(portNum >= 0, static_cast<FwAssertArgType>(portNum));
237  Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
238  if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT) {
239  if (!this->m_throttle[queueNum]) {
240  this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
241  this->m_throttle[queueNum] = true;
242  }
243 
244  rvStatus = false;
245  }
246  // When the component is already in READY state process the queue to send out the next available message immediately
247  if (this->m_state == READY) {
248  this->processQueue();
249  }
250 
251  return rvStatus;
252 }
253 
254 void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer, FwIndexType queueIndex) {
255  FW_ASSERT(this->m_state == READY);
256  Fw::Buffer outBuffer(comBuffer.getBuffAddr(), static_cast<Fw::Buffer::SizeType>(comBuffer.getBuffLength()));
257 
258  // Context value is used to determine what to do when the buffer returns on the dataReturnIn port
259  ComCfg::FrameContext context;
260  FwPacketDescriptorType descriptor;
261  Fw::SerializeStatus status = comBuffer.deserializeTo(descriptor);
262  FW_ASSERT(status == Fw::FW_SERIALIZE_OK, static_cast<FwAssertArgType>(status));
263  context.set_apid(static_cast<ComCfg::Apid::T>(descriptor));
264  context.set_comQueueIndex(queueIndex);
265  FW_ASSERT(this->m_buffer_state == OWNED);
266  this->m_buffer_state = UNOWNED;
267  this->dataOut_out(0, outBuffer, context);
268  // Set state to WAITING for the status to come back
269  this->m_state = WAITING;
270 }
271 
272 void ComQueue::sendBuffer(Fw::Buffer& buffer, FwIndexType queueIndex) {
273  // Retry buffer expected to be cleared as we are either transferring ownership or have already deallocated it.
274  FW_ASSERT(this->m_state == READY);
275 
276  // Context value is used to determine what to do when the buffer returns on the dataReturnIn port
277  ComCfg::FrameContext context;
278  FwPacketDescriptorType descriptor;
279  Fw::SerializeStatus status = buffer.getDeserializer().deserializeTo(descriptor);
280  FW_ASSERT(status == Fw::FW_SERIALIZE_OK, static_cast<FwAssertArgType>(status));
281  context.set_apid(static_cast<ComCfg::Apid::T>(descriptor));
282  context.set_comQueueIndex(queueIndex);
283  FW_ASSERT(this->m_buffer_state == OWNED);
284  this->m_buffer_state = UNOWNED;
285  this->dataOut_out(0, buffer, context);
286  // Set state to WAITING for the status to come back
287  this->m_state = WAITING;
288 }
289 
290 void ComQueue::processQueue() {
291  FwIndexType priorityIndex = 0;
292  FwIndexType sendPriority = 0;
293  // Check that we are in the appropriate state
294  FW_ASSERT(this->m_state == READY);
295 
296  // Walk all the queues in priority order. Send the first message that is available in priority order. No balancing
297  // is done within this loop.
298  for (priorityIndex = 0; priorityIndex < TOTAL_PORT_COUNT; priorityIndex++) {
299  QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
300  Types::Queue& queue = this->m_queues[entry.index];
301 
302  // Continue onto next prioritized queue if there is no items in the current queue
303  if (queue.getQueueSize() == 0) {
304  continue;
305  }
306 
307  // Send out the message based on the type
308  if (entry.index < COM_PORT_COUNT) {
309  // Dequeue is reading the whole persisted Fw::ComBuffer object from the queue's storage.
310  // thus it takes an address to the object to fill and the size of the actual object.
311  FW_ASSERT(this->m_buffer_state == OWNED);
312  auto dequeue_status =
313  queue.dequeue(reinterpret_cast<U8*>(&this->m_dequeued_com_buffer), sizeof(this->m_dequeued_com_buffer));
315  static_cast<FwAssertArgType>(dequeue_status));
316  this->sendComBuffer(this->m_dequeued_com_buffer, entry.index);
317  } else {
318  Fw::Buffer buffer;
319  auto dequeue_status = queue.dequeue(reinterpret_cast<U8*>(&buffer), sizeof(buffer));
321  static_cast<FwAssertArgType>(dequeue_status));
322  this->sendBuffer(buffer, entry.index);
323  }
324 
325  // Update the throttle and the index that was just sent
326  this->m_throttle[entry.index] = false;
327 
328  // Priority used in the next loop
329  sendPriority = entry.priority;
330  break;
331  }
332 
333  // Starting on the priority entry after the one dispatched and continuing through the end of the set of entries that
334  // share the same priority, rotate those entries such that the currently dispatched queue is last and the rest are
335  // shifted up by one. This effectively round-robins the queues of the same priority.
336  for (priorityIndex++;
337  priorityIndex < TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
338  priorityIndex++) {
339  // Swap the previous entry with this one.
340  QueueMetadata temp = this->m_prioritizedList[priorityIndex];
341  this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
342  this->m_prioritizedList[priorityIndex - 1] = temp;
343  }
344 }
345 } // end namespace Svc
Serialization/Deserialization operation was successful.
U16 FwPacketDescriptorType
The width of packet descriptors when they are serialized by the framework.
virtual void * allocate(const FwEnumStoreType identifier, FwSizeType &size, bool &recoverable, FwSizeType alignment=alignof(std::max_align_t))=0
std::make_unsigned< FwIndexType >::type FwUnsignedIndexType
Definition: ComQueue.cpp:19
Representing success.
PlatformSizeType FwSizeType
void tlmWrite_buffQueueDepth(const Svc::BuffQueueDepth &arg, Fw::Time _tlmTime=Fw::Time()) const
T e
The raw enum value.
I32 FwEnumStoreType
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]
Definition: ComQueue.hpp:66
configuration table for each queue
Definition: ComQueue.hpp:65
static const FwIndexType TOTAL_PORT_COUNT
Total count of input buffer ports and thus total queues.
Definition: ComQueue.hpp:38
ComQueue(const char *const compName)
Definition: ComQueue.cpp:31
void dataOut_out(FwIndexType portNum, Fw::Buffer &data, const ComCfg::FrameContext &context)
Invoke output port dataOut.
SerializeStatus deserializeTo(U8 &val, Endianness mode=Endianness::BIG)
deserialize 8-bit unsigned int
No room left in the buffer to serialize data.
void set_apid(ComCfg::Apid::T apid)
Set member apid.
void clear_high_water_mark()
Definition: Queue.cpp:52
bool isConnected_bufferReturnOut_OutputPort(FwIndexType portNum)
void cleanup()
Definition: ComQueue.cpp:46
static const FwIndexType BUFFER_PORT_COUNT
Definition: ComQueue.hpp:33
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
Definition: ComQueue.hpp:30
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Definition: Queue.cpp:17
FwIndexType get_comQueueIndex() const
Get member comQueueIndex.
SerializeStatus
forward declaration for string
ExternalSerializeBufferWithMemberCopy getDeserializer()
Definition: Buffer.cpp:105
Serializable::SizeType getBuffLength() const
returns current buffer size
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
Definition: ComQueue.hpp:53
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
Definition: Queue.cpp:29
FwSizeType depth
Depth of the queue [0, infinity)
Definition: ComQueue.hpp:52
void log_WARNING_HI_QueueOverflow(Svc::QueueType queueType, U32 index) const
U8 * getBuffAddr()
gets buffer address for data filling
Definition: ComBuffer.cpp:38
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:53
Memory Allocation base class.
void set_comQueueIndex(FwIndexType comQueueIndex)
Set member comQueueIndex.
FwSizeType get_high_water_mark() const
Definition: Queue.cpp:47
PlatformIndexType FwIndexType
FwSizeType SizeType
The size type for a buffer - for backwards compatibility.
Definition: Buffer.hpp:61
C++ header for working with basic fprime types.
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
Definition: BasicTypes.h:90
Type used to pass context info between components during framing/deframing.
void tlmWrite_comQueueDepth(const Svc::ComQueueDepth &arg, Fw::Time _tlmTime=Fw::Time()) const
RateGroupDivider component implementation.
virtual void deallocate(const FwEnumStoreType identifier, void *ptr)=0
FwSizeType getQueueSize() const
Definition: Queue.cpp:56
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Definition: Queue.cpp:36
#define FW_ASSERT(...)
Definition: Assert.hpp:14
Success/Failure.
void configure(QueueConfigurationTable queueConfig, FwEnumStoreType allocationId, Fw::MemAllocator &allocator)
Definition: ComQueue.cpp:53
Auto-generated base for ComQueue component.
void bufferReturnOut_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferReturnOut.