diff --git a/src/fsfw/osal/common/TcpTmTcServer.cpp b/src/fsfw/osal/common/TcpTmTcServer.cpp index 03145e20..2e65429f 100644 --- a/src/fsfw/osal/common/TcpTmTcServer.cpp +++ b/src/fsfw/osal/common/TcpTmTcServer.cpp @@ -31,9 +31,9 @@ const std::string TcpTmTcServer::DEFAULT_SERVER_PORT = tcpip::DEFAULT_SERVER_POR TcpTmTcServer::TcpTmTcServer(object_id_t objectId, object_id_t tmtcTcpBridge, size_t receptionBufferSize, size_t ringBufferSize, std::string customTcpServerPort, ReceptionModes receptionMode): - SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), receptionMode(receptionMode), - tcpConfig(customTcpServerPort), receptionBuffer(receptionBufferSize), - ringBuffer(ringBufferSize, true) { + SystemObject(objectId), tmtcBridgeId(tmtcTcpBridge), receptionMode(receptionMode), + tcpConfig(customTcpServerPort), receptionBuffer(receptionBufferSize), + ringBuffer(ringBufferSize, true) { } ReturnValue_t TcpTmTcServer::initialize() { @@ -51,6 +51,7 @@ ReturnValue_t TcpTmTcServer::initialize() { if(spacePacketParser == nullptr) { return HasReturnvaluesIF::RETURN_FAILED; } + tcpConfig.tcpFlags |= MSG_DONTWAIT; } } @@ -127,7 +128,6 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) { //connSocket = accept(listenerTcpSocket, &clientSockAddr, &connectorSockAddrLen); connSocket = accept(listenerTcpSocket, nullptr, nullptr); - sif::debug << "accepted new conn socket " << connSocket << std::endl; if(connSocket == INVALID_SOCKET) { handleError(Protocol::TCP, ErrorSources::ACCEPT_CALL, 500); closeSocket(connSocket); @@ -136,7 +136,6 @@ ReturnValue_t TcpTmTcServer::performOperation(uint8_t opCode) { handleServerOperation(connSocket); - sif::debug << "Shutting down" << std::endl; // Done, shut down connection and go back to listening for client requests retval = shutdown(connSocket, SHUT_BOTH); if(retval != 0) { @@ -161,81 +160,30 @@ ReturnValue_t TcpTmTcServer::initializeAfterTaskCreation() { } void TcpTmTcServer::handleServerOperation(socket_t& connSocket) { - //int retval = 0; - // using namespace std::chrono_literals; - // - // // Receive until the peer shuts down the connection, use select to do this - // fd_set rfds; - // fd_set efds; - // - // FD_ZERO(&rfds); - // FD_SET(connSocket, &rfds); - // - // FD_ZERO(&efds); - // FD_SET(connSocket, &efds); - // - // timeval tv = {}; - // tv.tv_sec = 0;//tcpConfig.selectTimeoutMs / 1000; - // tv.tv_usec = 0;// DEFAULT_LOOP_DELAY_MS * 1000;//(tcpConfig.selectTimeoutMs % 1000) * 1000; - // - // int nfds = connSocket + 1; - //setsockopt(connSocket, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv); - // do { - // // Read all telecommands sent by the client - // retval = recv( - // connSocket, - // reinterpret_cast(receptionBuffer.data()), - // receptionBuffer.capacity(), - // tcpFlags - // ); - // if (retval > 0) { - // handleTcReception(retval); - // } - // else if(retval == 0) { - // // Client has finished sending telecommands, send telemetry now - // handleTmSending(connSocket); - // } - // else { - // // Should not happen - // tcpip::handleError(tcpip::Protocol::TCP, tcpip::ErrorSources::RECV_CALL); - // } - // } while(retval > 0); - // data available - // int test = recv( - // connSocket, - // reinterpret_cast(receptionBuffer.data()), - // receptionBuffer.capacity(), - // tcpConfig.tcpFlags - // ); - // sif::debug << "Received " << test << " bytes" << std::endl; while (true) { int retval = recv( connSocket, reinterpret_cast(receptionBuffer.data()), receptionBuffer.capacity(), - MSG_DONTWAIT//tcpConfig.tcpFlags + tcpConfig.tcpFlags ); - if(retval == 0) { // Client closed connection return; } else if(retval > 0) { - sif::debug << "Received " << retval << " bytes" << std::endl; + // The ring buffer was configured for overwrite, so the returnvalue does not need to + // be checked for now ringBuffer.writeData(receptionBuffer.data(), retval); - sif::debug << "select retval: " << retval << std::endl; } else if(retval < 0) { if(errno == EAGAIN) { - // no data available. Check whether any packets have been read, then send back - // telemetry now + // No data available. Check whether any packets have been read, then send back + // telemetry if available bool tcAvailable = false; bool tmSent = false; size_t availableReadData = ringBuffer.getAvailableReadData(); - //sif::debug << "ring buffer data: " << availableReadData << std::endl; - //sif::debug << "last buf size: " << lastRingBufferSize << std::endl; if(availableReadData > lastRingBufferSize) { - sif::debug << "ring buffer size changed" << std::endl; tcAvailable = true; handleRingBufferData(availableReadData); } @@ -247,26 +195,10 @@ void TcpTmTcServer::handleServerOperation(socket_t& connSocket) { TaskFactory::delayTask(DEFAULT_LOOP_DELAY_MS); } } - if(errno == ETIMEDOUT) { - - retval = 0; + else { + tcpip::handleError(tcpip::Protocol::TCP, tcpip::ErrorSources::RECV_CALL); } } - // data available - // int retval = recv( - // connSocket, - // reinterpret_cast(receptionBuffer.data()), - // receptionBuffer.capacity(), - // tcpConfig.tcpFlags - // ); - //sif::debug << "recv retval: " << retval << std::endl; - // data available - // int test = recv( - // connSocket, - // reinterpret_cast(receptionBuffer.data()), - // receptionBuffer.capacity(), - // tcpConfig.tcpFlags - // ); } } @@ -328,31 +260,28 @@ ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket, bool& tmSent) while((not tmtcBridge->tmFifo->empty()) and (tmtcBridge->packetSentCounter < tmtcBridge->sentPacketsPerCycle)) { // Send can fail, so only peek from the FIFO - sif::debug << "sending TM" << std::endl; tmtcBridge->tmFifo->peek(&storeId); // Using the store accessor will take care of deleting TM from the store automatically ConstStorageAccessor storeAccessor(storeId); ReturnValue_t result = tmStore->getData(storeId, storeAccessor); if(result != HasReturnvaluesIF::RETURN_OK) { - sif::debug << "oh no" << std::endl; return result; } int retval = send(connSocket, reinterpret_cast(storeAccessor.data()), storeAccessor.size(), tcpConfig.tcpTmFlags); - if(retval != static_cast(storeAccessor.size())) { - // Assume that the client has closed the connection here for now - sif::debug << "conn broken?" << std::endl; - handleSocketError(storeAccessor); - return CONN_BROKEN; - } - else { + if(retval == static_cast(storeAccessor.size())) { // Packet sent, clear FIFO entry tmtcBridge->tmFifo->pop(); - sif::debug << "fifo size: " << tmtcBridge->tmFifo->size() << std::endl; tmSent = true; + + } + else if(retval <= 0) { + // Assume that the client has closed the connection here for now + handleSocketError(storeAccessor); + return CONN_BROKEN; } } return HasReturnvaluesIF::RETURN_OK; @@ -361,8 +290,9 @@ ReturnValue_t TcpTmTcServer::handleTmSending(socket_t connSocket, bool& tmSent) ReturnValue_t TcpTmTcServer::handleRingBufferData(size_t availableReadData) { ReturnValue_t status = HasReturnvaluesIF::RETURN_OK; ReturnValue_t result = HasReturnvaluesIF::RETURN_OK; + size_t readAmount = availableReadData; lastRingBufferSize = availableReadData; - if(availableReadData == ringBuffer.getMaxSize()) { + if(readAmount >= ringBuffer.getMaxSize()) { #if FSFW_VERBOSE_LEVEL >= 1 #if FSFW_CPP_OSTREAM_ENABLED == 1 // Possible configuration error, too much data or/and data coming in too fast, @@ -375,32 +305,42 @@ ReturnValue_t TcpTmTcServer::handleRingBufferData(size_t availableReadData) { #endif #endif } - ringBuffer.readData(receptionBuffer.data(), availableReadData, true); - result = spacePacketParser->parsePusPackets(receptionBuffer.data(), - receptionBuffer.size()); + if(readAmount >= receptionBuffer.size()) { +#if FSFW_VERBOSE_LEVEL >= 1 +#if FSFW_CPP_OSTREAM_ENABLED == 1 + // Possible configuration error, too much data or/and data coming in too fast, + // requiring larger buffers + sif::warning << "TcpTmTcServer::handleServerOperation: " + "Reception buffer too small " << std::endl; +#else + sif::printWarning("TcpTmTcServer::handleServerOperation: Reception buffer too small\n"); +#endif +#endif + readAmount = receptionBuffer.size(); + } + ringBuffer.readData(receptionBuffer.data(), readAmount, true); + uint32_t readPackets = 0; + result = spacePacketParser->parseSpacePackets(receptionBuffer.data(), readAmount, readPackets); if(result == SpacePacketParser::NO_PACKET_FOUND) { ringBuffer.deleteData(availableReadData); lastRingBufferSize = ringBuffer.getAvailableReadData(); - sif::debug << lastRingBufferSize << std::endl; } else if(result == HasReturnvaluesIF::RETURN_OK) { // Space Packets were found. Handle them here - auto fifo = spacePacketParser->fifo(); + auto& fifo = spacePacketParser->fifo(); SpacePacketParser::IndexSizePair idxSizePair; while(not fifo.empty()) { fifo.retrieve(&idxSizePair); - sif::debug << "handle tc" << std::endl; result = handleTcReception(receptionBuffer.data() + idxSizePair.first, idxSizePair.second); + std::memset(receptionBuffer.data() + idxSizePair.first, 0, idxSizePair.second); ringBuffer.deleteData(idxSizePair.second); if(result != HasReturnvaluesIF::RETURN_OK) { status = result; } lastRingBufferSize = ringBuffer.getAvailableReadData(); - sif::debug << lastRingBufferSize << std::endl; } } - std::memset(receptionBuffer.data(), 0, receptionBuffer.size()); return status; } diff --git a/src/fsfw/tcdistribution/PUSDistributor.cpp b/src/fsfw/tcdistribution/PUSDistributor.cpp index eec02429..1a5f713d 100644 --- a/src/fsfw/tcdistribution/PUSDistributor.cpp +++ b/src/fsfw/tcdistribution/PUSDistributor.cpp @@ -29,12 +29,31 @@ PUSDistributor::TcMqMapIter PUSDistributor::selectDestination() { tcStatus = checker.checkPacket(currentPacket); if(tcStatus != HasReturnvaluesIF::RETURN_OK) { #if FSFW_VERBOSE_LEVEL >= 1 + std::string keyword; + if(tcStatus == TcPacketCheck::INCORRECT_CHECKSUM) { + keyword = "checksum"; + } + else if(tcStatus == TcPacketCheck::INCORRECT_PRIMARY_HEADER) { + keyword = "incorrect primary header"; + } + else if(tcStatus == TcPacketCheck::ILLEGAL_APID) { + keyword = "illegal APID"; + } + else if(tcStatus == TcPacketCheck::INCORRECT_SECONDARY_HEADER) { + keyword = "incorrect secondary header"; + } + else if(tcStatus == TcPacketCheck::INCOMPLETE_PACKET) { + keyword = "incomplete packet"; + } + else { + keyword = "unnamed error"; + } #if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::debug << "PUSDistributor::handlePacket: Packet format invalid, code " << - static_cast(tcStatus) << std::endl; + sif::warning << "PUSDistributor::handlePacket: Packet format invalid, " + << keyword << " error" << std::endl; #else - sif::printDebug("PUSDistributor::handlePacket: Packet format invalid, code %d\n", - static_cast(tcStatus)); + sif::printWarning("PUSDistributor::handlePacket: Packet format invalid, " + "%s error\n", keyword); #endif #endif } diff --git a/src/fsfw/tmtcservices/SpacePacketParser.cpp b/src/fsfw/tmtcservices/SpacePacketParser.cpp index 7e510900..e48fe525 100644 --- a/src/fsfw/tmtcservices/SpacePacketParser.cpp +++ b/src/fsfw/tmtcservices/SpacePacketParser.cpp @@ -5,22 +5,22 @@ SpacePacketParser::SpacePacketParser(uint16_t maxExpectedPusPackets, bool storeS indexSizePairFIFO(maxExpectedPusPackets) { } -ReturnValue_t SpacePacketParser::parsePusPackets(const uint8_t *frame, - size_t frameSize) { +ReturnValue_t SpacePacketParser::parseSpacePackets(const uint8_t *frame, + size_t frameSize, uint32_t& foundPackets) { if(frame == nullptr or frameSize < 5) { #if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << "PusParser::parsePusPackets: Frame invalid!" << std::endl; + sif::error << "PusParser::parsePusPackets: Frame invalid" << std::endl; #else - sif::printError("PusParser::parsePusPackets: Frame invalid!\n"); + sif::printError("PusParser::parsePusPackets: Frame invalid\n"); #endif return HasReturnvaluesIF::RETURN_FAILED; } if(indexSizePairFIFO.full()) { #if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << "PusParser::parsePusPackets: FIFO is full!" << std::endl; + sif::error << "PusParser::parsePusPackets: FIFO is full" << std::endl; #else - sif::printError("PusParser::parsePusPackets: FIFO is full!\n"); + sif::printError("PusParser::parsePusPackets: FIFO is full\n"); #endif return HasReturnvaluesIF::RETURN_FAILED; } @@ -31,12 +31,12 @@ ReturnValue_t SpacePacketParser::parsePusPackets(const uint8_t *frame, return NO_PACKET_FOUND; } + // Size of a space packet is the value in the packet length field plus 7 size_t packetSize = lengthField + 7; - // sif::debug << frameSize << std::endl; - // Size of a pus packet is the value in the packet length field plus 7. if(packetSize > frameSize) { if(storeSplitPackets) { indexSizePairFIFO.insert(IndexSizePair(0, frameSize)); + foundPackets = 1; } else { #if FSFW_VERBOSE_LEVEL >= 1 @@ -57,19 +57,20 @@ ReturnValue_t SpacePacketParser::parsePusPackets(const uint8_t *frame, } else { indexSizePairFIFO.insert(IndexSizePair(0, packetSize)); - if(packetSize == frameSize) { + if(packetSize >= frameSize) { + foundPackets = 1; return HasReturnvaluesIF::RETURN_OK; } } // packet size is smaller than frame size, parse for more packets. - return readMultiplePackets(frame, frameSize, packetSize); + return readMultiplePackets(frame, frameSize, packetSize, foundPackets); } ReturnValue_t SpacePacketParser::readMultiplePackets(const uint8_t *frame, - size_t frameSize, size_t startIndex) { + size_t frameSize, size_t startIndex, uint32_t& foundPackets) { while (startIndex < frameSize) { - ReturnValue_t result = readNextPacket(frame, frameSize, startIndex); + ReturnValue_t result = readNextPacket(frame, frameSize, startIndex, foundPackets); if(result != HasReturnvaluesIF::RETURN_OK) { return result; } @@ -88,8 +89,7 @@ SpacePacketParser::IndexSizePair SpacePacketParser::getNextFifoPair() { } ReturnValue_t SpacePacketParser::readNextPacket(const uint8_t *frame, - size_t frameSize, size_t& currentIndex) { - // sif::debug << startIndex << std::endl; + size_t frameSize, size_t& currentIndex, uint32_t& foundPackets) { if(currentIndex + 5 > frameSize) { currentIndex = frameSize; return HasReturnvaluesIF::RETURN_OK; @@ -108,6 +108,7 @@ ReturnValue_t SpacePacketParser::readNextPacket(const uint8_t *frame, { if(storeSplitPackets) { indexSizePairFIFO.insert(IndexSizePair(currentIndex, remainingSize)); + foundPackets += 1; } else { #if FSFW_VERBOSE_LEVEL >= 1 @@ -139,6 +140,7 @@ ReturnValue_t SpacePacketParser::readNextPacket(const uint8_t *frame, "FIFO, it is full!\n"); #endif } + foundPackets += 1; currentIndex += nextPacketSize; return result; diff --git a/src/fsfw/tmtcservices/SpacePacketParser.h b/src/fsfw/tmtcservices/SpacePacketParser.h index d0d7bc4d..5cd093dd 100644 --- a/src/fsfw/tmtcservices/SpacePacketParser.h +++ b/src/fsfw/tmtcservices/SpacePacketParser.h @@ -53,7 +53,7 @@ public: * -@c SPLIT_PACKET if splitting is enabled and a split packet was found * -@c RETURN_OK if a packet was found. The index and sizes are stored in the internal FIFO */ - ReturnValue_t parsePusPackets(const uint8_t* frame, size_t frameSize); + ReturnValue_t parseSpacePackets(const uint8_t* frame, size_t frameSize, uint32_t& foundPackets); /** * Accessor function to get a reference to the internal FIFO which @@ -82,9 +82,9 @@ private: bool storeSplitPackets = false; ReturnValue_t readMultiplePackets(const uint8_t *frame, size_t frameSize, - size_t startIndex); + size_t startIndex, uint32_t& foundPackets); ReturnValue_t readNextPacket(const uint8_t *frame, - size_t frameSize, size_t& startIndex); + size_t frameSize, size_t& startIndex, uint32_t& foundPackets); }; #endif /* FRAMEWORK_TMTCSERVICES_PUSPARSER_H_ */