F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
TlmChan.cpp
Go to the documentation of this file.
1 
12 #include <Fw/Com/ComBuffer.hpp>
13 #include <Fw/FPrimeBasicTypes.hpp>
14 #include <Fw/Time/Time.hpp>
15 #include <Fw/Types/Assert.hpp>
16 #include <Os/RawTime.hpp>
17 #include <Svc/TlmChan/TlmChan.hpp>
18 
19 namespace Svc {
20 
21 // Definition of TLMCHAN_HASH_BUCKETS is >= number of telemetry ids
22 static_assert(std::numeric_limits<FwChanIdType>::max() >= TLMCHAN_HASH_BUCKETS,
23  "Cannot have more hash buckets than maximum telemetry ids in the system");
24 // TLMCHAN_HASH_BUCKETS >= TLMCHAN_NUM_TLM_HASH_SLOTS >= 0
25 static_assert(std::numeric_limits<FwChanIdType>::max() >= TLMCHAN_NUM_TLM_HASH_SLOTS,
26  "Cannot have more hash slots than maximum telemetry ids in the system");
27 
28 // TLMCHAN_MAX_ENTRIES_PER_RUN must be defined in TlmChanImplCfg.hpp.
29 // It caps the number of updated telemetry entries that Run_handler will
30 // serialize and downlink in a single invocation. Any entries beyond this
31 // limit are skipped (deferred) for the current cycle and will be cleared
32 // by the next buffer swap, so they are effectively dropped rather than
33 // queued. Choose a value that keeps Run_handler's worst-case execution
34 // time within its rate-group budget.
35 static_assert(TLMCHAN_MAX_ENTRIES_PER_RUN > 0, "TLMCHAN_MAX_ENTRIES_PER_RUN must be greater than zero");
36 static_assert(TLMCHAN_MAX_ENTRIES_PER_RUN <= TLMCHAN_HASH_BUCKETS,
37  "TLMCHAN_MAX_ENTRIES_PER_RUN cannot exceed TLMCHAN_HASH_BUCKETS");
38 
39 TlmChan::TlmChan(const char* name)
40  : TlmChanComponentBase(name), m_procCapCount(0), m_activeBuffer(ActiveBuffer::Buffer_0) {
41  FW_ASSERT(name != nullptr);
42 
43  // clear slot pointers
44  for (FwChanIdType entry = 0; entry < TLMCHAN_NUM_TLM_HASH_SLOTS; entry++) {
45  this->m_tlmEntries[0].slots[entry] = nullptr;
46  this->m_tlmEntries[1].slots[entry] = nullptr;
47  }
48  // clear buckets
49  for (FwChanIdType entry = 0; entry < TLMCHAN_HASH_BUCKETS; entry++) {
50  this->m_tlmEntries[0].buckets[entry].used = false;
51  this->m_tlmEntries[0].buckets[entry].updated = false;
52  this->m_tlmEntries[0].buckets[entry].bucketNo = entry;
53  this->m_tlmEntries[0].buckets[entry].next = nullptr;
54  this->m_tlmEntries[0].buckets[entry].id = 0;
55  this->m_tlmEntries[1].buckets[entry].used = false;
56  this->m_tlmEntries[1].buckets[entry].updated = false;
57  this->m_tlmEntries[1].buckets[entry].bucketNo = entry;
58  this->m_tlmEntries[1].buckets[entry].next = nullptr;
59  this->m_tlmEntries[1].buckets[entry].id = 0;
60  }
61  // clear free index
62  this->m_tlmEntries[0].free = 0;
63  this->m_tlmEntries[1].free = 0;
64 
65  // determine deployed channel size
66  this->m_chanIdSize = static_cast<U32>(sizeof(FwChanIdType));
67 
68  // ------- Set random telemetry hash seed -------
69  U32 seed = 0;
70 
71  // get current time and use as non-deterministic source for seed
72  Os::RawTime rawTime;
73  rawTime.now();
75  Fw::ExternalSerializeBuffer serBuf(timeBuf, sizeof(timeBuf));
76  (void)rawTime.serializeTo(serBuf);
77 
78  U32 foldedTime = 0;
79  for (U32 i = 0; i < static_cast<U32>(serBuf.getSize()); i++) {
80  // Rotate-and-XOR each byte to avoid cancellation when bytes are equal
81  foldedTime = (foldedTime << 8) | (foldedTime >> 24);
82  foldedTime ^= static_cast<U32>(timeBuf[i]);
83  }
84 
85  // read stack-address - address varies per boot
86  const U64 raw = reinterpret_cast<U64>(&seed);
87  const U32 foldedStack = static_cast<U32>(raw ^ (raw >> 32));
88 
89  seed = foldedTime ^ foldedStack;
90 
91  // Force a non-zero seed. Of the three hash paths, only the narrow
92  // (<16-bit) path actually loses its keying at seed == 0: it reverts to the
93  // original linear (id % MOD) % SLOTS reduction, re-exposing the predictable
94  // collision pattern this change removes. The Murmur3 and Wang paths still
95  // diffuse a zero seed correctly, so this guard is conservative for them.
96  // Substituting a known non-zero constant keeps every branch keyed and the
97  // seed uniform to reason about across platforms.
98  if (seed == 0) {
99  seed = 0xDEADBEEFU;
100  }
101 
102  this->m_hashSeed = seed;
103 }
104 
106 
108  // Validate input before use.
109  static_assert(std::is_unsigned<FwChanIdType>::value, "FwChanIdType must be unsigned");
110  static_assert(sizeof(FwChanIdType) <= sizeof(U32), "FwChanIdType must fit within U32 for safe hash cast");
111  static_assert(TLMCHAN_NUM_TLM_HASH_SLOTS > 0, "TLMCHAN_NUM_TLM_HASH_SLOTS must be greater than zero");
112 
113  FwChanIdType result;
114 
115  if (this->m_chanIdSize >= 4) {
116  // Verify id fits in U16 before narrowing cast
117  FW_ASSERT(id <= static_cast<FwChanIdType>(std::numeric_limits<U32>::max()), static_cast<FwAssertArgType>(id));
118 
119  U32 h = static_cast<U32>(id) ^ static_cast<U32>(this->m_hashSeed);
120 
121  // Murmur3 32-bit
122  h ^= (h >> 16);
123  h *= MURMUR3_C1;
124  h ^= (h >> 13);
125  h *= MURMUR3_C2;
126  h ^= (h >> 16);
127 
128  result = static_cast<FwChanIdType>(h % TLMCHAN_NUM_TLM_HASH_SLOTS);
129  } else if (this->m_chanIdSize == 2) {
130  // Verify id fits in U16 before narrowing cast
131  FW_ASSERT(id <= static_cast<FwChanIdType>(std::numeric_limits<U16>::max()), static_cast<FwAssertArgType>(id));
132 
133  U16 h = (static_cast<U16>(id)) ^ (static_cast<U16>(this->m_hashSeed) & static_cast<U16>(0xFFFFU));
134 
135  // Wang 16-bit
136  h = static_cast<U16>(h ^ (h >> 7));
137  h = static_cast<U16>(h * WANG16_C1);
138  h = static_cast<U16>(h ^ (h >> 5));
139  h = static_cast<U16>(h * WANG16_C2);
140  h = static_cast<U16>(h ^ (h >> 3));
141 
142  result = static_cast<FwChanIdType>(h % TLMCHAN_NUM_TLM_HASH_SLOTS);
143  } else {
144  // Verify id fits in U8 before narrowing cast
145  FW_ASSERT(id <= static_cast<FwChanIdType>(std::numeric_limits<U8>::max()), static_cast<FwAssertArgType>(id));
146 
147  // FwChanIdType is smaller than 16 bits (at most 255 distinct channel IDs).
148  // XOR with the low byte of the seed before reduction to maintain consistency
149  const U8 h = static_cast<U8>(id) ^ (static_cast<U8>(this->m_hashSeed) & static_cast<U8>(0xFFU));
150  result = static_cast<FwChanIdType>((h % TLMCHAN_HASH_MOD_VALUE) % TLMCHAN_NUM_TLM_HASH_SLOTS);
151  }
152  return result;
153 }
154 
155 void TlmChan::pingIn_handler(const FwIndexType portNum, U32 key) {
156  static_assert(NUM_PINGIN_INPUT_PORTS == 1, "pingIn_handler expects exactly one input port");
157  // return key
158  this->pingOut_out(0, key);
159 }
160 
161 Fw::TlmValid TlmChan::TlmGet_handler(FwIndexType portNum, FwChanIdType id, Fw::Time& timeTag, Fw::TlmBuffer& val) {
162  static_assert(NUM_TLMGET_INPUT_PORTS == 1, "TlmGet_handler expects exactly one input port");
163  FwChanIdType index = this->doHash(id);
164 
165  // Search to see if channel has been stored
166  // check both buffers
167  // don't need to lock because this port is guarded
168  TlmEntry* activeEntry = this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].slots[index];
169  for (FwChanIdType bucket = 0; bucket < TLMCHAN_HASH_BUCKETS; bucket++) {
170  if (activeEntry) {
171  if (activeEntry->id == id) {
172  break;
173  } else {
174  activeEntry = activeEntry->next;
175  }
176  } else {
177  break;
178  }
179  }
180 
181  TlmEntry* inactiveEntry = this->m_tlmEntries[1 - static_cast<U8>(this->m_activeBuffer)].slots[index];
182  for (FwChanIdType bucket = 0; bucket < TLMCHAN_HASH_BUCKETS; bucket++) {
183  if (inactiveEntry) {
184  if (inactiveEntry->id == id) {
185  break;
186  } else {
187  inactiveEntry = inactiveEntry->next;
188  }
189  } else {
190  break;
191  }
192  }
193 
194  if (activeEntry && inactiveEntry) {
195  Fw::TimeComparison cmp = Fw::Time::compare(inactiveEntry->lastUpdate, activeEntry->lastUpdate);
196  if (cmp == Fw::TimeComparison::GT) {
197  val = inactiveEntry->buffer;
198  timeTag = inactiveEntry->lastUpdate;
199  return Fw::TlmValid::VALID;
200  } else if (cmp != Fw::TimeComparison::INCOMPARABLE) {
201  val = activeEntry->buffer;
202  timeTag = activeEntry->lastUpdate;
203  return Fw::TlmValid::VALID;
204  } else {
205  if (inactiveEntry->updated) {
206  val = inactiveEntry->buffer;
207  timeTag = inactiveEntry->lastUpdate;
208  return Fw::TlmValid::VALID;
209  } else {
210  val = activeEntry->buffer;
211  timeTag = activeEntry->lastUpdate;
212  return Fw::TlmValid::VALID;
213  }
214  }
215  } else if (activeEntry) {
216  val = activeEntry->buffer;
217  timeTag = activeEntry->lastUpdate;
218  return Fw::TlmValid::VALID;
219  } else if (inactiveEntry) {
220  val = inactiveEntry->buffer;
221  timeTag = inactiveEntry->lastUpdate;
222  return Fw::TlmValid::VALID;
223  } else {
224  val.resetSer();
225  }
226  return Fw::TlmValid::INVALID;
227 }
228 
229 void TlmChan::TlmRecv_handler(FwIndexType portNum, FwChanIdType id, Fw::Time& timeTag, Fw::TlmBuffer& val) {
230  static_assert(NUM_TLMRECV_INPUT_PORTS == 1, "TlmRecv_handler expects exactly one input port");
231  FwChanIdType index = this->doHash(id);
232  TlmEntry* entryToUse = nullptr;
233  TlmEntry* prevEntry = nullptr;
234 
235  // Search to see if channel has already been stored or a bucket needs to be added
236  if (this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].slots[index]) {
237  entryToUse = this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].slots[index];
238  // Loop one extra time so that we don't inadvertently fall through the end of the loop early.
239  for (FwChanIdType bucket = 0; bucket < TLMCHAN_HASH_BUCKETS + 1; bucket++) {
240  if (entryToUse) {
241  if (entryToUse->id == id) {
242  break;
243  } else {
244  prevEntry = entryToUse;
245  entryToUse = entryToUse->next;
246  }
247  } else {
248  // Make sure that we haven't run out of buckets
249  FW_ASSERT(this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].free < TLMCHAN_HASH_BUCKETS);
250  // add new bucket from free list
251  entryToUse = &this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)]
252  .buckets[this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].free++];
253  FW_ASSERT(prevEntry);
254  prevEntry->next = entryToUse;
255  entryToUse->next = nullptr;
256  break;
257  }
258  }
259  } else {
260  FW_ASSERT(this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].free < TLMCHAN_HASH_BUCKETS);
261  this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].slots[index] =
262  &this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)]
263  .buckets[this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].free++];
264  entryToUse = this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].slots[index];
265  entryToUse->next = nullptr;
266  }
267 
268  FW_ASSERT(entryToUse);
269  entryToUse->used = true;
270  entryToUse->id = id;
271  entryToUse->updated = true;
272  entryToUse->lastUpdate = timeTag;
273  entryToUse->buffer = val;
274 }
275 
276 void TlmChan::Run_handler(FwIndexType portNum, U32 context) {
277  static_assert(NUM_RUN_INPUT_PORTS == 1, "Run_handler expects exactly one input port");
278  // Only write packets if connected
279  if (not this->isConnected_PktSend_OutputPort(0)) {
280  return;
281  }
282 
283  // Lock mutex long enough to swap the active buffer so the inactive buffer
284  // can be read without worrying about concurrent updates.
285  this->lock();
286  this->m_activeBuffer =
287  (this->m_activeBuffer == ActiveBuffer::Buffer_0) ? ActiveBuffer::Buffer_1 : ActiveBuffer::Buffer_0;
288  // Clear the new active buffer's updated flags so it is clean for incoming
289  // writes. Any entries that were deferred (skipped) in the previous cycle
290  // and still carry updated=true in this buffer are also cleared here.
291  // This is intentional: deferred entries are dropped rather than re-queued,
292  // which preserves Run_handler's bounded execution-time guarantee.
293  for (U32 entry = 0; entry < TLMCHAN_HASH_BUCKETS; entry++) {
294  this->m_tlmEntries[static_cast<U8>(this->m_activeBuffer)].buckets[entry].updated = false;
295  }
296  this->unLock();
297 
298  // -----------------------------------------------------------------------
299  // CPU processing guard
300  //
301  // entriesProcessed — updated entries serialized into downlink packets this
302  // invocation. Hard-capped at TLMCHAN_MAX_ENTRIES_PER_RUN.
303  // entriesDeferred — updated entries skipped because the cap was already
304  // reached. These samples are dropped for this cycle.
305  // A non-zero value means the system is producing
306  // telemetry faster than Run_handler can drain it.
307  // -----------------------------------------------------------------------
308  U32 entriesProcessed = 0;
309  U32 entriesDeferred = 0;
310 
311  Fw::TlmPacket pkt;
312  pkt.resetPktSer();
313 
314  for (U32 entry = 0; entry < TLMCHAN_HASH_BUCKETS; entry++) {
315  TlmEntry* p_entry = &this->m_tlmEntries[1 - static_cast<U8>(this->m_activeBuffer)].buckets[entry];
316  if ((p_entry->updated) && (p_entry->used)) {
317  // ------------------------------------------------------------------
318  // CPU guard check: once the per-run cap is reached, count this entry
319  // as deferred and skip serialization. The entry's updated flag will
320  // be cleared by the next buffer swap (see lock section above), so
321  // the sample is intentionally dropped for this cycle. This bounds
322  // Run_handler's worst-case execution time and prevents it from
323  // starving higher-priority tasks during a telemetry burst caused by
324  // a hardware anomaly, runaway component, software fault, or
325  // cyber-attack.
326  // ------------------------------------------------------------------
327  if (entriesProcessed >= TLMCHAN_MAX_ENTRIES_PER_RUN) {
328  entriesDeferred++;
329  continue;
330  }
331 
332  Fw::SerializeStatus stat = pkt.addValue(p_entry->id, p_entry->lastUpdate, p_entry->buffer);
333 
334  if (Fw::FW_SERIALIZE_NO_ROOM_LEFT == stat) {
335  this->PktSend_out(0, pkt.getBuffer(), 0);
336  pkt.resetPktSer();
337  stat = pkt.addValue(p_entry->id, p_entry->lastUpdate, p_entry->buffer);
338  // If a single channel doesn't fit in an empty packet the packet
339  // is misconfigured; assert so the error is visible immediately.
340  FW_ASSERT(Fw::FW_SERIALIZE_OK == stat, static_cast<FwAssertArgType>(stat));
341  } else if (Fw::FW_SERIALIZE_OK == stat) {
342  // room available, continue filling packet
343  } else {
344  FW_ASSERT(0, static_cast<FwAssertArgType>(stat));
345  }
346 
347  p_entry->updated = false;
348  entriesProcessed++;
349  }
350  }
351 
352  // send remnant entries
353  if (pkt.getNumEntries() > 0) {
354  this->PktSend_out(0, pkt.getBuffer(), 0);
355  }
356 
357  // Emit a WARNING_HI event when the processing cap was reached this cycle.
358  // Using an event rather than injecting a reserved telemetry channel into
359  // the downlink stream is the correct F-Prime anomaly reporting mechanism.
360  if (entriesDeferred > 0) {
361  this->m_procCapCount++;
362  this->log_WARNING_HI_TlmChanEpochProcessingCapReached(entriesDeferred, this->m_procCapCount);
363  }
364 }
365 
366 } // namespace Svc
Serialization/Deserialization operation was successful.
Serializable::SizeType getSize() const override
Get current buffer size.
TlmChan(const char *compName)
Definition: TlmChan.cpp:39
No room left in the buffer to serialize data.
bool isConnected_PktSend_OutputPort(FwIndexType portNum) const
Status now() override
Get the current time.
Definition: RawTime.cpp:36
Fw::ComBuffer & getBuffer()
get buffer to send to the ground
Definition: TlmPacket.cpp:55
Auto-generated base for TlmChan component.
Component that stores telemetry channel values.
FwSizeType getNumEntries()
get the number of packets added via addValue()
Definition: TlmPacket.cpp:51
virtual ~TlmChan()
Definition: TlmChan.cpp:105
SerializeStatus
forward declaration for string
SerializeStatus addValue(FwChanIdType id, Time &timeTag, TlmBuffer &buffer)
Add telemetry value to buffer.
Definition: TlmPacket.cpp:63
void PktSend_out(FwIndexType portNum, Fw::ComBuffer &data, U32 context) const
Invoke output port PktSend.
virtual void lock()
Lock the guarded mutex.
void pingOut_out(FwIndexType portNum, U32 key) const
Invoke output port pingOut.
External serialize buffer with no copy semantics.
FwIdType FwChanIdType
The type of a telemetry channel identifier.
SerializeStatus resetPktSer()
Reset serialization of values. This should be done when starting to accumulate a new set of values...
Definition: TlmPacket.cpp:20
void log_WARNING_HI_TlmChanEpochProcessingCapReached(U32 numDeferred, U32 numTimesDeferredCountReached) const
void resetSer() override
Reset serialization pointer to beginning of buffer.
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:54
static TimeComparison compare(const Time &time1, const Time &time2)
Definition: Time.cpp:139
Fw::SerializeStatus serializeTo(Fw::SerialBufferBase &buffer, Fw::Endianness mode=Fw::Endianness::BIG) const override
Serialize the contents of the RawTimeInterface object into a buffer.
Definition: RawTime.cpp:46
PlatformIndexType FwIndexType
virtual void unLock()
Unlock the guarded mutex.
RateGroupDivider component implementation.
#define FW_ASSERT(...)
Definition: Assert.hpp:14
FwChanIdType doHash(FwChanIdType id) const
Definition: TlmChan.cpp:107
#define U64(C)
Definition: sha.h:181