F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
BufferAccumulator.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title BufferAccumulator.cpp
3 // \author bocchino
4 // \brief BufferAccumulator implementation
5 //
6 // \copyright
7 // Copyright (C) 2017 California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
14 
15 #include <sys/time.h>
16 
17 #include <limits>
18 #include "Fw/Types/BasicTypes.hpp"
19 
20 namespace Svc {
21 
22 // ----------------------------------------------------------------------
23 // Construction, initialization, and destruction
24 // ----------------------------------------------------------------------
25 
26 BufferAccumulator ::BufferAccumulator(const char* const compName)
27  : BufferAccumulatorComponentBase(compName),
28  m_mode(BufferAccumulator_OpState::ACCUMULATE),
29  m_bufferMemory(nullptr),
30  m_bufferQueue(),
31  m_send(false),
32  m_waitForBuffer(false),
33  m_numWarnings(0u),
34  m_numDrained(0u),
35  m_numToDrain(0u),
36  m_opCode(),
37  m_cmdSeq(0u),
38  m_allocatorId(0) {}
39 
41 
42 // ----------------------------------------------------------------------
43 // Public methods
44 // ----------------------------------------------------------------------
45 
47  Fw::MemAllocator& allocator,
48  FwSizeType maxNumBuffers
49 ) {
50  this->m_allocatorId = identifier;
51  // Overflow protection
52  FW_ASSERT((std::numeric_limits<FwSizeType>::max() / maxNumBuffers) >= sizeof(Fw::Buffer));
53  FwSizeType memSize = static_cast<FwSizeType>(sizeof(Fw::Buffer) * maxNumBuffers);
54  bool recoverable = false;
55  this->m_bufferMemory = static_cast<Fw::Buffer*>(allocator.allocate(identifier, memSize, recoverable));
56  // TODO: Fail gracefully here
57  m_bufferQueue.init(this->m_bufferMemory, maxNumBuffers);
58 }
59 
61  allocator.deallocate(static_cast<FwEnumStoreType>(this->m_allocatorId), this->m_bufferMemory);
62 }
63 
64 // ----------------------------------------------------------------------
65 // Handler implementations for user-defined typed input ports
66 // ----------------------------------------------------------------------
67 
68 void BufferAccumulator ::bufferSendInFill_handler(const FwIndexType portNum, Fw::Buffer& buffer) {
69  const bool status = this->m_bufferQueue.enqueue(buffer);
70  if (status) {
71  if (this->m_numWarnings > 0) {
73  }
74  this->m_numWarnings = 0;
75  } else {
76  if (this->m_numWarnings == 0) {
78  }
79  m_numWarnings++;
80  }
81  if (this->m_send) {
82  this->sendStoredBuffer();
83  }
84 
85  this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
86 }
87 
88 void BufferAccumulator ::bufferSendInReturn_handler(const FwIndexType portNum, Fw::Buffer& buffer) {
89  this->bufferSendOutReturn_out(0, buffer);
90  this->m_waitForBuffer = false;
91  if ((this->m_mode == BufferAccumulator_OpState::DRAIN) || // we are draining ALL buffers
92  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers
93  // in a partial drain
94  this->m_send = true;
95  this->sendStoredBuffer();
96  }
97 }
98 
99 void BufferAccumulator ::pingIn_handler(const FwIndexType portNum, U32 key) {
100  this->pingOut_out(0, key);
101 }
102 
103 // ----------------------------------------------------------------------
104 // Command handler implementations
105 // ----------------------------------------------------------------------
106 
107 void BufferAccumulator ::BA_SetMode_cmdHandler(const FwOpcodeType opCode,
108  const U32 cmdSeq,
109  BufferAccumulator_OpState mode) {
110  // cancel an in-progress partial drain
111  if (this->m_numToDrain > 0) {
112  // reset counters for partial buffer drain
113  this->m_numToDrain = 0;
114  this->m_numDrained = 0;
115  // respond to the original command
116  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
117  }
118 
119  this->m_mode = mode;
120  if (mode == BufferAccumulator_OpState::DRAIN) {
121  if (!this->m_waitForBuffer) {
122  this->m_send = true;
123  this->sendStoredBuffer();
124  }
125  } else {
126  this->m_send = false;
127  }
128  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
129 }
130 
131 void BufferAccumulator ::BA_DrainBuffers_cmdHandler(const FwOpcodeType opCode,
132  const U32 cmdSeq,
133  U32 numToDrain,
134  BufferAccumulator_BlockMode blockMode) {
135  if (this->m_numDrained < this->m_numToDrain) {
136  this->log_WARNING_HI_BA_StillDraining(static_cast<U32>(this->m_numDrained),
137  static_cast<U32>(this->m_numToDrain));
138  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::BUSY);
139  return;
140  }
141 
142  if (this->m_mode == BufferAccumulator_OpState::DRAIN) {
144  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::VALIDATION_ERROR);
145  return;
146  }
147 
148  if (numToDrain == 0) {
150  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
151  return;
152  }
153 
154  this->m_opCode = opCode;
155  this->m_cmdSeq = cmdSeq;
156  this->m_numDrained = 0;
157  this->m_numToDrain = static_cast<FwSizeType>(numToDrain);
158 
159  if (blockMode == BufferAccumulator_BlockMode::NOBLOCK) {
160  FwSizeType numBuffers = this->m_bufferQueue.getSize();
161 
162  if (numBuffers < static_cast<FwSizeType>(numToDrain)) {
163  this->m_numToDrain = numBuffers;
164  this->log_WARNING_LO_BA_NonBlockDrain(static_cast<U32>(this->m_numToDrain), numToDrain);
165  }
166 
167  /* OK if there were 0 buffers queued, and we
168  * end up setting numToDrain to 0
169  */
170  if (0 == this->m_numToDrain) {
172  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
173  return;
174  }
175  }
176 
177  // We are still waiting for a buffer from last time
178  if (!this->m_waitForBuffer) {
179  this->m_send = true;
180  this->sendStoredBuffer(); // kick off the draining;
181  }
182 }
183 
184 // ----------------------------------------------------------------------
185 // Private helper methods
186 // ----------------------------------------------------------------------
187 
188 void BufferAccumulator ::sendStoredBuffer() {
189  FW_ASSERT(this->m_send);
190  Fw::Buffer buffer;
191  if ((this->m_numToDrain == 0) || // we are draining ALL buffers
192  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers in a
193  // partial drain
194  const bool status = this->m_bufferQueue.dequeue(buffer);
195  if (status) { // a buffer was dequeued
196  this->m_numDrained++;
197  this->bufferSendOutDrain_out(0, buffer);
198  this->m_waitForBuffer = true;
199  this->m_send = false;
200  } else if (this->m_numToDrain > 0) {
201  this->log_WARNING_HI_BA_DrainStalled(static_cast<U32>(this->m_numDrained),
202  static_cast<U32>(this->m_numToDrain));
203  }
204  }
205 
206  /* This used to be "else if", but then you wait for all
207  * drained buffers in a partial drain to be RETURNED before returning OK.
208  * Correct thing is to return OK once they are SENT
209  */
210  if ((this->m_numToDrain > 0) && // we are doing a partial drain
211  (this->m_numDrained == this->m_numToDrain)) { // AND we just finished draining
212  //
213  this->log_ACTIVITY_HI_BA_PartialDrainDone(static_cast<U32>(this->m_numDrained));
214  // reset counters for partial buffer drain
215  this->m_numToDrain = 0;
216  this->m_numDrained = 0;
217  this->m_send = false;
218  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
219  }
220 
221  this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
222 }
223 
224 } // namespace Svc
void bufferSendOutReturn_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutReturn.
FwIdType FwOpcodeType
The type of a command opcode.
PlatformSizeType FwSizeType
I32 FwEnumStoreType
virtual void * allocate(const FwEnumStoreType identifier, FwSizeType &size, bool &recoverable)=0
Allocate memory.
void log_ACTIVITY_HI_BA_PartialDrainDone(U32 numDrained) const
void pingOut_out(FwIndexType portNum, U32 key)
Invoke output port pingOut.
void log_WARNING_LO_BA_NonBlockDrain(U32 numWillDrain, U32 numReqDrain) const
BufferAccumulator(const char *const compName)
void log_WARNING_HI_BA_StillDraining(U32 numDrained, U32 numToDrain) const
void tlmWrite_BA_NumQueuedBuffers(U32 arg, Fw::Time _tlmTime=Fw::Time()) const
void allocateQueue(FwEnumStoreType identifier, Fw::MemAllocator &allocator, FwSizeType maxNumBuffers)
Command successfully executed.
PlatformIndexType FwIndexType
void deallocateQueue(Fw::MemAllocator &allocator)
Return allocated queue. Should be done during shutdown.
C++ header for working with basic fprime types.
void log_WARNING_HI_BA_DrainStalled(U32 numDrained, U32 numToDrain) const
Command failed validation.
RateGroupDivider component implementation.
void bufferSendOutDrain_out(FwIndexType portNum, Fw::Buffer &fwBuffer)
Invoke output port bufferSendOutDrain.
virtual void deallocate(const FwEnumStoreType identifier, void *ptr)=0
Deallocate memory.
Auto-generated base for BufferAccumulator component.
#define FW_ASSERT(...)
Definition: Assert.hpp:14
void cmdResponse_out(FwOpcodeType opCode, U32 cmdSeq, Fw::CmdResponse response)
Emit command response.