Refactor TM handling #450
@ -54,6 +54,7 @@ static constexpr uint32_t HK_STORE_QUEUE_SIZE = 300;
|
|||||||
static constexpr uint32_t CFDP_STORE_QUEUE_SIZE = 300;
|
static constexpr uint32_t CFDP_STORE_QUEUE_SIZE = 300;
|
||||||
|
|
||||||
static constexpr uint32_t MAX_PUS_FUNNEL_QUEUE_DEPTH = 100;
|
static constexpr uint32_t MAX_PUS_FUNNEL_QUEUE_DEPTH = 100;
|
||||||
|
static constexpr uint32_t MAX_CFDP_FUNNEL_QUEUE_DEPTH = 80;
|
||||||
|
|
||||||
static constexpr uint32_t MAX_STORED_CMDS_UDP = 120;
|
static constexpr uint32_t MAX_STORED_CMDS_UDP = 120;
|
||||||
static constexpr uint32_t MAX_STORED_CMDS_TCP = 150;
|
static constexpr uint32_t MAX_STORED_CMDS_TCP = 150;
|
||||||
|
@ -149,9 +149,6 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
|
|||||||
new CcsdsDistributor(config::EIVE_PUS_APID, objects::CCSDS_PACKET_DISTRIBUTOR);
|
new CcsdsDistributor(config::EIVE_PUS_APID, objects::CCSDS_PACKET_DISTRIBUTOR);
|
||||||
new PusDistributor(config::EIVE_PUS_APID, objects::PUS_PACKET_DISTRIBUTOR, ccsdsDistrib);
|
new PusDistributor(config::EIVE_PUS_APID, objects::PUS_PACKET_DISTRIBUTOR, ccsdsDistrib);
|
||||||
|
|
||||||
PusTmFunnel::FunnelCfg cfdpFunnelCfg(objects::CFDP_TM_FUNNEL, "CfdpTmFunnel", **tmStore,
|
|
||||||
**ipcStore, 50);
|
|
||||||
*cfdpFunnel = new CfdpTmFunnel(cfdpFunnelCfg, *ramToFileStore, config::EIVE_CFDP_APID);
|
|
||||||
PusTmFunnel::FunnelCfg pusFunnelCfg(objects::PUS_TM_FUNNEL, "PusTmFunnel", **tmStore, **ipcStore,
|
PusTmFunnel::FunnelCfg pusFunnelCfg(objects::PUS_TM_FUNNEL, "PusTmFunnel", **tmStore, **ipcStore,
|
||||||
config::MAX_PUS_FUNNEL_QUEUE_DEPTH);
|
config::MAX_PUS_FUNNEL_QUEUE_DEPTH);
|
||||||
// The PUS funnel routes all live TM to the live destinations and to the TM stores.
|
// The PUS funnel routes all live TM to the live destinations and to the TM stores.
|
||||||
@ -213,6 +210,10 @@ void ObjectFactory::produceGenericObjects(HealthTableIF** healthTable_, PusTmFun
|
|||||||
->addPersistentTmStoreRouting(filters::cfdpFilter(),
|
->addPersistentTmStoreRouting(filters::cfdpFilter(),
|
||||||
stores.cfdpStore->getReportReceptionQueue(0));
|
stores.cfdpStore->getReportReceptionQueue(0));
|
||||||
}
|
}
|
||||||
|
PusTmFunnel::FunnelCfg cfdpFunnelCfg(objects::CFDP_TM_FUNNEL, "CfdpTmFunnel", **tmStore,
|
||||||
|
**ipcStore, config::MAX_CFDP_FUNNEL_QUEUE_DEPTH);
|
||||||
|
*cfdpFunnel = new CfdpTmFunnel(cfdpFunnelCfg, stores.cfdpStore->getReportReceptionQueue(0),
|
||||||
|
*ramToFileStore, config::EIVE_CFDP_APID);
|
||||||
|
|
||||||
#if OBSW_ADD_TCPIP_SERVERS == 1
|
#if OBSW_ADD_TCPIP_SERVERS == 1
|
||||||
#if OBSW_ADD_TMTC_UDP_SERVER == 1
|
#if OBSW_ADD_TMTC_UDP_SERVER == 1
|
||||||
|
@ -92,10 +92,18 @@ ReturnValue_t PersistentTmStore::storePacket(PusTmReader& reader) {
|
|||||||
|
|
||||||
bool createNewFile = false;
|
bool createNewFile = false;
|
||||||
std::optional<uint8_t> suffix = std::nullopt;
|
std::optional<uint8_t> suffix = std::nullopt;
|
||||||
|
std::error_code e;
|
||||||
|
size_t fileSize = file_size(activeFile.value(), e);
|
||||||
|
if (e) {
|
||||||
|
sif::error << "PersistentTmStore: Could not retrieve file size, "
|
||||||
|
"error "
|
||||||
|
<< e.message() << std::endl;
|
||||||
|
return returnvalue::FAILED;
|
||||||
|
}
|
||||||
if (currentTv.tv_sec > activeFileTv.tv_sec + static_cast<int>(rolloverDiffSeconds)) {
|
if (currentTv.tv_sec > activeFileTv.tv_sec + static_cast<int>(rolloverDiffSeconds)) {
|
||||||
createNewFile = true;
|
createNewFile = true;
|
||||||
currentSameSecNumber = 0;
|
currentSameSecNumber = 0;
|
||||||
} else if (file_size(activeFile.value()) + reader.getFullPacketLen() > fileBuf.size()) {
|
} else if (fileSize + reader.getFullPacketLen() > fileBuf.size()) {
|
||||||
createNewFile = true;
|
createNewFile = true;
|
||||||
if (currentSameSecNumber >= MAX_FILES_IN_ONE_SECOND) {
|
if (currentSameSecNumber >= MAX_FILES_IN_ONE_SECOND) {
|
||||||
currentSameSecNumber = 0;
|
currentSameSecNumber = 0;
|
||||||
@ -158,7 +166,8 @@ void PersistentTmStore::deleteUpTo(uint32_t unixSeconds) {
|
|||||||
}
|
}
|
||||||
time_t fileEpoch = timegm(&fileTime);
|
time_t fileEpoch = timegm(&fileTime);
|
||||||
if (fileEpoch + rolloverDiffSeconds < unixSeconds) {
|
if (fileEpoch + rolloverDiffSeconds < unixSeconds) {
|
||||||
std::filesystem::remove(file.path());
|
std::error_code e;
|
||||||
|
std::filesystem::remove(file.path(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -184,10 +193,15 @@ ReturnValue_t PersistentTmStore::loadNextDumpFile() {
|
|||||||
using namespace std::filesystem;
|
using namespace std::filesystem;
|
||||||
std::error_code e;
|
std::error_code e;
|
||||||
for (; dumpParams.dirIter != directory_iterator(); dumpParams.dirIter++) {
|
for (; dumpParams.dirIter != directory_iterator(); dumpParams.dirIter++) {
|
||||||
|
dumpParams.dirEntry = *dumpParams.dirIter;
|
||||||
if (dumpParams.dirEntry.is_directory(e)) {
|
if (dumpParams.dirEntry.is_directory(e)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
dumpParams.fileSize = std::filesystem::file_size(dumpParams.dirEntry.path());
|
dumpParams.fileSize = std::filesystem::file_size(dumpParams.dirEntry.path(), e);
|
||||||
|
if (e) {
|
||||||
|
sif::error << "PersistentTmStore: Could not retrieve file size: " << e.message() << std::endl;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
// Can't even read CCSDS header.
|
// Can't even read CCSDS header.
|
||||||
if (dumpParams.fileSize <= 6) {
|
if (dumpParams.fileSize <= 6) {
|
||||||
continue;
|
continue;
|
||||||
@ -195,7 +209,7 @@ ReturnValue_t PersistentTmStore::loadNextDumpFile() {
|
|||||||
if (dumpParams.fileSize > fileBuf.size()) {
|
if (dumpParams.fileSize > fileBuf.size()) {
|
||||||
sif::error << "PersistentTmStore: File too large, is deleted" << std::endl;
|
sif::error << "PersistentTmStore: File too large, is deleted" << std::endl;
|
||||||
triggerEvent(FILE_TOO_LARGE, dumpParams.fileSize, fileBuf.size());
|
triggerEvent(FILE_TOO_LARGE, dumpParams.fileSize, fileBuf.size());
|
||||||
std::remove(dumpParams.dirEntry.path().c_str());
|
std::filesystem::remove(dumpParams.dirEntry.path(), e);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const path& file = dumpParams.dirEntry.path();
|
const path& file = dumpParams.dirEntry.path();
|
||||||
@ -249,7 +263,8 @@ ReturnValue_t PersistentTmStore::dumpNextPacket(DirectTmSinkIF& tmSink, size_t&
|
|||||||
// Delete the file and load next. Could use better algorithm to partially
|
// Delete the file and load next. Could use better algorithm to partially
|
||||||
// restore the file dump, but for now do not trust the file.
|
// restore the file dump, but for now do not trust the file.
|
||||||
dumpedLen = 0;
|
dumpedLen = 0;
|
||||||
std::remove(dumpParams.dirEntry.path().c_str());
|
std::error_code e;
|
||||||
|
std::filesystem::remove(dumpParams.dirEntry.path().c_str(), e);
|
||||||
fileHasSwapped = true;
|
fileHasSwapped = true;
|
||||||
return loadNextDumpFile();
|
return loadNextDumpFile();
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user