Ordered dumps #706

Merged
muellerr merged 16 commits from ordered-dumps into main 2023-06-26 18:11:08 +02:00
2 changed files with 31 additions and 23 deletions
Showing only changes of commit 25d10e3877 - Show all commits

View File

@ -79,8 +79,8 @@ ReturnValue_t PersistentTmStore::buildDumpSet(uint32_t fromUnixSeconds, uint32_t
} }
DumpIndex dumpIndex; DumpIndex dumpIndex;
dumpIndex.epoch = fileEpoch; dumpIndex.epoch = fileEpoch;
// Multiple files for the same time are supported via a special suffix. We smply count the // Multiple files for the same time are supported via a special suffix. We simply count the
// number of copies and later try to dump the same number of files with the additonal // number of copies and later try to dump the same number of files with the additional
// suffixes // suffixes
auto iter = dumpParams.orderedDumpFilestamps.find(dumpIndex); auto iter = dumpParams.orderedDumpFilestamps.find(dumpIndex);
if (iter != dumpParams.orderedDumpFilestamps.end()) { if (iter != dumpParams.orderedDumpFilestamps.end()) {
@ -295,6 +295,7 @@ ReturnValue_t PersistentTmStore::startDumpFromUpTo(uint32_t fromUnixSeconds,
return DUMP_DONE; return DUMP_DONE;
} }
dumpParams.dumpIter = dumpParams.orderedDumpFilestamps.begin(); dumpParams.dumpIter = dumpParams.orderedDumpFilestamps.begin();
dumpParams.currentSameFileIdx = std::nullopt;
state = State::DUMPING; state = State::DUMPING;
return loadNextDumpFile(); return loadNextDumpFile();
} }
@ -303,7 +304,25 @@ ReturnValue_t PersistentTmStore::loadNextDumpFile() {
using namespace std::filesystem; using namespace std::filesystem;
dumpParams.currentSize = 0; dumpParams.currentSize = 0;
std::error_code e; std::error_code e;
for (; dumpParams.dumpIter != dumpParams.orderedDumpFilestamps.end(); dumpParams.dumpIter++) { // Handle iteration, which does not necessarily have to increment the set iterator
// if there are multiple files for a given timestamp.
auto handleIteration = [&](DumpIndex& dumpIndex) {
if (dumpIndex.additionalFiles > 0) {
if (not dumpParams.currentSameFileIdx.has_value()) {
// Initialize the file index and stay on same file
dumpParams.currentSameFileIdx = 0;
return;
} else if (dumpParams.currentSameFileIdx.value() < dumpIndex.additionalFiles - 1) {
dumpParams.currentSameFileIdx = dumpParams.currentSameFileIdx.value() + 1;
return;
}
}
// File will change, reset this field for correct state-keeping.
dumpParams.currentSameFileIdx = std::nullopt;
// Increment iterator for next cycle.
dumpParams.dumpIter++;
};
while (dumpParams.dumpIter != dumpParams.orderedDumpFilestamps.end()) {
DumpIndex dumpIndex = *dumpParams.dumpIter; DumpIndex dumpIndex = *dumpParams.dumpIter;
timeval tv{}; timeval tv{};
tv.tv_sec = dumpIndex.epoch; tv.tv_sec = dumpIndex.epoch;
@ -311,38 +330,27 @@ ReturnValue_t PersistentTmStore::loadNextDumpFile() {
createFileName(tv, dumpParams.currentSameFileIdx, fullPathLength); createFileName(tv, dumpParams.currentSameFileIdx, fullPathLength);
dumpParams.currentFile = dumpParams.currentFile =
path(std::string(reinterpret_cast<const char*>(filePathBuf.data()), fullPathLength)); path(std::string(reinterpret_cast<const char*>(filePathBuf.data()), fullPathLength));
if (DEBUG_DUMPS) {
sif::debug << baseName << " dump: Loading " << dumpParams.currentFile << std::endl;
}
dumpParams.fileSize = std::filesystem::file_size(dumpParams.currentFile, e); dumpParams.fileSize = std::filesystem::file_size(dumpParams.currentFile, e);
if (e) { if (e) {
// TODO: Event? // TODO: Event?
sif::error << "PersistentTmStore: Could not load next dump file: " << e.message() sif::error << "PersistentTmStore: Could not load next dump file: " << e.message()
<< std::endl; << std::endl;
handleIteration(dumpIndex);
continue; continue;
} }
std::ifstream ifile(dumpParams.currentFile, std::ios::binary); std::ifstream ifile(dumpParams.currentFile, std::ios::binary);
if (ifile.bad()) { if (ifile.bad()) {
sif::error << "PersistentTmStore: File is bad. Loading next file" << std::endl; sif::error << "PersistentTmStore: File is bad. Loading next file" << std::endl;
handleIteration(dumpIndex);
continue; continue;
} }
if (DEBUG_DUMPS) {
sif::debug << baseName << " dump: Loading " << dumpParams.currentFile << std::endl;
}
ifile.read(reinterpret_cast<char*>(fileBuf.data()), ifile.read(reinterpret_cast<char*>(fileBuf.data()),
static_cast<std::streamsize>(dumpParams.fileSize)); static_cast<std::streamsize>(dumpParams.fileSize));
if (dumpIndex.additionalFiles > 0 and not dumpParams.currentSameFileIdx.has_value()) { // Next file is loaded, exit the loop.
if (not dumpParams.currentSameFileIdx.has_value()) { handleIteration(dumpIndex);
// Initialze the file index and stay on same file
dumpParams.currentSameFileIdx = 0;
continue;
} else if (dumpParams.currentSameFileIdx.value() < dumpIndex.additionalFiles) {
dumpParams.currentSameFileIdx = dumpParams.currentSameFileIdx.value() + 1;
continue;
}
}
// File will change, reset this field for correct state-keeping.
dumpParams.currentSameFileIdx = std::nullopt;
// Increment iterator for next cycle.
dumpParams.dumpIter++;
return returnvalue::OK; return returnvalue::OK;
} }
// Directory iterator was consumed and we are done. // Directory iterator was consumed and we are done.

View File

@ -39,9 +39,9 @@ struct PersistentTmStoreArgs {
}; };
struct DumpIndex { struct DumpIndex {
uint32_t epoch; uint32_t epoch = 0;
// Number of additional files with a suffix like .0, .1 etc. // Number of additional files with a suffix like .0, .1 etc.
uint8_t additionalFiles; uint8_t additionalFiles = 0;
// Define a custom comparison function based on the epoch variable // Define a custom comparison function based on the epoch variable
bool operator<(const DumpIndex& other) const { return epoch < other.epoch; } bool operator<(const DumpIndex& other) const { return epoch < other.epoch; }
}; };