F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
UdpReceiverComponentImpl.cpp
Go to the documentation of this file.
1 // ======================================================================
2 // \title UdpReceiverImpl.cpp
3 // \author tcanham
4 // \brief cpp file for UdpReceiver component implementation class
5 //
6 // \copyright
7 // Copyright 2009-2015, by the California Institute of Technology.
8 // ALL RIGHTS RESERVED. United States Government Sponsorship
9 // acknowledged.
10 //
11 // ======================================================================
12 
13 
15 #include <FpConfig.hpp>
16 #include <sys/types.h>
17 #include <cstring>
18 #include <cerrno>
19 #include <cstdlib>
20 #include <unistd.h>
21 #include <sys/socket.h>
22 #include <arpa/inet.h>
23 #include <Os/TaskString.hpp>
24 
25 namespace Svc {
26 
27  // ----------------------------------------------------------------------
28  // Construction, initialization, and destruction
29  // ----------------------------------------------------------------------
30 
33  const char *const compName
34  ) : UdpReceiverComponentBase(compName),
35  m_fd(-1),
36  m_packetsReceived(0),
37  m_bytesReceived(0),
38  m_packetsDropped(0),
39  m_decodeErrors(0),
40  m_firstSeq(true),
41  m_currSeq(0)
42  {
43 
44  }
45 
48  {
49  if (this->m_fd != -1) {
50  close(this->m_fd);
51  }
52  }
53 
55  const char* port
56  ) {
57 
58  //create a UDP socket
59  this->m_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
60  if (-1 == this->m_fd) {
61  Fw::LogStringArg arg(strerror(errno));
62  this->log_WARNING_HI_UR_SocketError(arg);
63  }
64 
65  sockaddr_in saddr;
66  // zero out the structure
67  memset(&saddr, 0, sizeof(saddr));
68 
69  saddr.sin_family = AF_INET;
70  saddr.sin_port = htons(atoi(port));
71  saddr.sin_addr.s_addr = htonl(INADDR_ANY);
72 
73  //bind socket to port
74  NATIVE_INT_TYPE status = bind(this->m_fd , (struct sockaddr*)&saddr, sizeof(saddr));
75  if (-1 == status) {
76  Fw::LogStringArg arg(strerror(errno));
77  this->log_WARNING_HI_UR_BindError(arg);
78  close(this->m_fd);
79  this->m_fd = -1;
80  } else {
81  this->log_ACTIVITY_HI_UR_PortOpened(atoi(port));
82  }
83  }
84 
86  NATIVE_UINT_TYPE priority,
87  NATIVE_UINT_TYPE stackSize,
88  NATIVE_UINT_TYPE affinity
89  ) {
90  Os::TaskString name(this->getObjName());
91  Os::Task::TaskStatus stat = this->m_socketTask.start(
92  name,
93  0,
94  priority,
95  stackSize,
96  UdpReceiverComponentImpl::workerTask,
97  this,
98  affinity);
99  FW_ASSERT(Os::Task::TASK_OK == stat,stat);
100  }
101 
102 
103  // ----------------------------------------------------------------------
104  // Handler implementations for user-defined typed input ports
105  // ----------------------------------------------------------------------
106 
107  void UdpReceiverComponentImpl ::
108  Sched_handler(
109  const NATIVE_INT_TYPE portNum,
110  U32 context
111  )
112  {
113  this->tlmWrite_UR_BytesReceived(this->m_bytesReceived);
114  this->tlmWrite_UR_PacketsReceived(this->m_packetsReceived);
115  this->tlmWrite_UR_PacketsDropped(this->m_packetsDropped);
116  }
117 
118  void UdpReceiverComponentImpl::workerTask(void* ptr) {
119  UdpReceiverComponentImpl *compPtr = static_cast<UdpReceiverComponentImpl*>(ptr);
120  while (true) {
121  compPtr->doRecv();
122  }
123  }
124 
125  void UdpReceiverComponentImpl::doRecv() {
126 
127  // wait for data from the socket
128  NATIVE_INT_TYPE psize = recvfrom(
129  this->m_fd,
130  this->m_recvBuff.getBuffAddr(),
131  this->m_recvBuff.getBuffCapacity(),
132  MSG_WAITALL,
133  0,
134  0);
135  if (-1 == psize) {
136  if (errno != EINTR) {
137  Fw::LogStringArg arg(strerror(errno));
138  this->log_WARNING_HI_UR_RecvError(arg);
139  }
140  return;
141  }
142  // reset buffer for deserialization
143  Fw::SerializeStatus stat = this->m_recvBuff.setBuffLen(psize);
144  FW_ASSERT(Fw::FW_SERIALIZE_OK == stat, stat);
145 
146  // get sequence number
147  U8 seqNum;
148  stat = this->m_recvBuff.deserialize(seqNum);
149  // check for deserialization error or port number too high
150  if (stat != Fw::FW_SERIALIZE_OK) {
151  this->log_WARNING_HI_UR_DecodeError(DECODE_SEQ,stat);
152  this->m_decodeErrors++;
153  return;
154  }
155 
156  // track sequence number
157  if (this->m_firstSeq) {
158  // first time, set tracked sequence number equal to the one received
159  this->m_currSeq = seqNum;
160  this->m_firstSeq = false;
161  } else {
162  // make sure sequence number has gone up by one
163  if (seqNum != ++this->m_currSeq) {
164  // will only be right if it rolls over only once, but better than nothing
165  U8 diff = seqNum - this->m_currSeq;
166  this->m_packetsDropped += diff;
167  // send EVR
168  this->log_WARNING_HI_UR_DroppedPacket(diff);
169  // reset to current sequence
170  this->m_currSeq = seqNum;
171  }
172  }
173 
174  // get port number
175  U8 portNum;
176  stat = this->m_recvBuff.deserialize(portNum);
177  // check for deserialization error or port number too high
178  if (stat != Fw::FW_SERIALIZE_OK or portNum > this->getNum_PortsOut_OutputPorts()) {
179  this->log_WARNING_HI_UR_DecodeError(DECODE_PORT,stat);
180  this->m_decodeErrors++;
181  return;
182  }
183  // get buffer for port
184 
185  stat = this->m_recvBuff.deserialize(this->m_portBuff);
186  if (stat != Fw::FW_SERIALIZE_OK) {
187  this->log_WARNING_HI_UR_DecodeError(DECODE_BUFFER,stat);
188  this->m_decodeErrors++;
189  return;
190  }
191 
192  // call output port
193  if (this->isConnected_PortsOut_OutputPort(portNum)) {
194 
195  Fw::SerializeStatus stat = this->PortsOut_out(portNum,this->m_portBuff);
196 
197  // If had issues deserializing the data, then report it:
198  if (stat != Fw::FW_SERIALIZE_OK) {
199 
200  this->log_WARNING_HI_UR_DecodeError(PORT_SEND,stat);
201  this->m_decodeErrors++;
202  }
203 
204  this->m_packetsReceived++;
205  this->m_bytesReceived += psize;
206 
207  }
208  }
209 
210 #ifdef BUILD_UT
211  UdpReceiverComponentImpl::UdpSerialBuffer& UdpReceiverComponentImpl::UdpSerialBuffer::operator=(const UdpReceiverComponentImpl::UdpSerialBuffer& other) {
212  this->resetSer();
213  this->serialize(other.getBuffAddr(),other.getBuffLength(),true);
214  return *this;
215  }
216 
217  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
218  const Fw::SerializeBufferBase& other) : Fw::SerializeBufferBase() {
219  FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
220  memcpy(this->m_buff,other.getBuffAddr(),other.getBuffLength());
221  this->setBuffLen(other.getBuffLength());
222  }
223 
224  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
225  const UdpReceiverComponentImpl::UdpSerialBuffer& other) : Fw::SerializeBufferBase() {
226  FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
227  memcpy(this->m_buff,other.m_buff,other.getBuffLength());
228  this->setBuffLen(other.getBuffLength());
229  }
230 
231  UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(): Fw::SerializeBufferBase() {
232 
233  }
234 
235 #endif
236 
237 
238 
239 } // end namespace Svc
PlatformUIntType NATIVE_UINT_TYPE
Definition: BasicTypes.h:56
Serialization/Deserialization operation was successful.
PlatformIntType NATIVE_INT_TYPE
Definition: BasicTypes.h:55
Status start(const Arguments &arguments) override
start the task
Definition: Task.cpp:82
UdpReceiverComponentImpl(const char *const compName)
SerializeStatus
forward declaration for string
Serializable::SizeType getBuffLength() const
returns current buffer size
void open(const char *port)
Open the connection.
C++-compatible configuration header for fprime configuration.
uint8_t U8
8-bit unsigned integer
Definition: BasicTypes.h:30
void startThread(NATIVE_UINT_TYPE priority, NATIVE_UINT_TYPE stackSize, NATIVE_UINT_TYPE affinity)
start worker thread
virtual U8 * getBuffAddr()=0
gets buffer address for data filling
#define FW_ASSERT(...)
Definition: Assert.hpp:14