CFDP: Add TM support #307

Merged
muellerr merged 8 commits from mueller/cfdp-add-closure-support into develop 2022-10-21 15:30:55 +02:00
22 changed files with 295 additions and 203 deletions

View File

@ -54,10 +54,6 @@ void Factory::setStaticFrameworkObjectIds() {
CommandingServiceBase::defaultPacketSource = objects::PUS_PACKET_DISTRIBUTOR;
CommandingServiceBase::defaultPacketDestination = objects::TM_FUNNEL;
TmFunnel::downlinkDestination = objects::TMTC_BRIDGE;
// No storage object for now.
TmFunnel::storageDestination = objects::NO_OBJECT;
VerificationReporter::DEFAULT_RECEIVER = objects::PUS_SERVICE_1_VERIFICATION;
}

View File

@ -16,8 +16,6 @@ enum sourceObjects : uint32_t {
PUS_SERVICE_23 = 0x51002300,
PUS_SERVICE_201 = 0x51020100,
TM_FUNNEL = 0x52000002,
/* Test Task */
TEST_TASK = 0x42694269,

View File

@ -97,10 +97,10 @@ ResetArgs RESET_ARGS_GNSS;
void Factory::setStaticFrameworkObjectIds() {
PusServiceBase::PUS_DISTRIBUTOR = objects::PUS_PACKET_DISTRIBUTOR;
PusServiceBase::PACKET_DESTINATION = objects::TM_FUNNEL;
PusServiceBase::PACKET_DESTINATION = objects::PUS_TM_FUNNEL;
CommandingServiceBase::defaultPacketSource = objects::PUS_PACKET_DISTRIBUTOR;
CommandingServiceBase::defaultPacketDestination = objects::TM_FUNNEL;
CommandingServiceBase::defaultPacketDestination = objects::PUS_TM_FUNNEL;
#if OBSW_Q7S_EM == 1
DeviceHandlerBase::powerSwitcherId = objects::NO_OBJECT;
@ -108,14 +108,6 @@ void Factory::setStaticFrameworkObjectIds() {
DeviceHandlerBase::powerSwitcherId = objects::PCDU_HANDLER;
#endif /* OBSW_Q7S_EM == 1 */
#if OBSW_TM_TO_PTME == 1
TmFunnel::downlinkDestination = objects::CCSDS_HANDLER;
#else
TmFunnel::downlinkDestination = objects::TMTC_BRIDGE;
#endif /* OBSW_TM_TO_PTME == 1 */
// No storage object for now.
TmFunnel::storageDestination = objects::NO_OBJECT;
LocalDataPoolManager::defaultHkDestination = objects::PUS_SERVICE_3_HOUSEKEEPING;
VerificationReporter::DEFAULT_RECEIVER = objects::PUS_SERVICE_1_VERIFICATION;

View File

@ -73,7 +73,8 @@ void ObjectFactory::produce(void* args) {
createTestComponents(gpioComIF);
#endif /* OBSW_ADD_TEST_CODE == 1 */
#if OBSW_ADD_SCEX_DEVICE == 1
createScexComponents(q7s::UART_SCEX_DEV, pwrSwitcher, *SdCardManager::instance(), true, std::nullopt);
createScexComponents(q7s::UART_SCEX_DEV, pwrSwitcher, *SdCardManager::instance(), true,
std::nullopt);
#endif
createMiscComponents();

View File

@ -128,7 +128,10 @@ enum commonObjects : uint32_t {
TCS_BOARD_ASS = 0x73000003,
RW_ASS = 0x73000004,
CFDP_HANDLER = 0x73000005,
CFDP_DISTRIBUTOR = 0x73000006
CFDP_DISTRIBUTOR = 0x73000006,
TM_FUNNEL = 0x73000100,
PUS_TM_FUNNEL = 0x73000101,
CFDP_TM_FUNNEL = 0x73000102,
};
}

View File

@ -1,10 +1,7 @@
#include "SaDeploymentDummy.h"
SaDeplDummy::SaDeplDummy(object_id_t objectId): SystemObject(objectId) {
}
SaDeplDummy::SaDeplDummy(object_id_t objectId) : SystemObject(objectId) {}
SaDeplDummy::~SaDeplDummy() = default;
ReturnValue_t SaDeplDummy::performOperation(uint8_t opCode) {
return returnvalue::OK;
}
ReturnValue_t SaDeplDummy::performOperation(uint8_t opCode) { return returnvalue::OK; }

View File

@ -2,12 +2,12 @@
#ifndef DUMMIES_SADEPLOYMENT_H_
#define DUMMIES_SADEPLOYMENT_H_
#include "SaDeploymentDummy.h"
#include <fsfw/devicehandlers/DeviceHandlerBase.h>
#include "SaDeploymentDummy.h"
class SaDeplDummy : public SystemObject, public ExecutableObjectIF {
public:
SaDeplDummy(object_id_t objectId);
virtual ~SaDeplDummy();

2
fsfw

@ -1 +1 @@
Subproject commit 73454c629c042de8efe7aa4fa6dcbf1e184b0961
Subproject commit 754b71a35fc27208d7c679ea58783cacda973996

View File

@ -40,7 +40,6 @@ enum sourceObjects : uint32_t {
PUS_SERVICE_6 = 0x51000500,
CCSDS_IP_CORE_BRIDGE = 0x73500000,
TM_FUNNEL = 0x73000100,
/* 0x49 ('I') for Communication Interfaces **/
ARDUINO_COM_IF = 0x49000000,

View File

@ -22,6 +22,8 @@
#include <fsfw/tcdistribution/PusDistributor.h>
#include <fsfw/timemanager/CdsShortTimeStamper.h>
#include <fsfw_hal/host/HostFilesystem.h>
#include <mission/tmtc/CfdpTmFunnel.h>
#include <mission/tmtc/PusTmFunnel.h>
#include <mission/tmtc/TmFunnel.h>
#include "OBSWConfig.h"
@ -94,6 +96,26 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) {
new PoolManager(objects::IPC_STORE, poolCfg);
}
#if OBSW_ADD_TCPIP_BRIDGE == 1
#if OBSW_USE_TMTC_TCP_BRIDGE == 0
auto tmtcBridge = new UdpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR);
new UdpTcPollingTask(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE);
sif::info << "Created UDP server for TMTC commanding with listener port "
<< udpBridge->getUdpPort() << std::endl;
#else
auto tmtcBridge = new TcpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR);
auto tcpServer = new TcpTmTcServer(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE);
// TCP is stream based. Use packet ID as start marker when parsing for space packets
tcpServer->setSpacePacketParsingOptions({common::PUS_PACKET_ID, common::CFDP_PACKET_ID});
sif::info << "Created TCP server for TMTC commanding with listener port "
<< tcpServer->getTcpPort() << std::endl;
#if OBSW_TCP_SERVER_WIRETAPPING == 1
tcpServer->enableWiretapping(true);
#endif /* OBSW_TCP_SERVER_WIRETAPPING == 1 */
#endif /* OBSW_USE_TMTC_TCP_BRIDGE == 0 */
tmtcBridge->setMaxNumberOfPacketsStored(300);
#endif /* OBSW_ADD_TCPIP_BRIDGE == 1 */
auto* ccsdsDistrib =
new CcsdsDistributor(config::EIVE_PUS_APID, objects::CCSDS_PACKET_DISTRIBUTOR);
new PusDistributor(config::EIVE_PUS_APID, objects::PUS_PACKET_DISTRIBUTOR, ccsdsDistrib);
@ -102,12 +124,16 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) {
#if OBSW_TM_TO_PTME == 1
vc = config::LIVE_TM;
#endif
auto* cfdpFunnel =
new CfdpTmFunnel(objects::CFDP_TM_FUNNEL, config::EIVE_CFDP_APID, *tmtcBridge, *tmStore, vc);
auto* pusFunnel =
new PusTmFunnel(objects::PUS_TM_FUNNEL, *tmtcBridge, *timeStamper, *tmStore, vc);
// Every TM packet goes through this funnel
auto* funnel = new TmFunnel(objects::TM_FUNNEL, *timeStamper, 50, vc);
new TmFunnel(objects::TM_FUNNEL, *pusFunnel, *cfdpFunnel);
// PUS service stack
new Service1TelecommandVerification(objects::PUS_SERVICE_1_VERIFICATION, config::EIVE_PUS_APID,
pus::PUS_SERVICE_1, objects::TM_FUNNEL, 20);
pus::PUS_SERVICE_1, objects::PUS_TM_FUNNEL, 20);
new Service2DeviceAccess(objects::PUS_SERVICE_2_DEVICE_ACCESS, config::EIVE_PUS_APID,
pus::PUS_SERVICE_2, 3, 10);
new Service3Housekeeping(objects::PUS_SERVICE_3_HOUSEKEEPING, config::EIVE_PUS_APID,
@ -131,25 +157,6 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) {
pus::PUS_SERVICE_200, 8);
new CService201HealthCommanding(objects::PUS_SERVICE_201_HEALTH, config::EIVE_PUS_APID,
pus::PUS_SERVICE_201);
#if OBSW_ADD_TCPIP_BRIDGE == 1
#if OBSW_USE_TMTC_TCP_BRIDGE == 0
auto tmtcBridge = new UdpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR);
new UdpTcPollingTask(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE);
sif::info << "Created UDP server for TMTC commanding with listener port "
<< udpBridge->getUdpPort() << std::endl;
#else
auto tmtcBridge = new TcpTmTcBridge(objects::TMTC_BRIDGE, objects::CCSDS_PACKET_DISTRIBUTOR);
auto tcpServer = new TcpTmTcServer(objects::TMTC_POLLING_TASK, objects::TMTC_BRIDGE);
// TCP is stream based. Use packet ID as start marker when parsing for space packets
tcpServer->setSpacePacketParsingOptions({common::PUS_PACKET_ID, common::CFDP_PACKET_ID});
sif::info << "Created TCP server for TMTC commanding with listener port "
<< tcpServer->getTcpPort() << std::endl;
#if OBSW_TCP_SERVER_WIRETAPPING == 1
tcpServer->enableWiretapping(true);
#endif /* OBSW_TCP_SERVER_WIRETAPPING == 1 */
#endif /* OBSW_USE_TMTC_TCP_BRIDGE == 0 */
tmtcBridge->setMaxNumberOfPacketsStored(300);
#endif /* OBSW_ADD_TCPIP_BRIDGE == 1 */
#if OBSW_ADD_CFDP_COMPONENTS == 1
using namespace cfdp;
@ -158,7 +165,8 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_) {
new CfdpDistributor(distribCfg);
auto* msgQueue = QueueFactory::instance()->createMessageQueue(32);
FsfwHandlerParams params(objects::CFDP_HANDLER, HOST_FS, *funnel, *tcStore, *tmStore, *msgQueue);
FsfwHandlerParams params(objects::CFDP_HANDLER, HOST_FS, *cfdpFunnel, *tcStore, *tmStore,
*msgQueue);
cfdp::IndicationCfg indicationCfg;
UnsignedByteField<uint16_t> apid(config::EIVE_LOCAL_CFDP_ENTITY_ID);
cfdp::EntityId localId(apid);

View File

@ -180,9 +180,9 @@ void CCSDSHandler::addVirtualChannel(VcId_t vcId, VirtualChannel* virtualChannel
}
}
MessageQueueId_t CCSDSHandler::getReportReceptionQueue(uint8_t virtualChannel) {
MessageQueueId_t CCSDSHandler::getReportReceptionQueue(uint8_t virtualChannel) const {
if (virtualChannel < common::NUMBER_OF_VIRTUAL_CHANNELS) {
VirtualChannelMapIter iter = virtualChannelMap.find(virtualChannel);
auto iter = virtualChannelMap.find(virtualChannel);
if (iter != virtualChannelMap.end()) {
return iter->second->getReportReceptionQueue();
} else {

View File

@ -67,7 +67,7 @@ class CCSDSHandler : public SystemObject,
*/
void addVirtualChannel(VcId_t virtualChannelId, VirtualChannel* virtualChannel);
MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0);
MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) const override;
ReturnValue_t getParameter(uint8_t domainId, uint8_t uniqueIdentifier,
ParameterWrapper* parameterWrapper, const ParameterWrapper* newValues,
uint16_t startAtIndex);

View File

@ -1,2 +1,3 @@
target_sources(${LIB_EIVE_MISSION} PRIVATE CCSDSHandler.cpp VirtualChannel.cpp
TmFunnel.cpp)
target_sources(
${LIB_EIVE_MISSION} PRIVATE CCSDSHandler.cpp VirtualChannel.cpp TmFunnel.cpp
CfdpTmFunnel.cpp PusTmFunnel.cpp)

View File

@ -0,0 +1,85 @@
#include "CfdpTmFunnel.h"
#include "fsfw/ipc/QueueFactory.h"
#include "fsfw/tmtcpacket/ccsds/SpacePacketCreator.h"
#include "fsfw/tmtcservices/TmTcMessage.h"
CfdpTmFunnel::CfdpTmFunnel(object_id_t objectId, uint16_t cfdpInCcsdsApid,
const AcceptsTelemetryIF& downlinkDestination, StorageManagerIF& tmStore,
uint8_t vc)
: SystemObject(objectId), cfdpInCcsdsApid(cfdpInCcsdsApid), tmStore(tmStore) {
msgQueue = QueueFactory::instance()->createMessageQueue(5);
msgQueue->setDefaultDestination(downlinkDestination.getReportReceptionQueue(vc));
}
const char* CfdpTmFunnel::getName() const { return "CFDP TM Funnel"; }
MessageQueueId_t CfdpTmFunnel::getReportReceptionQueue(uint8_t virtualChannel) const {
return msgQueue->getId();
}
ReturnValue_t CfdpTmFunnel::performOperation(uint8_t) {
TmTcMessage currentMessage;
ReturnValue_t status = msgQueue->receiveMessage(&currentMessage);
while (status == returnvalue::OK) {
status = handlePacket(currentMessage);
if (status != returnvalue::OK) {
sif::warning << "CfdpTmFunnel packet handling failed" << std::endl;
break;
}
status = msgQueue->receiveMessage(&currentMessage);
}
if (status == MessageQueueIF::EMPTY) {
return returnvalue::OK;
}
return status;
}
ReturnValue_t CfdpTmFunnel::initialize() { return returnvalue::OK; }
ReturnValue_t CfdpTmFunnel::handlePacket(TmTcMessage& msg) {
const uint8_t* cfdpPacket = nullptr;
size_t cfdpPacketLen = 0;
ReturnValue_t result = tmStore.getData(msg.getStorageId(), &cfdpPacket, &cfdpPacketLen);
if (result != returnvalue::OK) {
return result;
}
auto spacePacketHeader =
SpacePacketCreator(ccsds::PacketType::TM, false, cfdpInCcsdsApid,
ccsds::SequenceFlags::UNSEGMENTED, sourceSequenceCount++, 0);
sourceSequenceCount = sourceSequenceCount & ccsds::LIMIT_SEQUENCE_COUNT;
spacePacketHeader.setCcsdsLenFromTotalDataFieldLen(cfdpPacketLen);
uint8_t* newPacketData = nullptr;
store_address_t newStoreId{};
result =
tmStore.getFreeElement(&newStoreId, spacePacketHeader.getFullPacketLen(), &newPacketData);
if (result != returnvalue::OK) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::warning << "CfdpTmFunnel::handlePacket: Error getting TM store element of size "
<< spacePacketHeader.getFullPacketLen() << std::endl;
#endif
return result;
}
size_t serSize = 0;
result =
spacePacketHeader.serializeBe(&newPacketData, &serSize, spacePacketHeader.getFullPacketLen());
if (result != returnvalue::OK) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "CfdpTmFunnel::handlePacket: Error serializing packet" << std::endl;
#endif
return result;
}
std::memcpy(newPacketData, cfdpPacket, cfdpPacketLen);
// Delete old packet
tmStore.deleteData(msg.getStorageId());
msg.setStorageId(newStoreId);
result = msgQueue->sendToDefault(&msg);
if (result != returnvalue::OK) {
tmStore.deleteData(msg.getStorageId());
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "CfdpTmFunnel::handlePacket: Error sending TM to downlink handler" << std::endl;
#endif
}
return result;
}

