Refactor TM handling #450

Merged
muellerr merged 47 commits from refactor_tm_handling into develop 2023-03-11 15:05:22 +01:00
16 changed files with 176 additions and 148 deletions
Showing only changes of commit e5636f0b6c - Show all commits

View File

@ -8,7 +8,6 @@
#include <mission/tmtc/CcsdsIpCoreHandler.h>
#include <mission/tmtc/CfdpTmFunnel.h>
#include <mission/tmtc/PusTmFunnel.h>
#include <mission/tmtc/TmStoreRouter.h>
#include <string>

View File

@ -36,8 +36,8 @@
#include <mission/tmtc/PersistentTmStore.h>
#include <mission/tmtc/PusPacketFilter.h>
#include <mission/tmtc/PusTmFunnel.h>
#include <mission/tmtc/PusTmRouteByFilterHelper.h>
#include <mission/tmtc/TmFunnelHandler.h>
#include <mission/tmtc/TmStoreRouter.h>
#include "OBSWConfig.h"
#include "devices/gpioIds.h"
@ -160,12 +160,12 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
*pusFunnel = new PusTmFunnel(pusFunnelCfg, *timeStamper, sdcMan);
#if OBSW_ADD_TCPIP_SERVERS == 1
#if OBSW_ADD_TMTC_UDP_SERVER == 1
(*cfdpFunnel)->addDestination("UDP Server", *udpBridge, 0);
(*pusFunnel)->addDestination("UDP Server", *udpBridge, 0);
(*cfdpFunnel)->addLiveDestination("UDP Server", *udpBridge, 0);
(*pusFunnel)->addLiveDestination("UDP Server", *udpBridge, 0);
#endif
#if OBSW_ADD_TMTC_TCP_SERVER == 1
(*cfdpFunnel)->addDestination("TCP Server", *tcpBridge, 0);
(*pusFunnel)->addDestination("TCP Server", *tcpBridge, 0);
(*cfdpFunnel)->addLiveDestination("TCP Server", *tcpBridge, 0);
(*pusFunnel)->addLiveDestination("TCP Server", *tcpBridge, 0);
#endif
#endif
// Every TM packet goes through this funnel

View File

@ -3,7 +3,6 @@
#include <fsfw/devicehandlers/DeviceHandlerBase.h>
#include <mission/memory/SdCardMountedIF.h>
#include <mission/tmtc/TmStoreRouter.h>
#include "fsfw/objectmanager/SystemObjectIF.h"
#include "fsfw/power/PowerSwitchIF.h"

View File

@ -6,8 +6,9 @@ target_sources(
TmFunnelBase.cpp
CfdpTmFunnel.cpp
tmFilters.cpp
PusLiveDemux.cpp
PusPacketFilter.cpp
TmStoreRouter.cpp
PusTmRouteByFilterHelper.cpp
Service15TmStorage.cpp
PersistentTmStore.cpp
PusTmFunnel.cpp)

View File

@ -75,36 +75,6 @@ ReturnValue_t CfdpTmFunnel::handlePacket(TmTcMessage& msg) {
tmStore.deleteData(msg.getStorageId());
msg.setStorageId(newStoreId);
store_address_t origStoreId = newStoreId;
for (unsigned int idx = 0; idx < destinations.size(); idx++) {
const auto& dest = destinations[idx];
if (destinations.size() > 1) {
if (idx < destinations.size() - 1) {
// Create copy of data to ensure each TM recipient has its own copy. That way, we don't need
// to bother with send order and where the data is deleted.
store_address_t storeId;
result = tmStore.addData(&storeId, newPacketData, packetLen);
if (result == returnvalue::OK) {
msg.setStorageId(storeId);
} else {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PusTmFunnel::handlePacket: Store too full to create data copy or store "
"error"
<< std::endl;
#endif
break;
}
} else {
msg.setStorageId(origStoreId);
}
}
result = tmQueue->sendMessage(dest.queueId, &msg);
if (result != returnvalue::OK) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PusTmFunnel::handlePacket: Error sending TM to downlink handler " << dest.name
<< " failed" << std::endl;
#endif
tmStore.deleteData(msg.getStorageId());
}
}
return result;
// TODO: Also route to persistent TM store
return demultiplexLivePackets(origStoreId, newPacketData, packetLen);
}

View File

