F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
BufferAccumulator.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title BufferAccumulator.cpp
3 // \author bocchino
4 // \brief BufferAccumulator implementation
5 //
6 // \copyright
7 // Copyright (C) 2017 California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
14 
15 #include <sys/time.h>
16 
17 #include <limits>
18 #include "Fw/Types/BasicTypes.hpp"
19 
20 namespace Svc {
21 
22 // ----------------------------------------------------------------------
23 // Construction, initialization, and destruction
24 // ----------------------------------------------------------------------
25 
26 BufferAccumulator ::BufferAccumulator(const char* const compName)
27  : BufferAccumulatorComponentBase(compName),
28  m_mode(BufferAccumulator_OpState::ACCUMULATE),
29  m_bufferMemory(nullptr),
30  m_bufferQueue(),
31  m_send(false),
32  m_waitForBuffer(false),
33  m_numWarnings(0u),
34  m_numDrained(0u),
35  m_numToDrain(0u),
36  m_opCode(),
37  m_cmdSeq(0u),
38  m_allocatorId(0) {}
39 
41 
42 // ----------------------------------------------------------------------
43 // Public methods
44 // ----------------------------------------------------------------------
45 
47  Fw::MemAllocator& allocator,
48  FwSizeType maxNumBuffers,
49  BufferAccumulator_OpState initialMode
51 ) {
52  this->m_allocatorId = identifier;
53  // Overflow protection
54  FW_ASSERT((std::numeric_limits<FwSizeType>::max() / maxNumBuffers) >= sizeof(Fw::Buffer));
55  FwSizeType memSize = static_cast<FwSizeType>(sizeof(Fw::Buffer) * maxNumBuffers);
56  bool recoverable = false;
57  this->m_bufferMemory = static_cast<Fw::Buffer*>(allocator.allocate(identifier, memSize, recoverable));
58  // TODO: Fail gracefully here
59  m_bufferQueue.init(this->m_bufferMemory, maxNumBuffers);
60  this->m_mode = initialMode;
61  this->m_send = this->m_mode == BufferAccumulator_OpState::DRAIN;
62 }
63 
65  allocator.deallocate(static_cast<FwEnumStoreType>(this->m_allocatorId), this->m_bufferMemory);
66 }
67 
68 // ----------------------------------------------------------------------
69 // Handler implementations for user-defined typed input ports
70 // ----------------------------------------------------------------------
71 
72 void BufferAccumulator ::bufferSendInFill_handler(const FwIndexType portNum, Fw::Buffer& buffer) {
73  const bool status = this->m_bufferQueue.enqueue(buffer);
74  if (status) {
75  if (this->m_numWarnings > 0) {
77  }
78  this->m_numWarnings = 0;
79  } else {
80  if (this->m_numWarnings == 0) {
82  }
83  m_numWarnings++;
84  }
85  if (this->m_send) {
86  this->sendStoredBuffer();
87  }
88 
89  this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
90 }
91 
92 void BufferAccumulator ::bufferSendInReturn_handler(const FwIndexType portNum, Fw::Buffer& buffer) {
93  this->bufferSendOutReturn_out(0, buffer);
94  this->m_waitForBuffer = false;
95  if ((this->m_mode == BufferAccumulator_OpState::DRAIN) || // we are draining ALL buffers
96  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers
97  // in a partial drain
98  this->m_send = true;
99  this->sendStoredBuffer();
100  }
101 }
102 
103 void BufferAccumulator ::pingIn_handler(const FwIndexType portNum, U32 key) {
104  this->pingOut_out(0, key);
105 }
106 
107 // ----------------------------------------------------------------------
108 // Command handler implementations
109 // ----------------------------------------------------------------------
110 
111 void BufferAccumulator ::BA_SetMode_cmdHandler(const FwOpcodeType opCode,
112  const U32 cmdSeq,
113  BufferAccumulator_OpState mode) {
114  // cancel an in-progress partial drain
115  if (this->m_numToDrain > 0) {
116  // reset counters for partial buffer drain
117  this->m_numToDrain = 0;
118  this->m_numDrained = 0;
119  // respond to the original command
120  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
121  }
122 
123  this->m_mode = mode;
124  if (mode == BufferAccumulator_OpState::DRAIN) {
125  if (!this->m_waitForBuffer) {
126  this->m_send = true;
127  this->sendStoredBuffer();
128  }
129  } else {
130  this->m_send = false;
131  }
132  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
133 }
134 
135 void BufferAccumulator ::BA_DrainBuffers_cmdHandler(const FwOpcodeType opCode,
136  const U32 cmdSeq,
137  U32 numToDrain,
138  BufferAccumulator_BlockMode blockMode) {
139  if (this->m_numDrained < this->m_numToDrain) {
140  this->log_WARNING_HI_BA_StillDraining(static_cast<U32>(this->m_numDrained),
141  static_cast<U32>(this->m_numToDrain));
142  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::BUSY);
143  return;
144  }
145 
146  if (this->m_mode == BufferAccumulator_OpState::DRAIN) {
148  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::VALIDATION_ERROR);
149  return;
150  }
151 
152  if (numToDrain == 0) {
154  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
155  return;
156  }
157 
158  this->m_opCode = opCode;
159  this->m_cmdSeq = cmdSeq;
160  this->m_numDrained = 0;
161  this->m_numToDrain = static_cast<FwSizeType>(numToDrain);
162 
163  if (blockMode == BufferAccumulator_BlockMode::NOBLOCK) {
164  FwSizeType numBuffers = this->m_bufferQueue.getSize();
165 
166  if (numBuffers < static_cast<FwSizeType>(numToDrain)) {
167  this->m_numToDrain = numBuffers;
168  this->log_WARNING_LO_BA_NonBlockDrain(static_cast<U32>(this->m_numToDrain), numToDrain);
169  }
170 
171  /* OK if there were 0 buffers queued, and we
172  * end up setting numToDrain to 0
173  */
174  if (0 == this->m_numToDrain) {
176  this->cmdResponse_out(opCode, cmdSeq, Fw::CmdResponse::OK);
177  return;
178  }
179  }
180 
181  // We are still waiting for a buffer from last time
182  if (!this->m_waitForBuffer) {
183  this->m_send = true;
184  this->sendStoredBuffer(); // kick off the draining;
185  }
186 }
187 
188 // ----------------------------------------------------------------------
189 // Private helper methods
190 // ----------------------------------------------------------------------
191 
192 void BufferAccumulator ::sendStoredBuffer() {
193  FW_ASSERT(this->m_send);
194  Fw::Buffer buffer;
195  if ((this->m_numToDrain == 0) || // we are draining ALL buffers
196  (this->m_numDrained < this->m_numToDrain)) { // OR we aren't done draining some buffers in a
197  // partial drain
198  const bool status = this->m_bufferQueue.dequeue(buffer);
199  if (status) { // a buffer was dequeued
200  this->m_numDrained++;
201  this->bufferSendOutDrain_out(0, buffer);
202  this->m_waitForBuffer = true;
203  this->m_send = false;
204  } else if (this->m_numToDrain > 0) {
205  this->log_WARNING_HI_BA_DrainStalled(static_cast<U32>(this->m_numDrained),
206  static_cast<U32>(this->m_numToDrain));
207  }
208  }
209 
210  /* This used to be "else if", but then you wait for all
211  * drained buffers in a partial drain to be RETURNED before returning OK.
212  * Correct thing is to return OK once they are SENT
213  */
214  if ((this->m_numToDrain > 0) && // we are doing a partial drain
215  (this->m_numDrained == this->m_numToDrain)) { // AND we just finished draining
216  //
217  this->log_ACTIVITY_HI_BA_PartialDrainDone(static_cast<U32>(this->m_numDrained));
218  // reset counters for partial buffer drain
219  this->m_numToDrain = 0;
220  this->m_numDrained = 0;
221  this->m_send = false;
222  this->cmdResponse_out(this->m_opCode, this->m_cmdSeq, Fw::CmdResponse::OK);
223  }
224 
225  this->tlmWrite_BA_NumQueuedBuffers(static_cast<U32>(this->m_bufferQueue.getSize()));
226 }
227 
228 } // namespace Svc
virtual void * allocate(const FwEnumStoreType identifier, FwSizeType &size, bool &recoverable, FwSizeType alignment=alignof(std::max_align_t))=0
FwIdType FwOpcodeType
The type of a command opcode.
PlatformSizeType FwSizeType
I32 FwEnumStoreType
void bufferSendOutReturn_out(FwIndexType portNum, Fw::Buffer &fwBuffer) const
Invoke output port bufferSendOutReturn.
void log_ACTIVITY_HI_BA_PartialDrainDone(U32 numDrained) const
void log_WARNING_LO_BA_NonBlockDrain(U32 numWillDrain, U32 numReqDrain) const
BufferAccumulator(const char *const compName)
void log_WARNING_HI_BA_StillDraining(U32 numDrained, U32 numToDrain) const
void tlmWrite_BA_NumQueuedBuffers(U32 arg, Fw::Time _tlmTime=Fw::Time()) const
void pingOut_out(FwIndexType portNum, U32 key) const
Invoke output port pingOut.
Command successfully executed.
Memory Allocation base class.
PlatformIndexType FwIndexType
void bufferSendOutDrain_out(FwIndexType portNum, Fw::Buffer &fwBuffer) const
Invoke output port bufferSendOutDrain.
void deallocateQueue(Fw::MemAllocator &allocator)
Return allocated queue. Should be done during shutdown.
C++ header for working with basic fprime types.
void log_WARNING_HI_BA_DrainStalled(U32 numDrained, U32 numToDrain) const
Command failed validation.
RateGroupDivider component implementation.
virtual void deallocate(const FwEnumStoreType identifier, void *ptr)=0
Auto-generated base for BufferAccumulator component.
void allocateQueue(FwEnumStoreType identifier, Fw::MemAllocator &allocator, FwSizeType maxNumBuffers, BufferAccumulator_OpState initialMode=BufferAccumulator_OpState::ACCUMULATE)
#define FW_ASSERT(...)
Definition: Assert.hpp:14
void cmdResponse_out(FwOpcodeType opCode, U32 cmdSeq, Fw::CmdResponse response)
Emit command response.