diff --git a/bsp_hosted/ObjectFactory.cpp b/bsp_hosted/ObjectFactory.cpp index c5af74e4..29b1dda8 100644 --- a/bsp_hosted/ObjectFactory.cpp +++ b/bsp_hosted/ObjectFactory.cpp @@ -54,10 +54,6 @@ void Factory::setStaticFrameworkObjectIds() { CommandingServiceBase::defaultPacketSource = objects::PUS_PACKET_DISTRIBUTOR; CommandingServiceBase::defaultPacketDestination = objects::TM_FUNNEL; - TmFunnel::downlinkDestination = objects::TMTC_BRIDGE; - // No storage object for now. - TmFunnel::storageDestination = objects::NO_OBJECT; - VerificationReporter::DEFAULT_RECEIVER = objects::PUS_SERVICE_1_VERIFICATION; } diff --git a/bsp_hosted/fsfwconfig/objects/systemObjectList.h b/bsp_hosted/fsfwconfig/objects/systemObjectList.h index 8b264882..703cd8b8 100644 --- a/bsp_hosted/fsfwconfig/objects/systemObjectList.h +++ b/bsp_hosted/fsfwconfig/objects/systemObjectList.h @@ -16,8 +16,6 @@ enum sourceObjects : uint32_t { PUS_SERVICE_23 = 0x51002300, PUS_SERVICE_201 = 0x51020100, - TM_FUNNEL = 0x52000002, - /* Test Task */ TEST_TASK = 0x42694269, diff --git a/bsp_q7s/core/ObjectFactory.cpp b/bsp_q7s/core/ObjectFactory.cpp index 24a57f1e..f43277ac 100644 --- a/bsp_q7s/core/ObjectFactory.cpp +++ b/bsp_q7s/core/ObjectFactory.cpp @@ -99,10 +99,10 @@ ResetArgs RESET_ARGS_GNSS; void Factory::setStaticFrameworkObjectIds() { PusServiceBase::PUS_DISTRIBUTOR = objects::PUS_PACKET_DISTRIBUTOR; - PusServiceBase::PACKET_DESTINATION = objects::TM_FUNNEL; + PusServiceBase::PACKET_DESTINATION = objects::PUS_TM_FUNNEL; CommandingServiceBase::defaultPacketSource = objects::PUS_PACKET_DISTRIBUTOR; - CommandingServiceBase::defaultPacketDestination = objects::TM_FUNNEL; + CommandingServiceBase::defaultPacketDestination = objects::PUS_TM_FUNNEL; #if OBSW_Q7S_EM == 1 DeviceHandlerBase::powerSwitcherId = objects::NO_OBJECT; @@ -110,14 +110,6 @@ void Factory::setStaticFrameworkObjectIds() { DeviceHandlerBase::powerSwitcherId = objects::PCDU_HANDLER; #endif /* OBSW_Q7S_EM == 1 */ -#if OBSW_TM_TO_PTME == 1 - TmFunnel::downlinkDestination = objects::CCSDS_HANDLER; -#else - TmFunnel::downlinkDestination = objects::TMTC_BRIDGE; -#endif /* OBSW_TM_TO_PTME == 1 */ - // No storage object for now. - TmFunnel::storageDestination = objects::NO_OBJECT; - LocalDataPoolManager::defaultHkDestination = objects::PUS_SERVICE_3_HOUSEKEEPING; VerificationReporter::DEFAULT_RECEIVER = objects::PUS_SERVICE_1_VERIFICATION; diff --git a/bsp_q7s/em/emObjectFactory.cpp b/bsp_q7s/em/emObjectFactory.cpp index d80d0af7..4202cfdd 100644 --- a/bsp_q7s/em/emObjectFactory.cpp +++ b/bsp_q7s/em/emObjectFactory.cpp @@ -73,7 +73,8 @@ void ObjectFactory::produce(void* args) { createTestComponents(gpioComIF); #endif /* OBSW_ADD_TEST_CODE == 1 */ #if OBSW_ADD_SCEX_DEVICE == 1 - createScexComponents(q7s::UART_SCEX_DEV, pwrSwitcher, *SdCardManager::instance(), true, std::nullopt); + createScexComponents(q7s::UART_SCEX_DEV, pwrSwitcher, *SdCardManager::instance(), true, + std::nullopt); #endif createMiscComponents(); diff --git a/common/config/eive/objects.h b/common/config/eive/objects.h index bda3e987..4aa03c7a 100644 --- a/common/config/eive/objects.h +++ b/common/config/eive/objects.h @@ -133,8 +133,11 @@ enum commonObjects : uint32_t { PL_SUBSYSTEM = 0x73010002, PLOC_SUBSYSTEM = 0x73010003, EIVE_SYSTEM = 0x73010000, - CFDP_HANDLER = 0x7302000, - CFDP_DISTRIBUTOR = 0x73020001 + CFDP_HANDLER = 0x73000005, + CFDP_DISTRIBUTOR = 0x73000006, + TM_FUNNEL = 0x73000100, + PUS_TM_FUNNEL = 0x73000101, + CFDP_TM_FUNNEL = 0x73000102, }; } diff --git a/dummies/SaDeploymentDummy.cpp b/dummies/SaDeploymentDummy.cpp index 8ea80545..28588ea7 100644 --- a/dummies/SaDeploymentDummy.cpp +++ b/dummies/SaDeploymentDummy.cpp @@ -1,10 +1,7 @@ #include "SaDeploymentDummy.h" -SaDeplDummy::SaDeplDummy(object_id_t objectId): SystemObject(objectId) { -} +SaDeplDummy::SaDeplDummy(object_id_t objectId) : SystemObject(objectId) {} SaDeplDummy::~SaDeplDummy() = default; -ReturnValue_t SaDeplDummy::performOperation(uint8_t opCode) { - return returnvalue::OK; -} +ReturnValue_t SaDeplDummy::performOperation(uint8_t opCode) { return returnvalue::OK; } diff --git a/dummies/SaDeploymentDummy.h b/dummies/SaDeploymentDummy.h index a0b5ccdf..a9b72391 100644 --- a/dummies/SaDeploymentDummy.h +++ b/dummies/SaDeploymentDummy.h @@ -2,12 +2,12 @@ #ifndef DUMMIES_SADEPLOYMENT_H_ #define DUMMIES_SADEPLOYMENT_H_ -#include "SaDeploymentDummy.h" #include +#include "SaDeploymentDummy.h" + class SaDeplDummy : public SystemObject, public ExecutableObjectIF { public: - SaDeplDummy(object_id_t objectId); virtual ~SaDeplDummy(); diff --git a/fsfw b/fsfw index ec7566fb..11422a65 160000 --- a/fsfw +++ b/fsfw @@ -1 +1 @@ -Subproject commit ec7566fb8c38ebe232fbbb71e2ed1095fcc864e2 +Subproject commit 11422a658cc927abd6dc2eee45e821365f22107b diff --git a/linux/fsfwconfig/objects/systemObjectList.h b/linux/fsfwconfig/objects/systemObjectList.h index 51f6b919..dba8c630 100644 --- a/linux/fsfwconfig/objects/systemObjectList.h +++ b/linux/fsfwconfig/objects/systemObjectList.h @@ -40,7 +40,6 @@ enum sourceObjects : uint32_t { PUS_SERVICE_6 = 0x51000500, CCSDS_IP_CORE_BRIDGE = 0x73500000, - TM_FUNNEL = 0x73000100, /* 0x49 ('I') for Communication Interfaces **/ ARDUINO_COM_IF = 0x49000000, diff --git a/mission/core/GenericFactory.cpp b/mission/core/GenericFactory.cpp index 24870504..e81818e9 100644 --- a/mission/core/GenericFactory.cpp +++ b/mission/core/GenericFactory.cpp @@ -22,6 +22,8 @@ #include #include #include +#include +#include #include #include "OBSWConfig.h" @@ -94,6 +96,26 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) { new PoolManager(objects::IPC_STORE, poolCfg); } +#if OBSW_ADD_TCPIP_BRIDGE == 1 +#if OBSW_USE_TMTC_TCP_BRIDGE == 0 + auto tmtcBridge = new UdpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR); + new UdpTcPollingTask(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE); + sif::info << "Created UDP server for TMTC commanding with listener port " + << udpBridge->getUdpPort() << std::endl; +#else + auto tmtcBridge = new TcpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR); + auto tcpServer = new TcpTmTcServer(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE); + // TCP is stream based. Use packet ID as start marker when parsing for space packets + tcpServer->setSpacePacketParsingOptions({common::PUS_PACKET_ID, common::CFDP_PACKET_ID}); + sif::info << "Created TCP server for TMTC commanding with listener port " + << tcpServer->getTcpPort() << std::endl; +#if OBSW_TCP_SERVER_WIRETAPPING == 1 + tcpServer->enableWiretapping(true); +#endif /* OBSW_TCP_SERVER_WIRETAPPING == 1 */ +#endif /* OBSW_USE_TMTC_TCP_BRIDGE == 0 */ + tmtcBridge->setMaxNumberOfPacketsStored(300); +#endif /* OBSW_ADD_TCPIP_BRIDGE == 1 */ + auto* ccsdsDistrib = new CcsdsDistributor(config::EIVE_PUS_APID, objects::CCSDS_PACKET_DISTRIBUTOR); new PusDistributor(config::EIVE_PUS_APID, objects::PUS_PACKET_DISTRIBUTOR, ccsdsDistrib); @@ -102,12 +124,16 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) { #if OBSW_TM_TO_PTME == 1 vc = config::LIVE_TM; #endif + auto* cfdpFunnel = + new CfdpTmFunnel(objects::CFDP_TM_FUNNEL, config::EIVE_CFDP_APID, *tmtcBridge, *tmStore, vc); + auto* pusFunnel = + new PusTmFunnel(objects::PUS_TM_FUNNEL, *tmtcBridge, *timeStamper, *tmStore, vc); // Every TM packet goes through this funnel - auto* funnel = new TmFunnel(objects::TM_FUNNEL, *timeStamper, 50, vc); + new TmFunnel(objects::TM_FUNNEL, *pusFunnel, *cfdpFunnel); // PUS service stack new Service1TelecommandVerification(objects::PUS_SERVICE_1_VERIFICATION, config::EIVE_PUS_APID, - pus::PUS_SERVICE_1, objects::TM_FUNNEL, 20); + pus::PUS_SERVICE_1, objects::PUS_TM_FUNNEL, 20); new Service2DeviceAccess(objects::PUS_SERVICE_2_DEVICE_ACCESS, config::EIVE_PUS_APID, pus::PUS_SERVICE_2, 3, 10); new Service3Housekeeping(objects::PUS_SERVICE_3_HOUSEKEEPING, config::EIVE_PUS_APID, @@ -131,25 +157,6 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) { pus::PUS_SERVICE_200, 8); new CService201HealthCommanding(objects::PUS_SERVICE_201_HEALTH, config::EIVE_PUS_APID, pus::PUS_SERVICE_201); -#if OBSW_ADD_TCPIP_BRIDGE == 1 -#if OBSW_USE_TMTC_TCP_BRIDGE == 0 - auto tmtcBridge = new UdpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR); - new UdpTcPollingTask(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE); - sif::info << "Created UDP server for TMTC commanding with listener port " - << udpBridge->getUdpPort() << std::endl; -#else - auto tmtcBridge = new TcpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR); - auto tcpServer = new TcpTmTcServer(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE); - // TCP is stream based. Use packet ID as start marker when parsing for space packets - tcpServer->setSpacePacketParsingOptions({common::PUS_PACKET_ID, common::CFDP_PACKET_ID}); - sif::info << "Created TCP server for TMTC commanding with listener port " - << tcpServer->getTcpPort() << std::endl; -#if OBSW_TCP_SERVER_WIRETAPPING == 1 - tcpServer->enableWiretapping(true); -#endif /* OBSW_TCP_SERVER_WIRETAPPING == 1 */ -#endif /* OBSW_USE_TMTC_TCP_BRIDGE == 0 */ - tmtcBridge->setMaxNumberOfPacketsStored(300); -#endif /* OBSW_ADD_TCPIP_BRIDGE == 1 */ #if OBSW_ADD_CFDP_COMPONENTS == 1 using namespace cfdp; @@ -158,7 +165,8 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) { new CfdpDistributor(distribCfg); auto* msgQueue = QueueFactory::instance()->createMessageQueue(32); - FsfwHandlerParams params(objects::CFDP_HANDLER, HOST_FS, *funnel, *tcStore, *tmStore, *msgQueue); + FsfwHandlerParams params(objects::CFDP_HANDLER, HOST_FS, *cfdpFunnel, *tcStore, *tmStore, + *msgQueue); cfdp::IndicationCfg indicationCfg; UnsignedByteField apid(config::EIVE_LOCAL_CFDP_ENTITY_ID); cfdp::EntityId localId(apid); diff --git a/mission/tmtc/CCSDSHandler.cpp b/mission/tmtc/CCSDSHandler.cpp index b3b03d40..26abec25 100644 --- a/mission/tmtc/CCSDSHandler.cpp +++ b/mission/tmtc/CCSDSHandler.cpp @@ -180,9 +180,9 @@ void CCSDSHandler::addVirtualChannel(VcId_t vcId, VirtualChannel* virtualChannel } } -MessageQueueId_t CCSDSHandler::getReportReceptionQueue(uint8_t virtualChannel) { +MessageQueueId_t CCSDSHandler::getReportReceptionQueue(uint8_t virtualChannel) const { if (virtualChannel < common::NUMBER_OF_VIRTUAL_CHANNELS) { - VirtualChannelMapIter iter = virtualChannelMap.find(virtualChannel); + auto iter = virtualChannelMap.find(virtualChannel); if (iter != virtualChannelMap.end()) { return iter->second->getReportReceptionQueue(); } else { diff --git a/mission/tmtc/CCSDSHandler.h b/mission/tmtc/CCSDSHandler.h index 035f9e31..d76dbdaf 100644 --- a/mission/tmtc/CCSDSHandler.h +++ b/mission/tmtc/CCSDSHandler.h @@ -67,7 +67,7 @@ class CCSDSHandler : public SystemObject, */ void addVirtualChannel(VcId_t virtualChannelId, VirtualChannel* virtualChannel); - MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0); + MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) const override; ReturnValue_t getParameter(uint8_t domainId, uint8_t uniqueIdentifier, ParameterWrapper* parameterWrapper, const ParameterWrapper* newValues, uint16_t startAtIndex); diff --git a/mission/tmtc/CMakeLists.txt b/mission/tmtc/CMakeLists.txt index 0a931d0f..f34f9ccc 100644 --- a/mission/tmtc/CMakeLists.txt +++ b/mission/tmtc/CMakeLists.txt @@ -1,2 +1,3 @@ -target_sources(${LIB_EIVE_MISSION} PRIVATE CCSDSHandler.cpp VirtualChannel.cpp - TmFunnel.cpp) +target_sources( + ${LIB_EIVE_MISSION} PRIVATE CCSDSHandler.cpp VirtualChannel.cpp TmFunnel.cpp + CfdpTmFunnel.cpp PusTmFunnel.cpp) diff --git a/mission/tmtc/CfdpTmFunnel.cpp b/mission/tmtc/CfdpTmFunnel.cpp new file mode 100644 index 00000000..7969961b --- /dev/null +++ b/mission/tmtc/CfdpTmFunnel.cpp @@ -0,0 +1,85 @@ +#include "CfdpTmFunnel.h" + +#include "fsfw/ipc/QueueFactory.h" +#include "fsfw/tmtcpacket/ccsds/SpacePacketCreator.h" +#include "fsfw/tmtcservices/TmTcMessage.h" + +CfdpTmFunnel::CfdpTmFunnel(object_id_t objectId, uint16_t cfdpInCcsdsApid, + const AcceptsTelemetryIF& downlinkDestination, StorageManagerIF& tmStore, + uint8_t vc) + : SystemObject(objectId), cfdpInCcsdsApid(cfdpInCcsdsApid), tmStore(tmStore) { + msgQueue = QueueFactory::instance()->createMessageQueue(5); + msgQueue->setDefaultDestination(downlinkDestination.getReportReceptionQueue(vc)); +} + +const char* CfdpTmFunnel::getName() const { return "CFDP TM Funnel"; } + +MessageQueueId_t CfdpTmFunnel::getReportReceptionQueue(uint8_t virtualChannel) const { + return msgQueue->getId(); +} + +ReturnValue_t CfdpTmFunnel::performOperation(uint8_t) { + TmTcMessage currentMessage; + ReturnValue_t status = msgQueue->receiveMessage(¤tMessage); + while (status == returnvalue::OK) { + status = handlePacket(currentMessage); + if (status != returnvalue::OK) { + sif::warning << "CfdpTmFunnel packet handling failed" << std::endl; + break; + } + status = msgQueue->receiveMessage(¤tMessage); + } + + if (status == MessageQueueIF::EMPTY) { + return returnvalue::OK; + } + return status; +} + +ReturnValue_t CfdpTmFunnel::initialize() { return returnvalue::OK; } + +ReturnValue_t CfdpTmFunnel::handlePacket(TmTcMessage& msg) { + const uint8_t* cfdpPacket = nullptr; + size_t cfdpPacketLen = 0; + ReturnValue_t result = tmStore.getData(msg.getStorageId(), &cfdpPacket, &cfdpPacketLen); + if (result != returnvalue::OK) { + return result; + } + auto spacePacketHeader = + SpacePacketCreator(ccsds::PacketType::TM, false, cfdpInCcsdsApid, + ccsds::SequenceFlags::UNSEGMENTED, sourceSequenceCount++, 0); + sourceSequenceCount = sourceSequenceCount & ccsds::LIMIT_SEQUENCE_COUNT; + spacePacketHeader.setCcsdsLenFromTotalDataFieldLen(cfdpPacketLen); + uint8_t* newPacketData = nullptr; + store_address_t newStoreId{}; + result = + tmStore.getFreeElement(&newStoreId, spacePacketHeader.getFullPacketLen(), &newPacketData); + if (result != returnvalue::OK) { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::warning << "CfdpTmFunnel::handlePacket: Error getting TM store element of size " + << spacePacketHeader.getFullPacketLen() << std::endl; +#endif + return result; + } + size_t serSize = 0; + result = + spacePacketHeader.serializeBe(&newPacketData, &serSize, spacePacketHeader.getFullPacketLen()); + if (result != returnvalue::OK) { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::error << "CfdpTmFunnel::handlePacket: Error serializing packet" << std::endl; +#endif + return result; + } + std::memcpy(newPacketData, cfdpPacket, cfdpPacketLen); + // Delete old packet + tmStore.deleteData(msg.getStorageId()); + msg.setStorageId(newStoreId); + result = msgQueue->sendToDefault(&msg); + if (result != returnvalue::OK) { + tmStore.deleteData(msg.getStorageId()); +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::error << "CfdpTmFunnel::handlePacket: Error sending TM to downlink handler" << std::endl; +#endif + } + return result; +} diff --git a/mission/tmtc/CfdpTmFunnel.h b/mission/tmtc/CfdpTmFunnel.h new file mode 100644 index 00000000..fe2d664d --- /dev/null +++ b/mission/tmtc/CfdpTmFunnel.h @@ -0,0 +1,28 @@ +#ifndef FSFW_EXAMPLE_COMMON_CFDPTMFUNNEL_H +#define FSFW_EXAMPLE_COMMON_CFDPTMFUNNEL_H + +#include "fsfw/objectmanager/SystemObject.h" +#include "fsfw/storagemanager/StorageManagerIF.h" +#include "fsfw/tmtcservices/AcceptsTelemetryIF.h" +#include "fsfw/tmtcservices/TmTcMessage.h" + +class CfdpTmFunnel : public AcceptsTelemetryIF, public SystemObject { + public: + CfdpTmFunnel(object_id_t objectId, uint16_t cfdpInCcsdsApid, + const AcceptsTelemetryIF& downlinkDestination, StorageManagerIF& tmStore, + uint8_t vc); + [[nodiscard]] const char* getName() const override; + [[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override; + + ReturnValue_t performOperation(uint8_t opCode); + ReturnValue_t initialize() override; + + private: + ReturnValue_t handlePacket(TmTcMessage& msg); + + uint16_t sourceSequenceCount = 0; + uint16_t cfdpInCcsdsApid; + MessageQueueIF* msgQueue; + StorageManagerIF& tmStore; +}; +#endif // FSFW_EXAMPLE_COMMON_CFDPTMFUNNEL_H diff --git a/mission/tmtc/PusTmFunnel.cpp b/mission/tmtc/PusTmFunnel.cpp new file mode 100644 index 00000000..f4072f0f --- /dev/null +++ b/mission/tmtc/PusTmFunnel.cpp @@ -0,0 +1,69 @@ +#include "PusTmFunnel.h" + +#include "fsfw/ipc/QueueFactory.h" +#include "fsfw/objectmanager.h" +#include "fsfw/tmtcpacket/pus/tm/PusTmZcWriter.h" + +PusTmFunnel::PusTmFunnel(object_id_t objectId, const AcceptsTelemetryIF &downlinkDestination, + TimeReaderIF &timeReader, StorageManagerIF &tmStore, uint8_t vcId, + uint32_t messageDepth) + : SystemObject(objectId), timeReader(timeReader), tmStore(tmStore) { + tmQueue = QueueFactory::instance()->createMessageQueue(messageDepth, + MessageQueueMessage::MAX_MESSAGE_SIZE); + tmQueue->setDefaultDestination(downlinkDestination.getReportReceptionQueue(vcId)); +} + +PusTmFunnel::~PusTmFunnel() = default; + +MessageQueueId_t PusTmFunnel::getReportReceptionQueue(uint8_t virtualChannel) const { + return tmQueue->getId(); +} + +ReturnValue_t PusTmFunnel::performOperation(uint8_t) { + TmTcMessage currentMessage; + ReturnValue_t status = tmQueue->receiveMessage(¤tMessage); + while (status == returnvalue::OK) { + status = handlePacket(currentMessage); + if (status != returnvalue::OK) { + sif::warning << "TmFunnel packet handling failed" << std::endl; + break; + } + status = tmQueue->receiveMessage(¤tMessage); + } + + if (status == MessageQueueIF::EMPTY) { + return returnvalue::OK; + } + return status; +} + +ReturnValue_t PusTmFunnel::handlePacket(TmTcMessage &message) { + uint8_t *packetData = nullptr; + size_t size = 0; + ReturnValue_t result = tmStore.modifyData(message.getStorageId(), &packetData, &size); + if (result != returnvalue::OK) { + return result; + } + PusTmZeroCopyWriter packet(timeReader, packetData, size); + result = packet.parseDataWithoutCrcCheck(); + if (result != returnvalue::OK) { +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::warning << "PusTmFunnel::handlePacket: Error parsing received PUS packet" << std::endl; +#endif + return result; + } + packet.setSequenceCount(sourceSequenceCount++); + sourceSequenceCount = sourceSequenceCount % ccsds::LIMIT_SEQUENCE_COUNT; + packet.updateErrorControl(); + + result = tmQueue->sendToDefault(&message); + if (result != returnvalue::OK) { + tmStore.deleteData(message.getStorageId()); +#if FSFW_CPP_OSTREAM_ENABLED == 1 + sif::error << "PusTmFunnel::handlePacket: Error sending TM to downlink handler" << std::endl; +#endif + } + return result; +} + +const char *PusTmFunnel::getName() const { return "PUS TM Funnel"; } diff --git a/mission/tmtc/PusTmFunnel.h b/mission/tmtc/PusTmFunnel.h new file mode 100644 index 00000000..ae0390b3 --- /dev/null +++ b/mission/tmtc/PusTmFunnel.h @@ -0,0 +1,42 @@ +#ifndef FSFW_EXAMPLE_COMMON_PUSTMFUNNEL_H +#define FSFW_EXAMPLE_COMMON_PUSTMFUNNEL_H + +#include +#include +#include +#include +#include + +#include "fsfw/timemanager/TimeReaderIF.h" + +/** + * @brief TM Recipient. + * @details + * TODO: Add support for TM storage by using the (or a) LIVE flag provided by the CCSDS or Syrlinks + * handler. If we are in LIVE TM mode, forward TM to downlink destination directly. Otherwise, + * forward to TM storage backend which stores TMs into files. + * Main telemetry receiver. All generated telemetry is funneled into + * this object. + * @ingroup utility + * @author J. Meier, R. Mueller + */ +class PusTmFunnel : public AcceptsTelemetryIF, public SystemObject { + public: + explicit PusTmFunnel(object_id_t objectId, const AcceptsTelemetryIF &downlinkDestination, + TimeReaderIF &timeReader, StorageManagerIF &tmStore, uint8_t vdId, + uint32_t messageDepth = 20); + [[nodiscard]] const char *getName() const override; + ~PusTmFunnel() override; + + [[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override; + ReturnValue_t performOperation(uint8_t operationCode); + + private: + uint16_t sourceSequenceCount = 0; + TimeReaderIF &timeReader; + StorageManagerIF &tmStore; + MessageQueueIF *tmQueue = nullptr; + ReturnValue_t handlePacket(TmTcMessage &message); +}; + +#endif // FSFW_EXAMPLE_COMMON_PUSTMFUNNEL_H diff --git a/mission/tmtc/TmFunnel.cpp b/mission/tmtc/TmFunnel.cpp index 49e9ba6e..01bf0437 100644 --- a/mission/tmtc/TmFunnel.cpp +++ b/mission/tmtc/TmFunnel.cpp @@ -1,125 +1,16 @@ +#include "TmFunnel.h" + #include -#include -#include -#include -#include -#include -#include "OBSWConfig.h" +TmFunnel::TmFunnel(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel) + : SystemObject(objectId), pusFunnel(pusFunnel), cfdpFunnel(cfdpFunnel) {} -object_id_t TmFunnel::downlinkDestination = objects::NO_OBJECT; -object_id_t TmFunnel::storageDestination = objects::NO_OBJECT; - -TmFunnel::TmFunnel(object_id_t objectId, CdsShortTimeStamper& timeReader, uint32_t messageDepth, - uint8_t reportReceptionVc) - : SystemObject(objectId), - timeReader(timeReader), - messageDepth(messageDepth), - reportReceptionVc(reportReceptionVc) { - auto mqArgs = MqArgs(objectId, static_cast(this)); - tmQueue = QueueFactory::instance()->createMessageQueue( - messageDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs); - storageQueue = QueueFactory::instance()->createMessageQueue( - messageDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs); -} - -TmFunnel::~TmFunnel() {} - -MessageQueueId_t TmFunnel::getReportReceptionQueue(uint8_t virtualChannel) { - return tmQueue->getId(); -} +TmFunnel::~TmFunnel() = default; ReturnValue_t TmFunnel::performOperation(uint8_t operationCode) { - TmTcMessage currentMessage; - ReturnValue_t status = tmQueue->receiveMessage(¤tMessage); - while (status == returnvalue::OK) { - status = handlePacket(¤tMessage); - if (status != returnvalue::OK) { - break; - } - status = tmQueue->receiveMessage(¤tMessage); - } - - if (status == MessageQueueIF::EMPTY) { - return returnvalue::OK; - } else { - return status; - } + pusFunnel.performOperation(operationCode); + cfdpFunnel.performOperation(operationCode); + return returnvalue::OK; } -ReturnValue_t TmFunnel::handlePacket(TmTcMessage* message) { - uint8_t* packetData = nullptr; - size_t size = 0; - ReturnValue_t result = tmStore->modifyData(message->getStorageId(), &packetData, &size); - if (result != returnvalue::OK) { - return result; - } - - PusTmZeroCopyWriter packet(timeReader, packetData, size); - result = packet.parseDataWithoutCrcCheck(); - if (result != returnvalue::OK) { - return result; - } - packet.setSequenceCount(sourceSequenceCount++); - sourceSequenceCount = sourceSequenceCount % ccsds::LIMIT_SEQUENCE_COUNT; - packet.updateErrorControl(); - - result = tmQueue->sendToDefault(message); - if (result != returnvalue::OK) { - tmStore->deleteData(message->getStorageId()); - sif::error << "TmFunnel::handlePacket: Error sending to downlink " - "handler" - << std::endl; - return result; - } - - if (storageDestination != objects::NO_OBJECT) { - result = storageQueue->sendToDefault(message); - if (result != returnvalue::OK) { - tmStore->deleteData(message->getStorageId()); - sif::error << "TmFunnel::handlePacket: Error sending to storage " - "handler" - << std::endl; - return result; - } - } - return result; -} - -ReturnValue_t TmFunnel::initialize() { - tmStore = ObjectManager::instance()->get(objects::TM_STORE); - if (tmStore == nullptr) { - sif::error << "TmFunnel::initialize: TM store not set." << std::endl; - sif::error << "Make sure the tm store is set up properly" - " and implements StorageManagerIF" - << std::endl; - return ObjectManagerIF::CHILD_INIT_FAILED; - } - - AcceptsTelemetryIF* tmTarget = - ObjectManager::instance()->get(downlinkDestination); - if (tmTarget == nullptr) { - sif::error << "TmFunnel::initialize: Downlink Destination not set." << std::endl; - sif::error << "Make sure the downlink destination object is set up " - "properly and implements AcceptsTelemetryIF" - << std::endl; - return ObjectManagerIF::CHILD_INIT_FAILED; - } - - tmQueue->setDefaultDestination(tmTarget->getReportReceptionQueue(reportReceptionVc)); - - // Storage destination is optional. - if (storageDestination == objects::NO_OBJECT) { - return SystemObject::initialize(); - } - - AcceptsTelemetryIF* storageTarget = - ObjectManager::instance()->get(storageDestination); - if (storageTarget != nullptr) { - storageQueue->setDefaultDestination(storageTarget->getReportReceptionQueue(0)); - } - - return SystemObject::initialize(); -} - -const char* TmFunnel::getName() const { return "TM Funnel"; } +ReturnValue_t TmFunnel::initialize() { return returnvalue::OK; } diff --git a/mission/tmtc/TmFunnel.h b/mission/tmtc/TmFunnel.h index 325f3709..5441db2b 100644 --- a/mission/tmtc/TmFunnel.h +++ b/mission/tmtc/TmFunnel.h @@ -4,13 +4,12 @@ #include #include #include -#include #include #include -namespace Factory { -void setStaticFrameworkObjectIds(); -} +#include "CfdpTmFunnel.h" +#include "PusTmFunnel.h" +#include "fsfw/timemanager/TimeReaderIF.h" /** * @brief TM Recipient. @@ -18,36 +17,19 @@ void setStaticFrameworkObjectIds(); * Main telemetry receiver. All generated telemetry is funneled into * this object. * @ingroup utility - * @author J. Meier + * @author J. Meier, R. Mueller */ -class TmFunnel : public AcceptsTelemetryIF, public ExecutableObjectIF, public SystemObject { - friend void(Factory::setStaticFrameworkObjectIds)(); - +class TmFunnel : public ExecutableObjectIF, public SystemObject { public: - TmFunnel(object_id_t objectId, CdsShortTimeStamper& timeReader, uint32_t messageDepth = 20, - uint8_t reportReceptionVc = 0); - virtual ~TmFunnel(); + TmFunnel(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel); + ~TmFunnel() override; - const char* getName() const override; - MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) override; - ReturnValue_t performOperation(uint8_t operationCode = 0) override; + ReturnValue_t performOperation(uint8_t operationCode) override; ReturnValue_t initialize() override; - protected: - static object_id_t downlinkDestination; - static object_id_t storageDestination; - private: - CdsShortTimeStamper& timeReader; - uint32_t messageDepth = 0; - uint8_t reportReceptionVc = 0; - uint16_t sourceSequenceCount = 0; - MessageQueueIF* tmQueue = nullptr; - MessageQueueIF* storageQueue = nullptr; - - StorageManagerIF* tmStore = nullptr; - - ReturnValue_t handlePacket(TmTcMessage* message); + PusTmFunnel& pusFunnel; + CfdpTmFunnel& cfdpFunnel; }; #endif /* MISSION_UTILITY_TMFUNNEL_H_ */ diff --git a/mission/tmtc/VirtualChannel.cpp b/mission/tmtc/VirtualChannel.cpp index 6ba404e4..96a37b3b 100644 --- a/mission/tmtc/VirtualChannel.cpp +++ b/mission/tmtc/VirtualChannel.cpp @@ -53,7 +53,7 @@ ReturnValue_t VirtualChannel::performOperation() { return result; } -MessageQueueId_t VirtualChannel::getReportReceptionQueue(uint8_t virtualChannel) { +MessageQueueId_t VirtualChannel::getReportReceptionQueue(uint8_t virtualChannel) const { return tmQueue->getId(); } diff --git a/mission/tmtc/VirtualChannel.h b/mission/tmtc/VirtualChannel.h index b6ba8241..024ccee6 100644 --- a/mission/tmtc/VirtualChannel.h +++ b/mission/tmtc/VirtualChannel.h @@ -27,7 +27,7 @@ class VirtualChannel : public AcceptsTelemetryIF { VirtualChannel(uint8_t vcId, uint32_t tmQueueDepth, object_id_t ownerId); ReturnValue_t initialize(); - MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) override; + MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) const override; ReturnValue_t performOperation(); /** diff --git a/tmtc b/tmtc index 90c9cf29..1c38b54f 160000 --- a/tmtc +++ b/tmtc @@ -1 +1 @@ -Subproject commit 90c9cf297bc7158893181dbe3694d7814fae7730 +Subproject commit 1c38b54f0127c9144cc2e4ecef1651bf20622b71