F´ Flight Software - C/C++ Documentation
A framework for building embedded system applications to NASA flight quality standards.
Loading...
Searching...
No Matches
UdpReceiverComponentImpl.cpp
Go to the documentation of this file.
1// ======================================================================
2// \title UdpReceiverImpl.cpp
3// \author tcanham
4// \brief cpp file for UdpReceiver component implementation class
5//
6// \copyright
7// Copyright 2009-2015, by the California Institute of Technology.
8// ALL RIGHTS RESERVED. United States Government Sponsorship
9// acknowledged.
10//
11// ======================================================================
12
13
15#include <FpConfig.hpp>
16#include <sys/types.h>
17#include <cstring>
18#include <cerrno>
19#include <cstdlib>
20#include <unistd.h>
21#include <sys/socket.h>
22#include <arpa/inet.h>
23#include <Os/TaskString.hpp>
24
25//#define DEBUG_PRINT(...) printf(##__VA_ARGS__)
26#define DEBUG_PRINT(...)
27
28namespace Svc {
29
30 // ----------------------------------------------------------------------
31 // Construction, initialization, and destruction
32 // ----------------------------------------------------------------------
33
34 UdpReceiverComponentImpl ::
35 UdpReceiverComponentImpl(
36 const char *const compName
37 ) : UdpReceiverComponentBase(compName),
38 m_fd(-1),
39 m_packetsReceived(0),
40 m_bytesReceived(0),
41 m_packetsDropped(0),
42 m_decodeErrors(0),
43 m_firstSeq(true),
44 m_currSeq(0)
45 {
46
47 }
48
49 UdpReceiverComponentImpl ::
50 ~UdpReceiverComponentImpl()
51 {
52 if (this->m_fd != -1) {
53 close(this->m_fd);
54 }
55 }
56
58 const char* port
59 ) {
60
61 //create a UDP socket
62 this->m_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
63 if (-1 == this->m_fd) {
64 Fw::LogStringArg arg(strerror(errno));
65 this->log_WARNING_HI_UR_SocketError(arg);
66 }
67
68 sockaddr_in saddr;
69 // zero out the structure
70 memset(&saddr, 0, sizeof(saddr));
71
72 saddr.sin_family = AF_INET;
73 saddr.sin_port = htons(atoi(port));
74 saddr.sin_addr.s_addr = htonl(INADDR_ANY);
75
76 //bind socket to port
77 NATIVE_INT_TYPE status = bind(this->m_fd , (struct sockaddr*)&saddr, sizeof(saddr));
78 if (-1 == status) {
79 Fw::LogStringArg arg(strerror(errno));
80 this->log_WARNING_HI_UR_BindError(arg);
81 close(this->m_fd);
82 this->m_fd = -1;
83 } else {
84 this->log_ACTIVITY_HI_UR_PortOpened(atoi(port));
85 }
86 }
87
89 NATIVE_UINT_TYPE priority,
90 NATIVE_UINT_TYPE stackSize,
91 NATIVE_UINT_TYPE affinity
92 ) {
93 Os::TaskString name(this->getObjName());
94 Os::Task::TaskStatus stat = this->m_socketTask.start(
95 name,
96 0,
97 priority,
98 stackSize,
99 UdpReceiverComponentImpl::workerTask,
100 this,
101 affinity);
102 FW_ASSERT(Os::Task::TASK_OK == stat,stat);
103 }
104
105
106 // ----------------------------------------------------------------------
107 // Handler implementations for user-defined typed input ports
108 // ----------------------------------------------------------------------
109
110 void UdpReceiverComponentImpl ::
111 Sched_handler(
112 const NATIVE_INT_TYPE portNum,
113 U32 context
114 )
115 {
116 this->tlmWrite_UR_BytesReceived(this->m_bytesReceived);
117 this->tlmWrite_UR_PacketsReceived(this->m_packetsReceived);
118 this->tlmWrite_UR_PacketsDropped(this->m_packetsDropped);
119 }
120
121 void UdpReceiverComponentImpl::workerTask(void* ptr) {
122 UdpReceiverComponentImpl *compPtr = static_cast<UdpReceiverComponentImpl*>(ptr);
123 while (true) {
124 compPtr->doRecv();
125 }
126 }
127
128 void UdpReceiverComponentImpl::doRecv() {
129
130 // wait for data from the socket
131 NATIVE_INT_TYPE psize = recvfrom(
132 this->m_fd,
133 this->m_recvBuff.getBuffAddr(),
134 this->m_recvBuff.getBuffCapacity(),
135 MSG_WAITALL,
136 0,
137 0);
138 if (-1 == psize) {
139 if (errno != EINTR) {
140 Fw::LogStringArg arg(strerror(errno));
141 this->log_WARNING_HI_UR_RecvError(arg);
142 }
143 return;
144 }
145 // reset buffer for deserialization
146 Fw::SerializeStatus stat = this->m_recvBuff.setBuffLen(psize);
147 FW_ASSERT(Fw::FW_SERIALIZE_OK == stat, stat);
148
149 // get sequence number
150 U8 seqNum;
151 stat = this->m_recvBuff.deserialize(seqNum);
152 // check for deserialization error or port number too high
153 if (stat != Fw::FW_SERIALIZE_OK) {
154 this->log_WARNING_HI_UR_DecodeError(DECODE_SEQ,stat);
155 this->m_decodeErrors++;
156 return;
157 }
158
159 // track sequence number
160 if (this->m_firstSeq) {
161 // first time, set tracked sequence number equal to the one received
162 this->m_currSeq = seqNum;
163 this->m_firstSeq = false;
164 } else {
165 // make sure sequence number has gone up by one
166 if (seqNum != ++this->m_currSeq) {
167 // will only be right if it rolls over only once, but better than nothing
168 U8 diff = seqNum - this->m_currSeq;
169 this->m_packetsDropped += diff;
170 // send EVR
171 this->log_WARNING_HI_UR_DroppedPacket(diff);
172 // reset to current sequence
173 this->m_currSeq = seqNum;
174 }
175 }
176
177 // get port number
178 U8 portNum;
179 stat = this->m_recvBuff.deserialize(portNum);
180 // check for deserialization error or port number too high
181 if (stat != Fw::FW_SERIALIZE_OK or portNum > this->getNum_PortsOut_OutputPorts()) {
182 this->log_WARNING_HI_UR_DecodeError(DECODE_PORT,stat);
183 this->m_decodeErrors++;
184 return;
185 }
186 // get buffer for port
187
188 stat = this->m_recvBuff.deserialize(this->m_portBuff);
189 if (stat != Fw::FW_SERIALIZE_OK) {
190 this->log_WARNING_HI_UR_DecodeError(DECODE_BUFFER,stat);
191 this->m_decodeErrors++;
192 return;
193 }
194
195 // call output port
196 DEBUG_PRINT("Calling port %d with %d bytes.\n",portNum,this->m_portBuff.getBuffLength());
197 if (this->isConnected_PortsOut_OutputPort(portNum)) {
198
199 Fw::SerializeStatus stat = this->PortsOut_out(portNum,this->m_portBuff);
200
201 // If had issues deserializing the data, then report it:
202 if (stat != Fw::FW_SERIALIZE_OK) {
203
204 this->log_WARNING_HI_UR_DecodeError(PORT_SEND,stat);
205 this->m_decodeErrors++;
206 }
207
208 this->m_packetsReceived++;
209 this->m_bytesReceived += psize;
210
211 }
212 }
213
214#ifdef BUILD_UT
215 UdpReceiverComponentImpl::UdpSerialBuffer& UdpReceiverComponentImpl::UdpSerialBuffer::operator=(const UdpReceiverComponentImpl::UdpSerialBuffer& other) {
216 this->resetSer();
217 this->serialize(other.getBuffAddr(),other.getBuffLength(),true);
218 return *this;
219 }
220
221 UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
222 const Fw::SerializeBufferBase& other) : Fw::SerializeBufferBase() {
223 FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
224 memcpy(this->m_buff,other.getBuffAddr(),other.getBuffLength());
225 this->setBuffLen(other.getBuffLength());
226 }
227
228 UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(
229 const UdpReceiverComponentImpl::UdpSerialBuffer& other) : Fw::SerializeBufferBase() {
230 FW_ASSERT(sizeof(this->m_buff)>= other.getBuffLength(),sizeof(this->m_buff),other.getBuffLength());
231 memcpy(this->m_buff,other.m_buff,other.getBuffLength());
232 this->setBuffLen(other.getBuffLength());
233 }
234
235 UdpReceiverComponentImpl::UdpSerialBuffer::UdpSerialBuffer(): Fw::SerializeBufferBase() {
236
237 }
238
239#endif
240
241
242
243} // end namespace Svc
#define FW_ASSERT(...)
Definition Assert.hpp:14
PlatformIntType NATIVE_INT_TYPE
Definition BasicTypes.h:55
uint8_t U8
8-bit unsigned integer
Definition BasicTypes.h:30
PlatformUIntType NATIVE_UINT_TYPE
Definition BasicTypes.h:56
C++-compatible configuration header for fprime configuration.
#define DEBUG_PRINT
Serializable::SizeType getBuffLength() const
returns current buffer size
SerializeBufferBase & operator=(const SerializeBufferBase &src)
copy assignment operator
virtual U8 * getBuffAddr()=0
gets buffer address for data filling
Status start(const Arguments &arguments) override
start the task
Definition Task.cpp:82
void open(const char *port)
Open the connection.
void startThread(NATIVE_UINT_TYPE priority, NATIVE_UINT_TYPE stackSize, NATIVE_UINT_TYPE affinity)
start worker thread
SerializeStatus
forward declaration for string
@ FW_SERIALIZE_OK
Serialization/Deserialization operation was successful.