mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2026-05-07 12:52:00 -05:00
terndbtools: add retry to checkpoint to mitigate compaction race
This commit is contained in:
+57
-24
@@ -4,9 +4,12 @@
|
||||
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <filesystem>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include <rocksdb/db.h>
|
||||
#include <rocksdb/options.h>
|
||||
#include <rocksdb/iterator.h>
|
||||
@@ -24,7 +27,8 @@
|
||||
|
||||
#define die(...) do { fprintf(stderr, __VA_ARGS__); exit(1); } while(false)
|
||||
|
||||
static void createCheckpoint(const std::string& dbPath, const std::string& checkpointPath) {
|
||||
static void createCheckpoint(const std::string& dbPath, const std::string& checkpointPath, int maxRetries) {
|
||||
namespace fs = std::filesystem;
|
||||
Logger logger(LogLevel::LOG_INFO, STDERR_FILENO, false, false);
|
||||
std::shared_ptr<XmonAgent> xmon;
|
||||
Env env(logger, xmon, "createCheckpoint");
|
||||
@@ -63,32 +67,55 @@ static void createCheckpoint(const std::string& dbPath, const std::string& check
|
||||
dbOptions.bottommost_compression = rocksdb::kZSTD;
|
||||
dbOptions.create_if_missing = false;
|
||||
dbOptions.create_missing_column_families = false;
|
||||
dbOptions.max_open_files = 1000;
|
||||
|
||||
rocksdb::DB* db = nullptr;
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handles;
|
||||
s = rocksdb::DB::OpenForReadOnly(dbOptions, dbPath, cfDescriptors, &handles, &db);
|
||||
if (!s.ok()) {
|
||||
die("Failed to open DB %s: %s\n", dbPath.c_str(), s.ToString().c_str());
|
||||
}
|
||||
dbOptions.max_open_files = -1;
|
||||
|
||||
LOG_INFO(env, "Creating checkpoint %s -> %s", dbPath, checkpointPath);
|
||||
rocksdb::Checkpoint* checkpoint = nullptr;
|
||||
s = rocksdb::Checkpoint::Create(db, &checkpoint);
|
||||
if (!s.ok()) {
|
||||
die("Failed to create checkpoint object: %s\n", s.ToString().c_str());
|
||||
}
|
||||
std::unique_ptr<rocksdb::Checkpoint> checkpointGuard(checkpoint);
|
||||
|
||||
s = checkpoint->CreateCheckpoint(checkpointPath);
|
||||
if (!s.ok()) {
|
||||
for (int attempt = 0; attempt < maxRetries; attempt++) {
|
||||
rocksdb::DB* db = nullptr;
|
||||
std::vector<rocksdb::ColumnFamilyHandle*> handles;
|
||||
s = rocksdb::DB::OpenForReadOnly(dbOptions, dbPath, cfDescriptors, &handles, &db);
|
||||
if (!s.ok()) {
|
||||
if (s.IsPathNotFound() && attempt + 1 < maxRetries) {
|
||||
LOG_INFO(env, "Open failed (compaction race), retrying (%s/%s): %s",
|
||||
attempt + 1, maxRetries, s.ToString());
|
||||
sleep(1);
|
||||
continue;
|
||||
}
|
||||
die("Failed to open DB %s: %s\n", dbPath.c_str(), s.ToString().c_str());
|
||||
}
|
||||
|
||||
rocksdb::Checkpoint* checkpoint = nullptr;
|
||||
s = rocksdb::Checkpoint::Create(db, &checkpoint);
|
||||
if (!s.ok()) {
|
||||
for (auto* h : handles) db->DestroyColumnFamilyHandle(h);
|
||||
delete db;
|
||||
die("Failed to create checkpoint object: %s\n", s.ToString().c_str());
|
||||
}
|
||||
std::unique_ptr<rocksdb::Checkpoint> checkpointGuard(checkpoint);
|
||||
|
||||
s = checkpoint->CreateCheckpoint(checkpointPath);
|
||||
|
||||
for (auto* h : handles) db->DestroyColumnFamilyHandle(h);
|
||||
delete db;
|
||||
|
||||
if (s.ok()) {
|
||||
LOG_INFO(env, "Checkpoint created: %s", checkpointPath);
|
||||
return;
|
||||
}
|
||||
|
||||
if (s.IsPathNotFound() && attempt + 1 < maxRetries) {
|
||||
LOG_INFO(env, "Checkpoint failed (compaction race), retrying (%s/%s): %s",
|
||||
attempt + 1, maxRetries, s.ToString());
|
||||
std::error_code ec;
|
||||
fs::remove_all(checkpointPath + ".tmp", ec);
|
||||
fs::remove_all(checkpointPath, ec);
|
||||
sleep(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
die("Failed to create checkpoint at %s: %s\n", checkpointPath.c_str(), s.ToString().c_str());
|
||||
}
|
||||
|
||||
for (auto* h : handles) db->DestroyColumnFamilyHandle(h);
|
||||
delete db;
|
||||
|
||||
LOG_INFO(env, "Checkpoint created: %s", checkpointPath);
|
||||
}
|
||||
|
||||
static void copyDB(const std::string& srcPath, const std::string& dstPath) {
|
||||
@@ -261,8 +288,9 @@ static void usage(const char* binary) {
|
||||
fprintf(stderr, " Aggregates expected block usage per block service\n");
|
||||
fprintf(stderr, " copy-db OLD_DB_PATH NEW_DB_PATH\n");
|
||||
fprintf(stderr, " Copies all column families from OLD_DB_PATH to NEW_DB_PATH by iterating all keys.\n");
|
||||
fprintf(stderr, " create-checkpoint DB_PATH CHECKPOINT_PATH\n");
|
||||
fprintf(stderr, " create-checkpoint DB_PATH CHECKPOINT_PATH [--retries N]\n");
|
||||
fprintf(stderr, " Creates a RocksDB checkpoint (hardlink snapshot) of DB_PATH at CHECKPOINT_PATH.\n");
|
||||
fprintf(stderr, " Opens as secondary instance and retries on compaction races (default: 3 retries).\n");
|
||||
fprintf(stderr, " rebuild-block-services-to-files DB_PATH\n");
|
||||
fprintf(stderr, " Drops and recreates the blockServicesToFiles CF by scanning all spans.\n");
|
||||
}
|
||||
@@ -329,7 +357,12 @@ int main(int argc, char** argv) {
|
||||
} else if (arg == "create-checkpoint") {
|
||||
std::string dbPath = getNextArg();
|
||||
std::string checkpointPath = getNextArg();
|
||||
createCheckpoint(dbPath, checkpointPath);
|
||||
int maxRetries = 3;
|
||||
if (i + 2 < argc && std::string(argv[i+1]) == "--retries") {
|
||||
maxRetries = std::stoi(argv[i+2]);
|
||||
i += 2;
|
||||
}
|
||||
createCheckpoint(dbPath, checkpointPath, maxRetries);
|
||||
} else if (arg == "rebuild-block-services-to-files") {
|
||||
std::string dbPath = getNextArg();
|
||||
ShardDBTools::rebuildBlockServicesToFiles(dbPath);
|
||||
|
||||
Reference in New Issue
Block a user