shard: support for sharing rocksdb and init LogsDB CFs

This commit is contained in:
Miroslav Crnic
2024-02-08 17:44:03 +00:00
committed by GitHub Enterprise
parent 38707535e3
commit 37ba9bc457
8 changed files with 271 additions and 142 deletions
+28 -8
View File
@@ -4,20 +4,19 @@
#include <filesystem>
#include <rocksdb/db.h>
#include <sstream>
#include <random>
#include <unistd.h>
#include "Common.hpp"
#include "Bincode.hpp"
#include "Msgs.hpp"
#include "ShardDB.hpp"
#include "ShardDBData.hpp"
#include "Crypto.hpp"
#include "RocksDBUtils.hpp"
#include "SharedRocksDB.hpp"
#include "Time.hpp"
#include "CDCKey.hpp"
#include "wyhash.h"
#include "Timings.hpp"
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include "doctest.h"
@@ -124,7 +123,7 @@ TEST_CASE("ShardDB data") {
CHECK(transientFileBody().mtime() == readTransientFileBody().mtime());
CHECK(transientFileBody().fileSize() == readTransientFileBody().fileSize());
CHECK(transientFileBody().deadline() == readTransientFileBody().deadline());
CHECK(transientFileBody().deadline() == readTransientFileBody().deadline());
CHECK(transientFileBody().note() == readTransientFileBody().note());
}
@@ -281,7 +280,7 @@ TEST_CASE("ShardDB data") {
snapshotKey->nameHash = 123;
snapshotKey->setName(snapshotName);
snapshotKey->setCreationTime(456);
SnapshotEdgeBody snapshot;
snapshot.setTargetId(InodeIdExtra(InodeId(InodeType::FILE, ShardId(4), 32), false));
@@ -330,7 +329,7 @@ TEST_CASE("ShardDB data") {
CHECK(key->current == snapshotKey->current);
CHECK(key->dirId() == snapshotKey->dirId());
CHECK(key->nameHash == snapshotKey->nameHash);
CHECK(key->name() == snapshotKey->name());
CHECK(key->name() == snapshotKey->name());
CHECK(key->creationTime() == snapshotKey->creationTime());
CHECK(sliceEq(it->value(), snapshot.toSlice()));
SnapshotEdgeBody snapshotValue(it->value());
@@ -383,6 +382,7 @@ struct TempShardDB {
Logger logger;
std::unique_ptr<Env> env;
ShardId shid;
std::unique_ptr<SharedRocksDB> sharedDB;
std::unique_ptr<ShardDB> db;
TempShardDB(LogLevel level, ShardId shid_): logger(level, STDERR_FILENO, false, false), shid(shid_) {
@@ -391,13 +391,18 @@ struct TempShardDB {
throw SYSCALL_EXCEPTION("mkdtemp");
}
std::shared_ptr<XmonAgent> xmon;
db = std::make_unique<ShardDB>(logger, xmon, shid, DEFAULT_DEADLINE_INTERVAL, dbDir);
sharedDB = std::make_unique<SharedRocksDB>(logger, xmon);
initSharedDB();
db = std::make_unique<ShardDB>(logger, xmon, shid, DEFAULT_DEADLINE_INTERVAL, *sharedDB);
}
// useful to test recovery
void restart() {
std::shared_ptr<XmonAgent> xmon;
db = std::make_unique<ShardDB>(logger, xmon, shid, DEFAULT_DEADLINE_INTERVAL, dbDir);
db->close();
sharedDB = std::make_unique<SharedRocksDB>(logger, xmon);
initSharedDB();
db = std::make_unique<ShardDB>(logger, xmon, shid, DEFAULT_DEADLINE_INTERVAL, *sharedDB);
}
~TempShardDB() {
@@ -410,6 +415,21 @@ struct TempShardDB {
std::unique_ptr<ShardDB>& operator->() {
return db;
}
void initSharedDB() {
sharedDB->registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
rocksdb::Options rocksDBOptions;
rocksDBOptions.create_if_missing = true;
rocksDBOptions.create_missing_column_families = true;
rocksDBOptions.compression = rocksdb::kLZ4Compression;
rocksDBOptions.bottommost_compression = rocksdb::kZSTD;
// 1000*256 = 256k open files at once, given that we currently run on a
// single machine this is appropriate.
rocksDBOptions.max_open_files = 1000;
// We batch writes and flush manually.
rocksDBOptions.manual_wal_flush = true;
sharedDB->open(rocksDBOptions, dbDir);
}
};
#define NO_EGGS_ERROR(expr) \