Persistent TM Store #320

Merged
muellerr merged 109 commits from mueller/pus-15-tm-storage into develop 2023-02-24 19:03:39 +01:00
12 changed files with 101 additions and 116 deletions
Showing only changes of commit e416d94224 - Show all commits

View File

@ -3,8 +3,9 @@ if(EIVE_BUILD_GPSD_GPS_HANDLER)
endif()
target_sources(
${OBSW_NAME} PRIVATE Max31865RtdPolling.cpp ScexUartReader.cpp ImtqPollingTask.cpp
ScexDleParser.cpp ScexHelper.cpp RwPollingTask.cpp)
${OBSW_NAME}
PRIVATE Max31865RtdPolling.cpp ScexUartReader.cpp ImtqPollingTask.cpp
ScexDleParser.cpp ScexHelper.cpp RwPollingTask.cpp)
add_subdirectory(ploc)

View File

@ -139,10 +139,10 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
new CcsdsDistributor(config::EIVE_PUS_APID, objects::CCSDS_PACKET_DISTRIBUTOR);
new PusDistributor(config::EIVE_PUS_APID, objects::PUS_PACKET_DISTRIBUTOR, ccsdsDistrib);
PusTmFunnel::FunnelCfg cfdpFunnelCfg(objects::CFDP_TM_FUNNEL, *tmStore, *ipcStore, 50, 15);
PusTmFunnel::FunnelCfg cfdpFunnelCfg(objects::CFDP_TM_FUNNEL, *tmStore, *ipcStore, 50);
*cfdpFunnel = new CfdpTmFunnel(cfdpFunnelCfg, config::EIVE_CFDP_APID);
PusTmFunnel::FunnelCfg pusFunnelCfg(objects::PUS_TM_FUNNEL, *tmStore, *ipcStore,
config::MAX_PUS_FUNNEL_QUEUE_DEPTH, 15);
config::MAX_PUS_FUNNEL_QUEUE_DEPTH);
*pusFunnel = new PusTmFunnel(pusFunnelCfg, *timeStamper, sdcMan);
#if OBSW_ADD_TCPIP_SERVERS == 1
#if OBSW_ADD_TMTC_UDP_SERVER == 1

View File

