mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2026-05-06 20:31:17 -05:00
terndbtools: add clean-block-services-to-files command
One-off cleanup tool that removes zero-value entries from the blockServicesToFiles CF.
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
#include "ShardDBTools.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <filesystem>
|
||||
#include <random>
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
@@ -12,6 +13,7 @@
|
||||
#include <regex>
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/slice.h>
|
||||
#include <rocksdb/utilities/checkpoint.h>
|
||||
#include <rocksdb/write_batch.h>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
@@ -918,3 +920,141 @@ void ShardDBTools::rebuildBlockServicesToFiles(const std::string& dbPath) {
|
||||
LOG_INFO(env, "Rebuild complete: %s spans in %s files, wrote %s blockServicesToFiles entries",
|
||||
analyzedSpans, analyzedFiles, totalEntries);
|
||||
}
|
||||
|
||||
void ShardDBTools::cleanBlockServicesToFiles(const std::string& dbPath) {
|
||||
namespace fs = std::filesystem;
|
||||
Logger logger(LogLevel::LOG_INFO, STDERR_FILENO, false, false);
|
||||
std::shared_ptr<XmonAgent> xmon;
|
||||
Env env(logger, xmon, "ShardDBTools");
|
||||
|
||||
std::string checkpointPath = dbPath + "_clean";
|
||||
|
||||
const auto cleanupCheckpoint = [&checkpointPath]() {
|
||||
std::error_code ec;
|
||||
fs::remove_all(checkpointPath, ec);
|
||||
};
|
||||
|
||||
if (fs::exists(checkpointPath)) {
|
||||
cleanupCheckpoint();
|
||||
}
|
||||
|
||||
{
|
||||
LOG_INFO(env, "Opening DB in write mode to verify exclusive access...");
|
||||
SharedRocksDB sharedDb(logger, xmon, dbPath, "");
|
||||
sharedDb.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
|
||||
sharedDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
|
||||
sharedDb.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors());
|
||||
rocksdb::Options rocksDBOptions;
|
||||
rocksDBOptions.compression = rocksdb::kLZ4Compression;
|
||||
rocksDBOptions.bottommost_compression = rocksdb::kZSTD;
|
||||
rocksDBOptions.max_open_files = 1000;
|
||||
sharedDb.open(rocksDBOptions);
|
||||
|
||||
LOG_INFO(env, "Creating checkpoint %s -> %s", dbPath, checkpointPath);
|
||||
rocksdb::Checkpoint* checkpoint = nullptr;
|
||||
auto s = rocksdb::Checkpoint::Create(sharedDb.db(), &checkpoint);
|
||||
if (!s.ok()) {
|
||||
FATAL_EXCEPTION("Failed to create checkpoint object: %s", s.ToString());
|
||||
}
|
||||
std::unique_ptr<rocksdb::Checkpoint> checkpointGuard(checkpoint);
|
||||
|
||||
s = checkpoint->CreateCheckpoint(checkpointPath);
|
||||
if (!s.ok()) {
|
||||
cleanupCheckpoint();
|
||||
FATAL_EXCEPTION("Failed to create checkpoint at %s: %s", checkpointPath, s.ToString());
|
||||
}
|
||||
LOG_INFO(env, "Checkpoint created");
|
||||
}
|
||||
|
||||
SharedRocksDB srcDb(logger, xmon, dbPath, "");
|
||||
srcDb.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
|
||||
srcDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
|
||||
srcDb.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors());
|
||||
{
|
||||
rocksdb::Options opts;
|
||||
opts.compression = rocksdb::kLZ4Compression;
|
||||
opts.bottommost_compression = rocksdb::kZSTD;
|
||||
opts.max_open_files = 1000;
|
||||
try {
|
||||
srcDb.openForReadOnly(opts);
|
||||
} catch (...) {
|
||||
cleanupCheckpoint();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
SharedRocksDB dstDb(logger, xmon, checkpointPath, "");
|
||||
dstDb.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
|
||||
dstDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
|
||||
dstDb.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors());
|
||||
{
|
||||
rocksdb::Options opts;
|
||||
opts.compression = rocksdb::kLZ4Compression;
|
||||
opts.bottommost_compression = rocksdb::kZSTD;
|
||||
opts.max_open_files = 10000;
|
||||
try {
|
||||
dstDb.open(opts);
|
||||
} catch (...) {
|
||||
cleanupCheckpoint();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
LOG_INFO(env, "Dropping blockServicesToFiles CF in checkpoint...");
|
||||
dstDb.deleteCF("blockServicesToFiles");
|
||||
|
||||
LOG_INFO(env, "Recreating blockServicesToFiles CF in checkpoint...");
|
||||
rocksdb::ColumnFamilyOptions cfOptions;
|
||||
cfOptions.merge_operator = CreateInt64AddOperator();
|
||||
auto newCf = dstDb.createCF({"blockServicesToFiles", cfOptions});
|
||||
|
||||
auto srcCf = srcDb.getCF("blockServicesToFiles");
|
||||
uint64_t estimatedKeys = 0;
|
||||
srcDb.db()->GetIntProperty(srcCf, "rocksdb.estimate-num-keys", &estimatedKeys);
|
||||
LOG_INFO(env, "Copying non-zero entries from blockServicesToFiles (estimated %s keys)...", estimatedKeys);
|
||||
|
||||
static constexpr size_t BATCH_SIZE = 10000000;
|
||||
size_t keysCopied = 0;
|
||||
size_t keysDropped = 0;
|
||||
rocksdb::WriteBatch batch;
|
||||
|
||||
std::unique_ptr<rocksdb::Iterator> it(srcDb.db()->NewIterator(rocksdb::ReadOptions(), srcCf));
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
int64_t count = ExternalValue<I64Value>::FromSlice(it->value())().i64();
|
||||
if (count == 0) {
|
||||
keysDropped++;
|
||||
continue;
|
||||
}
|
||||
|
||||
batch.Put(newCf, it->key(), it->value());
|
||||
keysCopied++;
|
||||
|
||||
if (keysCopied % BATCH_SIZE == 0) {
|
||||
ROCKS_DB_CHECKED(dstDb.db()->Write(rocksdb::WriteOptions(), &batch));
|
||||
batch.Clear();
|
||||
LOG_INFO(env, " %s keys copied, %s keys dropped so far...", keysCopied, keysDropped);
|
||||
}
|
||||
}
|
||||
ROCKS_DB_CHECKED(it->status());
|
||||
|
||||
if (batch.Count() > 0) {
|
||||
ROCKS_DB_CHECKED(dstDb.db()->Write(rocksdb::WriteOptions(), &batch));
|
||||
}
|
||||
|
||||
LOG_INFO(env, "Done: %s keys copied, %s keys dropped", keysCopied, keysDropped);
|
||||
} catch (...) {
|
||||
srcDb.close();
|
||||
dstDb.close();
|
||||
cleanupCheckpoint();
|
||||
throw;
|
||||
}
|
||||
|
||||
srcDb.close();
|
||||
dstDb.close();
|
||||
|
||||
LOG_INFO(env, "Replacing %s with cleaned checkpoint...", dbPath);
|
||||
fs::remove_all(dbPath);
|
||||
fs::rename(checkpointPath, dbPath);
|
||||
LOG_INFO(env, "Clean complete: %s", dbPath);
|
||||
}
|
||||
|
||||
@@ -19,4 +19,5 @@ public:
|
||||
static void outputFilesWithDuplicateFailureDomains(const std::string& dbPath, const std::string& outputFilePath);
|
||||
static void outputBlockServiceUsage(const std::string& dbPath, const std::string& outputFilePath);
|
||||
static void rebuildBlockServicesToFiles(const std::string& dbPath);
|
||||
static void cleanBlockServicesToFiles(const std::string& dbPath);
|
||||
};
|
||||
|
||||
@@ -293,6 +293,8 @@ static void usage(const char* binary) {
|
||||
fprintf(stderr, " Opens as secondary instance and retries on compaction races (default: 3 retries).\n");
|
||||
fprintf(stderr, " rebuild-block-services-to-files DB_PATH\n");
|
||||
fprintf(stderr, " Drops and recreates the blockServicesToFiles CF by scanning all spans.\n");
|
||||
fprintf(stderr, " clean-block-services-to-files DB_PATH\n");
|
||||
fprintf(stderr, " Removes zero-value entries from the blockServicesToFiles CF using a checkpoint.\n");
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
@@ -366,6 +368,9 @@ int main(int argc, char** argv) {
|
||||
} else if (arg == "rebuild-block-services-to-files") {
|
||||
std::string dbPath = getNextArg();
|
||||
ShardDBTools::rebuildBlockServicesToFiles(dbPath);
|
||||
} else if (arg == "clean-block-services-to-files") {
|
||||
std::string dbPath = getNextArg();
|
||||
ShardDBTools::cleanBlockServicesToFiles(dbPath);
|
||||
} else {
|
||||
dieWithUsage();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user