#include #include #include #include #include #include #include "OBSWConfig.h" object_id_t TmFunnel::downlinkDestination = objects::NO_OBJECT; object_id_t TmFunnel::storageDestination = objects::NO_OBJECT; TmFunnel::TmFunnel(object_id_t objectId, CdsShortTimeStamper& timeReader, uint32_t messageDepth, uint8_t reportReceptionVc) : SystemObject(objectId), timeReader(timeReader), messageDepth(messageDepth), reportReceptionVc(reportReceptionVc) { auto mqArgs = MqArgs(objectId, static_cast(this)); tmQueue = QueueFactory::instance()->createMessageQueue( messageDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs); storageQueue = QueueFactory::instance()->createMessageQueue( messageDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs); } TmFunnel::~TmFunnel() {} MessageQueueId_t TmFunnel::getReportReceptionQueue(uint8_t virtualChannel) { return tmQueue->getId(); } ReturnValue_t TmFunnel::performOperation(uint8_t operationCode) { TmTcMessage currentMessage; ReturnValue_t status = tmQueue->receiveMessage(¤tMessage); while (status == HasReturnvaluesIF::RETURN_OK) { status = handlePacket(¤tMessage); if (status != HasReturnvaluesIF::RETURN_OK) { break; } status = tmQueue->receiveMessage(¤tMessage); } if (status == MessageQueueIF::EMPTY) { return HasReturnvaluesIF::RETURN_OK; } else { return status; } } ReturnValue_t TmFunnel::handlePacket(TmTcMessage* message) { uint8_t* packetData = nullptr; size_t size = 0; ReturnValue_t result = tmStore->modifyData(message->getStorageId(), &packetData, &size); if (result != HasReturnvaluesIF::RETURN_OK) { return result; } PusTmZeroCopyWriter packet(timeReader, packetData, size); result = packet.parseDataWithoutCrcCheck(); if (result != HasReturnvaluesIF::RETURN_OK) { return result; } packet.setSequenceCount(sourceSequenceCount++); sourceSequenceCount = sourceSequenceCount % ccsds::LIMIT_SEQUENCE_COUNT; packet.updateErrorControl(); result = tmQueue->sendToDefault(message); if (result != HasReturnvaluesIF::RETURN_OK) { tmStore->deleteData(message->getStorageId()); sif::error << "TmFunnel::handlePacket: Error sending to downlink " "handler" << std::endl; return result; } if (storageDestination != objects::NO_OBJECT) { result = storageQueue->sendToDefault(message); if (result != HasReturnvaluesIF::RETURN_OK) { tmStore->deleteData(message->getStorageId()); sif::error << "TmFunnel::handlePacket: Error sending to storage " "handler" << std::endl; return result; } } return result; } ReturnValue_t TmFunnel::initialize() { tmStore = ObjectManager::instance()->get(objects::TM_STORE); if (tmStore == nullptr) { sif::error << "TmFunnel::initialize: TM store not set." << std::endl; sif::error << "Make sure the tm store is set up properly" " and implements StorageManagerIF" << std::endl; return ObjectManagerIF::CHILD_INIT_FAILED; } AcceptsTelemetryIF* tmTarget = ObjectManager::instance()->get(downlinkDestination); if (tmTarget == nullptr) { sif::error << "TmFunnel::initialize: Downlink Destination not set." << std::endl; sif::error << "Make sure the downlink destination object is set up " "properly and implements AcceptsTelemetryIF" << std::endl; return ObjectManagerIF::CHILD_INIT_FAILED; } tmQueue->setDefaultDestination(tmTarget->getReportReceptionQueue(reportReceptionVc)); // Storage destination is optional. if (storageDestination == objects::NO_OBJECT) { return SystemObject::initialize(); } AcceptsTelemetryIF* storageTarget = ObjectManager::instance()->get(storageDestination); if (storageTarget != nullptr) { storageQueue->setDefaultDestination(storageTarget->getReportReceptionQueue(0)); } return SystemObject::initialize(); }