#include "CfdpTmFunnel.h" #include "fsfw/ipc/QueueFactory.h" #include "fsfw/tmtcpacket/ccsds/SpacePacketCreator.h" #include "fsfw/tmtcservices/TmTcMessage.h" CfdpTmFunnel::CfdpTmFunnel(object_id_t objectId, uint16_t cfdpInCcsdsApid, StorageManagerIF& tmStore, uint32_t messageDepth) : TmFunnelBase(objectId, tmStore, messageDepth), cfdpInCcsdsApid(cfdpInCcsdsApid) {} const char* CfdpTmFunnel::getName() const { return "CFDP TM Funnel"; } ReturnValue_t CfdpTmFunnel::performOperation(uint8_t) { TmTcMessage currentMessage; unsigned int count = 0; ReturnValue_t status = tmQueue->receiveMessage(¤tMessage); while (status == returnvalue::OK) { status = handlePacket(currentMessage); if (status != returnvalue::OK) { sif::warning << "CfdpTmFunnel packet handling failed" << std::endl; break; } count++; if (count == 500) { sif::error << "CfdpTmFunnel: Possible message storm detected" << std::endl; break; } status = tmQueue->receiveMessage(¤tMessage); } if (status == MessageQueueIF::EMPTY) { return returnvalue::OK; } return status; } ReturnValue_t CfdpTmFunnel::initialize() { return returnvalue::OK; } ReturnValue_t CfdpTmFunnel::handlePacket(TmTcMessage& msg) { const uint8_t* cfdpPacket = nullptr; size_t cfdpPacketLen = 0; ReturnValue_t result = tmStore.getData(msg.getStorageId(), &cfdpPacket, &cfdpPacketLen); if (result != returnvalue::OK) { return result; } auto spacePacketHeader = SpacePacketCreator(ccsds::PacketType::TM, false, cfdpInCcsdsApid, ccsds::SequenceFlags::UNSEGMENTED, sourceSequenceCount++, 0); sourceSequenceCount = sourceSequenceCount & ccsds::LIMIT_SEQUENCE_COUNT; spacePacketHeader.setCcsdsLenFromTotalDataFieldLen(cfdpPacketLen); uint8_t* newPacketData = nullptr; store_address_t newStoreId{}; result = tmStore.getFreeElement(&newStoreId, spacePacketHeader.getFullPacketLen(), &newPacketData); if (result != returnvalue::OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::warning << "CfdpTmFunnel::handlePacket: Error getting TM store element of size " << spacePacketHeader.getFullPacketLen() << std::endl; #endif return result; } size_t packetLen = 0; uint8_t* serPtr = newPacketData; result = spacePacketHeader.serializeBe(&serPtr, &packetLen, spacePacketHeader.getFullPacketLen()); if (result != returnvalue::OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "CfdpTmFunnel::handlePacket: Error serializing packet" << std::endl; #endif return result; } std::memcpy(serPtr, cfdpPacket, cfdpPacketLen); packetLen += cfdpPacketLen; // Delete old packet tmStore.deleteData(msg.getStorageId()); msg.setStorageId(newStoreId); store_address_t origStoreId = newStoreId; for (unsigned int idx = 0; idx < destinations.size(); idx++) { const auto& dest = destinations[idx]; if (destinations.size() > 1) { if (idx < destinations.size() - 1) { // Create copy of data to ensure each TM recipient has its own copy. That way, we don't need // to bother with send order and where the data is deleted. store_address_t storeId; result = tmStore.addData(&storeId, newPacketData, packetLen); if (result == returnvalue::OK) { msg.setStorageId(storeId); } else { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "PusTmFunnel::handlePacket: Store too full to create data copy or store " "error" << std::endl; #endif break; } } else { msg.setStorageId(origStoreId); } } result = tmQueue->sendMessage(dest.queueId, &msg); if (result != returnvalue::OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "PusTmFunnel::handlePacket: Error sending TM to downlink handler " << dest.name << " failed" << std::endl; #endif tmStore.deleteData(msg.getStorageId()); } } return result; }