From 902db94d154493dfbd0ec932c3e725ebfee21688 Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Wed, 15 Apr 2026 10:54:27 +0000 Subject: [PATCH] terndbtools: add clean-block-services-to-files command One-off cleanup tool that removes zero-value entries from the blockServicesToFiles CF. --- cpp/dbtools/ShardDBTools.cpp | 140 +++++++++++++++++++++++++++++++++++ cpp/dbtools/ShardDBTools.hpp | 1 + cpp/dbtools/terndbtools.cpp | 5 ++ 3 files changed, 146 insertions(+) diff --git a/cpp/dbtools/ShardDBTools.cpp b/cpp/dbtools/ShardDBTools.cpp index ccb0b308..d4cbe9b3 100644 --- a/cpp/dbtools/ShardDBTools.cpp +++ b/cpp/dbtools/ShardDBTools.cpp @@ -5,6 +5,7 @@ #include "ShardDBTools.hpp" #include +#include #include #include #include @@ -12,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -918,3 +920,141 @@ void ShardDBTools::rebuildBlockServicesToFiles(const std::string& dbPath) { LOG_INFO(env, "Rebuild complete: %s spans in %s files, wrote %s blockServicesToFiles entries", analyzedSpans, analyzedFiles, totalEntries); } + +void ShardDBTools::cleanBlockServicesToFiles(const std::string& dbPath) { + namespace fs = std::filesystem; + Logger logger(LogLevel::LOG_INFO, STDERR_FILENO, false, false); + std::shared_ptr xmon; + Env env(logger, xmon, "ShardDBTools"); + + std::string checkpointPath = dbPath + "_clean"; + + const auto cleanupCheckpoint = [&checkpointPath]() { + std::error_code ec; + fs::remove_all(checkpointPath, ec); + }; + + if (fs::exists(checkpointPath)) { + cleanupCheckpoint(); + } + + { + LOG_INFO(env, "Opening DB in write mode to verify exclusive access..."); + SharedRocksDB sharedDb(logger, xmon, dbPath, ""); + sharedDb.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors()); + sharedDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors()); + sharedDb.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors()); + rocksdb::Options rocksDBOptions; + rocksDBOptions.compression = rocksdb::kLZ4Compression; + rocksDBOptions.bottommost_compression = rocksdb::kZSTD; + rocksDBOptions.max_open_files = 1000; + sharedDb.open(rocksDBOptions); + + LOG_INFO(env, "Creating checkpoint %s -> %s", dbPath, checkpointPath); + rocksdb::Checkpoint* checkpoint = nullptr; + auto s = rocksdb::Checkpoint::Create(sharedDb.db(), &checkpoint); + if (!s.ok()) { + FATAL_EXCEPTION("Failed to create checkpoint object: %s", s.ToString()); + } + std::unique_ptr checkpointGuard(checkpoint); + + s = checkpoint->CreateCheckpoint(checkpointPath); + if (!s.ok()) { + cleanupCheckpoint(); + FATAL_EXCEPTION("Failed to create checkpoint at %s: %s", checkpointPath, s.ToString()); + } + LOG_INFO(env, "Checkpoint created"); + } + + SharedRocksDB srcDb(logger, xmon, dbPath, ""); + srcDb.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors()); + srcDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors()); + srcDb.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors()); + { + rocksdb::Options opts; + opts.compression = rocksdb::kLZ4Compression; + opts.bottommost_compression = rocksdb::kZSTD; + opts.max_open_files = 1000; + try { + srcDb.openForReadOnly(opts); + } catch (...) { + cleanupCheckpoint(); + throw; + } + } + + SharedRocksDB dstDb(logger, xmon, checkpointPath, ""); + dstDb.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors()); + dstDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors()); + dstDb.registerCFDescriptors(BlockServicesCacheDB::getColumnFamilyDescriptors()); + { + rocksdb::Options opts; + opts.compression = rocksdb::kLZ4Compression; + opts.bottommost_compression = rocksdb::kZSTD; + opts.max_open_files = 10000; + try { + dstDb.open(opts); + } catch (...) { + cleanupCheckpoint(); + throw; + } + } + + try { + LOG_INFO(env, "Dropping blockServicesToFiles CF in checkpoint..."); + dstDb.deleteCF("blockServicesToFiles"); + + LOG_INFO(env, "Recreating blockServicesToFiles CF in checkpoint..."); + rocksdb::ColumnFamilyOptions cfOptions; + cfOptions.merge_operator = CreateInt64AddOperator(); + auto newCf = dstDb.createCF({"blockServicesToFiles", cfOptions}); + + auto srcCf = srcDb.getCF("blockServicesToFiles"); + uint64_t estimatedKeys = 0; + srcDb.db()->GetIntProperty(srcCf, "rocksdb.estimate-num-keys", &estimatedKeys); + LOG_INFO(env, "Copying non-zero entries from blockServicesToFiles (estimated %s keys)...", estimatedKeys); + + static constexpr size_t BATCH_SIZE = 10000000; + size_t keysCopied = 0; + size_t keysDropped = 0; + rocksdb::WriteBatch batch; + + std::unique_ptr it(srcDb.db()->NewIterator(rocksdb::ReadOptions(), srcCf)); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + int64_t count = ExternalValue::FromSlice(it->value())().i64(); + if (count == 0) { + keysDropped++; + continue; + } + + batch.Put(newCf, it->key(), it->value()); + keysCopied++; + + if (keysCopied % BATCH_SIZE == 0) { + ROCKS_DB_CHECKED(dstDb.db()->Write(rocksdb::WriteOptions(), &batch)); + batch.Clear(); + LOG_INFO(env, " %s keys copied, %s keys dropped so far...", keysCopied, keysDropped); + } + } + ROCKS_DB_CHECKED(it->status()); + + if (batch.Count() > 0) { + ROCKS_DB_CHECKED(dstDb.db()->Write(rocksdb::WriteOptions(), &batch)); + } + + LOG_INFO(env, "Done: %s keys copied, %s keys dropped", keysCopied, keysDropped); + } catch (...) { + srcDb.close(); + dstDb.close(); + cleanupCheckpoint(); + throw; + } + + srcDb.close(); + dstDb.close(); + + LOG_INFO(env, "Replacing %s with cleaned checkpoint...", dbPath); + fs::remove_all(dbPath); + fs::rename(checkpointPath, dbPath); + LOG_INFO(env, "Clean complete: %s", dbPath); +} diff --git a/cpp/dbtools/ShardDBTools.hpp b/cpp/dbtools/ShardDBTools.hpp index 6e3e9938..ea4ab1eb 100644 --- a/cpp/dbtools/ShardDBTools.hpp +++ b/cpp/dbtools/ShardDBTools.hpp @@ -19,4 +19,5 @@ public: static void outputFilesWithDuplicateFailureDomains(const std::string& dbPath, const std::string& outputFilePath); static void outputBlockServiceUsage(const std::string& dbPath, const std::string& outputFilePath); static void rebuildBlockServicesToFiles(const std::string& dbPath); + static void cleanBlockServicesToFiles(const std::string& dbPath); }; diff --git a/cpp/dbtools/terndbtools.cpp b/cpp/dbtools/terndbtools.cpp index 9b98915e..20af7fa1 100644 --- a/cpp/dbtools/terndbtools.cpp +++ b/cpp/dbtools/terndbtools.cpp @@ -293,6 +293,8 @@ static void usage(const char* binary) { fprintf(stderr, " Opens as secondary instance and retries on compaction races (default: 3 retries).\n"); fprintf(stderr, " rebuild-block-services-to-files DB_PATH\n"); fprintf(stderr, " Drops and recreates the blockServicesToFiles CF by scanning all spans.\n"); + fprintf(stderr, " clean-block-services-to-files DB_PATH\n"); + fprintf(stderr, " Removes zero-value entries from the blockServicesToFiles CF using a checkpoint.\n"); } int main(int argc, char** argv) { @@ -366,6 +368,9 @@ int main(int argc, char** argv) { } else if (arg == "rebuild-block-services-to-files") { std::string dbPath = getNextArg(); ShardDBTools::rebuildBlockServicesToFiles(dbPath); + } else if (arg == "clean-block-services-to-files") { + std::string dbPath = getNextArg(); + ShardDBTools::cleanBlockServicesToFiles(dbPath); } else { dieWithUsage(); }