#include "CfdpTmFunnel.h" #include "fsfw/ipc/QueueFactory.h" #include "fsfw/tmtcpacket/ccsds/SpacePacketCreator.h" #include "fsfw/tmtcservices/TmTcMessage.h" CfdpTmFunnel::CfdpTmFunnel(TmFunnelBase::FunnelCfg cfg, MessageQueueId_t fileStoreDest, StorageManagerIF& ramToFileStore, uint16_t cfdpInCcsdsApid) : TmFunnelBase(cfg), fileStoreDest(fileStoreDest), ramToFileStore(ramToFileStore), cfdpInCcsdsApid(cfdpInCcsdsApid) {} const char* CfdpTmFunnel::getName() const { return "CFDP TM Funnel"; } ReturnValue_t CfdpTmFunnel::performOperation(uint8_t) { TmTcMessage currentMessage; unsigned int count = 0; ReturnValue_t status = tmQueue->receiveMessage(¤tMessage); while (status == returnvalue::OK) { status = handlePacket(currentMessage); if (status != returnvalue::OK) { sif::warning << "CfdpTmFunnel packet handling failed" << std::endl; break; } count++; if (count == 500) { sif::error << "CfdpTmFunnel: Possible message storm detected" << std::endl; break; } status = tmQueue->receiveMessage(¤tMessage); } if (status == MessageQueueIF::EMPTY) { return returnvalue::OK; } return status; } ReturnValue_t CfdpTmFunnel::initialize() { return returnvalue::OK; } ReturnValue_t CfdpTmFunnel::handlePacket(TmTcMessage& msg) { const uint8_t* cfdpPacket = nullptr; size_t cfdpPacketLen = 0; ReturnValue_t result = tmStore.getData(msg.getStorageId(), &cfdpPacket, &cfdpPacketLen); if (result != returnvalue::OK) { return result; } auto spacePacketHeader = SpacePacketCreator(ccsds::PacketType::TM, false, cfdpInCcsdsApid, ccsds::SequenceFlags::UNSEGMENTED, sourceSequenceCount++, 0); sourceSequenceCount = sourceSequenceCount & ccsds::LIMIT_SEQUENCE_COUNT; spacePacketHeader.setCcsdsLenFromTotalDataFieldLen(cfdpPacketLen); uint8_t* newPacketData = nullptr; store_address_t newStoreId{}; result = tmStore.getFreeElement(&newStoreId, spacePacketHeader.getFullPacketLen(), &newPacketData); if (result != returnvalue::OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::warning << "CfdpTmFunnel::handlePacket: Error getting TM store element of size " << spacePacketHeader.getFullPacketLen() << std::endl; #endif return result; } size_t packetLen = 0; uint8_t* serPtr = newPacketData; result = spacePacketHeader.serializeBe(&serPtr, &packetLen, spacePacketHeader.getFullPacketLen()); if (result != returnvalue::OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "CfdpTmFunnel::handlePacket: Error serializing packet" << std::endl; #endif return result; } std::memcpy(serPtr, cfdpPacket, cfdpPacketLen); packetLen += cfdpPacketLen; // Delete old packet tmStore.deleteData(msg.getStorageId()); msg.setStorageId(newStoreId); store_address_t origStoreId = newStoreId; store_address_t storageId; result = ramToFileStore.addData(&storageId, newPacketData, packetLen); TmTcMessage fileMsg(storageId); if (result != returnvalue::OK) { sif::error << "PusLiveDemux::handlePacket: Store too full to create data copy" << std::endl; } else { tmQueue->sendMessage(fileStoreDest, &fileMsg); } return demultiplexLivePackets(origStoreId, newPacketData, packetLen); }