diff --git a/src/fsfw/osal/common/TcpTmTcServer.cpp b/src/fsfw/osal/common/TcpTmTcServer.cpp index ee1b0f23..1de0816d 100644 --- a/src/fsfw/osal/common/TcpTmTcServer.cpp +++ b/src/fsfw/osal/common/TcpTmTcServer.cpp @@ -30,8 +30,8 @@ const std::string TcpTmTcServer::DEFAULT_SERVER_PORT = tcpip::DEFAULT_SERVER_POR TcpTmTcServer::TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge, size_t receptionBufferSize, std::string customTcpServerPort): - SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), - tcpPort(customTcpServerPort), receptionBuffer(receptionBufferSize) { + SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), + tcpPort(customTcpServerPort), receptionBuffer(receptionBufferSize) { if(tcpPort == "") { tcpPort = DEFAULT_SERVER_PORT; } @@ -127,7 +127,7 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) { handleServerOperation(connSocket); // Done, shut down connection and go back to listening for client requests - retval = shutdown(connSocket, SHUT_SEND); + retval = shutdown(connSocket, SHUT_BOTH); if(retval != 0) { handleError(Protocol::TCP, ErrorSources::SHUTDOWN_CALL); } @@ -163,8 +163,8 @@ void TcpTmTcServer::handleServerOperation(socket_t connSocket) { FD_SET(connSocket, &efds); timeval tv; - tv.tv_sec = 1; - tv.tv_usec = 0; + tv.tv_sec = selectTimeoutMs / 1000; + tv.tv_usec = (selectTimeoutMs % 1000) * 1000; int nfds = connSocket + 1; @@ -192,11 +192,19 @@ void TcpTmTcServer::handleServerOperation(socket_t connSocket) { uint32_t index = 0; int retval = select(nfds, &rfds, nullptr, &efds, &tv); if(retval < 0) { - // client might have shut down connection? + // client might have shut down connection + return; } else if(retval > 0) { if(FD_ISSET(connSocket, &rfds)) { // data available + int retval = recv( + connSocket, + reinterpret_cast(receptionBuffer.data()), + receptionBuffer.capacity(), + tcpFlags + ); + handleTcReception(retval); //int result = receiveData(); //if(result == 0) { // break; @@ -207,8 +215,9 @@ void TcpTmTcServer::handleServerOperation(socket_t connSocket) { } } else { - // no data available - TaskFactory::delayTask(500); + // no data available. Send back telemetry now + handleTmSending(connSocket); + //TaskFactory::delayTask(500); } } } diff --git a/src/fsfw/osal/common/TcpTmTcServer.h b/src/fsfw/osal/common/TcpTmTcServer.h index c6916080..a33a0427 100644 --- a/src/fsfw/osal/common/TcpTmTcServer.h +++ b/src/fsfw/osal/common/TcpTmTcServer.h @@ -45,6 +45,7 @@ public: static const std::string DEFAULT_SERVER_PORT; static constexpr size_t ETHERNET_MTU_SIZE = 1500; + static constexpr uint32_t DEFAULT_SELECT_TIMEOUT_MS = 200; /** * TCP Server Constructor @@ -59,6 +60,7 @@ public: std::string customTcpServerPort = ""); virtual~ TcpTmTcServer(); + void setSelectTimeout(uint32_t timeout); void setTcpBacklog(uint8_t tcpBacklog); ReturnValue_t initialize() override; @@ -77,6 +79,7 @@ private: std::string tcpPort; int tcpFlags = 0; + uint32_t selectTimeoutMs = DEFAULT_SELECT_TIMEOUT_MS; socket_t listenerTcpSocket = 0; struct sockaddr tcpAddress; MessageQueueId_t targetTcDestination = MessageQueueIF::NO_QUEUE; diff --git a/src/fsfw/tmtcservices/PusParser.cpp b/src/fsfw/tmtcservices/PusParser.cpp new file mode 100644 index 00000000..42044da3 --- /dev/null +++ b/src/fsfw/tmtcservices/PusParser.cpp @@ -0,0 +1,141 @@ +#include "PusParser.h" +#include + +PusParser::PusParser(uint16_t maxExpectedPusPackets, + bool storeSplitPackets): indexSizePairFIFO(maxExpectedPusPackets) { +} + +ReturnValue_t PusParser::parsePusPackets(const uint8_t *frame, + size_t frameSize) { + if(frame == nullptr or frameSize < 5) { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::error << "PusParser::parsePusPackets: Frame invalid!" << std::endl; +#else + sif::printError("PusParser::parsePusPackets: Frame invalid!\n"); +#endif + return HasReturnvaluesIF::RETURN_FAILED; + } + + if(indexSizePairFIFO.full()) { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::error << "PusParser::parsePusPackets: FIFO is full!" << std::endl; +#else + sif::printError("PusParser::parsePusPackets: FIFO is full!\n"); +#endif + return HasReturnvaluesIF::RETURN_FAILED; + } + + size_t lengthField = frame[4] << 8 | frame[5]; + + if(lengthField == 0) { + return NO_PACKET_FOUND; + } + + size_t packetSize = lengthField + 7; + // sif::debug << frameSize << std::endl; + // Size of a pus packet is the value in the packet length field plus 7. + if(packetSize > frameSize) { + if(storeSplitPackets) { + indexSizePairFIFO.insert(indexSizePair(0, frameSize)); + } + else { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::debug << "TcSerialPollingTask::readNextPacket: Next packet " + << "larger than remaining frame," << std::endl; + sif::debug << "Throwing away packet. Detected packet size: " + << packetSize << std::endl; +#else + sif::printDebug("TcSerialPollingTask::readNextPacket: Next packet " + "larger than remaining frame.\n"); + sif::printDebug("Throwing away packet. Detected packet size: %lu", + static_cast(packetSize)); +#endif + } + return SPLIT_PACKET; + } + else { + indexSizePairFIFO.insert(indexSizePair(0, packetSize)); + if(packetSize == frameSize) { + return HasReturnvaluesIF::RETURN_OK; + } + } + + // packet size is smaller than frame size, parse for more packets. + return readMultiplePackets(frame, frameSize, packetSize); +} + +ReturnValue_t PusParser::readMultiplePackets(const uint8_t *frame, + size_t frameSize, size_t startIndex) { + while (startIndex < frameSize) { + ReturnValue_t result = readNextPacket(frame, frameSize, startIndex); + if(result != HasReturnvaluesIF::RETURN_OK) { + return result; + } + } + return HasReturnvaluesIF::RETURN_OK; +} + +DynamicFIFO* PusParser::fifo(){ + return &indexSizePairFIFO; +} + +PusParser::indexSizePair PusParser::getNextFifoPair() { + indexSizePair nextIndexSizePair; + indexSizePairFIFO.retrieve(&nextIndexSizePair); + return nextIndexSizePair; +} + +ReturnValue_t PusParser::readNextPacket(const uint8_t *frame, + size_t frameSize, size_t& currentIndex) { + // sif::debug << startIndex << std::endl; + if(currentIndex + 5 > frameSize) { + currentIndex = frameSize; + return HasReturnvaluesIF::RETURN_OK; + } + + uint16_t lengthField = frame[currentIndex + 4] << 8 | + frame[currentIndex + 5]; + if(lengthField == 0) { + // It is assumed that no packet follows. + currentIndex = frameSize; + return HasReturnvaluesIF::RETURN_OK; + } + size_t nextPacketSize = lengthField + 7; + size_t remainingSize = frameSize - currentIndex; + if(nextPacketSize > remainingSize) + { + if(storeSplitPackets) { + indexSizePairFIFO.insert(indexSizePair(currentIndex, remainingSize)); + } + else { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::debug << "TcSerialPollingTask::readNextPacket: Next packet " + << "larger than remaining frame." << std::endl; + sif::debug << "Throwing away packet. Detected packet size: " + << nextPacketSize << std::endl; +#else + sif::printDebug("TcSerialPollingTask::readNextPacket: Next packet " + "larger than remaining frame.\n"); + sif::printDebug("Throwing away packet. Detected packet size: %lu\n", + static_cast(nextPacketSize)); +#endif + } + return SPLIT_PACKET; + } + + ReturnValue_t result = indexSizePairFIFO.insert(indexSizePair(currentIndex, + nextPacketSize)); + if (result != HasReturnvaluesIF::RETURN_OK) { + // FIFO full. +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::debug << "PusParser: Issue inserting into start index size " + << "FIFO, it is full!" << std::endl; +#else + sif::printDebug("PusParser: Issue inserting into start index size " + "FIFO, it is full!\n"); +#endif + } + currentIndex += nextPacketSize; + + return result; +} diff --git a/src/fsfw/tmtcservices/PusParser.h b/src/fsfw/tmtcservices/PusParser.h new file mode 100644 index 00000000..0efbdcc5 --- /dev/null +++ b/src/fsfw/tmtcservices/PusParser.h @@ -0,0 +1,83 @@ +#ifndef FRAMEWORK_TMTCSERVICES_PUSPARSER_H_ +#define FRAMEWORK_TMTCSERVICES_PUSPARSER_H_ + +#include +#include +#include + +/** + * @brief This small helper class scans a given buffer for PUS packets. + * Can be used if PUS packets are serialized in a tightly packed frame. + * @details + * The parser uses the payload length field of PUS packets to find + * the respective PUS packet sizes. + * + * The parser parses a buffer by taking a pointer and the maximum size to scan. + * If PUS packets are found, they are stored in a FIFO which stores pairs + * consisting of the index in the buffer and the respective packet sizes. + * + * If the parser detects split packets (which means that the size of the + * next packet is larger than the remaining size to scan), it can either + * store that split packet or throw away the packet. + * @author R. Mueller + */ +class PusParser { +public: + //! The first entry is the index inside the buffer while the second index + //! is the size of the PUS packet starting at that index. + using indexSizePair = std::pair; + + static constexpr uint8_t INTERFACE_ID = CLASS_ID::PUS_PARSER; + static constexpr ReturnValue_t NO_PACKET_FOUND = MAKE_RETURN_CODE(0x00); + static constexpr ReturnValue_t SPLIT_PACKET = MAKE_RETURN_CODE(0x01); + /** + * Parser constructor. + * @param maxExpectedPusPackets + * Maximum expected number of PUS packets. A good estimate is to divide + * the frame size by the minimum size of a PUS packet (12 bytes) + * @param storeSplitPackets + * Specifies whether split packets are also stored inside the FIFO, + * with the size being the remaining frame size. + */ + PusParser(uint16_t maxExpectedPusPackets, bool storeSplitPackets); + + /** + * Parse a given frame for PUS packets + * @param frame + * @param frameSize + * @return -@c NO_PACKET_FOUND if no packet was found. + */ + ReturnValue_t parsePusPackets(const uint8_t* frame, size_t frameSize); + + /** + * Accessor function to get a reference to the internal FIFO which + * stores pairs of indexi and packet sizes. This FIFO is filled + * by the parsePusPackets() function. + * @return + */ + DynamicFIFO* fifo(); + + /** + * Retrieve the next index and packet size pair from the FIFO. + * This also removed it from the FIFO. Please note that if the FIFO + * is empty, an empty pair will be returned. + * @return + */ + indexSizePair getNextFifoPair(); + +private: + /** A FIFO is used to store information about multiple PUS packets + * inside the receive buffer. The maximum number of entries is defined + * by the first constructor argument. + */ + DynamicFIFO indexSizePairFIFO; + + bool storeSplitPackets = false; + + ReturnValue_t readMultiplePackets(const uint8_t *frame, size_t frameSize, + size_t startIndex); + ReturnValue_t readNextPacket(const uint8_t *frame, + size_t frameSize, size_t& startIndex); +}; + +#endif /* FRAMEWORK_TMTCSERVICES_PUSPARSER_H_ */