Persistent TM Store #320
@ -98,7 +98,7 @@ ReturnValue_t PusTmFunnel::performOperation(uint8_t) {
|
|||||||
SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data(), &size,
|
SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data(), &size,
|
||||||
SerializeIF::Endianness::NETWORK);
|
SerializeIF::Endianness::NETWORK);
|
||||||
// TODO: TM store missing, and maybe there is a better way to do this?
|
// TODO: TM store missing, and maybe there is a better way to do this?
|
||||||
tmStore->dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds, destinations, tmQueue);
|
tmStore->dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds, *this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -149,36 +149,7 @@ ReturnValue_t PusTmFunnel::handlePacket(TmTcMessage &message) {
|
|||||||
miscStore.passPacket(packet);
|
miscStore.passPacket(packet);
|
||||||
okStore.passPacket(packet);
|
okStore.passPacket(packet);
|
||||||
}
|
}
|
||||||
|
return sendPacketToDestinations(origStoreId, message, packetData, size);
|
||||||
for (unsigned int idx = 0; idx < destinations.size(); idx++) {
|
|
||||||
const auto &destVcidPair = destinations[idx];
|
|
||||||
if (destinations.size() > 1) {
|
|
||||||
if (idx < destinations.size() - 1) {
|
|
||||||
// Create copy of data to ensure each TM recipient has its own copy. That way, we don't need
|
|
||||||
// to bother with send order and where the data is deleted.
|
|
||||||
store_address_t storeId;
|
|
||||||
result = tmStore.addData(&storeId, packetData, size);
|
|
||||||
if (result == returnvalue::OK) {
|
|
||||||
message.setStorageId(storeId);
|
|
||||||
} else {
|
|
||||||
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
|
||||||
sif::error << "PusTmFunnel::handlePacket: Store too full to create data copy"
|
|
||||||
<< std::endl;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
message.setStorageId(origStoreId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
result = tmQueue->sendMessage(destVcidPair.first, &message);
|
|
||||||
if (result != returnvalue::OK) {
|
|
||||||
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
|
||||||
sif::error << "PusTmFunnel::handlePacket: Error sending TM to downlink handler" << std::endl;
|
|
||||||
#endif
|
|
||||||
tmStore.deleteData(message.getStorageId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *PusTmFunnel::getName() const { return "PUS TM Funnel"; }
|
const char *PusTmFunnel::getName() const { return "PUS TM Funnel"; }
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
#include "TmFunnelBase.h"
|
#include "TmFunnelBase.h"
|
||||||
|
|
||||||
|
#include <fsfw/tmtcservices/TmTcMessage.h>
|
||||||
|
|
||||||
#include "fsfw/ipc/QueueFactory.h"
|
#include "fsfw/ipc/QueueFactory.h"
|
||||||
|
|
||||||
TmFunnelBase::TmFunnelBase(object_id_t objectId, StorageManagerIF &tmStore, uint32_t tmMsgDepth,
|
TmFunnelBase::TmFunnelBase(object_id_t objectId, StorageManagerIF &tmStore, uint32_t tmMsgDepth,
|
||||||
@ -21,3 +23,38 @@ void TmFunnelBase::addDestination(const AcceptsTelemetryIF &downlinkDestination,
|
|||||||
auto queueId = downlinkDestination.getReportReceptionQueue(vcid);
|
auto queueId = downlinkDestination.getReportReceptionQueue(vcid);
|
||||||
destinations.emplace_back(queueId, vcid);
|
destinations.emplace_back(queueId, vcid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReturnValue_t TmFunnelBase::sendPacketToDestinations(store_address_t origStoreId,
|
||||||
|
TmTcMessage &message,
|
||||||
|
const uint8_t *packetData, size_t size) {
|
||||||
|
ReturnValue_t result;
|
||||||
|
for (unsigned int idx = 0; idx < destinations.size(); idx++) {
|
||||||
|
const auto &destVcidPair = destinations[idx];
|
||||||
|
if (destinations.size() > 1) {
|
||||||
|
if (idx < destinations.size() - 1) {
|
||||||
|
// Create copy of data to ensure each TM recipient has its own copy. That way, we don't need
|
||||||
|
// to bother with send order and where the data is deleted.
|
||||||
|
store_address_t storeId;
|
||||||
|
result = tmStore.addData(&storeId, packetData, size);
|
||||||
|
if (result == returnvalue::OK) {
|
||||||
|
message.setStorageId(storeId);
|
||||||
|
} else {
|
||||||
|
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
||||||
|
sif::error << "PusTmFunnel::handlePacket: Store too full to create data copy"
|
||||||
|
<< std::endl;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
message.setStorageId(origStoreId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
result = tmQueue->sendMessage(destVcidPair.first, &message);
|
||||||
|
if (result != returnvalue::OK) {
|
||||||
|
#if FSFW_CPP_OSTREAM_ENABLED == 1
|
||||||
|
sif::error << "PusTmFunnel::handlePacket: Error sending TM to downlink handler" << std::endl;
|
||||||
|
#endif
|
||||||
|
tmStore.deleteData(message.getStorageId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
#include <fsfw/storagemanager/StorageManagerIF.h>
|
#include <fsfw/storagemanager/StorageManagerIF.h>
|
||||||
#include <fsfw/tmstorage/TmStoreFrontendSimpleIF.h>
|
#include <fsfw/tmstorage/TmStoreFrontendSimpleIF.h>
|
||||||
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
|
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
|
||||||
|
#include <fsfw/tmtcservices/TmTcMessage.h>
|
||||||
|
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
@ -15,6 +16,8 @@ class TmFunnelBase : public TmStoreFrontendSimpleIF,
|
|||||||
TmFunnelBase(object_id_t objectId, StorageManagerIF& tmStore, uint32_t tmMsgDepth,
|
TmFunnelBase(object_id_t objectId, StorageManagerIF& tmStore, uint32_t tmMsgDepth,
|
||||||
uint32_t tcMsgDepth, StorageManagerIF& ipcStore);
|
uint32_t tcMsgDepth, StorageManagerIF& ipcStore);
|
||||||
void addDestination(const AcceptsTelemetryIF& downlinkDestination, uint8_t vcid = 0);
|
void addDestination(const AcceptsTelemetryIF& downlinkDestination, uint8_t vcid = 0);
|
||||||
|
ReturnValue_t sendPacketToDestinations(store_address_t origStoreId, TmTcMessage& message,
|
||||||
|
const uint8_t* packetData, size_t size);
|
||||||
|
|
||||||
[[nodiscard]] MessageQueueId_t getCommandQueue() const override;
|
[[nodiscard]] MessageQueueId_t getCommandQueue() const override;
|
||||||
[[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override;
|
[[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override;
|
||||||
|
@ -185,5 +185,10 @@ void TmStore::addServiceSubservice(uint8_t service, uint8_t subservice) {
|
|||||||
void TmStore::deleteUpTo(uint32_t unixSeconds) {}
|
void TmStore::deleteUpTo(uint32_t unixSeconds) {}
|
||||||
|
|
||||||
void TmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds,
|
void TmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds,
|
||||||
std::vector<std::pair<MessageQueueId_t, uint8_t>>& destinations,
|
TmFunnelBase& funnel) {
|
||||||
MessageQueueIF* tmQueue) {}
|
store_address_t storeId;
|
||||||
|
TmTcMessage message;
|
||||||
|
const uint8_t* packetData = nullptr;
|
||||||
|
size_t size = 0;
|
||||||
|
funnel.sendPacketToDestinations(storeId, message, packetData, size);
|
||||||
|
}
|
||||||
|
@ -9,6 +9,8 @@
|
|||||||
|
|
||||||
#include <filesystem>
|
#include <filesystem>
|
||||||
|
|
||||||
|
#include "TmFunnelBase.h"
|
||||||
|
|
||||||
struct PacketFilter {
|
struct PacketFilter {
|
||||||
std::optional<std::vector<uint16_t>> apid;
|
std::optional<std::vector<uint16_t>> apid;
|
||||||
std::optional<std::vector<uint8_t>> services;
|
std::optional<std::vector<uint8_t>> services;
|
||||||
@ -28,9 +30,7 @@ class TmStore : public SystemObject {
|
|||||||
void addServiceSubservice(uint8_t service, uint8_t subservice);
|
void addServiceSubservice(uint8_t service, uint8_t subservice);
|
||||||
|
|
||||||
void deleteUpTo(uint32_t unixSeconds);
|
void deleteUpTo(uint32_t unixSeconds);
|
||||||
void dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds,
|
void dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds, TmFunnelBase& tmFunnel);
|
||||||
std::vector<std::pair<MessageQueueId_t, uint8_t>>& destinations,
|
|
||||||
MessageQueueIF* tmQueue);
|
|
||||||
|
|
||||||
void updateBaseDir();
|
void updateBaseDir();
|
||||||
ReturnValue_t passPacket(PusTmReader& reader);
|
ReturnValue_t passPacket(PusTmReader& reader);
|
||||||
|
Loading…
Reference in New Issue
Block a user