View File

@ -0,0 +1,28 @@
#ifndef FSFW_EXAMPLE_COMMON_CFDPTMFUNNEL_H
#define FSFW_EXAMPLE_COMMON_CFDPTMFUNNEL_H
#include "fsfw/objectmanager/SystemObject.h"
#include "fsfw/storagemanager/StorageManagerIF.h"
#include "fsfw/tmtcservices/AcceptsTelemetryIF.h"
#include "fsfw/tmtcservices/TmTcMessage.h"
class CfdpTmFunnel : public AcceptsTelemetryIF, public SystemObject {
public:
CfdpTmFunnel(object_id_t objectId, uint16_t cfdpInCcsdsApid,
const AcceptsTelemetryIF& downlinkDestination, StorageManagerIF& tmStore,
uint8_t vc);
[[nodiscard]] const char* getName() const override;
[[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override;
ReturnValue_t performOperation(uint8_t opCode);
ReturnValue_t initialize() override;
private:
ReturnValue_t handlePacket(TmTcMessage& msg);
uint16_t sourceSequenceCount = 0;
uint16_t cfdpInCcsdsApid;
MessageQueueIF* msgQueue;
StorageManagerIF& tmStore;
};
#endif // FSFW_EXAMPLE_COMMON_CFDPTMFUNNEL_H

View File

@ -0,0 +1,69 @@
#include "PusTmFunnel.h"
#include "fsfw/ipc/QueueFactory.h"
#include "fsfw/objectmanager.h"
#include "fsfw/tmtcpacket/pus/tm/PusTmZcWriter.h"
PusTmFunnel::PusTmFunnel(object_id_t objectId, const AcceptsTelemetryIF &downlinkDestination,
TimeReaderIF &timeReader, StorageManagerIF &tmStore, uint8_t vcId,
uint32_t messageDepth)
: SystemObject(objectId), timeReader(timeReader), tmStore(tmStore) {
tmQueue = QueueFactory::instance()->createMessageQueue(messageDepth,
MessageQueueMessage::MAX_MESSAGE_SIZE);
tmQueue->setDefaultDestination(downlinkDestination.getReportReceptionQueue(vcId));
}
PusTmFunnel::~PusTmFunnel() = default;
MessageQueueId_t PusTmFunnel::getReportReceptionQueue(uint8_t virtualChannel) const {
return tmQueue->getId();
}
ReturnValue_t PusTmFunnel::performOperation(uint8_t) {
TmTcMessage currentMessage;
ReturnValue_t status = tmQueue->receiveMessage(&currentMessage);
while (status == returnvalue::OK) {
status = handlePacket(currentMessage);
if (status != returnvalue::OK) {
sif::warning << "TmFunnel packet handling failed" << std::endl;
break;
}
status = tmQueue->receiveMessage(&currentMessage);
}
if (status == MessageQueueIF::EMPTY) {
return returnvalue::OK;
}
return status;
}
ReturnValue_t PusTmFunnel::handlePacket(TmTcMessage &message) {
uint8_t *packetData = nullptr;
size_t size = 0;
ReturnValue_t result = tmStore.modifyData(message.getStorageId(), &packetData, &size);
if (result != returnvalue::OK) {
return result;
}
PusTmZeroCopyWriter packet(timeReader, packetData, size);
result = packet.parseDataWithoutCrcCheck();
if (result != returnvalue::OK) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::warning << "PusTmFunnel::handlePacket: Error parsing received PUS packet" << std::endl;
#endif
return result;
}
packet.setSequenceCount(sourceSequenceCount++);
sourceSequenceCount = sourceSequenceCount % ccsds::LIMIT_SEQUENCE_COUNT;
packet.updateErrorControl();
result = tmQueue->sendToDefault(&message);
if (result != returnvalue::OK) {
tmStore.deleteData(message.getStorageId());
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PusTmFunnel::handlePacket: Error sending TM to downlink handler" << std::endl;
#endif
}
return result;
}
const char *PusTmFunnel::getName() const { return "PUS TM Funnel"; }

View File

@ -0,0 +1,42 @@
#ifndef FSFW_EXAMPLE_COMMON_PUSTMFUNNEL_H
#define FSFW_EXAMPLE_COMMON_PUSTMFUNNEL_H
#include <fsfw/ipc/MessageQueueIF.h>
#include <fsfw/objectmanager/SystemObject.h>
#include <fsfw/tasks/ExecutableObjectIF.h>
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
#include <fsfw/tmtcservices/TmTcMessage.h>
#include "fsfw/timemanager/TimeReaderIF.h"
/**
* @brief TM Recipient.
* @details
* TODO: Add support for TM storage by using the (or a) LIVE flag provided by the CCSDS or Syrlinks
* handler. If we are in LIVE TM mode, forward TM to downlink destination directly. Otherwise,
* forward to TM storage backend which stores TMs into files.
* Main telemetry receiver. All generated telemetry is funneled into
* this object.
* @ingroup utility
* @author J. Meier, R. Mueller
*/
class PusTmFunnel : public AcceptsTelemetryIF, public SystemObject {
public:
explicit PusTmFunnel(object_id_t objectId, const AcceptsTelemetryIF &downlinkDestination,
TimeReaderIF &timeReader, StorageManagerIF &tmStore, uint8_t vdId,
uint32_t messageDepth = 20);
[[nodiscard]] const char *getName() const override;
~PusTmFunnel() override;
[[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override;
ReturnValue_t performOperation(uint8_t operationCode);
private:
uint16_t sourceSequenceCount = 0;
TimeReaderIF &timeReader;
StorageManagerIF &tmStore;
MessageQueueIF *tmQueue = nullptr;
ReturnValue_t handlePacket(TmTcMessage &message);
};
#endif // FSFW_EXAMPLE_COMMON_PUSTMFUNNEL_H

View File

@ -1,125 +1,16 @@
#include "TmFunnel.h"
#include <fsfw/ipc/QueueFactory.h>
#include <fsfw/objectmanager/ObjectManager.h>
#include <fsfw/serviceinterface/ServiceInterfaceStream.h>
#include <fsfw/timemanager/CdsShortTimeStamper.h>
#include <fsfw/tmtcpacket/pus/tm.h>
#include <mission/tmtc/TmFunnel.h>
#include "OBSWConfig.h"
TmFunnel::TmFunnel(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel)
: SystemObject(objectId), pusFunnel(pusFunnel), cfdpFunnel(cfdpFunnel) {}
object_id_t TmFunnel::downlinkDestination = objects::NO_OBJECT;
object_id_t TmFunnel::storageDestination = objects::NO_OBJECT;
TmFunnel::TmFunnel(object_id_t objectId, CdsShortTimeStamper& timeReader, uint32_t messageDepth,
uint8_t reportReceptionVc)
: SystemObject(objectId),
timeReader(timeReader),
messageDepth(messageDepth),
reportReceptionVc(reportReceptionVc) {
auto mqArgs = MqArgs(objectId, static_cast<void*>(this));
tmQueue = QueueFactory::instance()->createMessageQueue(
messageDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs);
storageQueue = QueueFactory::instance()->createMessageQueue(
messageDepth, MessageQueueMessage::MAX_MESSAGE_SIZE, &mqArgs);
}
TmFunnel::~TmFunnel() {}
MessageQueueId_t TmFunnel::getReportReceptionQueue(uint8_t virtualChannel) {
return tmQueue->getId();
}
TmFunnel::~TmFunnel() = default;
ReturnValue_t TmFunnel::performOperation(uint8_t operationCode) {
TmTcMessage currentMessage;
ReturnValue_t status = tmQueue->receiveMessage(&currentMessage);
while (status == returnvalue::OK) {
status = handlePacket(&currentMessage);
if (status != returnvalue::OK) {
break;
}
status = tmQueue->receiveMessage(&currentMessage);
}
if (status == MessageQueueIF::EMPTY) {
return returnvalue::OK;
} else {
return status;
}
pusFunnel.performOperation(operationCode);
cfdpFunnel.performOperation(operationCode);
return returnvalue::OK;
}
ReturnValue_t TmFunnel::handlePacket(TmTcMessage* message) {
uint8_t* packetData = nullptr;
size_t size = 0;
ReturnValue_t result = tmStore->modifyData(message->getStorageId(), &packetData, &size);
if (result != returnvalue::OK) {
return result;
}
PusTmZeroCopyWriter packet(timeReader, packetData, size);
result = packet.parseDataWithoutCrcCheck();
if (result != returnvalue::OK) {
return result;
}
packet.setSequenceCount(sourceSequenceCount++);
sourceSequenceCount = sourceSequenceCount % ccsds::LIMIT_SEQUENCE_COUNT;
packet.updateErrorControl();
result = tmQueue->sendToDefault(message);
if (result != returnvalue::OK) {
tmStore->deleteData(message->getStorageId());
sif::error << "TmFunnel::handlePacket: Error sending to downlink "
"handler"
<< std::endl;
return result;
}
if (storageDestination != objects::NO_OBJECT) {
result = storageQueue->sendToDefault(message);
if (result != returnvalue::OK) {
tmStore->deleteData(message->getStorageId());
sif::error << "TmFunnel::handlePacket: Error sending to storage "
"handler"
<< std::endl;
return result;
}
}
return result;
}
ReturnValue_t TmFunnel::initialize() {
tmStore = ObjectManager::instance()->get<StorageManagerIF>(objects::TM_STORE);
if (tmStore == nullptr) {
sif::error << "TmFunnel::initialize: TM store not set." << std::endl;
sif::error << "Make sure the tm store is set up properly"
" and implements StorageManagerIF"
<< std::endl;
return ObjectManagerIF::CHILD_INIT_FAILED;
}
AcceptsTelemetryIF* tmTarget =
ObjectManager::instance()->get<AcceptsTelemetryIF>(downlinkDestination);
if (tmTarget == nullptr) {
sif::error << "TmFunnel::initialize: Downlink Destination not set." << std::endl;
sif::error << "Make sure the downlink destination object is set up "
"properly and implements AcceptsTelemetryIF"
<< std::endl;
return ObjectManagerIF::CHILD_INIT_FAILED;
}
tmQueue->setDefaultDestination(tmTarget->getReportReceptionQueue(reportReceptionVc));
// Storage destination is optional.
if (storageDestination == objects::NO_OBJECT) {
return SystemObject::initialize();
}
AcceptsTelemetryIF* storageTarget =
ObjectManager::instance()->get<AcceptsTelemetryIF>(storageDestination);
if (storageTarget != nullptr) {
storageQueue->setDefaultDestination(storageTarget->getReportReceptionQueue(0));
}
return SystemObject::initialize();
}
const char* TmFunnel::getName() const { return "TM Funnel"; }
ReturnValue_t TmFunnel::initialize() { return returnvalue::OK; }

View File

@ -4,13 +4,12 @@
#include <fsfw/ipc/MessageQueueIF.h>
#include <fsfw/objectmanager/SystemObject.h>
#include <fsfw/tasks/ExecutableObjectIF.h>
#include <fsfw/timemanager/CdsShortTimeStamper.h>
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
#include <fsfw/tmtcservices/TmTcMessage.h>
namespace Factory {
void setStaticFrameworkObjectIds();
}
#include "CfdpTmFunnel.h"
#include "PusTmFunnel.h"
#include "fsfw/timemanager/TimeReaderIF.h"
/**
* @brief TM Recipient.
@ -18,36 +17,19 @@ void setStaticFrameworkObjectIds();
* Main telemetry receiver. All generated telemetry is funneled into
* this object.
* @ingroup utility
* @author J. Meier
* @author J. Meier, R. Mueller
*/
class TmFunnel : public AcceptsTelemetryIF, public ExecutableObjectIF, public SystemObject {
friend void(Factory::setStaticFrameworkObjectIds)();
class TmFunnel : public ExecutableObjectIF, public SystemObject {
public:
TmFunnel(object_id_t objectId, CdsShortTimeStamper& timeReader, uint32_t messageDepth = 20,
uint8_t reportReceptionVc = 0);
virtual ~TmFunnel();
TmFunnel(object_id_t objectId, PusTmFunnel& pusFunnel, CfdpTmFunnel& cfdpFunnel);
~TmFunnel() override;
const char* getName() const override;
MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) override;
ReturnValue_t performOperation(uint8_t operationCode = 0) override;
ReturnValue_t performOperation(uint8_t operationCode) override;
ReturnValue_t initialize() override;
protected:
static object_id_t downlinkDestination;
static object_id_t storageDestination;
private:
CdsShortTimeStamper& timeReader;
uint32_t messageDepth = 0;
uint8_t reportReceptionVc = 0;
uint16_t sourceSequenceCount = 0;
MessageQueueIF* tmQueue = nullptr;
MessageQueueIF* storageQueue = nullptr;
StorageManagerIF* tmStore = nullptr;
ReturnValue_t handlePacket(TmTcMessage* message);
PusTmFunnel& pusFunnel;
CfdpTmFunnel& cfdpFunnel;
};
#endif /* MISSION_UTILITY_TMFUNNEL_H_ */

View File

@ -53,7 +53,7 @@ ReturnValue_t VirtualChannel::performOperation() {
return result;
}
MessageQueueId_t VirtualChannel::getReportReceptionQueue(uint8_t virtualChannel) {
MessageQueueId_t VirtualChannel::getReportReceptionQueue(uint8_t virtualChannel) const {
return tmQueue->getId();
}

View File

@ -27,7 +27,7 @@ class VirtualChannel : public AcceptsTelemetryIF {
VirtualChannel(uint8_t vcId, uint32_t tmQueueDepth, object_id_t ownerId);
ReturnValue_t initialize();
MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) override;
MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel = 0) const override;
ReturnValue_t performOperation();
/**

2
tmtc

@ -1 +1 @@
Subproject commit 1dfc2fca2f58f8d226fab01c87eb529ba7ec8376
Subproject commit c13f4a8575f99c0e9f17b1868e7abfbe28fa5648