F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
BufferAccumulator.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title BufferAccumulator.cpp
3 // \author bocchino
4 // \brief BufferAccumulator implementation
5 //
6 // \copyright
7 // Copyright (C) 2017 California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
14 
15 #include <sys/time.h>
16 
17 #include "Fw/Types/BasicTypes.hpp"
18 #include <limits>
19 
20 namespace Svc {
21 
22 // ----------------------------------------------------------------------
23 // Construction, initialization, and destruction
24 // ----------------------------------------------------------------------
25 
27  BufferAccumulator(const char* const compName)
28  : BufferAccumulatorComponentBase(compName),
29  m_mode(BufferAccumulator_OpState::ACCUMULATE),
30  m_bufferMemory(nullptr),
31  m_bufferQueue(),
32  m_send(false),
33  m_waitForBuffer(false),
34  m_numWarnings(0u),
35  m_numDrained(0u),
36  m_numToDrain(0u),
37  m_opCode(),
38  m_cmdSeq(0u),
39  m_allocatorId(0) {
40 }
41 
43 
44 // ----------------------------------------------------------------------
45 // Public methods
46 // ----------------------------------------------------------------------
47 
49  FwEnumStoreType identifier, Fw::MemAllocator& allocator,
50  FwSizeType maxNumBuffers
51 ) {
52 
53  this->m_allocatorId = identifier;
54  // Overflow protection
55  FW_ASSERT(
56  (std::numeric_limits<FwSizeType>::max() / maxNumBuffers) >= sizeof(Fw::Buffer)
57  );
58  FwSizeType memSize = static_cast<FwSizeType>(sizeof(Fw::Buffer) * maxNumBuffers);
59  bool recoverable = false;
60  this->m_bufferMemory = static_cast<Fw::Buffer*>(
61  allocator.allocate(identifier, memSize, recoverable));
62  //TODO: Fail gracefully here
63  m_bufferQueue.init(this->m_bufferMemory, maxNumBuffers);
64 }
65 
67  allocator.deallocate(static_cast<FwEnumStoreType>(this->m_allocatorId), this->m_bufferMemory);
68 }
69 
70 // ----------------------------------------------------------------------
71 // Handler implementations for user-defined typed input ports
72 // ----------------------------------------------------------------------
73 
74 void BufferAccumulator ::bufferSendInFill_handler(const FwIndexType portNum,
75  Fw::Buffer& buffer) {
76 
77  const bool status = this->m_bufferQueue.enqueue(buffer);
78  if (status) {
79  if (this->m_numWarnings > 0) {
81  }
82  this->m_numWarnings = 0;
83  } else {
84  if (this->m_numWarnings == 0) {
86  }
87  m_numWarnings++;
88  }
89  if (this->m_send) {
90  this->sendStoredBuffer();
91  }
92 
93  this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
94 }
95 
96 void BufferAccumulator ::bufferSendInReturn_handler(
97  const FwIndexType portNum, Fw::Buffer& buffer) {
98 
99  this->bufferSendOutReturn_out(0, buffer);
100  this->m_waitForBuffer = false;
101  if ((this->m_mode == BufferAccumulator_OpState::DRAIN) || // we are draining ALL buffers
102  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers
103  // in a partial drain
104  this->m_send = true;
105  this->sendStoredBuffer();
106  }
107 }
108 
109 void BufferAccumulator ::pingIn_handler(const FwIndexType portNum,
110  U32 key) {
111  this->pingOut_out(0, key);
112 }
113 
114 // ----------------------------------------------------------------------
115 // Command handler implementations
116 // ----------------------------------------------------------------------
117 
118 void BufferAccumulator ::BA_SetMode_cmdHandler(const FwOpcodeType opCode,
119  const U32 cmdSeq,
120  BufferAccumulator_OpState mode) {
121 
122  // cancel an in-progress partial drain
123  if (this->m_numToDrain > 0) {
124  // reset counters for partial buffer drain
125  this->m_numToDrain = 0;
126  this->m_numDrained = 0;
127  // respond to the original command
128  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
129  }
130 
131  this->m_mode = mode;
132  if (mode == BufferAccumulator_OpState::DRAIN) {
133  if (!this->m_waitForBuffer) {
134  this->m_send = true;
135  this->sendStoredBuffer();
136  }
137  } else {
138  this->m_send = false;
139  }
140  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
141 }
142 
143 void BufferAccumulator ::BA_DrainBuffers_cmdHandler(
144  const FwOpcodeType opCode, const U32 cmdSeq, U32 numToDrain,
145  BufferAccumulator_BlockMode blockMode) {
146 
147  if (this->m_numDrained < this->m_numToDrain) {
148  this->log_WARNING_HI_BA_StillDraining(static_cast<U32>(this->m_numDrained), static_cast<U32>(this->m_numToDrain));
149  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::BUSY);
150  return;
151  }
152 
153  if (this->m_mode == BufferAccumulator_OpState::DRAIN) {
155  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::VALIDATION_ERROR);
156  return;
157  }
158 
159  if (numToDrain == 0) {
161  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
162  return;
163  }
164 
165  this->m_opCode = opCode;
166  this->m_cmdSeq = cmdSeq;
167  this->m_numDrained = 0;
168  this->m_numToDrain = static_cast<FwSizeType>(numToDrain);
169 
170  if (blockMode == BufferAccumulator_BlockMode::NOBLOCK) {
171  FwSizeType numBuffers = this->m_bufferQueue.getSize();
172 
173  if (numBuffers < static_cast<FwSizeType>(numToDrain)) {
174  this->m_numToDrain = numBuffers;
175  this->log_WARNING_LO_BA_NonBlockDrain(static_cast<U32>(this->m_numToDrain), numToDrain);
176  }
177 
178  /* OK if there were 0 buffers queued, and we
179  * end up setting numToDrain to 0
180  */
181  if (0 == this->m_numToDrain) {
183  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
184  return;
185  }
186  }
187 
188  // We are still waiting for a buffer from last time
189  if (!this->m_waitForBuffer) {
190  this->m_send = true;
191  this->sendStoredBuffer(); // kick off the draining;
192  }
193 }
194 
195 // ----------------------------------------------------------------------
196 // Private helper methods
197 // ----------------------------------------------------------------------
198 
199 void BufferAccumulator ::sendStoredBuffer() {
200 
201  FW_ASSERT(this->m_send);
202  Fw::Buffer buffer;
203  if ((this->m_numToDrain == 0) || // we are draining ALL buffers
204  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers in a
205  // partial drain
206  const bool status = this->m_bufferQueue.dequeue(buffer);
207  if (status) { // a buffer was dequeued
208  this->m_numDrained++;
209  this->bufferSendOutDrain_out(0, buffer);
210  this->m_waitForBuffer = true;
211  this->m_send = false;
212  } else if (this->m_numToDrain > 0) {
213  this->log_WARNING_HI_BA_DrainStalled(static_cast<U32>(this->m_numDrained), static_cast<U32>(this->m_numToDrain));
214  }
215  }
216 
217  /* This used to be "else if", but then you wait for all
218  * drained buffers in a partial drain to be RETURNED before returning OK.
219  * Correct thing is to return OK once they are SENT
220  */
221  if ((this->m_numToDrain > 0) && // we are doing a partial drain
222  (this->m_numDrained == this->m_numToDrain)) { // AND we just finished draining
223  //
224  this->log_ACTIVITY_HI_BA_PartialDrainDone(static_cast<U32>(this->m_numDrained));
225  // reset counters for partial buffer drain
226  this->m_numToDrain = 0;
227  this->m_numDrained = 0;
228  this->m_send = false;
229  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
230  }
231 
232  this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
233 }
234 
235 } // namespace Svc
void bufferSendOutReturn_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutReturn.
PlatformSizeType FwSizeType
I32 FwEnumStoreType
virtual void * allocate(const FwEnumStoreType identifier, FwSizeType &size, bool &recoverable)=0
Allocate memory.
void log_ACTIVITY_HI_BA_PartialDrainDone(U32 numDrained) const
void pingOut_out(FwIndexType portNum, U32 key)
Invoke output port pingOut.
void log_WARNING_LO_BA_NonBlockDrain(U32 numWillDrain, U32 numReqDrain) const
BufferAccumulator(const char *const compName)
U32 FwOpcodeType
The type of a command opcode.
void log_WARNING_HI_BA_StillDraining(U32 numDrained, U32 numToDrain) const
void tlmWrite_BA_NumQueuedBuffers(U32 arg, Fw::Time _tlmTime=Fw::Time()) const
void allocateQueue(FwEnumStoreType identifier, Fw::MemAllocator &allocator, FwSizeType maxNumBuffers)
Command successfully executed.
PlatformIndexType FwIndexType
void deallocateQueue(Fw::MemAllocator &allocator)
Return allocated queue. Should be done during shutdown.
C++ header for working with basic fprime types.
void log_WARNING_HI_BA_DrainStalled(U32 numDrained, U32 numToDrain) const
Command failed validation.
RateGroupDivider component implementation.
void bufferSendOutDrain_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutDrain.
virtual void deallocate(const FwEnumStoreType identifier, void *ptr)=0
Deallocate memory.
Auto-generated base for BufferAccumulator component.
#define FW_ASSERT(...)
Definition: Assert.hpp:14
void cmdResponse_out(FwOpcodeType opCode, U32 cmdSeq, Fw::CmdResponse response)
Emit command response.