#include "CfdpTmFunnel.h" #include "fsfw/ipc/QueueFactory.h" #include "fsfw/tmtcpacket/ccsds/SpacePacketCreator.h" #include "fsfw/tmtcservices/TmTcMessage.h" CfdpTmFunnel::CfdpTmFunnel(TmFunnelBase::FunnelCfg cfg, std::optional fileStoreDest, StorageManagerIF& ramToFileStore, uint16_t cfdpInCcsdsApid) : TmFunnelBase(cfg), fileStoreDest(fileStoreDest), ramToFileStore(ramToFileStore), cfdpInCcsdsApid(cfdpInCcsdsApid) {} const char* CfdpTmFunnel::getName() const { return "CFDP TM Funnel"; } ReturnValue_t CfdpTmFunnel::performOperation(uint8_t) { TmTcMessage currentMessage; ReturnValue_t status; unsigned int count = 0; if (saveSequenceCount) { status = saveSequenceCountToFile(); if (status != returnvalue::OK) { sif::error << "CfdpTmFunnel: Storing sequence count to file has failed" << std::endl; } saveSequenceCount = false; } status = tmQueue->receiveMessage(¤tMessage); uint32_t handledPackets = 0; while (status == returnvalue::OK) { handledPackets++; status = handlePacket(currentMessage); if (status != returnvalue::OK) { sif::warning << "CfdpTmFunnel packet handling failed" << std::endl; break; } count++; if (count == 500) { sif::error << "CfdpTmFunnel: Possible message storm detected" << std::endl; break; } status = tmQueue->receiveMessage(¤tMessage); } if (handledPackets > 0) { // Very useful for profiling and debugging //sif::debug << "CfdpFunnel: Handled " << handledPackets << " packets in one cycle" << std::endl; } if (status == MessageQueueIF::EMPTY) { return returnvalue::OK; } return status; } ReturnValue_t CfdpTmFunnel::initialize() { return returnvalue::OK; } ReturnValue_t CfdpTmFunnel::handlePacket(TmTcMessage& msg) { const uint8_t* cfdpPacket = nullptr; size_t cfdpPacketLen = 0; ReturnValue_t result = tmStore.getData(msg.getStorageId(), &cfdpPacket, &cfdpPacketLen); if (result != returnvalue::OK) { return result; } auto spacePacketHeader = SpacePacketCreator(ccsds::PacketType::TM, false, cfdpInCcsdsApid, ccsds::SequenceFlags::UNSEGMENTED, sourceSequenceCount++, 0); sourceSequenceCount = sourceSequenceCount & ccsds::LIMIT_SEQUENCE_COUNT; spacePacketHeader.setCcsdsLenFromTotalDataFieldLen(cfdpPacketLen); uint8_t* newPacketData = nullptr; store_address_t newStoreId{}; result = tmStore.getFreeElement(&newStoreId, spacePacketHeader.getFullPacketLen(), &newPacketData); if (result != returnvalue::OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::warning << "CfdpTmFunnel::handlePacket: Error getting TM store element of size " << spacePacketHeader.getFullPacketLen() << std::endl; #endif return result; } size_t packetLen = 0; uint8_t* serPtr = newPacketData; result = spacePacketHeader.serializeBe(&serPtr, &packetLen, spacePacketHeader.getFullPacketLen()); if (result != returnvalue::OK) { #if FSFW_CPP_OSTREAM_ENABLED == 1 sif::error << "CfdpTmFunnel::handlePacket: Error serializing packet" << std::endl; #endif return result; } std::memcpy(serPtr, cfdpPacket, cfdpPacketLen); packetLen += cfdpPacketLen; // Delete old packet tmStore.deleteData(msg.getStorageId()); msg.setStorageId(newStoreId); store_address_t origStoreId = newStoreId; if(fileStoreDest.has_value()) { store_address_t storageId; result = ramToFileStore.addData(&storageId, newPacketData, packetLen); TmTcMessage fileMsg(storageId); if (result == returnvalue::OK) { tmQueue->sendMessage(fileStoreDest.value(), &fileMsg); } else if(result == StorageManagerIF::DATA_STORAGE_FULL) { sif::error << "CfdpTmFunnel::handlePacket: RAM to File Store too full to create data copy" << std::endl; } } return demultiplexLivePackets(origStoreId, newPacketData, packetLen); }