From 96865c1dd2990370ebe331d54c790223d9c70d3b Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Thu, 9 Mar 2023 17:44:05 +0100 Subject: [PATCH] continue TM handling refactoring --- bsp_q7s/core/ObjectFactory.cpp | 49 ++++----- linux/ipcore/PapbVcInterface.h | 4 +- linux/ipcore/Ptme.cpp | 2 +- linux/ipcore/Ptme.h | 6 +- .../{VcInterfaceIF.h => VirtualChannelIF.h} | 4 +- mission/tmtc/CMakeLists.txt | 5 + mission/tmtc/CcsdsIpCoreHandler.cpp | 102 +++++++----------- mission/tmtc/CcsdsIpCoreHandler.h | 34 +++--- mission/tmtc/LiveTmTask.cpp | 16 +++ mission/tmtc/LiveTmTask.h | 18 ++++ mission/tmtc/PersistentLogTmStoreTask.cpp | 1 + mission/tmtc/PersistentLogTmStoreTask.h | 26 +++++ mission/tmtc/PersistentSingleTmStoreTask.cpp | 37 +++++++ mission/tmtc/PersistentSingleTmStoreTask.h | 23 ++++ mission/tmtc/PersistentTmStore.cpp | 13 +-- mission/tmtc/PersistentTmStore.h | 8 +- mission/tmtc/PersistentTmStoreWithTmQueue.cpp | 28 +++++ mission/tmtc/PersistentTmStoreWithTmQueue.h | 19 ++++ mission/tmtc/VirtualChannel.cpp | 81 +++----------- mission/tmtc/VirtualChannel.h | 47 +++----- mission/tmtc/VirtualChannelWithQueue.cpp | 48 +++++++++ mission/tmtc/VirtualChannelWithQueue.h | 45 ++++++++ 22 files changed, 396 insertions(+), 220 deletions(-) rename linux/ipcore/{VcInterfaceIF.h => VirtualChannelIF.h} (86%) create mode 100644 mission/tmtc/LiveTmTask.cpp create mode 100644 mission/tmtc/LiveTmTask.h create mode 100644 mission/tmtc/PersistentLogTmStoreTask.cpp create mode 100644 mission/tmtc/PersistentLogTmStoreTask.h create mode 100644 mission/tmtc/PersistentSingleTmStoreTask.cpp create mode 100644 mission/tmtc/PersistentSingleTmStoreTask.h create mode 100644 mission/tmtc/PersistentTmStoreWithTmQueue.cpp create mode 100644 mission/tmtc/PersistentTmStoreWithTmQueue.h create mode 100644 mission/tmtc/VirtualChannelWithQueue.cpp create mode 100644 mission/tmtc/VirtualChannelWithQueue.h diff --git a/bsp_q7s/core/ObjectFactory.cpp b/bsp_q7s/core/ObjectFactory.cpp index ee58b109..62aecde4 100644 --- a/bsp_q7s/core/ObjectFactory.cpp +++ b/bsp_q7s/core/ObjectFactory.cpp @@ -73,6 +73,7 @@ #include #include #include +#include #include @@ -113,9 +114,9 @@ #include "mission/system/objects/AcsBoardAssembly.h" #include "mission/tmtc/CcsdsIpCoreHandler.h" #include "mission/tmtc/TmFunnelHandler.h" -#include "mission/tmtc/VirtualChannel.h" ResetArgs RESET_ARGS_GNSS; +std::atomic_bool LINK_STATE = CcsdsIpCoreHandler::LINK_DOWN; void Factory::setStaticFrameworkObjectIds() { PusServiceBase::PUS_DISTRIBUTOR = objects::PUS_PACKET_DISTRIBUTOR; @@ -740,16 +741,16 @@ ReturnValue_t ObjectFactory::createCcsdsComponents(LinuxLibgpioIF* gpioComIF, gpioCookiePtmeIp->addGpio(gpioIds::VC3_PAPB_EMPTY, gpio); gpioChecker(gpioComIF->addGpios(gpioCookiePtmeIp), "PTME PAPB VCs"); // Creating virtual channel interfaces - VcInterfaceIF* vc0 = + VirtualChannelIF* vc0 = new PapbVcInterface(gpioComIF, gpioIds::VC0_PAPB_BUSY, gpioIds::VC0_PAPB_EMPTY, q7s::UIO_PTME, q7s::uiomapids::PTME_VC0); - VcInterfaceIF* vc1 = + VirtualChannelIF* vc1 = new PapbVcInterface(gpioComIF, gpioIds::VC1_PAPB_BUSY, gpioIds::VC1_PAPB_EMPTY, q7s::UIO_PTME, q7s::uiomapids::PTME_VC1); - VcInterfaceIF* vc2 = + VirtualChannelIF* vc2 = new PapbVcInterface(gpioComIF, gpioIds::VC2_PAPB_BUSY, gpioIds::VC2_PAPB_EMPTY, q7s::UIO_PTME, q7s::uiomapids::PTME_VC2); - VcInterfaceIF* vc3 = + VirtualChannelIF* vc3 = new PapbVcInterface(gpioComIF, gpioIds::VC3_PAPB_BUSY, gpioIds::VC3_PAPB_EMPTY, q7s::UIO_PTME, q7s::uiomapids::PTME_VC3); // Creating ptme object and adding virtual channel interfaces @@ -763,26 +764,26 @@ ReturnValue_t ObjectFactory::createCcsdsComponents(LinuxLibgpioIF* gpioComIF, PtmeConfig* ptmeConfig = new PtmeConfig(objects::PTME_CONFIG, axiPtmeConfig); *ipCoreHandler = new CcsdsIpCoreHandler(objects::CCSDS_HANDLER, objects::CCSDS_PACKET_DISTRIBUTOR, - *ptme, *ptmeConfig, gpioComIF, gpioIds::RS485_EN_TX_CLOCK, - gpioIds::RS485_EN_TX_DATA); - VirtualChannel* vc = nullptr; - vc = new VirtualChannel(objects::PTME_VC0_LIVE_TM, ccsds::VC0, "PTME VC0 LIVE TM", - config::VC0_QUEUE_SIZE); - (*ipCoreHandler)->addVirtualChannel(ccsds::VC0, vc); - vc = new VirtualChannel(objects::PTME_VC1_LOG_TM, ccsds::VC1, "PTME VC1 LOG TM", - config::VC1_QUEUE_SIZE); - pusFunnel.addPersistentTmStoreRouting(filters::okFilter(), vc->getReportReceptionQueue()); - pusFunnel.addPersistentTmStoreRouting(filters::notOkFilter(), vc->getReportReceptionQueue()); - pusFunnel.addPersistentTmStoreRouting(filters::miscFilter(), vc->getReportReceptionQueue()); - (*ipCoreHandler)->addVirtualChannel(ccsds::VC1, vc); - vc = new VirtualChannel(objects::PTME_VC2_HK_TM, ccsds::VC2, "PTME VC2 HK TM", - config::VC2_QUEUE_SIZE); - pusFunnel.addPersistentTmStoreRouting(filters::hkFilter(), vc->getReportReceptionQueue()); - (*ipCoreHandler)->addVirtualChannel(ccsds::VC2, vc); - vc = new VirtualChannel(objects::PTME_VC3_CFDP_TM, ccsds::VC3, "PTME VC3 CFDP TM", - config::VC3_QUEUE_SIZE); + *ptmeConfig, LINK_STATE, gpioComIF, + gpioIds::RS485_EN_TX_CLOCK, gpioIds::RS485_EN_TX_DATA); + VirtualChannel* vc = new VirtualChannel(objects::PTME_VC0_LIVE_TM, ccsds::VC0, "PTME VC0 LIVE TM", + *ptme, LINK_STATE); + //(*ipCoreHandler)->addVirtualChannel(ccsds::VC0, vc); + vc = new VirtualChannel(objects::PTME_VC1_LOG_TM, ccsds::VC1, "PTME VC1 LOG TM", *ptme, + LINK_STATE); + + // pusFunnel.addPersistentTmStoreRouting(filters::okFilter(), vc->getReportReceptionQueue()); + // pusFunnel.addPersistentTmStoreRouting(filters::notOkFilter(), vc->getReportReceptionQueue()); + // pusFunnel.addPersistentTmStoreRouting(filters::miscFilter(), vc->getReportReceptionQueue()); + //(*ipCoreHandler)->addVirtualChannel(ccsds::VC1, vc); + vc = new VirtualChannel(objects::PTME_VC2_HK_TM, ccsds::VC2, "PTME VC2 HK TM", *ptme, LINK_STATE); + // auto hkTmStoreTask = new PersistentSingleTmStoreTask(); + // pusFunnel.addPersistentTmStoreRouting(filters::hkFilter(), vc->getReportReceptionQueue()); + //(*ipCoreHandler)->addVirtualChannel(ccsds::VC2, vc); + vc = new VirtualChannel(objects::PTME_VC3_CFDP_TM, ccsds::VC3, "PTME VC3 CFDP TM", *ptme, + LINK_STATE); // TODO: Set VC destination in CFDP funnel. - (*ipCoreHandler)->addVirtualChannel(ccsds::VC3, vc); + //(*ipCoreHandler)->addVirtualChannel(ccsds::VC3, vc); ReturnValue_t result = (*ipCoreHandler)->connectModeTreeParent(satsystem::com::SUBSYSTEM); if (result != returnvalue::OK) { diff --git a/linux/ipcore/PapbVcInterface.h b/linux/ipcore/PapbVcInterface.h index 83081d9d..5fb71340 100644 --- a/linux/ipcore/PapbVcInterface.h +++ b/linux/ipcore/PapbVcInterface.h @@ -3,10 +3,10 @@ #include #include +#include #include "OBSWConfig.h" #include "fsfw/returnvalues/returnvalue.h" -#include "linux/ipcore/VcInterfaceIF.h" /** * @brief This class handles the transmission of data to a virtual channel of the PTME IP Core @@ -14,7 +14,7 @@ * * @author J. Meier */ -class PapbVcInterface : public VcInterfaceIF { +class PapbVcInterface : public VirtualChannelIF { public: /** * @brief Constructor diff --git a/linux/ipcore/Ptme.cpp b/linux/ipcore/Ptme.cpp index 50b1a37c..714c71be 100644 --- a/linux/ipcore/Ptme.cpp +++ b/linux/ipcore/Ptme.cpp @@ -32,7 +32,7 @@ ReturnValue_t Ptme::writeToVc(uint8_t vcId, const uint8_t* data, size_t size) { return result; } -void Ptme::addVcInterface(VcId_t vcId, VcInterfaceIF* vc) { +void Ptme::addVcInterface(VcId_t vcId, VirtualChannelIF* vc) { if (vcId > config::NUMBER_OF_VIRTUAL_CHANNELS) { sif::warning << "Ptme::addVcInterface: Invalid virtual channel ID" << std::endl; return; diff --git a/linux/ipcore/Ptme.h b/linux/ipcore/Ptme.h index d826ac57..aec7bcb4 100644 --- a/linux/ipcore/Ptme.h +++ b/linux/ipcore/Ptme.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -10,7 +11,6 @@ #include "OBSWConfig.h" #include "fsfw/returnvalues/returnvalue.h" #include "linux/ipcore/PtmeIF.h" -#include "linux/ipcore/VcInterfaceIF.h" /** * @brief This class handles the interfacing to the telemetry (PTME) IP core. @@ -40,7 +40,7 @@ class Ptme : public PtmeIF, public SystemObject { * @brief This function adds the reference to a virtual channel interface to the vcInterface * map. */ - void addVcInterface(VcId_t vcId, VcInterfaceIF* vc); + void addVcInterface(VcId_t vcId, VirtualChannelIF* vc); private: static const uint8_t INTERFACE_ID = CLASS_ID::PTME; @@ -73,7 +73,7 @@ class Ptme : public PtmeIF, public SystemObject { uint32_t* ptmeBaseAddress = nullptr; - using VcInterfaceMap = std::unordered_map; + using VcInterfaceMap = std::unordered_map; using VcInterfaceMapIter = VcInterfaceMap::iterator; VcInterfaceMap vcInterfaceMap; diff --git a/linux/ipcore/VcInterfaceIF.h b/linux/ipcore/VirtualChannelIF.h similarity index 86% rename from linux/ipcore/VcInterfaceIF.h rename to linux/ipcore/VirtualChannelIF.h index 3b2b7c6c..266a56c3 100644 --- a/linux/ipcore/VcInterfaceIF.h +++ b/linux/ipcore/VirtualChannelIF.h @@ -13,9 +13,9 @@ * Also implements @DirectTmSinkIF to allow wiriting to the VC directly. * @author J. Meier */ -class VcInterfaceIF : public DirectTmSinkIF { +class VirtualChannelIF : public DirectTmSinkIF { public: - virtual ~VcInterfaceIF(){}; + virtual ~VirtualChannelIF(){}; virtual ReturnValue_t initialize() = 0; }; diff --git a/mission/tmtc/CMakeLists.txt b/mission/tmtc/CMakeLists.txt index 213f4ef0..ab5cf85d 100644 --- a/mission/tmtc/CMakeLists.txt +++ b/mission/tmtc/CMakeLists.txt @@ -1,12 +1,17 @@ target_sources( ${LIB_EIVE_MISSION} PRIVATE CcsdsIpCoreHandler.cpp + VirtualChannelWithQueue.cpp + PersistentTmStoreWithTmQueue.cpp + LiveTmTask.cpp VirtualChannel.cpp TmFunnelHandler.cpp TmFunnelBase.cpp CfdpTmFunnel.cpp tmFilters.cpp PusLiveDemux.cpp + PersistentSingleTmStoreTask.cpp + PersistentLogTmStoreTask.cpp PusPacketFilter.cpp PusTmRouteByFilterHelper.cpp Service15TmStorage.cpp diff --git a/mission/tmtc/CcsdsIpCoreHandler.cpp b/mission/tmtc/CcsdsIpCoreHandler.cpp index a0cb8d28..9a892977 100644 --- a/mission/tmtc/CcsdsIpCoreHandler.cpp +++ b/mission/tmtc/CcsdsIpCoreHandler.cpp @@ -13,10 +13,10 @@ #include "mission/devices/devicedefinitions/SyrlinksDefinitions.h" CcsdsIpCoreHandler::CcsdsIpCoreHandler(object_id_t objectId, object_id_t tcDestination, - PtmeIF& ptme, PtmeConfig& ptmeConfig, GpioIF* gpioIF, - gpioId_t enTxClock, gpioId_t enTxData) + PtmeConfig& ptmeConfig, std::atomic_bool& linkState, + GpioIF* gpioIF, gpioId_t enTxClock, gpioId_t enTxData) : SystemObject(objectId), - ptme(ptme), + linkState(linkState), tcDestination(tcDestination), parameterHelper(this), actionHelper(this, nullptr), @@ -31,22 +31,13 @@ CcsdsIpCoreHandler::CcsdsIpCoreHandler(object_id_t objectId, object_id_t tcDesti QueueFactory::instance()->createMessageQueue(10, EventMessage::EVENT_MESSAGE_SIZE, &mqArgs); } -CcsdsIpCoreHandler::~CcsdsIpCoreHandler() {} +CcsdsIpCoreHandler::~CcsdsIpCoreHandler() = default; ReturnValue_t CcsdsIpCoreHandler::performOperation(uint8_t operationCode) { readCommandQueue(); - // handleTelemetry(); return returnvalue::OK; } -// TODO: TM is sent to the respective VCs directly. -// void CcsdsIpCoreHandler::handleTelemetry() { -// VirtualChannelMapIter iter; -// for (iter = virtualChannelMap.begin(); iter != virtualChannelMap.end(); iter++) { -// iter->second->performOperation(); -// } -//} - ReturnValue_t CcsdsIpCoreHandler::initialize() { ReturnValue_t result = returnvalue::OK; AcceptsTelecommandsIF* tcDistributor = @@ -75,15 +66,6 @@ ReturnValue_t CcsdsIpCoreHandler::initialize() { return result; } - VirtualChannelMapIter iter; - for (iter = virtualChannelMap.begin(); iter != virtualChannelMap.end(); iter++) { - result = iter->second->initialize(); - if (result != returnvalue::OK) { - return result; - } - iter->second->setPtmeObject(&ptme); - } - result = ptmeConfig.initialize(); if (result != returnvalue::OK) { return ObjectManagerIF::CHILD_INIT_FAILED; @@ -92,7 +74,7 @@ ReturnValue_t CcsdsIpCoreHandler::initialize() { #if OBSW_SYRLINKS_SIMULATED == 1 // Update data on rising edge ptmeConfig->invertTxClock(false); - linkState = UP; + linkState = LINK_UP; forwardLinkstate(); #endif /* OBSW_SYRLINKS_SIMULATED == 1*/ @@ -126,41 +108,40 @@ void CcsdsIpCoreHandler::readCommandQueue(void) { MessageQueueId_t CcsdsIpCoreHandler::getCommandQueue() const { return commandQueue->getId(); } -void CcsdsIpCoreHandler::addVirtualChannel(VcId_t vcId, VirtualChannel* virtualChannel) { - if (vcId > config::NUMBER_OF_VIRTUAL_CHANNELS) { - sif::warning << "CcsdsHandler::addVirtualChannel: Invalid virtual channel ID" << std::endl; - return; - } - - if (virtualChannel == nullptr) { - sif::warning << "CcsdsHandler::addVirtualChannel: Invalid virtual channel interface" - << std::endl; - return; - } - - auto status = virtualChannelMap.emplace(vcId, virtualChannel); - if (status.second == false) { - sif::warning << "CcsdsHandler::addVirtualChannel: Failed to add virtual channel to " - "virtual channel map" - << std::endl; - return; - } -} +// void CcsdsIpCoreHandler::addVirtualChannel(VcId_t vcId, VirtualChannelWithQueue* virtualChannel) +// { +// if (vcId > config::NUMBER_OF_VIRTUAL_CHANNELS) { +// sif::warning << "CcsdsHandler::addVirtualChannel: Invalid virtual channel ID" << std::endl; +// return; +// } +// +// if (virtualChannel == nullptr) { +// sif::warning << "CcsdsHandler::addVirtualChannel: Invalid virtual channel interface" +// << std::endl; +// return; +// } +// +// auto status = virtualChannelMap.emplace(vcId, virtualChannel); +// if (status.second == false) { +// sif::warning << "CcsdsHandler::addVirtualChannel: Failed to add virtual channel to " +// "virtual channel map" +// << std::endl; +// return; +// } +// } // MessageQueueId_t CcsdsIpCoreHandler::getReportReceptionQueue(uint8_t virtualChannel) const { -// if (virtualChannel < config::NUMBER_OF_VIRTUAL_CHANNELS) { -// auto iter = virtualChannelMap.find(virtualChannel); -// if (iter != virtualChannelMap.end()) { -// return iter->second->getReportReceptionQueue(); -// } else { -// sif::warning << "CcsdsHandler::getReportReceptionQueue: Virtual channel with ID " -// << static_cast(virtualChannel) << " not in virtual channel map" -// << std::endl; -// return MessageQueueIF::NO_QUEUE; -// } -// } else { +// if (virtualChannel > config::NUMBER_OF_VIRTUAL_CHANNELS) { // sif::debug << "CcsdsHandler::getReportReceptionQueue: Invalid virtual channel requested"; +// return MessageQueueIF::NO_QUEUE; // } +// auto iter = virtualChannelMap.find(virtualChannel); +// if (iter != virtualChannelMap.end()) { +// return iter->second->getReportReceptionQueue(); +// } +// sif::warning << "CcsdsHandler::getReportReceptionQueue: Virtual channel with ID " +// << static_cast(virtualChannel) << " not in virtual channel map" +// << std::endl; // return MessageQueueIF::NO_QUEUE; // } @@ -237,20 +218,14 @@ ReturnValue_t CcsdsIpCoreHandler::executeAction(ActionId_t actionId, MessageQueu return EXECUTION_FINISHED; } -void CcsdsIpCoreHandler::forwardLinkstate() { - VirtualChannelMapIter iter; - for (iter = virtualChannelMap.begin(); iter != virtualChannelMap.end(); iter++) { - iter->second->setLinkState(linkState); - } -} +void CcsdsIpCoreHandler::updateLinkState() { linkState = LINK_UP; } void CcsdsIpCoreHandler::enableTransmit() { #ifndef TE0720_1CFA gpioIF->pullHigh(enTxClock); gpioIF->pullHigh(enTxData); #endif - linkState = UP; - forwardLinkstate(); + linkState = LINK_UP; } void CcsdsIpCoreHandler::getMode(Mode_t* mode, Submode_t* submode) { @@ -317,8 +292,7 @@ void CcsdsIpCoreHandler::disableTransmit() { gpioIF->pullLow(enTxClock); gpioIF->pullLow(enTxData); #endif - linkState = DOWN; - forwardLinkstate(); + linkState = LINK_DOWN; } const char* CcsdsIpCoreHandler::getName() const { return "CCSDS Handler"; } diff --git a/mission/tmtc/CcsdsIpCoreHandler.h b/mission/tmtc/CcsdsIpCoreHandler.h index f82e28fd..4a501ab5 100644 --- a/mission/tmtc/CcsdsIpCoreHandler.h +++ b/mission/tmtc/CcsdsIpCoreHandler.h @@ -2,12 +2,12 @@ #define CCSDSHANDLER_H_ #include +#include #include #include #include "OBSWConfig.h" -#include "VirtualChannel.h" #include "eive/definitions.h" #include "fsfw/action/ActionHelper.h" #include "fsfw/action/HasActionsIF.h" @@ -44,6 +44,8 @@ class CcsdsIpCoreHandler : public SystemObject, public ReceivesParameterMessagesIF, public HasActionsIF { public: + static const bool LINK_UP = true; + static const bool LINK_DOWN = false; using VcId_t = uint8_t; /** @@ -58,14 +60,22 @@ class CcsdsIpCoreHandler : public SystemObject, * @param enTxClock GPIO ID of RS485 tx clock enable * @param enTxData GPIO ID of RS485 tx data enable */ - CcsdsIpCoreHandler(object_id_t objectId, object_id_t tcDestination, PtmeIF& ptme, - PtmeConfig& ptmeConfig, GpioIF* gpioIF, gpioId_t enTxClock, gpioId_t enTxData); + CcsdsIpCoreHandler(object_id_t objectId, object_id_t tcDestination, PtmeConfig& ptmeConfig, + std::atomic_bool& linkState, GpioIF* gpioIF, gpioId_t enTxClock, + gpioId_t enTxData); ~CcsdsIpCoreHandler(); ReturnValue_t performOperation(uint8_t operationCode = 0) override; ReturnValue_t initialize(); MessageQueueId_t getCommandQueue() const override; + /** + * Currently directly forwards requests to the virtual channels which might live + * in different threads. + * @param virtualChannel + * @return + */ + // MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override; // ModesIF void getMode(Mode_t* mode, Submode_t* submode) override; @@ -80,9 +90,8 @@ class CcsdsIpCoreHandler : public SystemObject, * @param virtualChannelId ID of the virtual channel to add * @param virtualChannel Pointer to virtual channel object */ - void addVirtualChannel(VcId_t virtualChannelId, VirtualChannel* virtualChannel); + // void addVirtualChannel(VcId_t virtualChannelId, VirtualChannelWithQueue* virtualChannel); - // MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) const override; ReturnValue_t getParameter(uint8_t domainId, uint8_t uniqueIdentifier, ParameterWrapper* parameterWrapper, const ParameterWrapper* newValues, uint16_t startAtIndex); @@ -126,14 +135,9 @@ class CcsdsIpCoreHandler : public SystemObject, //! [EXPORT] : [COMMENT] Received action message with unknown action id static const ReturnValue_t COMMAND_NOT_IMPLEMENTED = MAKE_RETURN_CODE(0xA0); - static const bool UP = true; - static const bool DOWN = false; - - using VirtualChannelMap = std::unordered_map; - using VirtualChannelMapIter = VirtualChannelMap::iterator; - - PtmeIF& ptme; - VirtualChannelMap virtualChannelMap; + // using VirtualChannelMap = std::unordered_map; + // VirtualChannelMap virtualChannelMap; + std::atomic_bool& linkState; object_id_t tcDestination; @@ -156,15 +160,13 @@ class CcsdsIpCoreHandler : public SystemObject, // GPIO to enable RS485 transceiver for TX data signal gpioId_t enTxData = gpio::NO_GPIO; - bool linkState = DOWN; - void readCommandQueue(void); void handleTelemetry(); /** * @brief Forward link state to virtual channels. */ - void forwardLinkstate(); + void updateLinkState(); /** * @brief Starts transmit timer and enables transmitter. diff --git a/mission/tmtc/LiveTmTask.cpp b/mission/tmtc/LiveTmTask.cpp new file mode 100644 index 00000000..723ed26d --- /dev/null +++ b/mission/tmtc/LiveTmTask.cpp @@ -0,0 +1,16 @@ +#include "LiveTmTask.h" + +#include + +LiveTmTask::LiveTmTask(object_id_t objectId, VirtualChannelWithQueue& channel) + : SystemObject(objectId), channel(channel) {} + +ReturnValue_t LiveTmTask::performOperation(uint8_t opCode) { + while (true) { + ReturnValue_t result = channel.sendNextTm(); + if (result == MessageQueueIF::EMPTY) { + // 5 ms IDLE delay. Might tweak this in the future. + TaskFactory::delayTask(5); + } + } +} diff --git a/mission/tmtc/LiveTmTask.h b/mission/tmtc/LiveTmTask.h new file mode 100644 index 00000000..598b1379 --- /dev/null +++ b/mission/tmtc/LiveTmTask.h @@ -0,0 +1,18 @@ +#ifndef MISSION_TMTC_LIVETMTASK_H_ +#define MISSION_TMTC_LIVETMTASK_H_ + +#include +#include +#include + +class LiveTmTask : public SystemObject, public ExecutableObjectIF { + public: + LiveTmTask(object_id_t objectId, VirtualChannelWithQueue& channel); + + ReturnValue_t performOperation(uint8_t opCode) override; + + private: + VirtualChannelWithQueue& channel; +}; + +#endif /* MISSION_TMTC_LIVETMTASK_H_ */ diff --git a/mission/tmtc/PersistentLogTmStoreTask.cpp b/mission/tmtc/PersistentLogTmStoreTask.cpp new file mode 100644 index 00000000..2102bcb7 --- /dev/null +++ b/mission/tmtc/PersistentLogTmStoreTask.cpp @@ -0,0 +1 @@ +#include "PersistentLogTmStoreTask.h" diff --git a/mission/tmtc/PersistentLogTmStoreTask.h b/mission/tmtc/PersistentLogTmStoreTask.h new file mode 100644 index 00000000..d119fd33 --- /dev/null +++ b/mission/tmtc/PersistentLogTmStoreTask.h @@ -0,0 +1,26 @@ +#ifndef MISSION_TMTC_PERSISTENTLOGTMSTORETASK_H_ +#define MISSION_TMTC_PERSISTENTLOGTMSTORETASK_H_ + +#include +#include +#include +#include +#include +#include +#include + +struct LogStores { + PersistentTmStoreWithTmQueue& okStore; + PersistentTmStoreWithTmQueue& notOkStore; + PersistentTmStoreWithTmQueue& miscStore; +}; + +class PersistentLogTmStoreTask : public SystemObject, public ExecutableObjectIF { + public: + PersistentLogTmStoreTask(object_id_t objectId, StorageManagerIF& ipcStore, LogStores tmStore, + VirtualChannelWithQueue& channel); + + private: +}; + +#endif /* MISSION_TMTC_PERSISTENTLOGTMSTORETASK_H_ */ diff --git a/mission/tmtc/PersistentSingleTmStoreTask.cpp b/mission/tmtc/PersistentSingleTmStoreTask.cpp new file mode 100644 index 00000000..3b7f919c --- /dev/null +++ b/mission/tmtc/PersistentSingleTmStoreTask.cpp @@ -0,0 +1,37 @@ +#include +#include + +PersistentSingleTmStoreTask::PersistentSingleTmStoreTask(object_id_t objectId, + StorageManagerIF& ipcStore, + PersistentTmStoreWithTmQueue& tmStore, + VirtualChannel& channel) + : SystemObject(objectId), ipcStore(ipcStore), storeWithQueue(tmStore), channel(channel) {} + +ReturnValue_t PersistentSingleTmStoreTask::performOperation(uint8_t opCode) { + ReturnValue_t result; + auto& store = storeWithQueue.getTmStore(); + bool noTmToStoreReceived = false; + bool noTcRequestReceived = false; + while (true) { + // Store TM persistently + result = storeWithQueue.handleNextTm(); + if (result == MessageQueueIF::NO_QUEUE) { + noTmToStoreReceived = true; + } + // Handle TC requests, for example deletion or retrieval requests. + result = store.handleCommandQueue(ipcStore); + if (result == MessageQueueIF::NO_QUEUE) { + noTcRequestReceived = true; + } + // Dump TMs when applicable + if (store.getState() == PersistentTmStore::State::DUMPING) { + size_t dumpedLen; + // TODO: Maybe do a bit of a delay every 100-200 packets? + // TODO: handle returnvalue? + store.dumpNextPacket(channel, dumpedLen); + } else if (noTcRequestReceived and noTmToStoreReceived) { + // Nothng to do, so sleep for a bit. + TaskFactory::delayTask(5); + } + } +} diff --git a/mission/tmtc/PersistentSingleTmStoreTask.h b/mission/tmtc/PersistentSingleTmStoreTask.h new file mode 100644 index 00000000..4ec9f03f --- /dev/null +++ b/mission/tmtc/PersistentSingleTmStoreTask.h @@ -0,0 +1,23 @@ +#ifndef MISSION_TMTC_PERSISTENTSINGLETMSTORETASK_H_ +#define MISSION_TMTC_PERSISTENTSINGLETMSTORETASK_H_ + +#include +#include +#include +#include + +class PersistentSingleTmStoreTask : public SystemObject, public ExecutableObjectIF { + public: + PersistentSingleTmStoreTask(object_id_t objectId, StorageManagerIF& ipcStore, + PersistentTmStoreWithTmQueue& storeWithQueue, + VirtualChannel& channel); + + ReturnValue_t performOperation(uint8_t opCode) override; + + private: + StorageManagerIF& ipcStore; + PersistentTmStoreWithTmQueue& storeWithQueue; + VirtualChannel& channel; +}; + +#endif /* MISSION_TMTC_PERSISTENTSINGLETMSTORETASK_H_ */ diff --git a/mission/tmtc/PersistentTmStore.cpp b/mission/tmtc/PersistentTmStore.cpp index 5784fdba..8bcf59bd 100644 --- a/mission/tmtc/PersistentTmStore.cpp +++ b/mission/tmtc/PersistentTmStore.cpp @@ -35,13 +35,9 @@ ReturnValue_t PersistentTmStore::assignAndOrCreateMostRecentFile() { return returnvalue::OK; } -ReturnValue_t PersistentTmStore::handleCommandQueue(StorageManagerIF& ipcStore, - TmFunnelBase& tmFunnel) { +ReturnValue_t PersistentTmStore::handleCommandQueue(StorageManagerIF& ipcStore) { CommandMessage cmdMessage; ReturnValue_t result = tcQueue->receiveMessage(&cmdMessage); - if (result == MessageQueueIF::EMPTY) { - return returnvalue::OK; - } if (result != returnvalue::OK) { return result; } @@ -70,7 +66,10 @@ ReturnValue_t PersistentTmStore::handleCommandQueue(StorageManagerIF& ipcStore, SerializeIF::Endianness::NETWORK); SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data() + 4, &size, SerializeIF::Endianness::NETWORK); - startDumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds); + result = startDumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds); + if (result != returnvalue::OK and result == BUSY_DUMPING) { + triggerEvent(BUSY_DUMPING_EVENT); + } } } return returnvalue::OK; @@ -344,3 +343,5 @@ ReturnValue_t PersistentTmStore::initializeTmStore() { updateBaseDir(); return assignAndOrCreateMostRecentFile(); } + +PersistentTmStore::State PersistentTmStore::getState() const { return state; } diff --git a/mission/tmtc/PersistentTmStore.h b/mission/tmtc/PersistentTmStore.h index 88c79443..795c1fcd 100644 --- a/mission/tmtc/PersistentTmStore.h +++ b/mission/tmtc/PersistentTmStore.h @@ -2,6 +2,7 @@ #define MISSION_TMTC_TMSTOREBACKEND_H_ #include +#include #include #include #include @@ -11,7 +12,6 @@ #include -#include "TmFunnelBase.h" #include "eive/eventSubsystemIds.h" #include "eive/resultClassIds.h" @@ -22,6 +22,7 @@ class PersistentTmStore : public TmStoreFrontendSimpleIF, public SystemObject { enum class State { IDLE, DUMPING }; static constexpr uint8_t INTERFACE_ID = CLASS_ID::PERSISTENT_TM_STORE; static constexpr ReturnValue_t DUMP_DONE = returnvalue::makeCode(INTERFACE_ID, 0); + static constexpr ReturnValue_t BUSY_DUMPING = returnvalue::makeCode(INTERFACE_ID, 1); static constexpr uint8_t SUBSYSTEM_ID = SUBSYSTEM_ID::PERSISTENT_TM_STORE; @@ -33,12 +34,15 @@ class PersistentTmStore : public TmStoreFrontendSimpleIF, public SystemObject { //! [EXPORT] : [COMMENT] File in store too large. P1: Detected file size //! P2: Allowed file size static constexpr Event FILE_TOO_LARGE = event::makeEvent(SUBSYSTEM_ID, 1, severity::LOW); + static constexpr Event BUSY_DUMPING_EVENT = event::makeEvent(SUBSYSTEM_ID, 2, severity::INFO); + PersistentTmStore(object_id_t objectId, const char* baseDir, std::string baseName, RolloverInterval intervalUnit, uint32_t intervalCount, StorageManagerIF& tmStore, SdCardMountedIF& sdcMan); ReturnValue_t initializeTmStore(); - ReturnValue_t handleCommandQueue(StorageManagerIF& ipcStore, TmFunnelBase& tmFunnel); + State getState() const; + ReturnValue_t handleCommandQueue(StorageManagerIF& ipcStore); void deleteUpTo(uint32_t unixSeconds); ReturnValue_t startDumpFrom(uint32_t fromUnixSeconds); diff --git a/mission/tmtc/PersistentTmStoreWithTmQueue.cpp b/mission/tmtc/PersistentTmStoreWithTmQueue.cpp new file mode 100644 index 00000000..b8d40adf --- /dev/null +++ b/mission/tmtc/PersistentTmStoreWithTmQueue.cpp @@ -0,0 +1,28 @@ +#include "PersistentTmStoreWithTmQueue.h" + +#include +#include + +PersistentTmStoreWithTmQueue::PersistentTmStoreWithTmQueue(StorageManagerIF& tmStore, + PersistentTmStore& persistentTmStore, + uint32_t tmQueueDepth) + : tmStore(tmStore), persistentTmStore(persistentTmStore) { + tmQueue = QueueFactory::instance()->createMessageQueue(tmQueueDepth); +} + +ReturnValue_t PersistentTmStoreWithTmQueue::handleNextTm() { + TmTcMessage msg; + ReturnValue_t result = tmQueue->receiveMessage(&msg); + if (result == MessageQueueIF::EMPTY) { + return result; + } + auto accessor = tmStore.getData(msg.getStorageId()); + if (accessor.first != returnvalue::OK) { + return result; + } + PusTmReader reader(accessor.second.data(), accessor.second.size()); + persistentTmStore.storePacket(reader); + return returnvalue::OK; +} + +PersistentTmStore& PersistentTmStoreWithTmQueue::getTmStore() { return persistentTmStore; } diff --git a/mission/tmtc/PersistentTmStoreWithTmQueue.h b/mission/tmtc/PersistentTmStoreWithTmQueue.h new file mode 100644 index 00000000..05cc4d2d --- /dev/null +++ b/mission/tmtc/PersistentTmStoreWithTmQueue.h @@ -0,0 +1,19 @@ +#ifndef MISSION_TMTC_PERSISTENTTMSTOREWITHTMQUEUE_H_ +#define MISSION_TMTC_PERSISTENTTMSTOREWITHTMQUEUE_H_ +#include + +class PersistentTmStoreWithTmQueue : public AcceptsTelemetryIF { + public: + PersistentTmStoreWithTmQueue(StorageManagerIF& tmStore, PersistentTmStore& persistentTmStore, + uint32_t tmQueueDepth); + + ReturnValue_t handleNextTm(); + PersistentTmStore& getTmStore(); + + private: + StorageManagerIF& tmStore; + MessageQueueIF* tmQueue; + PersistentTmStore& persistentTmStore; +}; + +#endif /* MISSION_TMTC_PERSISTENTTMSTOREWITHTMQUEUE_H_ */ diff --git a/mission/tmtc/VirtualChannel.cpp b/mission/tmtc/VirtualChannel.cpp index a6e7732c..fba64049 100644 --- a/mission/tmtc/VirtualChannel.cpp +++ b/mission/tmtc/VirtualChannel.cpp @@ -1,77 +1,26 @@ #include "VirtualChannel.h" -#include "CcsdsIpCoreHandler.h" -#include "OBSWConfig.h" -#include "fsfw/ipc/QueueFactory.h" -#include "fsfw/objectmanager/ObjectManager.h" -#include "fsfw/serviceinterface/ServiceInterfaceStream.h" -#include "fsfw/tmtcservices/TmTcMessage.h" +VirtualChannel::VirtualChannel(object_id_t objectId, uint8_t vcId, const char* vcName, PtmeIF& ptme, + const std::atomic_bool& linkStateProvider) + : SystemObject(objectId), + vcId(vcId), + vcName(vcName), + ptme(ptme), + linkStateProvider(linkStateProvider) {} -VirtualChannel::VirtualChannel(object_id_t objectId, uint8_t vcId, const char* vcName, - uint32_t tmQueueDepth) - : SystemObject(objectId), vcId(vcId), vcName(vcName) { - auto mqArgs = MqArgs(objectId, reinterpret_cast(vcId)); - tmQueue = QueueFactory::instance()->createMessageQueue( - tmQueueDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs); +ReturnValue_t VirtualChannel::initialize() { return returnvalue::OK; } + +ReturnValue_t VirtualChannel::sendNextTm(const uint8_t* data, size_t size) { + return write(data, size); } -ReturnValue_t VirtualChannel::initialize() { - tmStore = ObjectManager::instance()->get(objects::TM_STORE); - if (tmStore == nullptr) { - sif::error << "VirtualChannel::initialize: Failed to get tm store" << std::endl; - return returnvalue::FAILED; +ReturnValue_t VirtualChannel::write(const uint8_t* data, size_t size) { + if (linkStateProvider.load()) { + return ptme.writeToVc(vcId, data, size); } return returnvalue::OK; } -ReturnValue_t VirtualChannel::performOperation(uint8_t opCode) { - ReturnValue_t result = returnvalue::OK; - TmTcMessage message; - // To be able to push high datarates, we use a custom permanent loop. - while (true) { - unsigned int count = 0; - while (tmQueue->receiveMessage(&message) == returnvalue::OK) { - store_address_t storeId = message.getStorageId(); - const uint8_t* data = nullptr; - size_t size = 0; - result = tmStore->getData(storeId, &data, &size); - if (result != returnvalue::OK) { - sif::warning << "VirtualChannel::performOperation: Failed to read data from TM store" - << std::endl; - tmStore->deleteData(storeId); - return result; - } - - if (linkIsUp) { - result = ptme->writeToVc(vcId, data, size); - } - tmStore->deleteData(storeId); - if (result != returnvalue::OK) { - return result; - } - - count++; - if (count == 500) { - sif::error << "VirtualChannel: Possible message storm detected" << std::endl; - break; - } - } - } - return returnvalue::FAILED; -} - -MessageQueueId_t VirtualChannel::getReportReceptionQueue(uint8_t virtualChannel) const { - return tmQueue->getId(); -} - -void VirtualChannel::setPtmeObject(PtmeIF* ptme_) { - if (ptme_ == nullptr) { - sif::warning << "VirtualChannel::setPtmeObject: Invalid ptme object" << std::endl; - return; - } - ptme = ptme_; -} - -void VirtualChannel::setLinkState(bool linkIsUp_) { linkIsUp = linkIsUp_; } +uint8_t VirtualChannel::getVcid() const { return vcId; } const char* VirtualChannel::getName() const { return vcName.c_str(); } diff --git a/mission/tmtc/VirtualChannel.h b/mission/tmtc/VirtualChannel.h index 98caa004..b46c099e 100644 --- a/mission/tmtc/VirtualChannel.h +++ b/mission/tmtc/VirtualChannel.h @@ -1,16 +1,11 @@ -#ifndef VIRTUALCHANNEL_H_ -#define VIRTUALCHANNEL_H_ +#pragma once -#include #include -#include #include +#include -#include "OBSWConfig.h" -#include "fsfw/returnvalues/returnvalue.h" -#include "fsfw/tmtcservices/AcceptsTelemetryIF.h" - -class StorageManagerIF; +#include +#include /** * @brief This class represents a virtual channel. Sending a tm message to an object of this class @@ -18,7 +13,7 @@ class StorageManagerIF; * * @author J. Meier */ -class VirtualChannel : public SystemObject, public ExecutableObjectIF, public AcceptsTelemetryIF { +class VirtualChannel : public SystemObject, public VirtualChannelIF { public: /** * @brief Constructor @@ -26,35 +21,19 @@ class VirtualChannel : public SystemObject, public ExecutableObjectIF, public Ac * @param vcId The virtual channel id assigned to this object * @param tmQueueDepth Queue depth of queue receiving telemetry from other objects */ - VirtualChannel(object_id_t objectId, uint8_t vcId, const char* vcName, uint32_t tmQueueDepth); + VirtualChannel(object_id_t objectId, uint8_t vcId, const char* vcName, PtmeIF& ptme, + const std::atomic_bool& linkStateProvider); ReturnValue_t initialize() override; - MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) const override; - ReturnValue_t performOperation(uint8_t opCode) override; + ReturnValue_t sendNextTm(const uint8_t* data, size_t size); + ReturnValue_t write(const uint8_t* data, size_t size) override; + uint8_t getVcid() const; - /** - * @brief Sets the PTME object which handles access to the PTME IP Core. - * - * @param ptme Pointer to ptme object - */ - void setPtmeObject(PtmeIF* ptme_); - - /** - * @brief Can be used by the owner to set the link state. Packets will be discarded if link - * to ground station is down. - */ - void setLinkState(bool linkIsUp_); - const char* getName() const override; + const char* getName() const; private: - PtmeIF* ptme = nullptr; - MessageQueueIF* tmQueue = nullptr; + PtmeIF& ptme; uint8_t vcId = 0; std::string vcName; - - bool linkIsUp = false; - - StorageManagerIF* tmStore = nullptr; + const std::atomic_bool& linkStateProvider; }; - -#endif /* VIRTUALCHANNEL_H_ */ diff --git a/mission/tmtc/VirtualChannelWithQueue.cpp b/mission/tmtc/VirtualChannelWithQueue.cpp new file mode 100644 index 00000000..62557973 --- /dev/null +++ b/mission/tmtc/VirtualChannelWithQueue.cpp @@ -0,0 +1,48 @@ +#include + +#include "CcsdsIpCoreHandler.h" +#include "OBSWConfig.h" +#include "fsfw/ipc/QueueFactory.h" +#include "fsfw/objectmanager/ObjectManager.h" +#include "fsfw/serviceinterface/ServiceInterfaceStream.h" +#include "fsfw/tmtcservices/TmTcMessage.h" + +VirtualChannelWithQueue::VirtualChannelWithQueue(VirtualChannel& channel, StorageManagerIF& tmStore, + uint32_t tmQueueDepth, + const std::atomic_bool& linkStateProvider) + : channel(channel) { + auto mqArgs = MqArgs(channel.getObjectId(), reinterpret_cast(channel.getVcid())); + tmQueue = QueueFactory::instance()->createMessageQueue( + tmQueueDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs); +} + +ReturnValue_t VirtualChannelWithQueue::sendNextTm() { + TmTcMessage message; + ReturnValue_t result = tmQueue->receiveMessage(&message); + if (result == MessageQueueIF::EMPTY) { + return result; + } + store_address_t storeId = message.getStorageId(); + const uint8_t* data = nullptr; + size_t size = 0; + result = tmStore->getData(storeId, &data, &size); + if (result != returnvalue::OK) { + sif::warning << "VirtualChannel::performOperation: Failed to read data from TM store" + << std::endl; + tmStore->deleteData(storeId); + return result; + } + + channel.write(data, size); + tmStore->deleteData(storeId); + if (result != returnvalue::OK) { + return result; + } + return returnvalue::OK; +} + +MessageQueueId_t VirtualChannelWithQueue::getReportReceptionQueue(uint8_t virtualChannel) const { + return tmQueue->getId(); +} + +VirtualChannel& VirtualChannelWithQueue::vc() { return channel; } diff --git a/mission/tmtc/VirtualChannelWithQueue.h b/mission/tmtc/VirtualChannelWithQueue.h new file mode 100644 index 00000000..f7a2bef9 --- /dev/null +++ b/mission/tmtc/VirtualChannelWithQueue.h @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include + +#include "OBSWConfig.h" +#include "fsfw/returnvalues/returnvalue.h" +#include "fsfw/tmtcservices/AcceptsTelemetryIF.h" + +class StorageManagerIF; + +/** + * @brief This class represents a virtual channel. Sending a tm message to an object of this class + * will forward the tm packet to the respective virtual channel of the PTME IP Core. + * + * @author J. Meier + */ +class VirtualChannelWithQueue : public AcceptsTelemetryIF { + public: + /** + * @brief Constructor + * + * @param vcId The virtual channel id assigned to this object + * @param tmQueueDepth Queue depth of queue receiving telemetry from other objects + */ + VirtualChannelWithQueue(VirtualChannel& channel, StorageManagerIF& tmStore, uint32_t tmQueueDepth, + const std::atomic_bool& linkStateProvider); + + MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) const override; + ReturnValue_t sendNextTm(); + + VirtualChannel& vc(); + + const char* getName() const override; + + private: + VirtualChannel& channel; + MessageQueueIF* tmQueue = nullptr; + StorageManagerIF* tmStore = nullptr; +};