From c1d30aad13753591b57db9e549333a596b606947 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Wed, 5 May 2021 15:59:41 +0200 Subject: [PATCH] TCP server implementation finished A lot of smaller tweaks and smaller refactoring done in UDP TMTC bridge as well --- osal/common/TcpTmTcBridge.cpp | 26 ++++++++-- osal/common/TcpTmTcBridge.h | 25 ++++++++-- osal/common/TcpTmTcServer.cpp | 86 ++++++++++++++++++++-------------- osal/common/TcpTmTcServer.h | 44 ++++++++++++----- osal/common/UdpTcPollingTask.h | 7 +-- osal/common/UdpTmTcBridge.cpp | 2 +- osal/common/UdpTmTcBridge.h | 10 +++- osal/common/tcpipCommon.cpp | 6 +++ osal/common/tcpipCommon.h | 1 + tmtcservices/TmTcBridge.cpp | 7 ++- tmtcservices/TmTcBridge.h | 3 +- 11 files changed, 152 insertions(+), 65 deletions(-) diff --git a/osal/common/TcpTmTcBridge.cpp b/osal/common/TcpTmTcBridge.cpp index a88b68e9..24fab9a9 100644 --- a/osal/common/TcpTmTcBridge.cpp +++ b/osal/common/TcpTmTcBridge.cpp @@ -22,15 +22,18 @@ TcpTmTcBridge::TcpTmTcBridge(object_id_t objectId, object_id_t tcDestination, object_id_t tmStoreId, object_id_t tcStoreId): TmTcBridge(objectId, tcDestination, tmStoreId, tcStoreId) { mutex = MutexFactory::instance()->createMutex(); - communicationLinkUp = false; + // Connection is always up, TM is requested by connecting to server and receiving packets + registerCommConnect(); } ReturnValue_t TcpTmTcBridge::initialize() { ReturnValue_t result = TmTcBridge::initialize(); if(result != HasReturnvaluesIF::RETURN_OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << "TmTcUdpBridge::initialize: TmTcBridge initialization failed!" + sif::error << "TcpTmTcBridge::initialize: TmTcBridge initialization failed!" << std::endl; +#else + sif::printError("TcpTmTcBridge::initialize: TmTcBridge initialization failed!\n"); #endif return result; } @@ -44,8 +47,25 @@ TcpTmTcBridge::~TcpTmTcBridge() { } } -ReturnValue_t TcpTmTcBridge::sendTm(const uint8_t *data, size_t dataLen) { +ReturnValue_t TcpTmTcBridge::handleTm() { + // Simply store the telemetry in the FIFO, the server will use it to access the TM + MutexGuard guard(mutex, timeoutType, mutexTimeoutMs); + TmTcMessage message; + ReturnValue_t status = HasReturnvaluesIF::RETURN_OK; + for (ReturnValue_t result = tmTcReceptionQueue->receiveMessage(&message); + result == HasReturnvaluesIF::RETURN_OK; + result = tmTcReceptionQueue->receiveMessage(&message)) + { + status = storeDownlinkData(&message); + if(status != HasReturnvaluesIF::RETURN_OK) { + break; + } + } + return HasReturnvaluesIF::RETURN_OK; +} +ReturnValue_t TcpTmTcBridge::sendTm(const uint8_t *data, size_t dataLen) { + // Not used. The Server uses the FIFO to access and send the telemetry. return HasReturnvaluesIF::RETURN_OK; } diff --git a/osal/common/TcpTmTcBridge.h b/osal/common/TcpTmTcBridge.h index db45fca8..6cfacb9f 100644 --- a/osal/common/TcpTmTcBridge.h +++ b/osal/common/TcpTmTcBridge.h @@ -17,16 +17,30 @@ #include /** - * @brief This class should be used with the UdpTcPollingTask to implement a UDP server - * for receiving and sending PUS TMTC. + * @brief This class should be used with the TcpTmTcServer to implement a TCP server + * for receiving and sending PUS telemetry and telecommands (TMTC) + * @details + * This bridge tasks takes care of filling a FIFO which generated telemetry. The TcpTmTcServer + * will take care of sending the telemetry stored in the FIFO if a client connects to the + * server. This bridge will also be the default destination for telecommands, but the telecommands + * will be relayed to a specified tcDestination directly. */ class TcpTmTcBridge: public TmTcBridge { - //friend class UdpTcPollingTask; + friend class TcpTmTcServer; public: /* The ports chosen here should not be used by any other process. */ static const std::string DEFAULT_UDP_SERVER_PORT; + /** + * Constructor + * @param objectId Object ID of the TcpTmTcBridge. + * @param tcDestination Destination for received TC packets. Any received telecommands will + * be sent there directly. The destination object needs to implement + * AcceptsTelecommandsIF. + * @param tmStoreId TM store object ID. It is recommended to the default object ID + * @param tcStoreId TC store object ID. It is recommended to the default object ID + */ TcpTmTcBridge(object_id_t objectId, object_id_t tcDestination, object_id_t tmStoreId = objects::TM_STORE, object_id_t tcStoreId = objects::TC_STORE); @@ -39,12 +53,15 @@ public: ReturnValue_t initialize() override; + protected: + ReturnValue_t handleTm() override; virtual ReturnValue_t sendTm(const uint8_t * data, size_t dataLen) override; private: - //! Access to the client address is mutex protected as it is set by another task. + //! Access to the FIFO needs to be mutex protected because it is used by the bridge and + //! the server. MutexIF::TimeoutType timeoutType = MutexIF::TimeoutType::WAITING; dur_millis_t mutexTimeoutMs = 20; MutexIF* mutex; diff --git a/osal/common/TcpTmTcServer.cpp b/osal/common/TcpTmTcServer.cpp index 8717783e..25403e8b 100644 --- a/osal/common/TcpTmTcServer.cpp +++ b/osal/common/TcpTmTcServer.cpp @@ -4,6 +4,7 @@ #include "../../container/SharedRingBuffer.h" #include "../../ipc/MessageQueueSenderIF.h" +#include "../../ipc/MutexGuard.h" #include "../../objectmanager/ObjectManagerIF.h" #include "../../serviceinterface/ServiceInterface.h" #include "../../tmtcservices/TmTcMessage.h" @@ -25,10 +26,9 @@ const std::string TcpTmTcServer::DEFAULT_TCP_SERVER_PORT = "7301"; TcpTmTcServer::TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge, - /*SharedRingBuffer* tcpRingBuffer, */ size_t receptionBufferSize, - std::string customTcpServerPort): - SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), tcpPort(customTcpServerPort), - receptionBuffer(receptionBufferSize) /*, tcpRingBuffer(tcpRingBuffer) */ { + size_t receptionBufferSize, std::string customTcpServerPort): + SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), + tcpPort(customTcpServerPort), receptionBuffer(receptionBufferSize) { if(tcpPort == "") { tcpPort = DEFAULT_TCP_SERVER_PORT; } @@ -37,17 +37,6 @@ TcpTmTcServer::TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge, ReturnValue_t TcpTmTcServer::initialize() { using namespace tcpip; - /* - if(tcpRingBuffer == nullptr) { -#if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << "TcpTmTcServer::initialize: Invalid ring buffer!" << std::endl; -#else - sif::printError("TcpTmTcServer::initialize: Invalid ring buffer!\n"); -#endif - return ObjectManagerIF::CHILD_INIT_FAILED; - } - */ - ReturnValue_t result = TcpIpBase::initialize(); if(result != HasReturnvaluesIF::RETURN_OK) { return result; @@ -74,13 +63,14 @@ ReturnValue_t TcpTmTcServer::initialize() { hints.ai_protocol = IPPROTO_TCP; hints.ai_flags = AI_PASSIVE; + // Listen to all addresses (0.0.0.0) by using AI_PASSIVE in the hint flags retval = getaddrinfo(nullptr, tcpPort.c_str(), &hints, &addrResult); if (retval != 0) { handleError(Protocol::TCP, ErrorSources::GETADDRINFO_CALL); return HasReturnvaluesIF::RETURN_FAILED; } - /* Open TCP (stream) socket */ + // Open TCP (stream) socket listenerTcpSocket = socket(addrResult->ai_family, addrResult->ai_socktype, addrResult->ai_protocol); if(listenerTcpSocket == INVALID_SOCKET) { @@ -89,6 +79,7 @@ ReturnValue_t TcpTmTcServer::initialize() { return HasReturnvaluesIF::RETURN_FAILED; } + // Bind to the address found by getaddrinfo retval = bind(listenerTcpSocket, addrResult->ai_addr, static_cast(addrResult->ai_addrlen)); if(retval == SOCKET_ERROR) { freeaddrinfo(addrResult); @@ -107,15 +98,15 @@ TcpTmTcServer::~TcpTmTcServer() { ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) { using namespace tcpip; - /* If a connection is accepted, the corresponding socket will be assigned to the new socket */ + // If a connection is accepted, the corresponding socket will be assigned to the new socket socket_t connSocket = 0; sockaddr clientSockAddr = {}; socklen_t connectorSockAddrLen = 0; int retval = 0; - /* Listen for connection requests permanently for lifetime of program */ + // Listen for connection requests permanently for lifetime of program while(true) { - retval = listen(listenerTcpSocket, currentBacklog); + retval = listen(listenerTcpSocket, tcpBacklog); if(retval == SOCKET_ERROR) { handleError(Protocol::TCP, ErrorSources::LISTEN_CALL, 500); continue; @@ -131,7 +122,7 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) { handleServerOperation(connSocket); - /* Done, shut down connection */ + // Done, shut down connection and go back to listening for client requests retval = shutdown(connSocket, SHUT_SEND); if(retval != 0) { handleError(Protocol::TCP, ErrorSources::SHUTDOWN_CALL); @@ -142,29 +133,21 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) { } ReturnValue_t TcpTmTcServer::initializeAfterTaskCreation() { + if(tmtcBridge == nullptr) { + return ObjectManagerIF::CHILD_INIT_FAILED; + } /* Initialize the destination after task creation. This ensures that the destination has already been set in the TMTC bridge. */ targetTcDestination = tmtcBridge->getRequestQueue(); - -// -// if(tcpRingBuffer != nullptr) { -// auto fifoCheck = tcpRingBuffer->getReceiveSizesFIFO(); -// if (fifoCheck == nullptr) { -//#if FSFW_CPP_OSTREAM_ENABLED == 1 -// sif::error << "TcpTmTcServer::initializeAfterTaskCreation: " -// "TCP ring buffer does not have a FIFO!" << std::endl; -//#else -// sif::printError("TcpTmTcServer::initialize: TCP ring buffer does not have a FIFO!\n"); -//#endif /* FSFW_CPP_OSTREAM_ENABLED == 0 */ -// } -// } - + tcStore = tmtcBridge->tcStore; + tmStore = tmtcBridge->tmStore; return HasReturnvaluesIF::RETURN_OK; } void TcpTmTcServer::handleServerOperation(socket_t connSocket) { int retval = 0; do { + // Read all telecommands sent by the client retval = recv(connSocket, reinterpret_cast(receptionBuffer.data()), receptionBuffer.capacity(), @@ -173,10 +156,12 @@ void TcpTmTcServer::handleServerOperation(socket_t connSocket) { handleTcReception(retval); } else if(retval == 0) { - /* Client has finished sending telecommands, send telemetry now */ + // Client has finished sending telecommands, send telemetry now + handleTmSending(connSocket); } else { - /* Should not happen */ + // Should not happen + tcpip::handleError(tcpip::Protocol::TCP, tcpip::ErrorSources::RECV_CALL); } } while(retval > 0); } @@ -210,3 +195,32 @@ ReturnValue_t TcpTmTcServer::handleTcReception(size_t bytesRecvd) { } return result; } + +void TcpTmTcServer::setTcpBacklog(uint8_t tcpBacklog) { + this->tcpBacklog = tcpBacklog; +} + +ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket) { + // Access to the FIFO is mutex protected because it is filled by the bridge + MutexGuard(tmtcBridge->mutex, tmtcBridge->timeoutType, tmtcBridge->mutexTimeoutMs); + store_address_t storeId; + while((not tmtcBridge->tmFifo->empty()) and + (tmtcBridge->packetSentCounter < tmtcBridge->sentPacketsPerCycle)) { + tmtcBridge->tmFifo->retrieve(&storeId); + + // Using the store accessor will take care of deleting TM from the store automatically + ConstStorageAccessor storeAccessor(storeId); + ReturnValue_t result = tmStore->getData(storeId, storeAccessor); + if(result != HasReturnvaluesIF::RETURN_OK) { + return result; + } + int retval = send(connSocket, + reinterpret_cast(storeAccessor.data()), + storeAccessor.size(), + tcpTmFlags); + if(retval != static_cast(storeAccessor.size())) { + tcpip::handleError(tcpip::Protocol::TCP, tcpip::ErrorSources::SEND_CALL); + } + } + return HasReturnvaluesIF::RETURN_OK; +} diff --git a/osal/common/TcpTmTcServer.h b/osal/common/TcpTmTcServer.h index 6db6621d..e4328370 100644 --- a/osal/common/TcpTmTcServer.h +++ b/osal/common/TcpTmTcServer.h @@ -1,5 +1,5 @@ -#ifndef FSFW_OSAL_WINDOWS_TCWINTCPSERVER_H_ -#define FSFW_OSAL_WINDOWS_TCWINTCPSERVER_H_ +#ifndef FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_ +#define FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_ #include "TcpIpBase.h" #include "../../ipc/messageQueueDefinitions.h" @@ -17,13 +17,22 @@ #include class TcpTmTcBridge; -//class SharedRingBuffer; - /** - * @brief Windows TCP server used to receive telecommands on a Windows Host + * @brief TCP server implementation * @details - * Based on: https://docs.microsoft.com/en-us/windows/win32/winsock/complete-server-code + * This server will run for the whole program lifetime and will take care of serving client + * requests on a specified TCP server port. This server waas written in a generic way and + * can be used on Unix and on Windows systems. + * + * If a connection is accepted, the server will read all telecommands sent by a client and then + * send all telemetry currently found in the TMTC bridge FIFO. + * + * Reading telemetry without sending telecommands is possible by connecting, shutting down the + * send operation immediately and then reading the telemetry. It is therefore recommended to + * connect to the server regularly, even if no telecommands need to be sent. + * + * The server will listen to a specific port on all addresses (0.0.0.0). */ class TcpTmTcServer: public SystemObject, @@ -35,18 +44,28 @@ public: static const std::string DEFAULT_TCP_CLIENT_PORT; static constexpr size_t ETHERNET_MTU_SIZE = 1500; - TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge /*, SharedRingBuffer* tcpRingBuffer*/, - size_t receptionBufferSize = ETHERNET_MTU_SIZE, + /** + * TCP Server Constructor + * @param objectId Object ID of the TCP Server + * @param tmtcTcpBridge Object ID of the TCP TMTC Bridge object + * @param receptionBufferSize This will be the size of the reception buffer. Default buffer + * size will be the Ethernet MTU size + * @param customTcpServerPort The user can specify another port than the default (7301) here. + */ + TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge, + size_t receptionBufferSize = ETHERNET_MTU_SIZE + 1, std::string customTcpServerPort = ""); virtual~ TcpTmTcServer(); + void setTcpBacklog(uint8_t tcpBacklog); + ReturnValue_t initialize() override; ReturnValue_t performOperation(uint8_t opCode) override; ReturnValue_t initializeAfterTaskCreation() override; protected: StorageManagerIF* tcStore = nullptr; - + StorageManagerIF* tmStore = nullptr; private: //! TMTC bridge is cached. object_id_t tmtcBridgeId = objects::NO_OBJECT; @@ -58,14 +77,15 @@ private: struct sockaddr tcpAddress; MessageQueueId_t targetTcDestination = MessageQueueIF::NO_QUEUE; int tcpAddrLen = sizeof(tcpAddress); - int currentBacklog = 3; + int tcpBacklog = 3; std::vector receptionBuffer; - //SharedRingBuffer* tcpRingBuffer; int tcpSockOpt = 0; + int tcpTmFlags = 0; void handleServerOperation(socket_t connSocket); ReturnValue_t handleTcReception(size_t bytesRecvd); + ReturnValue_t handleTmSending(socket_t connSocket); }; -#endif /* FSFW_OSAL_WINDOWS_TCWINTCPSERVER_H_ */ +#endif /* FSFW_OSAL_COMMON_TCP_TMTC_SERVER_H_ */ diff --git a/osal/common/UdpTcPollingTask.h b/osal/common/UdpTcPollingTask.h index cfe74b34..9a680fb8 100644 --- a/osal/common/UdpTcPollingTask.h +++ b/osal/common/UdpTcPollingTask.h @@ -9,8 +9,11 @@ #include /** - * @brief This class should be used with the UdpTmTcBridge to implement a UDP server + * @brief This class can be used with the UdpTmTcBridge to implement a UDP server * for receiving and sending PUS TMTC. + * @details + * This task is exclusively used to poll telecommands from a given socket and transfer them + * to the FSFW software bus. It used the blocking recvfrom call to do this. */ class UdpTcPollingTask: public TcpIpBase, @@ -45,8 +48,6 @@ private: object_id_t tmtcBridgeId = objects::NO_OBJECT; UdpTmTcBridge* tmtcBridge = nullptr; MessageQueueId_t targetTcDestination = MessageQueueIF::NO_QUEUE; - - //! See: https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-recvfrom int receptionFlags = 0; std::vector receptionBuffer; diff --git a/osal/common/UdpTmTcBridge.cpp b/osal/common/UdpTmTcBridge.cpp index ccbb194e..b07ca7a5 100644 --- a/osal/common/UdpTmTcBridge.cpp +++ b/osal/common/UdpTmTcBridge.cpp @@ -23,7 +23,7 @@ const std::string UdpTmTcBridge::DEFAULT_UDP_SERVER_PORT = tcpip::DEFAULT_SERVER_PORT; UdpTmTcBridge::UdpTmTcBridge(object_id_t objectId, object_id_t tcDestination, - object_id_t tmStoreId, object_id_t tcStoreId, std::string udpServerPort): + std::string udpServerPort, object_id_t tmStoreId, object_id_t tcStoreId): TmTcBridge(objectId, tcDestination, tmStoreId, tcStoreId) { if(udpServerPort == "") { this->udpServerPort = DEFAULT_UDP_SERVER_PORT; diff --git a/osal/common/UdpTmTcBridge.h b/osal/common/UdpTmTcBridge.h index 290f5eb0..74084a9d 100644 --- a/osal/common/UdpTmTcBridge.h +++ b/osal/common/UdpTmTcBridge.h @@ -17,8 +17,13 @@ #include /** - * @brief This class should be used with the UdpTcPollingTask to implement a UDP server + * @brief This class can be used with the UdpTcPollingTask to implement a UDP server * for receiving and sending PUS TMTC. + * @details + * This bridge task will take care of sending telemetry back to a UDP client if a connection + * was established and store them in a FIFO if this was not done yet. It is also be the default + * destination for telecommands, but the telecommands will be relayed to a specified tcDestination + * directly. */ class UdpTmTcBridge: public TmTcBridge, @@ -29,7 +34,8 @@ public: static const std::string DEFAULT_UDP_SERVER_PORT; UdpTmTcBridge(object_id_t objectId, object_id_t tcDestination, - object_id_t tmStoreId, object_id_t tcStoreId, std::string udpServerPort = ""); + std::string udpServerPort = "", object_id_t tmStoreId = objects::TM_STORE, + object_id_t tcStoreId = objects::TC_STORE); virtual~ UdpTmTcBridge(); /** diff --git a/osal/common/tcpipCommon.cpp b/osal/common/tcpipCommon.cpp index d4a3e8e6..ca4dbf18 100644 --- a/osal/common/tcpipCommon.cpp +++ b/osal/common/tcpipCommon.cpp @@ -32,6 +32,12 @@ void tcpip::determineErrorStrings(Protocol protocol, ErrorSources errorSrc, std: else if(errorSrc == ErrorSources::RECVFROM_CALL) { srcString = "recvfrom call"; } + else if(errorSrc == ErrorSources::SEND_CALL) { + srcString = "send call"; + } + else if(errorSrc == ErrorSources::SENDTO_CALL) { + srcString = "sendto call"; + } else if(errorSrc == ErrorSources::GETADDRINFO_CALL) { srcString = "getaddrinfo call"; } diff --git a/osal/common/tcpipCommon.h b/osal/common/tcpipCommon.h index eb1bd910..ce7a90cd 100644 --- a/osal/common/tcpipCommon.h +++ b/osal/common/tcpipCommon.h @@ -29,6 +29,7 @@ enum class ErrorSources { RECVFROM_CALL, LISTEN_CALL, ACCEPT_CALL, + SEND_CALL, SENDTO_CALL, SHUTDOWN_CALL }; diff --git a/tmtcservices/TmTcBridge.cpp b/tmtcservices/TmTcBridge.cpp index dcffac41..1257ef89 100644 --- a/tmtcservices/TmTcBridge.cpp +++ b/tmtcservices/TmTcBridge.cpp @@ -183,8 +183,11 @@ ReturnValue_t TmTcBridge::storeDownlinkData(TmTcMessage *message) { if(tmFifo->full()) { #if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::debug << "TmTcBridge::storeDownlinkData: TM downlink max. number " - << "of stored packet IDs reached! " << std::endl; + sif::warning << "TmTcBridge::storeDownlinkData: TM downlink max. number " + "of stored packet IDs reached!" << std::endl; +#else + sif::printWarning("TmTcBridge::storeDownlinkData: TM downlink max. number " + "of stored packet IDs reached!\n"); #endif if(overwriteOld) { tmFifo->retrieve(&storeId); diff --git a/tmtcservices/TmTcBridge.h b/tmtcservices/TmTcBridge.h index 0177648c..d3e1c547 100644 --- a/tmtcservices/TmTcBridge.h +++ b/tmtcservices/TmTcBridge.h @@ -150,8 +150,7 @@ protected: void printData(uint8_t * data, size_t dataLen); /** - * This fifo can be used to store downlink data - * which can not be sent at the moment. + * This FIFO can be used to store downlink data which can not be sent at the moment. */ DynamicFIFO* tmFifo = nullptr; uint8_t sentPacketsPerCycle = DEFAULT_STORED_DATA_SENT_PER_CYCLE;