CFDP source handler #776

Merged
muellerr merged 96 commits from cfdp-source-handler into main 2023-10-19 11:11:31 +02:00
12 changed files with 113 additions and 31 deletions
Showing only changes of commit 6771d656bb - Show all commits

View File

@ -163,9 +163,11 @@ void ObjectFactory::produce(void* args) {
&ipCoreHandler);
createCcsdsComponents(ccsdsArgs);
#if OBSW_TM_TO_PTME == 1
if (ccsdsArgs.liveDestination != nullptr) {
pusFunnel->addLiveDestination("VC0 LIVE TM", *ccsdsArgs.liveDestination, 0);
cfdpFunnel->addLiveDestination("VC0 LIVE TM", *ccsdsArgs.liveDestination, 0);
if (ccsdsArgs.normalLiveTmDest != MessageQueueIF::NO_QUEUE) {
pusFunnel->addLiveDestinationByRawId("VC0 NORMAL LIVE TM", ccsdsArgs.normalLiveTmDest, 0);
}
if (ccsdsArgs.cfdpLiveTmDest != MessageQueueIF::NO_QUEUE) {
cfdpFunnel->addLiveDestinationByRawId("VC0 CFDP LIVE TM", ccsdsArgs.cfdpLiveTmDest, 0);
}
#endif
#endif /* OBSW_ADD_CCSDS_IP_CORES == 1 */

View File

@ -777,12 +777,13 @@ ReturnValue_t ObjectFactory::createCcsdsComponents(CcsdsComponentArgs& args) {
new CcsdsIpCoreHandler(objects::CCSDS_HANDLER, objects::CCSDS_PACKET_DISTRIBUTOR, *ptmeConfig,
LINK_STATE, &args.gpioComIF, gpios, PTME_LOCKED);
// This VC will receive all live TM
auto* vcWithQueue =
new VirtualChannelWithQueue(objects::PTME_VC0_LIVE_TM, ccsds::VC0, "PTME VC0 LIVE TM", *ptme,
LINK_STATE, args.tmStore, 500);
args.liveDestination = vcWithQueue;
auto* vcWithQueue = new VirtualChannel(objects::PTME_VC0_LIVE_TM, ccsds::VC0, "PTME VC0 LIVE TM",
*ptme, LINK_STATE);
auto* liveTask = new LiveTmTask(objects::LIVE_TM_TASK, args.pusFunnel, args.cfdpFunnel,
*vcWithQueue, PTME_LOCKED);
*vcWithQueue, PTME_LOCKED, config::LIVE_CHANNEL_NORMAL_QUEUE_SIZE,
config::LIVE_CHANNEL_CFDP_QUEUE_SIZE);
args.normalLiveTmDest = liveTask->getNormalLiveQueueId();
args.cfdpLiveTmDest = liveTask->getCfdpLiveQueueId();
liveTask->connectModeTreeParent(satsystem::com::SUBSYSTEM);
// Set up log store.

View File

@ -46,7 +46,8 @@ struct CcsdsComponentArgs {
PusTmFunnel& pusFunnel;
CfdpTmFunnel& cfdpFunnel;
CcsdsIpCoreHandler** ipCoreHandler;
AcceptsTelemetryIF* liveDestination = nullptr;
MessageQueueId_t normalLiveTmDest = MessageQueueIF::NO_QUEUE;
MessageQueueId_t cfdpLiveTmDest = MessageQueueIF::NO_QUEUE;
};
void setStatics();

View File

@ -58,6 +58,9 @@ static constexpr uint32_t NOK_STORE_QUEUE_SIZE = 350;
static constexpr uint32_t HK_STORE_QUEUE_SIZE = 300;
static constexpr uint32_t CFDP_STORE_QUEUE_SIZE = 300;
static constexpr uint32_t LIVE_CHANNEL_NORMAL_QUEUE_SIZE = 300;
static constexpr uint32_t LIVE_CHANNEL_CFDP_QUEUE_SIZE = 300;
static constexpr uint32_t CFDP_MAX_FSM_CALL_COUNT_SRC_HANDLER = 50;
static constexpr uint32_t CFDP_MAX_FSM_CALL_COUNT_DEST_HANDLER = 300;
static constexpr uint32_t CFDP_SHORT_DELAY_MS = 50;

View File

@ -10,6 +10,7 @@
#include "fsfw/ipc/QueueFactory.h"
#include "fsfw/tasks/TaskFactory.h"
#include "fsfw/tmtcservices/TmTcMessage.h"
#include "mission/sysDefs.h"
using namespace returnvalue;
using namespace cfdp;
@ -68,6 +69,23 @@ ReturnValue_t CfdpHandler::initialize() {
fsmCount++;
}
fsmCount = 0;
if (signals::CFDP_CHANNEL_THROTTLE_SIGNAL) {
throttlePeriodSourceHandler.resetTimer();
throttlePeriodOngoing = true;
signals::CFDP_CHANNEL_THROTTLE_SIGNAL = false;
}
if (throttlePeriodOngoing) {
if (throttlePeriodSourceHandler.hasTimedOut()) {
throttlePeriodOngoing = false;
} else {
shortDelay = true;
}
}
// CFDP can be throttled by the slowest live TM handler to handle back pressure in a sensible
// way without requiring huge amounts of memory for large files.
if (!throttlePeriodOngoing) {
const SourceHandler::FsmResult& srcResult = srcHandler.stateMachine();
while (srcResult.callStatus == CallStatus::CALL_AGAIN) {
// Limit number of messages.
@ -83,6 +101,7 @@ ReturnValue_t CfdpHandler::initialize() {
}
fsmCount++;
}
}
if (shortDelay) {
TaskFactory::delayTask(config::CFDP_SHORT_DELAY_MS);
continue;

View File

@ -4,6 +4,7 @@
#include <etl/queue.h>
#include <fsfw/cfdp/handler/SourceHandler.h>
#include <fsfw/ipc/CommandMessage.h>
#include <fsfw/timemanager/Countdown.h>
#include <utility>
@ -73,6 +74,9 @@ class CfdpHandler : public SystemObject, public ExecutableObjectIF, public Accep
private:
MessageQueueIF& pduQueue;
MessageQueueIF& cfdpRequestQueue;
Countdown throttlePeriodSourceHandler = Countdown(80);
bool throttlePeriodOngoing = false;
cfdp::LocalEntityCfg localCfg;
cfdp::RemoteConfigTableIF& remoteCfgProvider;
cfdp::FsfwParams fsfwParams;

View File

@ -5,8 +5,13 @@
#include <fsfw/tasks/TaskFactory.h>
#include <fsfw/timemanager/Stopwatch.h>
#include "mission/sysDefs.h"
std::atomic_bool signals::CFDP_CHANNEL_THROTTLE_SIGNAL = false;
LiveTmTask::LiveTmTask(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel,
VirtualChannelWithQueue& channel, const std::atomic_bool& ptmeLocked)
VirtualChannel& channel, const std::atomic_bool& ptmeLocked,
uint32_t regularTmQueueDepth, uint32_t cfdpQueueDepth)
: SystemObject(objectId),
modeHelper(this),
pusFunnel(pusFunnel),
@ -14,17 +19,34 @@ LiveTmTask::LiveTmTask(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunne
channel(channel),
ptmeLocked(ptmeLocked) {
requestQueue = QueueFactory::instance()->createMessageQueue();
cfdpTmQueue = QueueFactory::instance()->createMessageQueue(cfdpQueueDepth);
regularTmQueue = QueueFactory::instance()->createMessageQueue(regularTmQueueDepth);
}
ReturnValue_t LiveTmTask::performOperation(uint8_t opCode) {
readCommandQueue();
bool handledTm;
ReturnValue_t result;
while (true) {
// The funnel tasks are scheduled here directly as well.
ReturnValue_t result = channel.handleNextTm(!ptmeLocked);
if (result == DirectTmSinkIF::IS_BUSY) {
sif::error << "Lost live TM, PAPB busy" << std::endl;
}
// TODO: Must read CFDP TM queue and regular TM queue and forward them. Handle regular queue
// first.
handledTm = false;
if (!channel.isBusy()) {
result = handleRegularTmQueue();
if (result == MessageQueueIF::EMPTY) {
result = handleCfdpTmQueue();
}
if (result == returnvalue::OK) {
handledTm = true;
}
}
if (channel.isBusy()) {
// Throttle CFDP packet creator. It is by far the most relevant data creator, so throttling
// it is the easiest way to handle back pressure for now in a sensible way. It is cleared
// by the data creator.
signals::CFDP_CHANNEL_THROTTLE_SIGNAL = true;
}
if (!handledTm) {
if (tmFunnelCd.hasTimedOut()) {
pusFunnel.performOperation(0);
cfdpFunnel.performOperation(0);
@ -94,9 +116,17 @@ void LiveTmTask::readCommandQueue(void) {
}
}
ReturnValue_t LiveTmTask::handleRegularTmQueue() { return returnvalue::OK; }
ReturnValue_t LiveTmTask::handleCfdpTmQueue() { return returnvalue::OK; }
ModeTreeChildIF& LiveTmTask::getModeTreeChildIF() { return *this; }
ReturnValue_t LiveTmTask::initialize() {
modeHelper.initialize();
return returnvalue::OK;
}
MessageQueueId_t LiveTmTask::getNormalLiveQueueId() const { return regularTmQueue->getId(); }
MessageQueueId_t LiveTmTask::getCfdpLiveQueueId() const { return cfdpTmQueue->getId(); }

View File

@ -18,25 +18,33 @@ class LiveTmTask : public SystemObject,
public ModeTreeConnectionIF {
public:
LiveTmTask(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel,
VirtualChannelWithQueue& channel, const std::atomic_bool& ptmeLocked);
VirtualChannel& channel, const std::atomic_bool& ptmeLocked,
uint32_t regularTmQueueDepth, uint32_t cfdpQueueDepth);
MessageQueueId_t getNormalLiveQueueId() const;
MessageQueueId_t getCfdpLiveQueueId() const;
ReturnValue_t performOperation(uint8_t opCode) override;
ReturnValue_t initialize() override;
ReturnValue_t connectModeTreeParent(HasModeTreeChildrenIF& parent) override;
private:
MessageQueueIF* requestQueue;
MessageQueueIF* cfdpTmQueue;
MessageQueueIF* regularTmQueue;
ModeHelper modeHelper;
Mode_t mode = HasModesIF::MODE_OFF;
Countdown tmFunnelCd = Countdown(100);
PusTmFunnel& pusFunnel;
CfdpTmFunnel& cfdpFunnel;
VirtualChannelWithQueue& channel;
VirtualChannel& channel;
uint32_t packetCounter = 0;
const std::atomic_bool& ptmeLocked;
void readCommandQueue(void);
ReturnValue_t handleRegularTmQueue();
ReturnValue_t handleCfdpTmQueue();
MessageQueueId_t getCommandQueue() const override;
void getMode(Mode_t* mode, Submode_t* submode) override;

View File

@ -57,9 +57,7 @@ ReturnValue_t PusLiveDemux::demultiplexPackets(StorageManagerIF& tmStore,
uint32_t PusLiveDemux::addDestination(const char* name,
const AcceptsTelemetryIF& downlinkDestination, uint8_t vcid) {
auto queueId = downlinkDestination.getReportReceptionQueue(vcid);
destinations.emplace_back(name, queueId, vcid);
return destinations.size() - 1;
return addDestinationByRawId(name, downlinkDestination.getReportReceptionQueue(vcid), vcid);
}
void PusLiveDemux::setDestFull(uint32_t listIndex) {
@ -73,3 +71,9 @@ void PusLiveDemux::setDestAvailable(uint32_t listIndex) {
destinations[listIndex].isFull = false;
}
}
uint32_t PusLiveDemux::addDestinationByRawId(const char* name, MessageQueueId_t downlinkDestination,
uint8_t vcid) {
destinations.emplace_back(name, downlinkDestination, vcid);
return destinations.size() - 1;
}

View File

@ -14,6 +14,8 @@ class PusLiveDemux {
ReturnValue_t demultiplexPackets(StorageManagerIF& tmStore, store_address_t origStoreId,
const uint8_t* tmData, size_t tmSize);
uint32_t addDestinationByRawId(const char* name, MessageQueueId_t downlinkDestination,
uint8_t vcid = 0);
uint32_t addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination,
uint8_t vcid = 0);
void setDestFull(uint32_t listIndex);

View File

@ -70,3 +70,9 @@ ReturnValue_t TmFunnelBase::saveSequenceCountToFile() {
ofile << sourceSequenceCount << "\n";
return returnvalue::OK;
}
uint32_t TmFunnelBase::addLiveDestinationByRawId(const char *name,
MessageQueueId_t downlinkDestination,
uint8_t vcid) {
return liveDemux.addDestinationByRawId(name, downlinkDestination, vcid);
}

View File

@ -37,6 +37,8 @@ class TmFunnelBase : public AcceptsTelemetryIF, public SystemObject {
};
explicit TmFunnelBase(FunnelCfg cfg);
[[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override;
virtual uint32_t addLiveDestinationByRawId(const char* name, MessageQueueId_t downlinkDestination,
uint8_t vcid = 0);
virtual uint32_t addLiveDestination(const char* name,
const AcceptsTelemetryIF& downlinkDestination,
uint8_t vcid = 0);