#include "LiveTmTask.h" #include #include #include #include #include "mission/sysDefs.h" static constexpr bool DEBUG_TM_QUEUE_SPEED = false; std::atomic_bool signals::CFDP_CHANNEL_THROTTLE_SIGNAL = false; std::atomic_uint32_t signals::CFDP_MSG_COUNTER = 0; LiveTmTask::LiveTmTask(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel, VirtualChannel& channel, const std::atomic_bool& ptmeLocked, uint32_t regularTmQueueDepth, uint32_t cfdpQueueDepth) : SystemObject(objectId), modeHelper(this), pusFunnel(pusFunnel), cfdpFunnel(cfdpFunnel), channel(channel), ptmeLocked(ptmeLocked) { requestQueue = QueueFactory::instance()->createMessageQueue(); cfdpTmQueue = QueueFactory::instance()->createMessageQueue(cfdpQueueDepth); regularTmQueue = QueueFactory::instance()->createMessageQueue(regularTmQueueDepth); } ReturnValue_t LiveTmTask::performOperation(uint8_t opCode) { readCommandQueue(); bool handledTm; ReturnValue_t result; uint32_t consecutiveRegularCounter = 0; uint32_t consecutiveCfdpCounter = 0; bool isCfdp = false; while (true) { isCfdp = false; // TODO: Must read CFDP TM queue and regular TM queue and forward them. Handle regular queue // first. handledTm = false; updateBusyFlag(); if (!channelIsBusy) { result = handleRegularTmQueue(); if (result == MessageQueueIF::EMPTY) { result = handleCfdpTmQueue(); isCfdp = true; } if (result == returnvalue::OK) { handledTm = true; if (DEBUG_TM_QUEUE_SPEED) { if (isCfdp) { consecutiveCfdpCounter++; } else { consecutiveRegularCounter++; } } } else if (result != MessageQueueIF::EMPTY) { const char* contextStr = "Regular TM queue"; if (isCfdp) { contextStr = "CFDP TM queue"; } sif::warning << "LiveTmTask: " << contextStr << " handling failure, returncode 0x" << std::setfill('0') << std::hex << std::setw(4) << result << std::dec << std::endl; } } cfdpBackpressureHandling(); if (!handledTm) { if (tmFunnelCd.hasTimedOut()) { pusFunnel.performOperation(0); cfdpFunnel.performOperation(0); tmFunnelCd.resetTimer(); } // Read command queue during idle times. readCommandQueue(); if (DEBUG_TM_QUEUE_SPEED) { if (consecutiveCfdpCounter > 0) { sif::debug << "Consecutive CFDP TM handled: " << consecutiveCfdpCounter << std::endl; } if (consecutiveRegularCounter > 0) { sif::debug << "Consecutive regular TM handled: " << consecutiveRegularCounter << std::endl; } consecutiveRegularCounter = 0; consecutiveCfdpCounter = 0; } // 40 ms IDLE delay. Might tweak this in the future. TaskFactory::delayTask(40); } } } MessageQueueId_t LiveTmTask::getCommandQueue() const { return requestQueue->getId(); } void LiveTmTask::getMode(Mode_t* mode, Submode_t* submode) { if (mode != nullptr) { *mode = this->mode; } if (submode != nullptr) { *submode = SUBMODE_NONE; } } ReturnValue_t LiveTmTask::checkModeCommand(Mode_t mode, Submode_t submode, uint32_t* msToReachTheMode) { if (mode == MODE_ON or mode == MODE_OFF) { return returnvalue::OK; } return returnvalue::FAILED; } void LiveTmTask::startTransition(Mode_t mode, Submode_t submode) { this->mode = mode; modeHelper.modeChanged(mode, submode); announceMode(false); } void LiveTmTask::announceMode(bool recursive) { triggerEvent(MODE_INFO, mode, SUBMODE_NONE); } object_id_t LiveTmTask::getObjectId() const { return SystemObject::getObjectId(); } const HasHealthIF* LiveTmTask::getOptHealthIF() const { return nullptr; } const HasModesIF& LiveTmTask::getModeIF() const { return *this; } ReturnValue_t LiveTmTask::connectModeTreeParent(HasModeTreeChildrenIF& parent) { return modetree::connectModeTreeParent(parent, *this, nullptr, modeHelper); } void LiveTmTask::readCommandQueue(void) { CommandMessage commandMessage; ReturnValue_t result = returnvalue::FAILED; result = requestQueue->receiveMessage(&commandMessage); if (result == returnvalue::OK) { result = modeHelper.handleModeCommand(&commandMessage); if (result == returnvalue::OK) { return; } CommandMessage reply; reply.setReplyRejected(CommandMessage::UNKNOWN_COMMAND, commandMessage.getCommand()); requestQueue->reply(&reply); return; } } ReturnValue_t LiveTmTask::handleRegularTmQueue() { return handleGenericTmQueue(*regularTmQueue, false); } ReturnValue_t LiveTmTask::handleCfdpTmQueue() { return handleGenericTmQueue(*cfdpTmQueue, true); } ReturnValue_t LiveTmTask::handleGenericTmQueue(MessageQueueIF& queue, bool isCfdp) { TmTcMessage message; ReturnValue_t result = queue.receiveMessage(&message); if (result == MessageQueueIF::EMPTY) { return result; } if (isCfdp and signals::CFDP_MSG_COUNTER > 0) { signals::CFDP_MSG_COUNTER--; } if (DEBUG_CFDP_TO_LIVE_TM_TASK and signals::CFDP_MSG_COUNTER > 0) { sif::debug << "LiveTmTask: CFDP message counter: " << signals::CFDP_MSG_COUNTER << std::endl; } store_address_t storeId = message.getStorageId(); const uint8_t* data = nullptr; size_t size = 0; result = tmStore->getData(storeId, &data, &size); if (result != returnvalue::OK) { sif::warning << "VirtualChannel::performOperation: Failed to read data from TM store" << std::endl; tmStore->deleteData(storeId); return result; } if (!ptmeLocked) { size_t writtenSize = 0; result = channel.write(data, size, writtenSize); if (result == DirectTmSinkIF::PARTIALLY_WRITTEN) { result = channel.handleWriteCompletionSynchronously(writtenSize, 400); if (result != returnvalue::OK) { // TODO: Event? Might lead to dangerous spam though.. sif::warning << "LiveTmTask: Synchronous write of last segment failed with code 0x" << std::setfill('0') << std::setw(4) << std::hex << result << std::dec << std::endl; } } else if (result != returnvalue::OK) { sif::error << "LiveTmTask: Channel write failed with code 0x" << std::setfill('0') << std::hex << std::setw(4) << result << std::dec << std::endl; } } // Try delete in any case, ignore failures (which should not happen), it is more important to // propagate write errors. tmStore->deleteData(storeId); return result; } void LiveTmTask::throttleCfdp() { throttlePeriodOngoing = true; signals::CFDP_CHANNEL_THROTTLE_SIGNAL = true; if (DEBUG_CFDP_TO_LIVE_TM_TASK) { sif::debug << "Throttling CFDP" << std::endl; } } void LiveTmTask::releaseCfdp() { throttlePeriodOngoing = false; signals::CFDP_CHANNEL_THROTTLE_SIGNAL = false; if (DEBUG_CFDP_TO_LIVE_TM_TASK) { sif::debug << "Releasing CFDP" << std::endl; } } void LiveTmTask::updateBusyFlag() { // We cache this as a member, because the busy bit can toggle very quickly.. channelIsBusy = channel.isBusy(); } void LiveTmTask::cfdpBackpressureHandling() { if (channelIsBusy and !throttlePeriodOngoing) { // Throttle CFDP packet creator. It is by far the most relevant data creator, so throttling // it is the easiest way to handle back pressure for now in a sensible way. if (signals::CFDP_MSG_COUNTER >= (config::LIVE_CHANNEL_CFDP_QUEUE_SIZE / 2)) { throttleCfdp(); } } else if (!channelIsBusy and throttlePeriodOngoing) { // Half full/empty flow control: Release the CFDP is the queue is empty enough. if (signals::CFDP_MSG_COUNTER <= (config::LIVE_CHANNEL_CFDP_QUEUE_SIZE / 4)) { releaseCfdp(); } } } ModeTreeChildIF& LiveTmTask::getModeTreeChildIF() { return *this; } ReturnValue_t LiveTmTask::initialize() { modeHelper.initialize(); tmStore = ObjectManager::instance()->get(objects::TM_STORE); if (tmStore == nullptr) { return ObjectManagerIF::CHILD_INIT_FAILED; } return returnvalue::OK; } MessageQueueId_t LiveTmTask::getNormalLiveQueueId() const { return regularTmQueue->getId(); } MessageQueueId_t LiveTmTask::getCfdpLiveQueueId() const { return cfdpTmQueue->getId(); }