#include "PersistentTmStore.h" #include #include #include #include #include #include #include "fsfw/ipc/CommandMessage.h" #include "fsfw/ipc/QueueFactory.h" #include "fsfw/tmstorage/TmStoreMessage.h" using namespace returnvalue; PersistentTmStore::PersistentTmStore(object_id_t objectId, const char* baseDir, std::string baseName, RolloverInterval intervalUnit, uint32_t intervalCount, StorageManagerIF& tmStore, SdCardMountedIF& sdcMan) : SystemObject(objectId), baseDir(baseDir), baseName(std::move(baseName)), sdcMan(sdcMan), tmStore(tmStore) { tcQueue = QueueFactory::instance()->createMessageQueue(); calcDiffSeconds(intervalUnit, intervalCount); } ReturnValue_t PersistentTmStore::assignAndOrCreateMostRecentFile() { using namespace std::filesystem; for (auto const& file : directory_iterator(basePath)) { if (file.is_directory()) { continue; } auto pathStr = file.path().string(); if (pathStr.find(baseName) == std::string::npos) { continue; } unsigned int underscorePos = pathStr.find_last_of('_'); std::string stampStr = pathStr.substr(underscorePos + 1); struct tm time {}; if (nullptr == strptime(stampStr.c_str(), FILE_DATE_FORMAT, &time)) { sif::error << "PersistentTmStore::assignOrCreateMostRecentFile: Error reading timestamp" << std::endl; // Delete the file and re-create it. activeFile = std::nullopt; std::filesystem::remove(file.path()); break; } time_t fileEpoch = timegm(&time); // There is still a file within the active time window, so re-use that file for new TMs to // store. if (fileEpoch + static_cast(rolloverDiffSeconds) > currentTv.tv_sec) { activeFileTv.tv_sec = fileEpoch; activeFile = file.path(); break; } } if (not activeFile.has_value()) { return createMostRecentFile(std::nullopt); } return returnvalue::OK; } ReturnValue_t PersistentTmStore::handleCommandQueue(StorageManagerIF& ipcStore, TmFunnelBase& tmFunnel) { CommandMessage cmdMessage; ReturnValue_t result = tcQueue->receiveMessage(&cmdMessage); if (result == MessageQueueIF::EMPTY) { return returnvalue::OK; } if (result != returnvalue::OK) { return result; } if (cmdMessage.getMessageType() == messagetypes::TM_STORE) { Command_t cmd = cmdMessage.getCommand(); if (cmd == TmStoreMessage::DELETE_STORE_CONTENT_TIME) { Clock::getClock_timeval(¤tTv); store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage); auto accessor = ipcStore.getData(storeId); uint32_t deleteUpToUnixSeconds = 0; size_t size = accessor.second.size(); SerializeAdapter::deSerialize(&deleteUpToUnixSeconds, accessor.second.data(), &size, SerializeIF::Endianness::NETWORK); deleteUpTo(deleteUpToUnixSeconds); } else if (cmd == TmStoreMessage::DOWNLINK_STORE_CONTENT_TIME) { Clock::getClock_timeval(¤tTv); store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage); auto accessor = ipcStore.getData(storeId); if (accessor.second.size() < 8) { return returnvalue::FAILED; } uint32_t dumpFromUnixSeconds; uint32_t dumpUntilUnixSeconds; size_t size = 8; SerializeAdapter::deSerialize(&dumpFromUnixSeconds, accessor.second.data(), &size, SerializeIF::Endianness::NETWORK); SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data() + 4, &size, SerializeIF::Endianness::NETWORK); dumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds, tmFunnel); } } return returnvalue::OK; } ReturnValue_t PersistentTmStore::passPacket(PusTmReader& reader) { bool inApidList = false; if (filter.apid) { auto& apidFilter = filter.apid.value(); if (std::find(apidFilter.begin(), apidFilter.end(), reader.getApid()) != apidFilter.end()) { if (not filter.serviceSubservices and not filter.services) { return storePacket(reader); } inApidList = true; } } std::pair serviceSubservice; serviceSubservice.first = reader.getService(); serviceSubservice.second = reader.getSubService(); if (filter.services) { auto& serviceFilter = filter.services.value(); if (std::find(serviceFilter.begin(), serviceFilter.end(), serviceSubservice.first) != serviceFilter.end()) { if (filter.apid and inApidList) { return storePacket(reader); } } } if (filter.serviceSubservices) { auto& serviceSubserviceFilter = filter.serviceSubservices.value(); if (std::find(serviceSubserviceFilter.begin(), serviceSubserviceFilter.end(), serviceSubservice) != serviceSubserviceFilter.end()) { if (filter.apid and inApidList) { return storePacket(reader); } } } return returnvalue::OK; } void PersistentTmStore::dumpFrom(uint32_t fromUnixSeconds, TmFunnelBase& tmFunnel) { return dumpFromUpTo(fromUnixSeconds, currentTv.tv_sec, tmFunnel); } ReturnValue_t PersistentTmStore::storePacket(PusTmReader& reader) { using namespace std::filesystem; if (baseDirUninitialized) { updateBaseDir(); } Clock::getClock_timeval(¤tTv); // It is assumed here that the filesystem is usable. if (not activeFile.has_value()) { ReturnValue_t result = assignAndOrCreateMostRecentFile(); if (result != returnvalue::OK) { return result; } } bool createNewFile = false; std::optional suffix = std::nullopt; if (currentTv.tv_sec > activeFileTv.tv_sec + static_cast(rolloverDiffSeconds)) { createNewFile = true; currentSameSecNumber = 0; } else if (file_size(activeFile.value()) + reader.getFullPacketLen() > fileBuf.size()) { createNewFile = true; if (currentSameSecNumber >= MAX_FILES_IN_ONE_SECOND) { currentSameSecNumber = 0; } if (currentTv.tv_sec == activeFileTv.tv_sec) { suffix = currentSameSecNumber++; } else { currentSameSecNumber = 0; } } if (createNewFile) { createMostRecentFile(suffix); } // Rollover conditions were handled, write to file now std::ofstream of(activeFile.value(), std::ios::app | std::ios::binary); of.write(reinterpret_cast(reader.getFullData()), static_cast(reader.getFullPacketLen())); return returnvalue::OK; } MessageQueueId_t PersistentTmStore::getCommandQueue() const { return tcQueue->getId(); } void PersistentTmStore::calcDiffSeconds(RolloverInterval intervalUnit, uint32_t intervalCount) { if (intervalUnit == RolloverInterval::MINUTELY) { rolloverDiffSeconds = 60 * intervalCount; } else if (intervalUnit == RolloverInterval::HOURLY) { rolloverDiffSeconds = 60 * 60 * intervalCount; } else if (intervalUnit == RolloverInterval::DAILY) { rolloverDiffSeconds = 60 * 60 * 24 * intervalCount; } } bool PersistentTmStore::updateBaseDir() { using namespace std::filesystem; const char* currentPrefix = sdcMan.getCurrentMountPrefix(); if (currentPrefix == nullptr) { return false; } basePath = path(currentPrefix) / baseDir / baseName; if (not exists(basePath)) { create_directories(basePath); } baseDirUninitialized = false; return true; } void PersistentTmStore::addApid(uint16_t apid) { if (not filter.apid) { filter.apid = std::vector({apid}); return; } filter.apid.value().push_back(apid); } void PersistentTmStore::addService(uint8_t service) { if (not filter.services) { filter.services = std::vector({service}); return; } filter.services.value().push_back(service); } void PersistentTmStore::addServiceSubservice(uint8_t service, uint8_t subservice) { if (not filter.serviceSubservices) { filter.serviceSubservices = std::vector>({std::pair(service, subservice)}); return; } filter.serviceSubservices.value().emplace_back(service, subservice); } void PersistentTmStore::deleteUpTo(uint32_t unixSeconds) { using namespace std::filesystem; for (auto const& file : directory_iterator(basePath)) { if (file.is_directory() or (activeFile.has_value() and (activeFile.value() == file.path()))) { continue; } // Convert file time to the UNIX epoch struct tm fileTime {}; if (pathToTm(file.path(), fileTime) != returnvalue::OK) { sif::error << "Time extraction for " << file << "failed" << std::endl; continue; } time_t fileEpoch = timegm(&fileTime); if (fileEpoch + rolloverDiffSeconds < unixSeconds) { std::filesystem::remove(file.path()); } } } void PersistentTmStore::dumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds, TmFunnelBase& funnel) { using namespace std::filesystem; for (auto const& file : directory_iterator(basePath)) { if (file.is_directory()) { continue; } struct tm fileTime {}; if (pathToTm(file.path(), fileTime) != returnvalue::OK) { sif::error << "Time extraction for file " << file << "failed" << std::endl; continue; } auto fileEpoch = static_cast(timegm(&fileTime)); if ((fileEpoch > fromUnixSeconds) and (fileEpoch + rolloverDiffSeconds <= upToUnixSeconds)) { fileToPackets(file, fileEpoch, funnel); } } } ReturnValue_t PersistentTmStore::pathToTm(const std::filesystem::path& path, struct tm& time) { auto pathStr = path.string(); size_t splitChar = pathStr.find('_'); auto timeOnlyStr = pathStr.substr(splitChar + 1); if (nullptr == strptime(timeOnlyStr.c_str(), FILE_DATE_FORMAT, &time)) { return returnvalue::FAILED; } return returnvalue::OK; } void PersistentTmStore::fileToPackets(const std::filesystem::path& path, uint32_t unixStamp, TmFunnelBase& funnel) { store_address_t storeId; TmTcMessage message; size_t size = std::filesystem::file_size(path); if (size < 6) { // Can't even read the CCSDS header return; } std::ifstream ifile(path, std::ios::binary); ifile.read(reinterpret_cast(fileBuf.data()), static_cast(size)); size_t currentIdx = 0; while (currentIdx < size) { PusTmReader reader(&timeReader, fileBuf.data(), fileBuf.size()); // CRC check to fully ensure this is a valid TM ReturnValue_t result = reader.parseDataWithCrcCheck(); if (result == returnvalue::OK) { result = tmStore.addData(&storeId, fileBuf.data() + currentIdx, reader.getFullPacketLen()); if (result != returnvalue::OK) { continue; } funnel.sendPacketToDestinations(storeId, message, fileBuf.data() + currentIdx, reader.getFullPacketLen()); currentIdx += reader.getFullPacketLen(); } else { sif::error << "Parsing of PUS TM failed with code " << result << std::endl; triggerEvent(POSSIBLE_FILE_CORRUPTION, result, unixStamp); // Stop for now, do not really know where to continue and we do not trust the file anymore. break; } } } ReturnValue_t PersistentTmStore::createMostRecentFile(std::optional suffix) { using namespace std::filesystem; unsigned currentIdx = 0; path pathStart = basePath / baseName; memcpy(fileBuf.data() + currentIdx, pathStart.c_str(), pathStart.string().length()); currentIdx += pathStart.string().length(); fileBuf[currentIdx] = '_'; currentIdx += 1; time_t epoch = currentTv.tv_sec; struct tm* time = gmtime(&epoch); size_t writtenBytes = strftime(reinterpret_cast(fileBuf.data() + currentIdx), fileBuf.size(), FILE_DATE_FORMAT, time); if (writtenBytes == 0) { sif::error << "PersistentTmStore::createMostRecentFile: Could not create file timestamp" << std::endl; return returnvalue::FAILED; } currentIdx += writtenBytes; char* res = strcpy(reinterpret_cast(fileBuf.data() + currentIdx), ".bin"); if (res == nullptr) { return returnvalue::FAILED; } currentIdx += 4; if (suffix.has_value()) { std::string fullSuffix = "." + std::to_string(suffix.value()); res = strcpy(reinterpret_cast(fileBuf.data() + currentIdx), fullSuffix.c_str()); if (res == nullptr) { return returnvalue::FAILED; } currentIdx += fullSuffix.size(); } path newPath(std::string(reinterpret_cast(fileBuf.data()), currentIdx)); std::ofstream of(newPath, std::ios::binary); activeFile = newPath; activeFileTv = currentTv; return returnvalue::OK; } ReturnValue_t PersistentTmStore::initializeTmStore() { updateBaseDir(); return assignAndOrCreateMostRecentFile(); }