diff --git a/bsp_q7s/core/ObjectFactory.h b/bsp_q7s/core/ObjectFactory.h index 0a43b336..d71ed359 100644 --- a/bsp_q7s/core/ObjectFactory.h +++ b/bsp_q7s/core/ObjectFactory.h @@ -8,7 +8,6 @@ #include #include #include -#include #include diff --git a/mission/core/GenericFactory.cpp b/mission/core/GenericFactory.cpp index 01871ff2..e2e5c0c7 100644 --- a/mission/core/GenericFactory.cpp +++ b/mission/core/GenericFactory.cpp @@ -36,8 +36,8 @@ #include #include #include +#include #include -#include #include "OBSWConfig.h" #include "devices/gpioIds.h" @@ -160,12 +160,12 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun *pusFunnel = new PusTmFunnel(pusFunnelCfg, *timeStamper, sdcMan); #if OBSW_ADD_TCPIP_SERVERS == 1 #if OBSW_ADD_TMTC_UDP_SERVER == 1 - (*cfdpFunnel)->addDestination("UDP Server", *udpBridge, 0); - (*pusFunnel)->addDestination("UDP Server", *udpBridge, 0); + (*cfdpFunnel)->addLiveDestination("UDP Server", *udpBridge, 0); + (*pusFunnel)->addLiveDestination("UDP Server", *udpBridge, 0); #endif #if OBSW_ADD_TMTC_TCP_SERVER == 1 - (*cfdpFunnel)->addDestination("TCP Server", *tcpBridge, 0); - (*pusFunnel)->addDestination("TCP Server", *tcpBridge, 0); + (*cfdpFunnel)->addLiveDestination("TCP Server", *tcpBridge, 0); + (*pusFunnel)->addLiveDestination("TCP Server", *tcpBridge, 0); #endif #endif // Every TM packet goes through this funnel diff --git a/mission/core/GenericFactory.h b/mission/core/GenericFactory.h index 8663d696..5902ff7b 100644 --- a/mission/core/GenericFactory.h +++ b/mission/core/GenericFactory.h @@ -3,7 +3,6 @@ #include #include -#include #include "fsfw/objectmanager/SystemObjectIF.h" #include "fsfw/power/PowerSwitchIF.h" diff --git a/mission/tmtc/CMakeLists.txt b/mission/tmtc/CMakeLists.txt index ef97002c..213f4ef0 100644 --- a/mission/tmtc/CMakeLists.txt +++ b/mission/tmtc/CMakeLists.txt @@ -6,8 +6,9 @@ target_sources( TmFunnelBase.cpp CfdpTmFunnel.cpp tmFilters.cpp + PusLiveDemux.cpp PusPacketFilter.cpp - TmStoreRouter.cpp + PusTmRouteByFilterHelper.cpp Service15TmStorage.cpp PersistentTmStore.cpp PusTmFunnel.cpp) diff --git a/mission/tmtc/CfdpTmFunnel.cpp b/mission/tmtc/CfdpTmFunnel.cpp index 89d7c105..26b308da 100644 --- a/mission/tmtc/CfdpTmFunnel.cpp +++ b/mission/tmtc/CfdpTmFunnel.cpp @@ -75,36 +75,6 @@ ReturnValue_t CfdpTmFunnel::handlePacket(TmTcMessage& msg) { tmStore.deleteData(msg.getStorageId()); msg.setStorageId(newStoreId); store_address_t origStoreId = newStoreId; - for (unsigned int idx = 0; idx < destinations.size(); idx++) { - const auto& dest = destinations[idx]; - if (destinations.size() > 1) { - if (idx < destinations.size() - 1) { - // Create copy of data to ensure each TM recipient has its own copy. That way, we don't need - // to bother with send order and where the data is deleted. - store_address_t storeId; - result = tmStore.addData(&storeId, newPacketData, packetLen); - if (result == returnvalue::OK) { - msg.setStorageId(storeId); - } else { -#if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << "PusTmFunnel::handlePacket: Store too full to create data copy or store " - "error" - << std::endl; -#endif - break; - } - } else { - msg.setStorageId(origStoreId); - } - } - result = tmQueue->sendMessage(dest.queueId, &msg); - if (result != returnvalue::OK) { -#if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << "PusTmFunnel::handlePacket: Error sending TM to downlink handler " << dest.name - << " failed" << std::endl; -#endif - tmStore.deleteData(msg.getStorageId()); - } - } - return result; + // TODO: Also route to persistent TM store + return demultiplexLivePackets(origStoreId, newPacketData, packetLen); } diff --git a/mission/tmtc/PersistentTmStore.cpp b/mission/tmtc/PersistentTmStore.cpp index 694a50b7..0324a649 100644 --- a/mission/tmtc/PersistentTmStore.cpp +++ b/mission/tmtc/PersistentTmStore.cpp @@ -69,14 +69,14 @@ ReturnValue_t PersistentTmStore::handleCommandQueue(StorageManagerIF& ipcStore, SerializeIF::Endianness::NETWORK); SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data() + 4, &size, SerializeIF::Endianness::NETWORK); - dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds, tmFunnel); + dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds); } } return returnvalue::OK; } -void PersistentTmStore::dumpFrom(uint32_t fromUnixSeconds, TmFunnelBase& tmFunnel) { - return dumpFromUpTo(fromUnixSeconds, currentTv.tv_sec, tmFunnel); +void PersistentTmStore::dumpFrom(uint32_t fromUnixSeconds) { + return dumpFromUpTo(fromUnixSeconds, currentTv.tv_sec); } ReturnValue_t PersistentTmStore::storePacket(PusTmReader& reader) { @@ -166,8 +166,7 @@ void PersistentTmStore::deleteUpTo(uint32_t unixSeconds) { } } -void PersistentTmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds, - TmFunnelBase& funnel) { +void PersistentTmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds) { using namespace std::filesystem; for (auto const& file : directory_iterator(basePath)) { if (file.is_directory()) { @@ -180,7 +179,7 @@ void PersistentTmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnix } auto fileEpoch = static_cast(timegm(&fileTime)); if ((fileEpoch > fromUnixSeconds) and (fileEpoch + rolloverDiffSeconds <= upToUnixSeconds)) { - fileToPackets(file, fileEpoch, funnel); + fileToPackets(file, fileEpoch); } } } @@ -195,8 +194,7 @@ ReturnValue_t PersistentTmStore::pathToTm(const std::filesystem::path& path, str return returnvalue::OK; } -void PersistentTmStore::fileToPackets(const std::filesystem::path& path, uint32_t unixStamp, - TmFunnelBase& funnel) { +void PersistentTmStore::fileToPackets(const std::filesystem::path& path, uint32_t unixStamp) { store_address_t storeId; TmTcMessage message; size_t size = std::filesystem::file_size(path); @@ -212,12 +210,13 @@ void PersistentTmStore::fileToPackets(const std::filesystem::path& path, uint32_ // CRC check to fully ensure this is a valid TM ReturnValue_t result = reader.parseDataWithCrcCheck(); if (result == returnvalue::OK) { - result = tmStore.addData(&storeId, fileBuf.data() + currentIdx, reader.getFullPacketLen()); - if (result != returnvalue::OK) { - continue; - } - funnel.sendPacketToDestinations(storeId, message, fileBuf.data() + currentIdx, - reader.getFullPacketLen()); + // TODO: Blow the data out to the VC directly. Use IF function to do this. + // result = tmStore.addData(&storeId, fileBuf.data() + currentIdx, + // reader.getFullPacketLen()); if (result != returnvalue::OK) { + // continue; + // } + // funnel.sendPacketToLiveDestinations(storeId, message, fileBuf.data() + currentIdx, + // reader.getFullPacketLen()); currentIdx += reader.getFullPacketLen(); } else { sif::error << "Parsing of PUS TM failed with code " << result << std::endl; diff --git a/mission/tmtc/PersistentTmStore.h b/mission/tmtc/PersistentTmStore.h index 280fa4ac..3b72cb2d 100644 --- a/mission/tmtc/PersistentTmStore.h +++ b/mission/tmtc/PersistentTmStore.h @@ -32,8 +32,8 @@ class PersistentTmStore : public TmStoreFrontendSimpleIF, public SystemObject { ReturnValue_t handleCommandQueue(StorageManagerIF& ipcStore, TmFunnelBase& tmFunnel); void deleteUpTo(uint32_t unixSeconds); - void dumpFrom(uint32_t fromUnixSeconds, TmFunnelBase& tmFunnel); - void dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds, TmFunnelBase& tmFunnel); + void dumpFrom(uint32_t fromUnixSeconds); + void dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds); // ReturnValue_t passPacket(PusTmReader& reader); ReturnValue_t storePacket(PusTmReader& reader); @@ -69,7 +69,7 @@ class PersistentTmStore : public TmStoreFrontendSimpleIF, public SystemObject { void calcDiffSeconds(RolloverInterval intervalUnit, uint32_t intervalCount); ReturnValue_t createMostRecentFile(std::optional suffix); static ReturnValue_t pathToTm(const std::filesystem::path& path, struct tm& time); - void fileToPackets(const std::filesystem::path& path, uint32_t unixStamp, TmFunnelBase& funnel); + void fileToPackets(const std::filesystem::path& path, uint32_t unixStamp); bool updateBaseDir(); ReturnValue_t assignAndOrCreateMostRecentFile(); }; diff --git a/mission/tmtc/PusLiveDemux.cpp b/mission/tmtc/PusLiveDemux.cpp new file mode 100644 index 00000000..d8ae1126 --- /dev/null +++ b/mission/tmtc/PusLiveDemux.cpp @@ -0,0 +1,48 @@ +#include "PusLiveDemux.h" + +#include +#include + +PusLiveDemux::PusLiveDemux(MessageQueueIF& ownerQueue) : ownerQueue(ownerQueue) {} + +ReturnValue_t PusLiveDemux::demultiplexPackets(StorageManagerIF& tmStore, + store_address_t origStoreId, const uint8_t* tmData, + size_t tmSize) { + ReturnValue_t result = returnvalue::OK; + for (unsigned int idx = 0; idx < destinations.size(); idx++) { + const auto& dest = destinations[idx]; + if (destinations.size() > 1) { + if (idx < destinations.size() - 1) { + // Create copy of data to ensure each TM recipient has its own copy. That way, we don't need + // to bother with send order and where the data is deleted. + store_address_t storeId; + result = tmStore.addData(&storeId, tmData, tmSize); + if (result == returnvalue::OK) { + message.setStorageId(storeId); + } else { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::error << "PusLiveDemux::handlePacket: Store too full to create data copy" + << std::endl; +#endif + } + } else { + message.setStorageId(origStoreId); + } + } + result = ownerQueue.sendMessage(dest.queueId, &message); + if (result != returnvalue::OK) { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::error << "PusLiveDemux::handlePacket: Error sending TM to downlink handler " << dest.name + << std::endl; +#endif + tmStore.deleteData(message.getStorageId()); + } + } + return result; +} + +void PusLiveDemux::addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination, + uint8_t vcid) { + auto queueId = downlinkDestination.getReportReceptionQueue(vcid); + destinations.emplace_back(name, queueId, vcid); +} diff --git a/mission/tmtc/PusLiveDemux.h b/mission/tmtc/PusLiveDemux.h new file mode 100644 index 00000000..1288b173 --- /dev/null +++ b/mission/tmtc/PusLiveDemux.h @@ -0,0 +1,34 @@ +#ifndef MISSION_TMTC_PUSLIVEDEMUX_H_ +#define MISSION_TMTC_PUSLIVEDEMUX_H_ + +#include +#include +#include + +#include + +class PusLiveDemux { + public: + PusLiveDemux(MessageQueueIF& ownerQueue); + ReturnValue_t demultiplexPackets(StorageManagerIF& tmStore, store_address_t origStoreId, + const uint8_t* tmData, size_t tmSize); + + void addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination, + uint8_t vcid = 0); + + private: + struct Destination { + Destination(const char* name, MessageQueueId_t queueId, uint8_t virtualChannel) + : name(name), queueId(queueId), virtualChannel(virtualChannel) {} + + const char* name; + MessageQueueId_t queueId; + uint8_t virtualChannel = 0; + }; + + MessageQueueIF& ownerQueue; + TmTcMessage message; + std::vector destinations; +}; + +#endif /* MISSION_TMTC_PUSLIVEDEMUX_H_ */ diff --git a/mission/tmtc/PusTmFunnel.cpp b/mission/tmtc/PusTmFunnel.cpp index ad763ff5..691ccd46 100644 --- a/mission/tmtc/PusTmFunnel.cpp +++ b/mission/tmtc/PusTmFunnel.cpp @@ -86,18 +86,21 @@ ReturnValue_t PusTmFunnel::handleTmPacket(TmTcMessage &message) { sourceSequenceCount = sourceSequenceCount % ccsds::LIMIT_SEQUENCE_COUNT; packet.updateErrorControl(); - // TODO: 1. Send packet to persistent TM store when applicable. - // 2. Send packet to live TM VC - // 3. Send packet to TCP/IP destination - return sendPacketToDestinations(origStoreId, message, packetData, size); - // timeval currentUptime{}; - // Clock::getUptime(¤tUptime); - // if (currentUptime.tv_sec - lastTvUpdate.tv_sec > - // static_cast(TV_UPDATE_INTERVAL_SECS)) { - // Clock::getClock_timeval(¤tTv); - // lastTvUpdate = currentUptime; - // } + // Send to persistent TM store if the packet matches some filter. + MessageQueueId_t destination; + if (persistentTmMap.packetMatches(packet, destination)) { + store_address_t storageId; + TmTcMessage msg(storageId); + result = tmStore.addData(&storageId, packetData, size); + if (result != returnvalue::OK) { + sif::error << "PusLiveDemux::handlePacket: Store too full to create data copy" << std::endl; + } else { + tmQueue->sendMessage(destination, &msg); + } + } + return demultiplexLivePackets(origStoreId, packetData, size); + // TODO: Will be moved to own thread. // bool sdcUsable = sdcMan.isSdCardUsable(std::nullopt); // initStoresIfPossible(sdcUsable); // if (sdcUsable) { @@ -111,6 +114,7 @@ ReturnValue_t PusTmFunnel::handleTmPacket(TmTcMessage &message) { const char *PusTmFunnel::getName() const { return "PUS TM Funnel"; } void PusTmFunnel::initStoresIfPossible(bool sdCardUsable) { + // TODO: Those will be moved to own thread. if (not storesInitialized and sdCardUsable and sdcMan.getCurrentMountPrefix() != nullptr) { // miscStore.initializeTmStore(); // okStore.initializeTmStore(); @@ -126,5 +130,5 @@ ReturnValue_t PusTmFunnel::initialize() { } void PusTmFunnel::addPersistentTmStoreRouting(PusPacketFilter filter, MessageQueueId_t dest) { - router.addRouting(filter, dest); + persistentTmMap.addRouting(filter, dest); } diff --git a/mission/tmtc/PusTmFunnel.h b/mission/tmtc/PusTmFunnel.h index 774737dc..3c22afa2 100644 --- a/mission/tmtc/PusTmFunnel.h +++ b/mission/tmtc/PusTmFunnel.h @@ -6,8 +6,8 @@ #include #include #include +#include #include -#include #include @@ -39,7 +39,7 @@ class PusTmFunnel : public TmFunnelBase { TimeReaderIF &timeReader; bool storesInitialized = false; SdCardMountedIF &sdcMan; - PersistentTmStoreRouter router; + PusTmRouteByFilterHelper persistentTmMap; ReturnValue_t handleTmPacket(TmTcMessage &message); void initStoresIfPossible(bool sdCardUsable); diff --git a/mission/tmtc/PusTmRouteByFilterHelper.cpp b/mission/tmtc/PusTmRouteByFilterHelper.cpp new file mode 100644 index 00000000..63b95733 --- /dev/null +++ b/mission/tmtc/PusTmRouteByFilterHelper.cpp @@ -0,0 +1,19 @@ +#include "PusTmRouteByFilterHelper.h" + +#include + +PusTmRouteByFilterHelper::PusTmRouteByFilterHelper() = default; + +bool PusTmRouteByFilterHelper::packetMatches(PusTmReader& reader, MessageQueueId_t& destination) { + for (const auto filterAndDest : routerMap) { + if (filterAndDest.first.packetMatches(reader)) { + destination = filterAndDest.second; + return true; + } + } + return false; +} + +void PusTmRouteByFilterHelper::addRouting(PusPacketFilter filter, MessageQueueId_t destination) { + routerMap.emplace_back(std::move(filter), destination); +} diff --git a/mission/tmtc/TmStoreRouter.h b/mission/tmtc/PusTmRouteByFilterHelper.h similarity index 53% rename from mission/tmtc/TmStoreRouter.h rename to mission/tmtc/PusTmRouteByFilterHelper.h index 9daa4889..92bb0f6f 100644 --- a/mission/tmtc/TmStoreRouter.h +++ b/mission/tmtc/PusTmRouteByFilterHelper.h @@ -4,10 +4,21 @@ #include #include -class PersistentTmStoreRouter { +/** + * Simple composition of concrecte @PusPacketFilters which also maps them to + * a destination ID. + */ +class PusTmRouteByFilterHelper { public: - PersistentTmStoreRouter(); + PusTmRouteByFilterHelper(); + /** + * Checks whether the packet matches any of the inserted filters and returns + * the destination if it does. Currently only supports one destination. + * @param reader + * @param destination + * @return + */ bool packetMatches(PusTmReader& reader, MessageQueueId_t& destination); void addRouting(PusPacketFilter filter, MessageQueueId_t destination); diff --git a/mission/tmtc/TmFunnelBase.cpp b/mission/tmtc/TmFunnelBase.cpp index 78a13fab..eb480b03 100644 --- a/mission/tmtc/TmFunnelBase.cpp +++ b/mission/tmtc/TmFunnelBase.cpp @@ -5,8 +5,16 @@ #include "fsfw/ipc/QueueFactory.h" TmFunnelBase::TmFunnelBase(FunnelCfg cfg) - : SystemObject(cfg.objectId), name(cfg.name), tmStore(cfg.tmStore), ipcStore(cfg.ipcStore) { - tmQueue = QueueFactory::instance()->createMessageQueue(cfg.tmMsgDepth); + : SystemObject(cfg.objectId), + name(cfg.name), + tmStore(cfg.tmStore), + ipcStore(cfg.ipcStore), + tmQueue(QueueFactory::instance()->createMessageQueue(cfg.tmMsgDepth)), + liveDemux(*tmQueue) {} + +ReturnValue_t TmFunnelBase::demultiplexLivePackets(store_address_t origStoreId, + const uint8_t *tmData, size_t tmSize) { + return liveDemux.demultiplexPackets(tmStore, origStoreId, tmData, tmSize); } TmFunnelBase::~TmFunnelBase() { QueueFactory::instance()->deleteMessageQueue(tmQueue); } @@ -15,43 +23,7 @@ MessageQueueId_t TmFunnelBase::getReportReceptionQueue(uint8_t virtualChannel) c return tmQueue->getId(); } -void TmFunnelBase::addDestination(const char *name, const AcceptsTelemetryIF &downlinkDestination, - uint8_t vcid) { - auto queueId = downlinkDestination.getReportReceptionQueue(vcid); - destinations.emplace_back(name, queueId, vcid); -} - -ReturnValue_t TmFunnelBase::sendPacketToDestinations(store_address_t origStoreId, - TmTcMessage &message, - const uint8_t *packetData, size_t size) { - ReturnValue_t result = returnvalue::OK; - for (unsigned int idx = 0; idx < destinations.size(); idx++) { - const auto &dest = destinations[idx]; - if (destinations.size() > 1) { - if (idx < destinations.size() - 1) { - // Create copy of data to ensure each TM recipient has its own copy. That way, we don't need - // to bother with send order and where the data is deleted. - store_address_t storeId; - result = tmStore.addData(&storeId, packetData, size); - if (result == returnvalue::OK) { - message.setStorageId(storeId); - } else { -#if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << name << "::handlePacket: Store too full to create data copy" << std::endl; -#endif - } - } else { - message.setStorageId(origStoreId); - } - } - result = tmQueue->sendMessage(dest.queueId, &message); - if (result != returnvalue::OK) { -#if FSFW_CPP_OSTREAM_ENABLED == 1 - sif::error << name << "::handlePacket: Error sending TM to downlink handler " << dest.name - << std::endl; -#endif - tmStore.deleteData(message.getStorageId()); - } - } - return result; +void TmFunnelBase::addLiveDestination(const char *name, + const AcceptsTelemetryIF &downlinkDestination, uint8_t vcid) { + liveDemux.addDestination(name, downlinkDestination, vcid); } diff --git a/mission/tmtc/TmFunnelBase.h b/mission/tmtc/TmFunnelBase.h index ef3c6c6a..51d16626 100644 --- a/mission/tmtc/TmFunnelBase.h +++ b/mission/tmtc/TmFunnelBase.h @@ -6,6 +6,7 @@ #include #include #include +#include #include @@ -26,11 +27,11 @@ class TmFunnelBase : public AcceptsTelemetryIF, public SystemObject { uint32_t tmMsgDepth; }; explicit TmFunnelBase(FunnelCfg cfg); - void addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination, - uint8_t vcid = 0); - ReturnValue_t sendPacketToDestinations(store_address_t origStoreId, TmTcMessage& message, - const uint8_t* packetData, size_t size); [[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override; + void addLiveDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination, + uint8_t vcid = 0); + ReturnValue_t demultiplexLivePackets(store_address_t origStoreId, const uint8_t* tmData, + size_t tmSize); ~TmFunnelBase() override; @@ -38,18 +39,8 @@ class TmFunnelBase : public AcceptsTelemetryIF, public SystemObject { const char* name; StorageManagerIF& tmStore; StorageManagerIF& ipcStore; - - struct Destination { - Destination(const char* name, MessageQueueId_t queueId, uint8_t virtualChannel) - : name(name), queueId(queueId), virtualChannel(virtualChannel) {} - - const char* name; - MessageQueueId_t queueId; - uint8_t virtualChannel = 0; - }; - - std::vector destinations; MessageQueueIF* tmQueue = nullptr; + PusLiveDemux liveDemux; }; #endif /* MISSION_TMTC_TMFUNNELBASE_H_ */ diff --git a/mission/tmtc/TmStoreRouter.cpp b/mission/tmtc/TmStoreRouter.cpp deleted file mode 100644 index d958574e..00000000 --- a/mission/tmtc/TmStoreRouter.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include "TmStoreRouter.h" - -#include - -PersistentTmStoreRouter::PersistentTmStoreRouter() = default; - -bool PersistentTmStoreRouter::packetMatches(PusTmReader& reader, MessageQueueId_t& destination) { - for (const auto filterAndDest : routerMap) { - if (filterAndDest.first.packetMatches(reader)) { - destination = filterAndDest.second; - return true; - } - } - return false; -} - -void PersistentTmStoreRouter::addRouting(PusPacketFilter filter, MessageQueueId_t destination) { - routerMap.emplace_back(std::move(filter), destination); -}