@ -69,14 +69,14 @@ ReturnValue_t PersistentTmStore::handleCommandQueue(StorageManagerIF& ipcStore,
SerializeIF::Endianness::NETWORK);
SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data() + 4, &size,
SerializeIF::Endianness::NETWORK);
dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds, tmFunnel);
dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds);
}
}
return returnvalue::OK;
}
void PersistentTmStore::dumpFrom(uint32_t fromUnixSeconds, TmFunnelBase& tmFunnel) {
return dumpFromUpTo(fromUnixSeconds, currentTv.tv_sec, tmFunnel);
void PersistentTmStore::dumpFrom(uint32_t fromUnixSeconds) {
return dumpFromUpTo(fromUnixSeconds, currentTv.tv_sec);
}
ReturnValue_t PersistentTmStore::storePacket(PusTmReader& reader) {
@ -166,8 +166,7 @@ void PersistentTmStore::deleteUpTo(uint32_t unixSeconds) {
}
}
void PersistentTmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds,
TmFunnelBase& funnel) {
void PersistentTmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds) {
using namespace std::filesystem;
for (auto const& file : directory_iterator(basePath)) {
if (file.is_directory()) {
@ -180,7 +179,7 @@ void PersistentTmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnix
}
auto fileEpoch = static_cast<uint32_t>(timegm(&fileTime));
if ((fileEpoch > fromUnixSeconds) and (fileEpoch + rolloverDiffSeconds <= upToUnixSeconds)) {
fileToPackets(file, fileEpoch, funnel);
fileToPackets(file, fileEpoch);
}
}
}
@ -195,8 +194,7 @@ ReturnValue_t PersistentTmStore::pathToTm(const std::filesystem::path& path, str
return returnvalue::OK;
}
void PersistentTmStore::fileToPackets(const std::filesystem::path& path, uint32_t unixStamp,
TmFunnelBase& funnel) {
void PersistentTmStore::fileToPackets(const std::filesystem::path& path, uint32_t unixStamp) {
store_address_t storeId;
TmTcMessage message;
size_t size = std::filesystem::file_size(path);
@ -212,12 +210,13 @@ void PersistentTmStore::fileToPackets(const std::filesystem::path& path, uint32_
// CRC check to fully ensure this is a valid TM
ReturnValue_t result = reader.parseDataWithCrcCheck();
if (result == returnvalue::OK) {
result = tmStore.addData(&storeId, fileBuf.data() + currentIdx, reader.getFullPacketLen());
if (result != returnvalue::OK) {
continue;
}
funnel.sendPacketToDestinations(storeId, message, fileBuf.data() + currentIdx,
reader.getFullPacketLen());
// TODO: Blow the data out to the VC directly. Use IF function to do this.
// result = tmStore.addData(&storeId, fileBuf.data() + currentIdx,
// reader.getFullPacketLen()); if (result != returnvalue::OK) {
// continue;
// }
// funnel.sendPacketToLiveDestinations(storeId, message, fileBuf.data() + currentIdx,
// reader.getFullPacketLen());
currentIdx += reader.getFullPacketLen();
} else {
sif::error << "Parsing of PUS TM failed with code " << result << std::endl;

View File

@ -32,8 +32,8 @@ class PersistentTmStore : public TmStoreFrontendSimpleIF, public SystemObject {
ReturnValue_t handleCommandQueue(StorageManagerIF& ipcStore, TmFunnelBase& tmFunnel);
void deleteUpTo(uint32_t unixSeconds);
void dumpFrom(uint32_t fromUnixSeconds, TmFunnelBase& tmFunnel);
void dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds, TmFunnelBase& tmFunnel);
void dumpFrom(uint32_t fromUnixSeconds);
void dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds);
// ReturnValue_t passPacket(PusTmReader& reader);
ReturnValue_t storePacket(PusTmReader& reader);
@ -69,7 +69,7 @@ class PersistentTmStore : public TmStoreFrontendSimpleIF, public SystemObject {
void calcDiffSeconds(RolloverInterval intervalUnit, uint32_t intervalCount);
ReturnValue_t createMostRecentFile(std::optional<uint8_t> suffix);
static ReturnValue_t pathToTm(const std::filesystem::path& path, struct tm& time);
void fileToPackets(const std::filesystem::path& path, uint32_t unixStamp, TmFunnelBase& funnel);
void fileToPackets(const std::filesystem::path& path, uint32_t unixStamp);
bool updateBaseDir();
ReturnValue_t assignAndOrCreateMostRecentFile();
};

View File

@ -0,0 +1,48 @@
#include "PusLiveDemux.h"
#include <fsfw/storagemanager/storeAddress.h>
#include <fsfw/tmtcservices/TmTcMessage.h>
PusLiveDemux::PusLiveDemux(MessageQueueIF& ownerQueue) : ownerQueue(ownerQueue) {}
ReturnValue_t PusLiveDemux::demultiplexPackets(StorageManagerIF& tmStore,
store_address_t origStoreId, const uint8_t* tmData,
size_t tmSize) {
ReturnValue_t result = returnvalue::OK;
for (unsigned int idx = 0; idx < destinations.size(); idx++) {
const auto& dest = destinations[idx];
if (destinations.size() > 1) {
if (idx < destinations.size() - 1) {
// Create copy of data to ensure each TM recipient has its own copy. That way, we don't need
// to bother with send order and where the data is deleted.
store_address_t storeId;
result = tmStore.addData(&storeId, tmData, tmSize);
if (result == returnvalue::OK) {
message.setStorageId(storeId);
} else {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PusLiveDemux::handlePacket: Store too full to create data copy"
<< std::endl;
#endif
}
} else {
message.setStorageId(origStoreId);
}
}
result = ownerQueue.sendMessage(dest.queueId, &message);
if (result != returnvalue::OK) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << "PusLiveDemux::handlePacket: Error sending TM to downlink handler " << dest.name
<< std::endl;
#endif
tmStore.deleteData(message.getStorageId());
}
}
return result;
}
void PusLiveDemux::addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination,
uint8_t vcid) {
auto queueId = downlinkDestination.getReportReceptionQueue(vcid);
destinations.emplace_back(name, queueId, vcid);
}

View File

@ -0,0 +1,34 @@
#ifndef MISSION_TMTC_PUSLIVEDEMUX_H_
#define MISSION_TMTC_PUSLIVEDEMUX_H_
#include <fsfw/ipc/messageQueueDefinitions.h>
#include <fsfw/storagemanager/StorageManagerIF.h>
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
#include <vector>
class PusLiveDemux {
public:
PusLiveDemux(MessageQueueIF& ownerQueue);
ReturnValue_t demultiplexPackets(StorageManagerIF& tmStore, store_address_t origStoreId,
const uint8_t* tmData, size_t tmSize);
void addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination,
uint8_t vcid = 0);
private:
struct Destination {
Destination(const char* name, MessageQueueId_t queueId, uint8_t virtualChannel)
: name(name), queueId(queueId), virtualChannel(virtualChannel) {}
const char* name;
MessageQueueId_t queueId;
uint8_t virtualChannel = 0;
};
MessageQueueIF& ownerQueue;
TmTcMessage message;
std::vector<Destination> destinations;
};
#endif /* MISSION_TMTC_PUSLIVEDEMUX_H_ */

