#include "PersistentTmStore.h" #include #include #include #include #include #include #include #include "eive/definitions.h" #include "fsfw/ipc/CommandMessage.h" #include "fsfw/ipc/QueueFactory.h" #include "fsfw/tmstorage/TmStoreMessage.h" #include "mission/persistentTmStoreDefs.h" using namespace returnvalue; static constexpr bool DEBUG_DUMPS = false; PersistentTmStore::PersistentTmStore(PersistentTmStoreArgs args) : SystemObject(args.objectId), tmStore(args.tmStore), baseDir(args.baseDir), baseName(std::move(args.baseName)), sdcMan(args.sdcMan) { tcQueue = QueueFactory::instance()->createMessageQueue(); calcDiffSeconds(args.intervalUnit, args.intervalCount); } ReturnValue_t PersistentTmStore::cancelDump() { state = State::IDLE; return returnvalue::OK; } ReturnValue_t PersistentTmStore::buildDumpSet(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds) { using namespace std::filesystem; std::error_code e; dumpParams.orderedDumpFilestamps.clear(); for (auto const& fileOrDir : directory_iterator(basePath)) { if (not fileOrDir.is_regular_file(e)) { continue; } dumpParams.fileSize = std::filesystem::file_size(fileOrDir.path(), e); if (e) { sif::error << "PersistentTmStore: Could not retrieve file size: " << e.message() << std::endl; continue; } // File empty or can't even read CCSDS header. if (dumpParams.fileSize <= 6) { continue; } if (dumpParams.fileSize > fileBuf.size()) { sif::error << "PersistentTmStore: File too large, is deleted" << std::endl; triggerEvent(persTmStore::FILE_TOO_LARGE, dumpParams.fileSize, fileBuf.size()); std::filesystem::remove(fileOrDir.path(), e); continue; } const path& file = fileOrDir.path(); struct tm fileTime {}; if (pathToTime(file, fileTime) != returnvalue::OK) { sif::error << "Time extraction for file " << file << "failed" << std::endl; continue; } auto fileEpoch = static_cast(timegm(&fileTime)); if ((fileEpoch > dumpParams.fromUnixTime) and (fileEpoch + rolloverDiffSeconds <= dumpParams.untilUnixTime)) { std::ifstream ifile(file, std::ios::binary); if (ifile.bad()) { sif::error << "PersistentTmStore: File is bad" << std::endl; // TODO: Consider deleting file here? continue; } if (DEBUG_DUMPS) { sif::debug << "Inserting file " << fileOrDir.path() << std::endl; } DumpIndex dumpIndex; dumpIndex.epoch = fileEpoch; // Multiple files for the same time are supported via a special suffix. We simply count the // number of copies and later try to dump the same number of files with the additional // suffixes auto iter = dumpParams.orderedDumpFilestamps.find(dumpIndex); if (iter != dumpParams.orderedDumpFilestamps.end()) { dumpIndex.epoch = iter->epoch; dumpIndex.additionalFiles = iter->additionalFiles + 1; dumpParams.orderedDumpFilestamps.erase(dumpIndex); } else { dumpIndex.additionalFiles = 0; } dumpParams.orderedDumpFilestamps.emplace(dumpIndex); } } return returnvalue::OK; } std::optional PersistentTmStore::extractSuffix(const std::string& pathStr) { std::string numberStr; // Find the position of the dot at the end of the file path size_t dotPos = pathStr.find_last_of('.'); if ((dotPos < pathStr.length()) and not std::isdigit(pathStr[dotPos + 1])) { return std::nullopt; } // Extract the substring after the dot numberStr = pathStr.substr(dotPos + 1); std::optional number; try { number = std::stoi(numberStr); if (number.value() > std::numeric_limits::max()) { return std::nullopt; } } catch (std::invalid_argument& exception) { sif::error << "PersistentTmStore::extractSuffix: Exception " << exception.what() << ", invald input string: " << numberStr << std::endl; } return number; } ReturnValue_t PersistentTmStore::assignAndOrCreateMostRecentFile() { if (not activeFile.has_value()) { return createMostRecentFile(std::nullopt); } return returnvalue::OK; } ReturnValue_t PersistentTmStore::handleCommandQueue(StorageManagerIF& ipcStore, Command_t& execCmd) { execCmd = CommandMessageIF::CMD_NONE; CommandMessage cmdMessage; ReturnValue_t result = tcQueue->receiveMessage(&cmdMessage); if (result != returnvalue::OK) { return result; } if (cmdMessage.getMessageType() == messagetypes::TM_STORE) { Command_t cmd = cmdMessage.getCommand(); if (cmd == TmStoreMessage::DELETE_STORE_CONTENT_TIME) { result = handleDeletionCmd(ipcStore, cmdMessage); execCmd = cmd; } else if (cmd == TmStoreMessage::DOWNLINK_STORE_CONTENT_TIME) { result = handleDumpCmd(ipcStore, cmdMessage); execCmd = cmd; } } return result; } ReturnValue_t PersistentTmStore::handleDeletionCmd(StorageManagerIF& ipcStore, CommandMessage& cmdMessage) { Clock::getClock_timeval(¤tTv); store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage); auto accessor = ipcStore.getData(storeId); size_t size = accessor.second.size(); if (size < 4) { return returnvalue::FAILED; } const uint8_t* data = accessor.second.data(); uint32_t deleteUpToUnixSeconds = 0; if (size == 4) { SerializeAdapter::deSerialize(&deleteUpToUnixSeconds, &data, &size, SerializeIF::Endianness::NETWORK); deleteUpTo(deleteUpToUnixSeconds); } else if (size == 8) { uint32_t deleteFromUnixSeconds = 0; SerializeAdapter::deSerialize(&deleteFromUnixSeconds, &data, &size, SerializeIF::Endianness::NETWORK); SerializeAdapter::deSerialize(&deleteUpToUnixSeconds, &data, &size, SerializeIF::Endianness::NETWORK); deleteFromUpTo(deleteFromUnixSeconds, deleteUpToUnixSeconds); } else { sif::warning << "PersistentTmStore: Unknown deletion time specification" << std::endl; return returnvalue::FAILED; } return returnvalue::OK; } ReturnValue_t PersistentTmStore::handleDumpCmd(StorageManagerIF& ipcStore, CommandMessage& cmdMessage) { Clock::getClock_timeval(¤tTv); store_address_t storeId = TmStoreMessage::getStoreId(&cmdMessage); auto accessor = ipcStore.getData(storeId); if (accessor.second.size() < 8) { return returnvalue::FAILED; } uint32_t dumpFromUnixSeconds = 0; uint32_t dumpUntilUnixSeconds = 0; size_t size = 8; SerializeAdapter::deSerialize(&dumpFromUnixSeconds, accessor.second.data(), &size, SerializeIF::Endianness::NETWORK); SerializeAdapter::deSerialize(&dumpUntilUnixSeconds, accessor.second.data() + 4, &size, SerializeIF::Endianness::NETWORK); ReturnValue_t result = startDumpFromUpTo(dumpFromUnixSeconds, dumpUntilUnixSeconds); if (result == BUSY_DUMPING) { triggerEvent(persTmStore::BUSY_DUMPING_EVENT); return result; } return returnvalue::OK; } ReturnValue_t PersistentTmStore::startDumpFrom(uint32_t fromUnixSeconds) { return startDumpFromUpTo(fromUnixSeconds, currentTv.tv_sec); } ReturnValue_t PersistentTmStore::storePacket(PusTmReader& reader) { using namespace std::filesystem; if (baseDirUninitialized) { updateBaseDir(); } Clock::getClock_timeval(¤tTv); // It is assumed here that the filesystem is usable. if (not activeFile.has_value()) { ReturnValue_t result = assignAndOrCreateMostRecentFile(); if (result != returnvalue::OK) { return result; } } bool createNewFile = false; std::optional suffix = std::nullopt; std::error_code e; size_t fileSize = file_size(activeFile.value(), e); if (e) { sif::error << "PersistentTmStore: Could not retrieve file size, " "error " << e.message() << std::endl; return returnvalue::FAILED; } if (currentTv.tv_sec > activeFileTv.tv_sec + static_cast(rolloverDiffSeconds)) { createNewFile = true; currentSameSecNumber = 0; } else if (fileSize + reader.getFullPacketLen() > fileBuf.size()) { createNewFile = true; if (currentSameSecNumber >= MAX_FILES_IN_ONE_SECOND) { currentSameSecNumber = 0; } if (currentTv.tv_sec == activeFileTv.tv_sec) { suffix = currentSameSecNumber++; } else { currentSameSecNumber = 0; } } if (createNewFile) { createMostRecentFile(suffix); } // Rollover conditions were handled, write to file now std::ofstream of(activeFile.value(), std::ios::app | std::ios::binary); of.write(reinterpret_cast(reader.getFullData()), static_cast(reader.getFullPacketLen())); return returnvalue::OK; } MessageQueueId_t PersistentTmStore::getCommandQueue() const { return tcQueue->getId(); } void PersistentTmStore::calcDiffSeconds(RolloverInterval intervalUnit, uint32_t intervalCount) { if (intervalUnit == RolloverInterval::MINUTELY) { rolloverDiffSeconds = 60 * intervalCount; } else if (intervalUnit == RolloverInterval::HOURLY) { rolloverDiffSeconds = 60 * 60 * intervalCount; } else if (intervalUnit == RolloverInterval::DAILY) { rolloverDiffSeconds = 60 * 60 * 24 * intervalCount; } } bool PersistentTmStore::updateBaseDir() { using namespace std::filesystem; const char* currentPrefix = sdcMan.getCurrentMountPrefix(); if (currentPrefix == nullptr) { return false; } basePath = path(currentPrefix) / baseDir / baseName; std::error_code e; if (not exists(basePath, e)) { create_directories(basePath); } // Each file will have the base name as a prefix again path preparedFullFilePath = basePath / baseName; basePathSize = preparedFullFilePath.string().length(); std::memcpy(filePathBuf.data(), preparedFullFilePath.c_str(), basePathSize); filePathBuf[basePathSize] = '_'; basePathSize += 1; baseDirUninitialized = false; return true; } void PersistentTmStore::deleteUpTo(uint32_t unixSeconds) { deleteFromUpTo(0, unixSeconds); } void PersistentTmStore::deleteFromUpTo(uint32_t startUnixTime, uint32_t endUnixTime) { using namespace std::filesystem; for (auto const& file : directory_iterator(basePath)) { if (file.is_directory() or (activeFile.has_value() and (activeFile.value() == file.path()))) { continue; } // Convert file time to the UNIX epoch struct tm fileTime {}; if (pathToTime(file.path(), fileTime) != returnvalue::OK) { sif::error << "Time extraction for " << file << " failed" << std::endl; continue; } time_t fileEpoch = timegm(&fileTime); if (fileEpoch + rolloverDiffSeconds < endUnixTime and static_cast(fileEpoch) >= startUnixTime) { std::error_code e; std::filesystem::remove(file.path(), e); } } } ReturnValue_t PersistentTmStore::startDumpFromUpTo(uint32_t fromUnixSeconds, uint32_t upToUnixSeconds) { using namespace std::filesystem; if (state == State::DUMPING) { return returnvalue::FAILED; } auto dirIter = directory_iterator(basePath); // Directory empty case. if (dirIter == directory_iterator()) { return returnvalue::FAILED; } dumpParams.fromUnixTime = fromUnixSeconds; dumpParams.untilUnixTime = upToUnixSeconds; buildDumpSet(fromUnixSeconds, upToUnixSeconds); // No files in time window found. if (dumpParams.orderedDumpFilestamps.empty()) { return DUMP_DONE; } dumpParams.dumpIter = dumpParams.orderedDumpFilestamps.begin(); dumpParams.currentSameFileIdx = std::nullopt; state = State::DUMPING; return loadNextDumpFile(); } ReturnValue_t PersistentTmStore::loadNextDumpFile() { using namespace std::filesystem; dumpParams.currentSize = 0; std::error_code e; // Handle iteration, which does not necessarily have to increment the set iterator // if there are multiple files for a given timestamp. auto handleIteration = [&](DumpIndex& dumpIndex) { if (dumpIndex.additionalFiles > 0) { if (not dumpParams.currentSameFileIdx.has_value()) { // Initialize the file index and stay on same file dumpParams.currentSameFileIdx = 0; return; } else if (dumpParams.currentSameFileIdx.value() < dumpIndex.additionalFiles - 1) { dumpParams.currentSameFileIdx = dumpParams.currentSameFileIdx.value() + 1; return; } } // File will change, reset this field for correct state-keeping. dumpParams.currentSameFileIdx = std::nullopt; dumpParams.currentFileUnixStamp = dumpParams.dumpIter->epoch; // Increment iterator for next cycle. dumpParams.dumpIter++; }; while (dumpParams.dumpIter != dumpParams.orderedDumpFilestamps.end()) { DumpIndex dumpIndex = *dumpParams.dumpIter; timeval tv{}; tv.tv_sec = dumpIndex.epoch; size_t fullPathLength = 0; createFileName(tv, dumpParams.currentSameFileIdx, fullPathLength); dumpParams.currentFile = path(std::string(reinterpret_cast(filePathBuf.data()), fullPathLength)); if (DEBUG_DUMPS) { sif::debug << baseName << " dump: Loading " << dumpParams.currentFile << std::endl; } dumpParams.fileSize = std::filesystem::file_size(dumpParams.currentFile, e); if (e) { // TODO: Event? sif::error << "PersistentTmStore: Could not load next dump file: " << e.message() << std::endl; handleIteration(dumpIndex); continue; } std::ifstream ifile(dumpParams.currentFile, std::ios::binary); if (ifile.bad()) { sif::error << "PersistentTmStore: File is bad. Loading next file" << std::endl; handleIteration(dumpIndex); continue; } ifile.read(reinterpret_cast(fileBuf.data()), static_cast(dumpParams.fileSize)); // Next file is loaded, exit the loop. handleIteration(dumpIndex); return returnvalue::OK; } // Directory iterator was consumed and we are done. state = State::IDLE; return DUMP_DONE; } ReturnValue_t PersistentTmStore::getNextDumpPacket(PusTmReader& reader, bool& fileHasSwapped) { if (state == State::IDLE) { return DUMP_DONE; } if (dumpParams.pendingPacketDump) { return returnvalue::FAILED; } fileHasSwapped = false; reader.setReadOnlyData(fileBuf.data() + dumpParams.currentSize, fileBuf.size() - dumpParams.currentSize); // CRC check to fully ensure this is a valid TM ReturnValue_t result = reader.parseDataWithCrcCheck(); if (result != returnvalue::OK) { sif::error << "PersistentTmStore: Parsing of PUS TM failed with code " << result << std::endl; triggerEvent(persTmStore::POSSIBLE_FILE_CORRUPTION, result, dumpParams.currentFileUnixStamp); // Delete the file and load next. Could use better algorithm to partially // restore the file dump, but for now do not trust the file. std::error_code e; std::filesystem::remove(dumpParams.currentFile.c_str(), e); if (dumpParams.currentFile == activeFile) { activeFile == std::nullopt; assignAndOrCreateMostRecentFile(); } fileHasSwapped = true; return loadNextDumpFile(); } dumpParams.pendingPacketDump = true; return returnvalue::OK; } ReturnValue_t PersistentTmStore::confirmDump(const PusTmReader& reader, bool& fileHasSwapped) { dumpParams.pendingPacketDump = false; dumpParams.currentSize += reader.getFullPacketLen(); if (dumpParams.currentSize >= dumpParams.fileSize) { fileHasSwapped = true; return loadNextDumpFile(); } fileHasSwapped = false; return returnvalue::OK; } ReturnValue_t PersistentTmStore::pathToTime(const std::filesystem::path& path, struct tm& time) { auto pathStr = path.string(); size_t splitChar = pathStr.find('_'); auto timeOnlyStr = pathStr.substr(splitChar + 1); if (nullptr == strptime(timeOnlyStr.c_str(), config::FILE_DATE_FORMAT, &time)) { return returnvalue::FAILED; } return returnvalue::OK; } ReturnValue_t PersistentTmStore::createMostRecentFile(std::optional suffix) { using namespace std::filesystem; size_t currentIdx; createFileName(currentTv, suffix, currentIdx); path newPath(std::string(reinterpret_cast(filePathBuf.data()), currentIdx)); std::ofstream of(newPath, std::ios::binary); activeFile = newPath; activeFileTv = currentTv; return returnvalue::OK; } ReturnValue_t PersistentTmStore::initializeTmStore() { Clock::getClock_timeval(¤tTv); updateBaseDir(); // Reset active file, base directory might have changed. activeFile = std::nullopt; return assignAndOrCreateMostRecentFile(); } PersistentTmStore::State PersistentTmStore::getState() const { return state; } void PersistentTmStore::getStartAndEndTimeCurrentOrLastDump(uint32_t& startTime, uint32_t& endTime) const { startTime = dumpParams.fromUnixTime; endTime = dumpParams.untilUnixTime; } ReturnValue_t PersistentTmStore::createFileName(timeval& tv, std::optional suffix, size_t& fullPathLength) { unsigned currentIdx = basePathSize; time_t epoch = tv.tv_sec; struct tm* time = gmtime(&epoch); size_t writtenBytes = strftime(reinterpret_cast(filePathBuf.data() + currentIdx), filePathBuf.size(), config::FILE_DATE_FORMAT, time); if (writtenBytes == 0) { sif::error << "PersistentTmStore::createMostRecentFile: Could not create file timestamp" << std::endl; return returnvalue::FAILED; } currentIdx += writtenBytes; char* res = strcpy(reinterpret_cast(filePathBuf.data() + currentIdx), ".bin"); if (res == nullptr) { return returnvalue::FAILED; } currentIdx += 4; if (suffix.has_value()) { std::string fullSuffix = "." + std::to_string(suffix.value()); res = strcpy(reinterpret_cast(filePathBuf.data() + currentIdx), fullSuffix.c_str()); if (res == nullptr) { return returnvalue::FAILED; } currentIdx += fullSuffix.size(); } fullPathLength = currentIdx; return returnvalue::OK; }