From 9f600a24ff53cd37f2c8436a981531a70f162678 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Fri, 13 Oct 2023 10:57:58 +0200 Subject: [PATCH] refactored throttle handling --- mission/cfdp/CfdpHandler.cpp | 19 ++++++------------- mission/cfdp/CfdpHandler.h | 6 ++++-- mission/com/LiveTmTask.cpp | 24 +++++++++++++++++++----- mission/com/LiveTmTask.h | 6 ++++++ mission/com/VirtualChannel.cpp | 1 + mission/genericFactory.cpp | 2 +- 6 files changed, 37 insertions(+), 21 deletions(-) diff --git a/mission/cfdp/CfdpHandler.cpp b/mission/cfdp/CfdpHandler.cpp index 07fbecb1..99afb725 100644 --- a/mission/cfdp/CfdpHandler.cpp +++ b/mission/cfdp/CfdpHandler.cpp @@ -15,7 +15,8 @@ using namespace returnvalue; using namespace cfdp; -CfdpHandler::CfdpHandler(const FsfwHandlerParams& fsfwHandlerParams, const CfdpHandlerCfg& cfdpCfg) +CfdpHandler::CfdpHandler(const FsfwHandlerParams& fsfwHandlerParams, const CfdpHandlerCfg& cfdpCfg, + const std::atomic_bool& throttleSignal) : SystemObject(fsfwHandlerParams.objectId), pduQueue(fsfwHandlerParams.tmtcQueue), cfdpRequestQueue(fsfwHandlerParams.cfdpQueue), @@ -28,7 +29,8 @@ CfdpHandler::CfdpHandler(const FsfwHandlerParams& fsfwHandlerParams, const CfdpH this->fsfwParams), srcHandler(SourceHandlerParams(localCfg, cfdpCfg.userHandler, seqCntProvider), this->fsfwParams), - ipcStore(fsfwHandlerParams.ipcStore) {} + ipcStore(fsfwHandlerParams.ipcStore), + throttleSignal(throttleSignal) {} [[nodiscard]] const char* CfdpHandler::getName() const { return "CFDP Handler"; } @@ -69,20 +71,11 @@ ReturnValue_t CfdpHandler::initialize() { fsmCount++; } fsmCount = 0; - if (signals::CFDP_CHANNEL_THROTTLE_SIGNAL) { - throttlePeriodSourceHandler.resetTimer(); + + if (throttleSignal) { throttlePeriodOngoing = true; - signals::CFDP_CHANNEL_THROTTLE_SIGNAL = false; } - if (throttlePeriodOngoing) { - if (throttlePeriodSourceHandler.hasTimedOut()) { - throttlePeriodOngoing = false; - - } else { - shortDelay = true; - } - } // CFDP can be throttled by the slowest live TM handler to handle back pressure in a sensible // way without requiring huge amounts of memory for large files. if (!throttlePeriodOngoing) { diff --git a/mission/cfdp/CfdpHandler.h b/mission/cfdp/CfdpHandler.h index 46631ac5..82f27b0b 100644 --- a/mission/cfdp/CfdpHandler.h +++ b/mission/cfdp/CfdpHandler.h @@ -62,7 +62,8 @@ struct CfdpHandlerCfg { class CfdpHandler : public SystemObject, public ExecutableObjectIF, public AcceptsTelecommandsIF { public: - explicit CfdpHandler(const FsfwHandlerParams& fsfwParams, const CfdpHandlerCfg& cfdpCfg); + explicit CfdpHandler(const FsfwHandlerParams& fsfwParams, const CfdpHandlerCfg& cfdpCfg, + const std::atomic_bool& throttleSignal); [[nodiscard]] const char* getName() const override; [[nodiscard]] uint32_t getIdentifier() const override; @@ -74,7 +75,6 @@ class CfdpHandler : public SystemObject, public ExecutableObjectIF, public Accep private: MessageQueueIF& pduQueue; MessageQueueIF& cfdpRequestQueue; - Countdown throttlePeriodSourceHandler = Countdown(80); bool throttlePeriodOngoing = false; cfdp::LocalEntityCfg localCfg; @@ -89,6 +89,8 @@ class CfdpHandler : public SystemObject, public ExecutableObjectIF, public Accep StorageManagerIF* tcStore = nullptr; StorageManagerIF* tmStore = nullptr; + const std::atomic_bool& throttleSignal; + ReturnValue_t handlePduPacketMessages(); ReturnValue_t handlePduPacket(TmTcMessage& msg); ReturnValue_t handleCfdpRequest(CommandMessage& msg); diff --git a/mission/com/LiveTmTask.cpp b/mission/com/LiveTmTask.cpp index 833f28c1..f9b01c77 100644 --- a/mission/com/LiveTmTask.cpp +++ b/mission/com/LiveTmTask.cpp @@ -53,11 +53,14 @@ ReturnValue_t LiveTmTask::performOperation(uint8_t opCode) { } } } - if (channel.isBusy() and not signals::CFDP_CHANNEL_THROTTLE_SIGNAL) { + if (channel.isBusy() and !throttlePeriodOngoing) { // Throttle CFDP packet creator. It is by far the most relevant data creator, so throttling - // it is the easiest way to handle back pressure for now in a sensible way. It is cleared - // by the data creator. - signals::CFDP_CHANNEL_THROTTLE_SIGNAL = true; + // it is the easiest way to handle back pressure for now in a sensible way. + throttleCfdp(); + } else if(!channel.isBusy() and throttlePeriodOngoing) { + if(minimumPeriodThrottleCd.hasTimedOut()) { + releaseCfdp(); + } } if (!handledTm) { if (tmFunnelCd.hasTimedOut()) { @@ -164,7 +167,7 @@ ReturnValue_t LiveTmTask::handleGenericTmQueue(MessageQueueIF& queue) { result = channel.write(data, size, partiallyWrittenSize); if (result == DirectTmSinkIF::PARTIALLY_WRITTEN) { // Already throttle CFDP. - signals::CFDP_CHANNEL_THROTTLE_SIGNAL = true; + throttleCfdp(); result = channel.handleLastWriteSynchronously(data, size, partiallyWrittenSize, 200); if (result != returnvalue::OK) { // TODO: Event? Might lead to dangerous spam though.. @@ -180,6 +183,17 @@ ReturnValue_t LiveTmTask::handleGenericTmQueue(MessageQueueIF& queue) { return result; } +void LiveTmTask::throttleCfdp() { + throttlePeriodOngoing = true; + minimumPeriodThrottleCd.resetTimer(); + signals::CFDP_CHANNEL_THROTTLE_SIGNAL = true; +} + +void LiveTmTask::releaseCfdp() { + throttlePeriodOngoing = false; + signals::CFDP_CHANNEL_THROTTLE_SIGNAL = false; +} + ModeTreeChildIF& LiveTmTask::getModeTreeChildIF() { return *this; } ReturnValue_t LiveTmTask::initialize() { diff --git a/mission/com/LiveTmTask.h b/mission/com/LiveTmTask.h index 63831158..73d2bf16 100644 --- a/mission/com/LiveTmTask.h +++ b/mission/com/LiveTmTask.h @@ -39,6 +39,10 @@ class LiveTmTask : public SystemObject, CfdpTmFunnel& cfdpFunnel; VirtualChannel& channel; const std::atomic_bool& ptmeLocked; + // This countdown ensures that the CFDP is always throttled with a minimum period. Only after + // this period, the CFDP can be released if the channel is not busy. + Countdown minimumPeriodThrottleCd = Countdown(40); + bool throttlePeriodOngoing = false; void readCommandQueue(void); @@ -56,6 +60,8 @@ class LiveTmTask : public SystemObject, void startTransition(Mode_t mode, Submode_t submode) override; void announceMode(bool recursive) override; + void throttleCfdp(); + void releaseCfdp(); object_id_t getObjectId() const override; const HasHealthIF* getOptHealthIF() const override; diff --git a/mission/com/VirtualChannel.cpp b/mission/com/VirtualChannel.cpp index 140b23f6..7ada00aa 100644 --- a/mission/com/VirtualChannel.cpp +++ b/mission/com/VirtualChannel.cpp @@ -58,6 +58,7 @@ ReturnValue_t VirtualChannel::handleLastWriteSynchronously(const uint8_t* data, delayMs += 10; continue; } + sif::debug << "last write after" << delayMs << std::endl; return finishWrite(data, start, remLen); } return returnvalue::FAILED; diff --git a/mission/genericFactory.cpp b/mission/genericFactory.cpp index 0dd965eb..d66ef5f7 100644 --- a/mission/genericFactory.cpp +++ b/mission/genericFactory.cpp @@ -292,7 +292,7 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun GROUND_REMOTE_CFG.maxFileSegmentLen = config::CFDP_MAX_FILE_SEGMENT_LEN; CfdpHandlerCfg cfdpCfg(localId, indicationCfg, *eiveUserHandler, EIVE_FAULT_HANDLER, PACKET_LIST, LOST_SEGMENTS, REMOTE_CFG_PROVIDER); - auto* cfdpHandler = new CfdpHandler(params, cfdpCfg); + auto* cfdpHandler = new CfdpHandler(params, cfdpCfg, signals::CFDP_CHANNEL_THROTTLE_SIGNAL); // All CFDP packets arrive wrapped inside CCSDS space packets CcsdsDistributorIF::DestInfo info("CFDP Destination", config::EIVE_CFDP_APID, cfdpHandler->getRequestQueue(), true);