F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
ComQueue.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title ComQueue.cpp
3 // \author vbai
4 // \brief cpp file for ComQueue component implementation class
5 // ======================================================================
6 
7 #include <Fw/Com/ComPacket.hpp>
8 #include <Fw/Types/Assert.hpp>
10 #include <type_traits>
11 #include "Fw/Types/BasicTypes.hpp"
12 
13 namespace Svc {
14 
15 // ----------------------------------------------------------------------
16 // Construction, initialization, and destruction
17 // ----------------------------------------------------------------------
18 
19 using FwUnsignedIndexType = std::make_unsigned<FwIndexType>::type;
20 
21 ComQueue ::QueueConfigurationTable ::QueueConfigurationTable() {
22  static_assert(static_cast<FwUnsignedIndexType>(std::numeric_limits<FwIndexType>::max()) >=
23  FW_NUM_ARRAY_ELEMENTS(this->entries),
24  "Number of entries must fit into FwIndexType");
25  for (FwIndexType i = 0; i < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->entries)); i++) {
26  this->entries[i].priority = 0;
27  this->entries[i].depth = 0;
28  }
29 }
30 
31 ComQueue ::ComQueue(const char* const compName)
32  : ComQueueComponentBase(compName),
33  m_state(WAITING),
34  m_allocationId(static_cast<FwEnumStoreType>(-1)),
35  m_allocator(nullptr),
36  m_allocation(nullptr) {
37  // Initialize throttles to "off"
38  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
39  this->m_throttle[i] = false;
40  }
41 }
42 
44 
46  // Deallocate memory ignoring error conditions
47  if ((this->m_allocator != nullptr) && (this->m_allocation != nullptr)) {
48  this->m_allocator->deallocate(this->m_allocationId, this->m_allocation);
49  }
50 }
51 
53  FwEnumStoreType allocationId,
54  Fw::MemAllocator& allocator) {
55  FwIndexType currentPriorityIndex = 0;
56  FwSizeType totalAllocation = 0;
57 
58  // Store/initialize allocator members
59  this->m_allocator = &allocator;
60  this->m_allocationId = allocationId;
61  this->m_allocation = nullptr;
62 
63  // Initializes the sorted queue metadata list in priority (sorted) order. This is accomplished by walking the
64  // priority values in priority order from 0 to TOTAL_PORT_COUNT. At each priory value, the supplied queue
65  // configuration table is walked and any entry matching the current priority values is used to add queue metadata to
66  // the prioritized list. This results in priority-sorted queue metadata objects that index back into the unsorted
67  // queue data structures.
68  //
69  // The total allocation size is tracked for passing to the allocation call and is a summation of
70  // (depth * message size) for each prioritized metadata object of (depth * message size)
71  for (FwIndexType currentPriority = 0; currentPriority < TOTAL_PORT_COUNT; currentPriority++) {
72  // Walk each queue configuration entry and add them into the prioritized metadata list when matching the current
73  // priority value
74  for (FwIndexType entryIndex = 0;
75  entryIndex < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(queueConfig.entries)); entryIndex++) {
76  // Check for valid configuration entry
77  FW_ASSERT(queueConfig.entries[entryIndex].priority < TOTAL_PORT_COUNT,
78  static_cast<FwAssertArgType>(queueConfig.entries[entryIndex].priority),
79  static_cast<FwAssertArgType>(TOTAL_PORT_COUNT), static_cast<FwAssertArgType>(entryIndex));
80 
81  if (currentPriority == queueConfig.entries[entryIndex].priority) {
82  // Set up the queue metadata object in order to track priority, depth, index into the queue list of the
83  // backing queue object, and message size. Both index and message size are calculated where priority and
84  // depth are copied from the configuration object.
85  QueueMetadata& entry = this->m_prioritizedList[currentPriorityIndex];
86  entry.priority = queueConfig.entries[entryIndex].priority;
87  entry.depth = queueConfig.entries[entryIndex].depth;
88  entry.index = entryIndex;
89  // Message size is determined by the type of object being stored, which in turn is determined by the
90  // index of the entry. Those lower than COM_PORT_COUNT are Fw::ComBuffers and those larger Fw::Buffer.
91  entry.msgSize = (entryIndex < COM_PORT_COUNT) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
92  // Overflow checks
93  FW_ASSERT((std::numeric_limits<FwSizeType>::max() / entry.depth) >= entry.msgSize,
94  static_cast<FwAssertArgType>(entry.depth), static_cast<FwAssertArgType>(entry.msgSize));
95  FW_ASSERT(std::numeric_limits<FwSizeType>::max() - (entry.depth * entry.msgSize) >= totalAllocation);
96  totalAllocation += entry.depth * entry.msgSize;
97  currentPriorityIndex++;
98  }
99  }
100  }
101  // Allocate a single chunk of memory from the memory allocator. Memory recover is neither needed nor used.
102  bool recoverable = false;
103  this->m_allocation = this->m_allocator->allocate(this->m_allocationId, totalAllocation, recoverable);
104 
105  // Each of the backing queue objects must be supplied memory to store the queued messages. These data regions are
106  // sub-portions of the total allocated data. This memory is passed out by looping through each queue in prioritized
107  // order and passing out the memory to each queue's setup method.
108  FwSizeType allocationOffset = 0;
109  for (FwIndexType i = 0; i < TOTAL_PORT_COUNT; i++) {
110  // Get current queue's allocation size and safety check the values
111  FwSizeType allocationSize = this->m_prioritizedList[i].depth * this->m_prioritizedList[i].msgSize;
112  FW_ASSERT(this->m_prioritizedList[i].index < static_cast<FwIndexType>(FW_NUM_ARRAY_ELEMENTS(this->m_queues)),
113  static_cast<FwAssertArgType>(this->m_prioritizedList[i].index));
114  FW_ASSERT((allocationSize + allocationOffset) <= totalAllocation, static_cast<FwAssertArgType>(allocationSize),
115  static_cast<FwAssertArgType>(allocationOffset), static_cast<FwAssertArgType>(totalAllocation));
116 
117  // Setup queue's memory allocation, depth, and message size. Setup is skipped for a depth 0 queue
118  if (allocationSize > 0) {
119  this->m_queues[this->m_prioritizedList[i].index].setup(
120  reinterpret_cast<U8*>(this->m_allocation) + allocationOffset, allocationSize,
121  this->m_prioritizedList[i].depth, this->m_prioritizedList[i].msgSize);
122  }
123  allocationOffset += allocationSize;
124  }
125  // Safety check that all memory was used as expected
126  FW_ASSERT(allocationOffset == totalAllocation, static_cast<FwAssertArgType>(allocationOffset),
127  static_cast<FwAssertArgType>(totalAllocation));
128 }
129 // ----------------------------------------------------------------------
130 // Handler implementations for user-defined typed input ports
131 // ----------------------------------------------------------------------
132 
133 void ComQueue::comPacketQueueIn_handler(const FwIndexType portNum, Fw::ComBuffer& data, U32 context) {
134  // Ensure that the port number of comPacketQueueIn is consistent with the expectation
135  FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
136  (void)this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
137 }
138 
139 void ComQueue::bufferQueueIn_handler(const FwIndexType portNum, Fw::Buffer& fwBuffer) {
140  FW_ASSERT(std::numeric_limits<FwIndexType>::max() - COM_PORT_COUNT > portNum);
141  const FwIndexType queueNum = static_cast<FwIndexType>(portNum + COM_PORT_COUNT);
142  // Ensure that the port number of bufferQueueIn is consistent with the expectation
143  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
144  FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
145  bool success =
146  this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
147  if (!success) {
148  this->bufferReturnOut_out(portNum, fwBuffer);
149  }
150 }
151 
152 void ComQueue::comStatusIn_handler(const FwIndexType portNum, Fw::Success& condition) {
153  switch (this->m_state) {
154  // On success, the queue should be processed. On failure, the component should still wait.
155  case WAITING:
156  if (condition.e == Fw::Success::SUCCESS) {
157  this->m_state = READY;
158  this->processQueue();
159  // A message may or may not be sent. Thus, READY or WAITING are acceptable final states.
160  FW_ASSERT((this->m_state == WAITING || this->m_state == READY),
161  static_cast<FwAssertArgType>(this->m_state));
162  } else {
163  this->m_state = WAITING;
164  }
165  break;
166  // Both READY and unknown states should not be possible at this point. To receive a status message we must be
167  // one of the WAITING or RETRY states.
168  default:
169  FW_ASSERT(0, static_cast<FwAssertArgType>(this->m_state));
170  break;
171  }
172 }
173 
174 void ComQueue::run_handler(const FwIndexType portNum, U32 context) {
175  // Downlink the high-water marks for the Fw::ComBuffer array types
176  ComQueueDepth comQueueDepth;
177  for (U32 i = 0; i < comQueueDepth.SIZE; i++) {
178  comQueueDepth[i] = static_cast<U32>(this->m_queues[i].get_high_water_mark());
179  this->m_queues[i].clear_high_water_mark();
180  }
181  this->tlmWrite_comQueueDepth(comQueueDepth);
182 
183  // Downlink the high-water marks for the Fw::Buffer array types
184  BuffQueueDepth buffQueueDepth;
185  for (U32 i = 0; i < buffQueueDepth.SIZE; i++) {
186  buffQueueDepth[i] = static_cast<U32>(this->m_queues[i + COM_PORT_COUNT].get_high_water_mark());
187  this->m_queues[i + COM_PORT_COUNT].clear_high_water_mark();
188  }
189  this->tlmWrite_buffQueueDepth(buffQueueDepth);
190 }
191 
192 void ComQueue ::dataReturnIn_handler(FwIndexType portNum, Fw::Buffer& data, const ComCfg::FrameContext& context) {
193  static_assert(std::numeric_limits<FwIndexType>::is_signed, "FwIndexType must be signed");
194  // For the buffer queues, the index of the queue is portNum offset by COM_PORT_COUNT since
195  // the first COM_PORT_COUNT queues are for ComBuffer. So we have for buffer queues:
196  // queueNum = portNum + COM_PORT_COUNT
197  // Since queueNum is used as APID, we can retrieve the original portNum like such:
198  FwIndexType bufferReturnPortNum = static_cast<FwIndexType>(context.get_comQueueIndex() - ComQueue::COM_PORT_COUNT);
199  // Failing this assert means that context.apid was modified since ComQueue set it, which should not happen
200  FW_ASSERT(bufferReturnPortNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(bufferReturnPortNum));
201  if (bufferReturnPortNum >= 0) {
202  // It is a coding error not to connect the associated bufferReturnOut port for each dataReturnIn port
203  FW_ASSERT(this->isConnected_bufferReturnOut_OutputPort(bufferReturnPortNum),
204  static_cast<FwAssertArgType>(bufferReturnPortNum));
205  // If this is a buffer port, return the buffer to the BufferDownlink
206  this->bufferReturnOut_out(bufferReturnPortNum, data);
207  }
208 }
209 
210 // ----------------------------------------------------------------------
211 // Hook implementations for typed async input ports
212 // ----------------------------------------------------------------------
213 
214 void ComQueue::bufferQueueIn_overflowHook(FwIndexType portNum, Fw::Buffer& fwBuffer) {
215  FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, static_cast<FwAssertArgType>(portNum));
216  this->bufferReturnOut_out(portNum, fwBuffer);
217 }
218 
219 // ----------------------------------------------------------------------
220 // Private helper methods
221 // ----------------------------------------------------------------------
222 
223 bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
224  // Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
225  // set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
226  const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
227  FW_ASSERT((queueType == QueueType::COM_QUEUE) || (queueNum >= COM_PORT_COUNT),
228  static_cast<FwAssertArgType>(queueType), static_cast<FwAssertArgType>(queueNum));
229  const FwIndexType portNum =
230  static_cast<FwIndexType>(queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT));
231  bool rvStatus = true;
232  FW_ASSERT(expectedSize == size, static_cast<FwAssertArgType>(size), static_cast<FwAssertArgType>(expectedSize));
233  FW_ASSERT(portNum >= 0, static_cast<FwAssertArgType>(portNum));
234  Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
235  if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT) {
236  if (!this->m_throttle[queueNum]) {
237  this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
238  this->m_throttle[queueNum] = true;
239  }
240 
241  rvStatus = false;
242  }
243  // When the component is already in READY state process the queue to send out the next available message immediately
244  if (this->m_state == READY) {
245  this->processQueue();
246  }
247 
248  return rvStatus;
249 }
250 
251 void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer, FwIndexType queueIndex) {
252  FW_ASSERT(this->m_state == READY);
253  Fw::Buffer outBuffer(comBuffer.getBuffAddr(), static_cast<Fw::Buffer::SizeType>(comBuffer.getBuffLength()));
254 
255  // Context value is used to determine what to do when the buffer returns on the dataReturnIn port
256  ComCfg::FrameContext context;
257  FwPacketDescriptorType descriptor;
258  Fw::SerializeStatus status = comBuffer.deserializeTo(descriptor);
259  FW_ASSERT(status == Fw::FW_SERIALIZE_OK, static_cast<FwAssertArgType>(status));
260  context.set_apid(static_cast<ComCfg::APID::T>(descriptor));
261  context.set_comQueueIndex(queueIndex);
262 
263  this->dataOut_out(0, outBuffer, context);
264  // Set state to WAITING for the status to come back
265  this->m_state = WAITING;
266 }
267 
268 void ComQueue::sendBuffer(Fw::Buffer& buffer, FwIndexType queueIndex) {
269  // Retry buffer expected to be cleared as we are either transferring ownership or have already deallocated it.
270  FW_ASSERT(this->m_state == READY);
271 
272  // Context value is used to determine what to do when the buffer returns on the dataReturnIn port
273  ComCfg::FrameContext context;
274  FwPacketDescriptorType descriptor;
275  Fw::SerializeStatus status = buffer.getDeserializer().deserializeTo(descriptor);
276  FW_ASSERT(status == Fw::FW_SERIALIZE_OK, static_cast<FwAssertArgType>(status));
277  context.set_apid(static_cast<ComCfg::APID::T>(descriptor));
278  context.set_comQueueIndex(queueIndex);
279 
280  this->dataOut_out(0, buffer, context);
281  // Set state to WAITING for the status to come back
282  this->m_state = WAITING;
283 }
284 
285 void ComQueue::processQueue() {
286  FwIndexType priorityIndex = 0;
287  FwIndexType sendPriority = 0;
288  // Check that we are in the appropriate state
289  FW_ASSERT(this->m_state == READY);
290 
291  // Walk all the queues in priority order. Send the first message that is available in priority order. No balancing
292  // is done within this loop.
293  for (priorityIndex = 0; priorityIndex < TOTAL_PORT_COUNT; priorityIndex++) {
294  QueueMetadata& entry = this->m_prioritizedList[priorityIndex];
295  Types::Queue& queue = this->m_queues[entry.index];
296 
297  // Continue onto next prioritized queue if there is no items in the current queue
298  if (queue.getQueueSize() == 0) {
299  continue;
300  }
301 
302  // Send out the message based on the type
303  if (entry.index < COM_PORT_COUNT) {
304  Fw::ComBuffer comBuffer;
305  queue.dequeue(reinterpret_cast<U8*>(&comBuffer), sizeof(comBuffer));
306  this->sendComBuffer(comBuffer, entry.index);
307  } else {
308  Fw::Buffer buffer;
309  queue.dequeue(reinterpret_cast<U8*>(&buffer), sizeof(buffer));
310  this->sendBuffer(buffer, entry.index);
311  }
312 
313  // Update the throttle and the index that was just sent
314  this->m_throttle[entry.index] = false;
315 
316  // Priority used in the next loop
317  sendPriority = entry.priority;
318  break;
319  }
320 
321  // Starting on the priority entry after the one dispatched and continuing through the end of the set of entries that
322  // share the same priority, rotate those entries such that the currently dispatched queue is last and the rest are
323  // shifted up by one. This effectively round-robins the queues of the same priority.
324  for (priorityIndex++;
325  priorityIndex < TOTAL_PORT_COUNT && (this->m_prioritizedList[priorityIndex].priority == sendPriority);
326  priorityIndex++) {
327  // Swap the previous entry with this one.
328  QueueMetadata temp = this->m_prioritizedList[priorityIndex];
329  this->m_prioritizedList[priorityIndex] = this->m_prioritizedList[priorityIndex - 1];
330  this->m_prioritizedList[priorityIndex - 1] = temp;
331  }
332 }
333 } // end namespace Svc
Serialization/Deserialization operation was successful.
FwIdType FwPacketDescriptorType
The type of a com packet descriptor.
std::make_unsigned< FwIndexType >::type FwUnsignedIndexType
Definition: ComQueue.cpp:19
Representing success.
PlatformSizeType FwSizeType
void tlmWrite_buffQueueDepth(const Svc::BuffQueueDepth &arg, Fw::Time _tlmTime=Fw::Time()) const
T e
The raw enum value.
I32 FwEnumStoreType
QueueConfigurationEntry entries[TOTAL_PORT_COUNT]
Definition: ComQueue.hpp:63
configuration table for each queue
Definition: ComQueue.hpp:62
virtual void * allocate(const FwEnumStoreType identifier, FwSizeType &size, bool &recoverable)=0
Allocate memory.
static const FwIndexType TOTAL_PORT_COUNT
Total count of input buffer ports and thus total queues.
Definition: ComQueue.hpp:35
ComQueue(const char *const compName)
Definition: ComQueue.cpp:31
void dataOut_out(FwIndexType portNum, Fw::Buffer &data, const ComCfg::FrameContext &context)
Invoke output port dataOut.
No room left in the buffer to serialize data.
void clear_high_water_mark()
Definition: Queue.cpp:56
bool isConnected_bufferReturnOut_OutputPort(FwIndexType portNum)
void cleanup()
Definition: ComQueue.cpp:45
static const FwIndexType BUFFER_PORT_COUNT
Definition: ComQueue.hpp:30
static const FwIndexType COM_PORT_COUNT
< Count of Fw::Com input ports and thus Fw::Com queues
Definition: ComQueue.hpp:27
void setup(U8 *const storage, const FwSizeType storage_size, const FwSizeType depth, const FwSizeType message_size)
setup the queue object to setup storage
Definition: Queue.cpp:17
FwIndexType get_comQueueIndex() const
Get member comQueueIndex.
SerializeStatus
forward declaration for string
ExternalSerializeBufferWithMemberCopy getDeserializer()
Definition: Buffer.cpp:105
Serializable::SizeType getBuffLength() const
returns current buffer size
FwIndexType priority
Priority of the queue [0, TOTAL_PORT_COUNT)
Definition: ComQueue.hpp:50
Fw::SerializeStatus enqueue(const U8 *const message, const FwSizeType size)
pushes a fixed-size message onto the back of the queue
Definition: Queue.cpp:29
FwSizeType depth
Depth of the queue [0, infinity)
Definition: ComQueue.hpp:49
void log_WARNING_HI_QueueOverflow(Svc::QueueType queueType, U32 index) const
U8 * getBuffAddr()
gets buffer address for data filling
Definition: ComBuffer.cpp:38
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:53
void set_comQueueIndex(FwIndexType comQueueIndex)
Set member comQueueIndex.
FwSizeType get_high_water_mark() const
Definition: Queue.cpp:51
PlatformIndexType FwIndexType
FwSizeType SizeType
The size type for a buffer - for backwards compatibility.
Definition: Buffer.hpp:52
C++ header for working with basic fprime types.
#define FW_NUM_ARRAY_ELEMENTS(a)
number of elements in an array
Definition: BasicTypes.h:90
Type used to pass context info between components during framing/deframing.
void tlmWrite_comQueueDepth(const Svc::ComQueueDepth &arg, Fw::Time _tlmTime=Fw::Time()) const
void set_apid(ComCfg::APID::T apid)
Set member apid.
RateGroupDivider component implementation.
virtual void deallocate(const FwEnumStoreType identifier, void *ptr)=0
Deallocate memory.
FwSizeType getQueueSize() const
Definition: Queue.cpp:60
Fw::SerializeStatus dequeue(U8 *const message, const FwSizeType size)
pops a fixed-size message off the front of the queue
Definition: Queue.cpp:38
#define FW_ASSERT(...)
Definition: Assert.hpp:14
Success/Failure.
void configure(QueueConfigurationTable queueConfig, FwEnumStoreType allocationId, Fw::MemAllocator &allocator)
Definition: ComQueue.cpp:52
SerializeStatus deserializeTo(U8 &val)
deserialize 8-bit unsigned int
Auto-generated base for ComQueue component.
void bufferReturnOut_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferReturnOut.