#pragma once #include #include #include "Env.hpp" #include "LogsDB.hpp" struct TempLogsDB { std::string dbDir; Logger logger; std::shared_ptr xmon; std::unique_ptr env; std::unique_ptr sharedDB; std::unique_ptr db; TempLogsDB( LogLevel level, ReplicaId replicaId = 0, LogIdx lastRead = 0, bool dontWaitForReplication = false, bool dontDoReplication = false, bool forceLeader = false, bool avoidBeingLeader = false, bool initialStart = true, LogIdx forcedLastReleased = false): logger(level, STDERR_FILENO, false, false), env(new Env(logger, xmon, "LogsDB")) { dbDir = std::string("temp-logs-db.XXXXXX"); if (mkdtemp(dbDir.data()) == nullptr) { throw SYSCALL_EXCEPTION("mkdtemp"); } sharedDB = std::make_unique(logger, xmon); initSharedDB(); db = std::make_unique(*env, *sharedDB, replicaId, lastRead, dontWaitForReplication, dontDoReplication, forceLeader, avoidBeingLeader, initialStart, forcedLastReleased); } // useful to test recovery void restart( ReplicaId replicaId = 0, LogIdx lastRead = 0, bool dontWaitForReplication = false, bool dontDoReplication = false, bool forceLeader = false, bool avoidBeingLeader = false, bool initialStart = true, LogIdx forcedLastReleased = false) { db->close(); sharedDB = std::make_unique(logger, xmon); initSharedDB(); db = std::make_unique(*env, *sharedDB, replicaId, lastRead, dontWaitForReplication, dontDoReplication, forceLeader, avoidBeingLeader, initialStart, forcedLastReleased); } ~TempLogsDB() { std::error_code err; if (std::filesystem::remove_all(std::filesystem::path(dbDir), err) < 0) { std::cerr << "Could not remove " << dbDir << ": " << err << std::endl; } } std::unique_ptr& operator->() { return db; } void initSharedDB() { sharedDB->registerCFDescriptors({{rocksdb::kDefaultColumnFamilyName, {}}}); sharedDB->registerCFDescriptors(LogsDB::getColumnFamilyDescriptors()); rocksdb::Options rocksDBOptions; rocksDBOptions.create_if_missing = true; rocksDBOptions.create_missing_column_families = true; rocksDBOptions.compression = rocksdb::kLZ4Compression; rocksDBOptions.bottommost_compression = rocksdb::kZSTD; // 1000*256 = 256k open files at once, given that we currently run on a // single machine this is appropriate. rocksDBOptions.max_open_files = 1000; // We batch writes and flush manually. rocksDBOptions.manual_wal_flush = true; sharedDB->open(rocksDBOptions, dbDir); } }; inline LogsDBLogEntry initEntry(uint64_t idx, std::string data) { LogsDBLogEntry e; e.idx = idx; e.value.assign(data.begin(), data.end()); return e; } inline std::ostream& operator<<(std::ostream& out, const std::vector& entries) { out << "{ "; for (auto& entry : entries) { out << "{" << entry << "}" << ","; } out << "} "; return out; }