@ -256,7 +256,7 @@ ReturnValue_t ImtqHandler::scanForReply(const uint8_t* start, size_t remainingSi
ReturnValue_t ImtqHandler::interpretDeviceReply(DeviceCommandId_t id, const uint8_t* packet) {
ReturnValue_t result;
ReturnValue_t status = returnvalue::OK;
if(getMode() != MODE_NORMAL) {
if (getMode() != MODE_NORMAL) {
// Ignore replies during transitions.
return returnvalue::OK;
}

View File

@ -20,7 +20,7 @@ ReturnValue_t CfdpTmFunnel::performOperation(uint8_t) {
break;
}
count++;
if(count == 500) {
if (count == 500) {
sif::error << "CfdpTmFunnel: Possible message storm detected" << std::endl;
break;
}

View File

@ -8,6 +8,10 @@
#include <fstream>
#include <utility>
#include "fsfw/ipc/CommandMessage.h"
#include "fsfw/ipc/QueueFactory.h"
#include "fsfw/tmstorage/TmStoreMessage.h"
using namespace returnvalue;
TmStore::TmStore(object_id_t objectId, const char* baseDir, std::string baseName,
muellerr marked this conversation as resolved
Review

I don't get why the stores need a reference to a central time. They can look that up by their own I think. Is it necessary to have all stores working on the exact same time?

I don't get why the stores need a reference to a central time. They can look that up by their own I think. Is it necessary to have all stores working on the exact same time?
Review

No. I'll probably update the code so each persistent store calls `Clock::getClock_timeval

No. I'll probably update the code so each persistent store calls `Clock::getClock_timeval
@ -19,9 +23,46 @@ TmStore::TmStore(object_id_t objectId, const char* baseDir, std::string baseName
currentTv(currentTv),
sdcMan(sdcMan),
tmStore(tmStore) {
tcQueue = QueueFactory::instance()->createMessageQueue();
calcDiffSeconds(intervalUnit, intervalCount);
}
ReturnValue_t TmStore::handleCommandQueue(StorageManagerIF& ipcStore, TmFunnelBase& tmFunnel) {
CommandMessage cmdMessage;
ReturnValue_t result = tcQueue->receiveMessage(&cmdMessage);
if (result == MessageQueueIF::EMPTY) {
return returnvalue::OK;
}
if (result != returnvalue::OK) {
return result;
}
if (cmdMessage.getMessageType() == messagetypes::TM_STORE) {
Command_t cmd = cmdMessage.getCommand();
if (cmd == TmStoreMessage::DELETE_STORE_CONTENT_TIME) {
store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage);
auto accessor = ipcStore.getData(storeId);
uint32_t deleteUpToUnixSeconds = 0;
size_t size = accessor.second.size();
SerializeAdapter::deSerialize(&deleteUpToUnixSeconds, accessor.second.data(), &size,
SerializeIF::Endianness::NETWORK);
deleteUpTo(deleteUpToUnixSeconds);
} else if (cmd == TmStoreMessage::DOWNLINK_STORE_CONTENT_TIME) {
store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage);
auto accessor = ipcStore.getData(storeId);
uint32_t dumpFromUnixSeconds;
uint32_t dumpUntilUnixSeconds;
size_t size = accessor.second.size();
SerializeAdapter::deSerialize(&dumpFromUnixSeconds, accessor.second.data(), &size,
SerializeIF::Endianness::NETWORK);
SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data(), &size,
SerializeIF::Endianness::NETWORK);
// TODO: TM store missing, and maybe there is a better way to do this?
dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds, tmFunnel);
}
}
return returnvalue::OK;
}
ReturnValue_t TmStore::passPacket(PusTmReader& reader) {
bool inApidList = false;
if (filter.apid) {
@ -91,11 +132,12 @@ ReturnValue_t TmStore::storePacket(PusTmReader& reader) {
// Rollover conditions were handled, write to file now
std::ofstream of(mostRecentFile.value(), std::ios::app | std::ios::binary);
of.write(reinterpret_cast<const char*>(reader.getFullData()), reader.getFullPacketLen());
of.write(reinterpret_cast<const char*>(reader.getFullData()),
static_cast<std::streamsize>(reader.getFullPacketLen()));
return returnvalue::OK;
}
MessageQueueId_t TmStore::getCommandQueue() { return MessageQueueIF::NO_QUEUE; }
MessageQueueId_t TmStore::getCommandQueue() const { return tcQueue->getId(); }
void TmStore::calcDiffSeconds(RolloverInterval intervalUnit, uint32_t intervalCount) {
if (intervalUnit == RolloverInterval::MINUTELY) {
@ -184,7 +226,7 @@ void TmStore::addServiceSubservice(uint8_t service, uint8_t subservice) {
std::vector<std::pair<uint8_t, uint8_t>>({std::pair(service, subservice)});
return;
}
filter.serviceSubservices.value().push_back({service, subservice});
filter.serviceSubservices.value().emplace_back(service, subservice);
}
void TmStore::deleteUpTo(uint32_t unixSeconds) {
@ -196,7 +238,7 @@ void TmStore::deleteUpTo(uint32_t unixSeconds) {
}
Clock::TimeOfDay_t tod;
pathToTod(file.path(), tod);
timeval time;
timeval time{};
ReturnValue_t result = Clock::convertTimeOfDayToTimeval(&tod, &time);
if (result != returnvalue::OK) {
sif::error << "TOD to time conversion failed for file " << file << std::endl;
@ -222,13 +264,13 @@ void TmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds,
}
Clock::TimeOfDay_t tod;
pathToTod(file.path(), tod);
timeval time;
timeval time{};
ReturnValue_t result = Clock::convertTimeOfDayToTimeval(&tod, &time);
if (result != returnvalue::OK) {
sif::error << "TOD to time conversion failed for file " << file << std::endl;
continue;
}
uint32_t timeUnsigned = static_cast<uint32_t>(time.tv_sec);
auto timeUnsigned = static_cast<uint32_t>(time.tv_sec);
if (timeUnsigned > fromUnixSeconds && timeUnsigned + rolloverDiffSeconds < upToUnixSeconds) {
fileToPackets(file, timeUnsigned, funnel);
}
@ -237,7 +279,7 @@ void TmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds,
void TmStore::pathToTod(const std::filesystem::path& path, Clock::TimeOfDay_t& tod) {
auto pathStr = path.string();
size_t splitChar = pathStr.find("_");
size_t splitChar = pathStr.find('_');
auto timeOnlyStr = pathStr.substr(splitChar);
sscanf(timeOnlyStr.data(),
"%04" SCNu32 "-%02" SCNu32 "-%02" SCNu32 "T%02" SCNu32 "-%02" SCNu32 "-%02" SCNu32 "Z",
@ -254,7 +296,7 @@ void TmStore::fileToPackets(const std::filesystem::path& path, uint32_t unixStam
return;
}
std::ifstream ifile(path, std::ios::binary);
ifile.read(reinterpret_cast<char*>(fileBuf.data()), size);
ifile.read(reinterpret_cast<char*>(fileBuf.data()), static_cast<std::streamsize>(size));
size_t currentIdx = 0;
while (currentIdx < size) {
PusTmReader reader(&timeReader, fileBuf.data(), fileBuf.size());

View File

@ -3,7 +3,7 @@
#include <fsfw/objectmanager/SystemObject.h>
#include <fsfw/timemanager/CdsShortTimeStamper.h>
#include <fsfw/tmstorage/TmStoreFrontendIF.h>
#include <fsfw/tmstorage/TmStoreFrontendSimpleIF.h>
#include <fsfw/tmtcpacket/pus/tm/PusTmReader.h>
#include <fsfw/tmtcservices/AcceptsTelemetryIF.h>
#include <mission/memory/SdCardMountedIF.h>
@ -21,7 +21,7 @@ struct PacketFilter {
enum class RolloverInterval { MINUTELY, HOURLY, DAILY };
class TmStore : public SystemObject {
class TmStore : public TmStoreFrontendSimpleIF, public SystemObject {
public:
static constexpr uint8_t SUBSYSTEM_ID = SUBSYSTEM_ID::PERSISTENT_TM_STORE;
@ -34,6 +34,8 @@ class TmStore : public SystemObject {
RolloverInterval intervalUnit, uint32_t intervalCount, timeval& currentTv,
StorageManagerIF& tmStore, SdCardMountedIF& sdcMan);
ReturnValue_t handleCommandQueue(StorageManagerIF& ipcStore, TmFunnelBase& tmFunnel);
void addApid(uint16_t apid);
void addService(uint8_t service);
void addServiceSubservice(uint8_t service, uint8_t subservice);
@ -48,11 +50,7 @@ class TmStore : public SystemObject {
private:
static constexpr size_t MAX_FILESIZE = 8192;
/**
* To get the queue where commands shall be sent.
* @return Id of command queue.
*/
MessageQueueId_t getCommandQueue();
MessageQueueIF* tcQueue;
PacketFilter filter;
CdsShortTimeStamper timeReader;
bool baseDirUninitialized = true;
@ -67,9 +65,15 @@ class TmStore : public SystemObject {
SdCardMountedIF& sdcMan;
StorageManagerIF& tmStore;
/**
* To get the queue where commands shall be sent.
* @return Id of command queue.
*/
[[nodiscard]] MessageQueueId_t getCommandQueue() const override;
void calcDiffSeconds(RolloverInterval intervalUnit, uint32_t intervalCount);
void assignAndOrCreateMostRecentFile();
void pathToTod(const std::filesystem::path& path, Clock::TimeOfDay_t& tod);
static void pathToTod(const std::filesystem::path& path, Clock::TimeOfDay_t& tod);
void fileToPackets(const std::filesystem::path& path, uint32_t unixStamp, TmFunnelBase& funnel);
ReturnValue_t storePacket(PusTmReader& reader);
};

View File

@ -51,35 +51,44 @@ PusTmFunnel::~PusTmFunnel() = default;
ReturnValue_t PusTmFunnel::performOperation(uint8_t) {
CommandMessage cmdMessage;
ReturnValue_t status = tcQueue->receiveMessage(&cmdMessage);
if (status == returnvalue::OK) {
ReturnValue_t result = handleTcRequest(cmdMessage);
if (result != returnvalue::OK) {
sif::error << "PusTmFunnel::performOperation: Error handling TC request" << std::endl;
}
ReturnValue_t result = okStore.handleCommandQueue(ipcStore, *this);
if (result != returnvalue::OK) {
sif::error << "PusTmFunnel::performOperation: Issue handling OK store command" << std::endl;
}
result = notOkStore.handleCommandQueue(ipcStore, *this);
if (result != returnvalue::OK) {
sif::error << "PusTmFunnel::performOperation: Issue handling NOT OK store command" << std::endl;
}
result = hkStore.handleCommandQueue(ipcStore, *this);
if (result != returnvalue::OK) {
sif::error << "PusTmFunnel::performOperation: Issue handling HK store command" << std::endl;
}
result = miscStore.handleCommandQueue(ipcStore, *this);
if (result != returnvalue::OK) {
sif::error << "PusTmFunnel::performOperation: Issue handling MISC store command" << std::endl;
}
TmTcMessage currentMessage;
unsigned int count = 0;
ReturnValue_t status = tmQueue->receiveMessage(&currentMessage);
while (status == returnvalue::OK) {
status = handleTmPacket(tmMessage);
if (status != returnvalue::OK) {
result = tmQueue->receiveMessage(&currentMessage);
while (result == returnvalue::OK) {
result = handleTmPacket(currentMessage);
if (result != returnvalue::OK) {
sif::warning << "TmFunnel packet handling failed" << std::endl;
break;
}
count++;
if(count == 500) {
if (count == 500) {
sif::error << "PusTmFunnel: Possible message storm detected" << std::endl;
break;
}
status = tmQueue->receiveMessage(&currentMessage);
result = tmQueue->receiveMessage(&currentMessage);
}
if (status == MessageQueueIF::EMPTY) {
if (result == MessageQueueIF::EMPTY) {
return returnvalue::OK;
}
return status;
return result;
}
ReturnValue_t PusTmFunnel::handleTmPacket(TmTcMessage &message) {
@ -137,56 +146,3 @@ ReturnValue_t PusTmFunnel::initialize() {
initStoresIfPossible(sdcMan.isSdCardUsable(std::nullopt));
return returnvalue::OK;
}
ReturnValue_t PusTmFunnel::handleTcRequest(CommandMessage &cmdMessage) {
if (cmdMessage.getMessageType() == messagetypes::TM_STORE) {
Command_t cmd = cmdMessage.getCommand();
object_id_t objectId = TmStoreMessage::getObjectId(&cmdMessage);
TmStore *tmStore = nullptr;
switch (objectId) {
case (objects::HK_TM_STORE): {
tmStore = &hkStore;
break;
}
case (objects::OK_TM_STORE): {
tmStore = &okStore;
break;
}
case (objects::NOT_OK_TM_STORE): {
tmStore = &notOkStore;
break;
}
case (objects::MISC_TM_STORE): {
tmStore = &miscStore;
break;
}
default: {
}
}
if (tmStore == nullptr) {
return returnvalue::FAILED;
}
if (cmd == TmStoreMessage::DELETE_STORE_CONTENT_TIME) {
store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage);
auto accessor = ipcStore.getData(storeId);
uint32_t deleteUpToUnixSeconds = 0;
size_t size = accessor.second.size();
SerializeAdapter::deSerialize(&deleteUpToUnixSeconds, accessor.second.data(), &size,
SerializeIF::Endianness::NETWORK);
tmStore->deleteUpTo(deleteUpToUnixSeconds);
} else if (cmd == TmStoreMessage::DOWNLINK_STORE_CONTENT_TIME) {
store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage);
auto accessor = ipcStore.getData(storeId);
uint32_t dumpFromUnixSeconds;
uint32_t dumpUntilUnixSeconds;
size_t size = accessor.second.size();
SerializeAdapter::deSerialize(&dumpFromUnixSeconds, accessor.second.data(), &size,
SerializeIF::Endianness::NETWORK);
SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data(), &size,
SerializeIF::Endianness::NETWORK);
// TODO: TM store missing, and maybe there is a better way to do this?
tmStore->dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds, *this);
}
}
return returnvalue::OK;
}

View File

@ -44,7 +44,6 @@ class PusTmFunnel : public TmFunnelBase {
TmStore hkStore;
SdCardMountedIF &sdcMan;
ReturnValue_t handleTcRequest(CommandMessage &msg);
ReturnValue_t handleTmPacket(TmTcMessage &message);
void initStoresIfPossible(bool sdCardUsable);
ReturnValue_t initialize() override;

View File

@ -30,14 +30,10 @@ ReturnValue_t Service15TmStorage::getMessageQueueAndObject(uint8_t subservice,
const uint8_t *tcData, size_t tcDataLen,
MessageQueueId_t *id,
object_id_t *objectId) {
object_id_t targetObjectId;
SerializeAdapter::deSerialize(&targetObjectId, &tcData, &tcDataLen,
SerializeIF::Endianness::NETWORK);
if (targetObjectId == objects::CFDP_TM_STORE) {
*objectId = objects::CFDP_TM_FUNNEL;
} else {
*objectId = objects::PUS_TM_FUNNEL;
if (tcDataLen < 4) {
return CommandingServiceBase::INVALID_TC;
}
SerializeAdapter::deSerialize(objectId, &tcData, &tcDataLen, SerializeIF::Endianness::NETWORK);
auto *frontendIF = ObjectManager::instance()->get<TmStoreFrontendSimpleIF>(*objectId);
if (frontendIF == nullptr) {
return FAILED;
muellerr marked this conversation as resolved Outdated

I think there is an Error code for unknown object. (CommandingServiceBase::INVALID_OBJECT)

I think there is an Error code for unknown object. (CommandingServiceBase::INVALID_OBJECT)

View File

@ -7,7 +7,6 @@
TmFunnelBase::TmFunnelBase(FunnelCfg cfg)
: SystemObject(cfg.objectId), tmStore(cfg.tmStore), ipcStore(cfg.ipcStore) {
tmQueue = QueueFactory::instance()->createMessageQueue(cfg.tmMsgDepth);
tcQueue = QueueFactory::instance()->createMessageQueue(cfg.tcMsgDepth);
}
TmFunnelBase::~TmFunnelBase() { QueueFactory::instance()->deleteMessageQueue(tmQueue); }
@ -56,5 +55,3 @@ ReturnValue_t TmFunnelBase::sendPacketToDestinations(store_address_t origStoreId
}
return result;
}
MessageQueueId_t TmFunnelBase::getCommandQueue() const { return tcQueue->getId(); }

View File

@ -9,32 +9,25 @@
#include <vector>
class TmFunnelBase : public TmStoreFrontendSimpleIF,
public AcceptsTelemetryIF,
public SystemObject {
class TmFunnelBase : public AcceptsTelemetryIF, public SystemObject {
public:
struct FunnelCfg {
FunnelCfg(object_id_t objId, StorageManagerIF& tmStore, StorageManagerIF& ipcStore,
uint32_t tmMsgDepth, uint32_t tcMsgDepth)
: objectId(objId),
tmStore(tmStore),
ipcStore(ipcStore),
tmMsgDepth(tmMsgDepth),
tcMsgDepth(tcMsgDepth) {}
uint32_t tmMsgDepth)
: objectId(objId), tmStore(tmStore), ipcStore(ipcStore), tmMsgDepth(tmMsgDepth) {}
object_id_t objectId;
StorageManagerIF& tmStore;
StorageManagerIF& ipcStore;
uint32_t tmMsgDepth;
uint32_t tcMsgDepth;
};
TmFunnelBase(FunnelCfg cfg);
explicit TmFunnelBase(FunnelCfg cfg);
void addDestination(const char* name, const AcceptsTelemetryIF& downlinkDestination,
uint8_t vcid = 0);
ReturnValue_t sendPacketToDestinations(store_address_t origStoreId, TmTcMessage& message,
const uint8_t* packetData, size_t size);
[[nodiscard]] MessageQueueId_t getReportReceptionQueue(uint8_t virtualChannel) const override;
virtual ~TmFunnelBase();
~TmFunnelBase() override;
protected:
StorageManagerIF& tmStore;
@ -52,9 +45,6 @@ class TmFunnelBase : public TmStoreFrontendSimpleIF,
std::vector<Destination> destinations;
MessageQueueIF* tmQueue = nullptr;
MessageQueueIF* tcQueue = nullptr;
MessageQueueId_t getCommandQueue() const override;
};
#endif /* MISSION_TMTC_TMFUNNELBASE_H_ */

View File

@ -50,7 +50,7 @@ ReturnValue_t VirtualChannel::performOperation() {
}
count++;
if(count == 500) {
if (count == 500) {
sif::error << "VirtualChannel: Possible message storm detected" << std::endl;
break;
}