View File

@ -86,18 +86,21 @@ ReturnValue_t PusTmFunnel::handleTmPacket(TmTcMessage &message) {
sourceSequenceCount = sourceSequenceCount % ccsds::LIMIT_SEQUENCE_COUNT;
packet.updateErrorControl();
// TODO: 1. Send packet to persistent TM store when applicable.
// 2. Send packet to live TM VC
// 3. Send packet to TCP/IP destination
return sendPacketToDestinations(origStoreId, message, packetData, size);
// timeval currentUptime{};
// Clock::getUptime(&currentUptime);
// if (currentUptime.tv_sec - lastTvUpdate.tv_sec >
// static_cast<signed int>(TV_UPDATE_INTERVAL_SECS)) {
// Clock::getClock_timeval(&currentTv);
// lastTvUpdate = currentUptime;
// }
// Send to persistent TM store if the packet matches some filter.
MessageQueueId_t destination;
if (persistentTmMap.packetMatches(packet, destination)) {
store_address_t storageId;
TmTcMessage msg(storageId);
result = tmStore.addData(&storageId, packetData, size);
if (result != returnvalue::OK) {
sif::error << "PusLiveDemux::handlePacket: Store too full to create data copy" << std::endl;
} else {
tmQueue->sendMessage(destination, &msg);
}
}
return demultiplexLivePackets(origStoreId, packetData, size);
// TODO: Will be moved to own thread.
// bool sdcUsable = sdcMan.isSdCardUsable(std::nullopt);
// initStoresIfPossible(sdcUsable);
// if (sdcUsable) {
@ -111,6 +114,7 @@ ReturnValue_t PusTmFunnel::handleTmPacket(TmTcMessage &message) {
const char *PusTmFunnel::getName() const { return "PUS TM Funnel"; }
void PusTmFunnel::initStoresIfPossible(bool sdCardUsable) {
// TODO: Those will be moved to own thread.
if (not storesInitialized and sdCardUsable and sdcMan.getCurrentMountPrefix() != nullptr) {
// miscStore.initializeTmStore();
// okStore.initializeTmStore();
@ -126,5 +130,5 @@ ReturnValue_t PusTmFunnel::initialize() {
}
void PusTmFunnel::addPersistentTmStoreRouting(PusPacketFilter filter, MessageQueueId_t dest) {
router.addRouting(filter, dest);
persistentTmMap.addRouting(filter, dest);
}

View File

@ -6,8 +6,8 @@
#include <fsfw/tasks/ExecutableObjectIF.h>
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
#include <fsfw/tmtcservices/TmTcMessage.h>
#include <mission/tmtc/PusTmRouteByFilterHelper.h>
#include <mission/tmtc/TmFunnelBase.h>
#include <mission/tmtc/TmStoreRouter.h>
#include <vector>
@ -39,7 +39,7 @@ class PusTmFunnel : public TmFunnelBase {
TimeReaderIF &timeReader;
bool storesInitialized = false;
SdCardMountedIF &sdcMan;
PersistentTmStoreRouter router;
PusTmRouteByFilterHelper persistentTmMap;
ReturnValue_t handleTmPacket(TmTcMessage &message);
void initStoresIfPossible(bool sdCardUsable);

View File

@ -0,0 +1,19 @@
#include "PusTmRouteByFilterHelper.h"
#include <fsfw/ipc/MessageQueueIF.h>
PusTmRouteByFilterHelper::PusTmRouteByFilterHelper() = default;
bool PusTmRouteByFilterHelper::packetMatches(PusTmReader& reader, MessageQueueId_t& destination) {
for (const auto filterAndDest : routerMap) {
if (filterAndDest.first.packetMatches(reader)) {
destination = filterAndDest.second;
return true;
}
}
return false;
}
void PusTmRouteByFilterHelper::addRouting(PusPacketFilter filter, MessageQueueId_t destination) {
routerMap.emplace_back(std::move(filter), destination);
}

View File

@ -4,10 +4,21 @@
#include <fsfw/ipc/messageQueueDefinitions.h>
#include <mission/tmtc/PusPacketFilter.h>
class PersistentTmStoreRouter {
/**
* Simple composition of concrecte @PusPacketFilters which also maps them to
* a destination ID.
*/
class PusTmRouteByFilterHelper {
public:
PersistentTmStoreRouter();
PusTmRouteByFilterHelper();
/**
* Checks whether the packet matches any of the inserted filters and returns
* the destination if it does. Currently only supports one destination.
* @param reader
* @param destination
* @return
*/
bool packetMatches(PusTmReader& reader, MessageQueueId_t& destination);
void addRouting(PusPacketFilter filter, MessageQueueId_t destination);

View File

@ -5,8 +5,16 @@
#include "fsfw/ipc/QueueFactory.h"
TmFunnelBase::TmFunnelBase(FunnelCfg cfg)
: SystemObject(cfg.objectId), name(cfg.name), tmStore(cfg.tmStore), ipcStore(cfg.ipcStore) {
tmQueue = QueueFactory::instance()->createMessageQueue(cfg.tmMsgDepth);
: SystemObject(cfg.objectId),
name(cfg.name),
tmStore(cfg.tmStore),
ipcStore(cfg.ipcStore),
tmQueue(QueueFactory::instance()->createMessageQueue(cfg.tmMsgDepth)),
liveDemux(*tmQueue) {}
ReturnValue_t TmFunnelBase::demultiplexLivePackets(store_address_t origStoreId,
const uint8_t *tmData, size_t tmSize) {
return liveDemux.demultiplexPackets(tmStore, origStoreId, tmData, tmSize);
}
TmFunnelBase::~TmFunnelBase() { QueueFactory::instance()->deleteMessageQueue(tmQueue); }
@ -15,43 +23,7 @@ MessageQueueId_t TmFunnelBase::getReportReceptionQueue(uint8_t virtualChannel) c
return tmQueue->getId();
}
void TmFunnelBase::addDestination(const char *name, const AcceptsTelemetryIF &downlinkDestination,
uint8_t vcid) {
auto queueId = downlinkDestination.getReportReceptionQueue(vcid);
destinations.emplace_back(name, queueId, vcid);
}
ReturnValue_t TmFunnelBase::sendPacketToDestinations(store_address_t origStoreId,
TmTcMessage &message,
const uint8_t *packetData, size_t size) {
ReturnValue_t result = returnvalue::OK;
for (unsigned int idx = 0; idx < destinations.size(); idx++) {
const auto &dest = destinations[idx];
if (destinations.size() > 1) {
if (idx < destinations.size() - 1) {
// Create copy of data to ensure each TM recipient has its own copy. That way, we don't need
// to bother with send order and where the data is deleted.
store_address_t storeId;
result = tmStore.addData(&storeId, packetData, size);
if (result == returnvalue::OK) {
message.setStorageId(storeId);
} else {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << name << "::handlePacket: Store too full to create data copy" << std::endl;
#endif
}
} else {
message.setStorageId(origStoreId);
}
}
result = tmQueue->sendMessage(dest.queueId, &message);
if (result != returnvalue::OK) {
#if FSFW_CPP_OSTREAM_ENABLED == 1
sif::error << name << "::handlePacket: Error sending TM to downlink handler " << dest.name
<< std::endl;
#endif
tmStore.deleteData(message.getStorageId());
}
}
return result;
void TmFunnelBase::addLiveDestination(const char *name,
const AcceptsTelemetryIF &downlinkDestination, uint8_t vcid) {
liveDemux.addDestination(name, downlinkDestination, vcid);
}

View File

@ -6,6 +6,7 @@
#include <fsfw/tmstorage/TmStoreFrontendSimpleIF.h>
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
#include <fsfw/tmtcservices/TmTcMessage.h>
#include <mission/tmtc/PusLiveDemux.h>
#include <vector>
@ -26,11 +27,11 @@ class TmFunnelBase : public AcceptsTelemetryIF, public SystemObject {
uint32_t tmMsgDepth;
};
explicit TmFunnelBase(FunnelCfg cfg);
void addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination,
uint8_t vcid = 0);
ReturnValue_t sendPacketToDestinations(store_address_t origStoreId, TmTcMessage& message,
const uint8_t* packetData, size_t size);
[[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override;
void addLiveDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination,
uint8_t vcid = 0);
ReturnValue_t demultiplexLivePackets(store_address_t origStoreId, const uint8_t* tmData,
size_t tmSize);
~TmFunnelBase() override;
@ -38,18 +39,8 @@ class TmFunnelBase : public AcceptsTelemetryIF, public SystemObject {
const char* name;
StorageManagerIF& tmStore;
StorageManagerIF& ipcStore;
struct Destination {
Destination(const char* name, MessageQueueId_t queueId, uint8_t virtualChannel)
: name(name), queueId(queueId), virtualChannel(virtualChannel) {}
const char* name;
MessageQueueId_t queueId;
uint8_t virtualChannel = 0;
};
std::vector<Destination> destinations;
MessageQueueIF* tmQueue = nullptr;
PusLiveDemux liveDemux;
};
#endif /* MISSION_TMTC_TMFUNNELBASE_H_ */

View File

@ -1,19 +0,0 @@
#include "TmStoreRouter.h"
#include <fsfw/ipc/MessageQueueIF.h>
PersistentTmStoreRouter::PersistentTmStoreRouter() = default;
bool PersistentTmStoreRouter::packetMatches(PusTmReader& reader, MessageQueueId_t& destination) {
for (const auto filterAndDest : routerMap) {
if (filterAndDest.first.packetMatches(reader)) {
destination = filterAndDest.second;
return true;
}
}
return false;
}
void PersistentTmStoreRouter::addRouting(PusPacketFilter filter, MessageQueueId_t destination) {
routerMap.emplace_back(std::move(filter), destination);
}