Files
ternfs-XTXMarkets/cpp/dbtools/ShardDBTools.cpp

121 lines
4.5 KiB
C++

#include "ShardDBTools.hpp"
#include <memory>
#include <rocksdb/slice.h>
#include <vector>
#include "Env.hpp"
#include "LogsDB.hpp"
#include "LogsDBTools.hpp"
#include "ShardDB.hpp"
#include "SharedRocksDB.hpp"
namespace rocksdb {
std::ostream& operator<<(std::ostream& out, const rocksdb::Slice& slice) {
return goLangBytesFmt(out, (const char*)slice.data(), slice.size());
}
}
void ShardDBTools::verifyEqual(const std::string &db1Path, const std::string &db2Path) {
Logger logger(LogLevel::LOG_INFO, STDERR_FILENO, false, false);
std::shared_ptr<XmonAgent> xmon;
Env env(logger, xmon, "ShardDBTools");
SharedRocksDB sharedDb1(logger, xmon);
sharedDb1.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
sharedDb1.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
rocksdb::Options rocksDBOptions;
rocksDBOptions.compression = rocksdb::kLZ4Compression;
rocksDBOptions.bottommost_compression = rocksdb::kZSTD;
sharedDb1.openForReadOnly(rocksDBOptions, db1Path);
SharedRocksDB sharedDb2(logger, xmon);
sharedDb2.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
sharedDb2.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
sharedDb2.openForReadOnly(rocksDBOptions, db2Path);
auto db1 = sharedDb1.db();
auto db2 = sharedDb2.db();
size_t totalKeysCompared{0};
size_t totalKeySize{0};
size_t totalValueSize{0};
for(const auto& cf : ShardDB::getColumnFamilyDescriptors()) {
LOG_INFO(env, "Starting comparison on CF %s", cf.name);
auto cf1 = sharedDb1.getCF(cf.name);
auto cf2 = sharedDb2.getCF(cf.name);
size_t keysCompared{0};
size_t keySize{0};
size_t valueSize{0};
auto it1 = db1->NewIterator({}, cf1);
auto it2 = db2->NewIterator({}, cf2);
it1->SeekToFirst();
it2->SeekToFirst();
while (it1->Valid() && it2->Valid()) {
if (it1->key() != it2->key()) {
LOG_ERROR(env, "Found mismatch in key cf %s, key1: %s, key2: %s", cf.name, it1->key(), it2->key());
return;
}
if (it1->value() != it2->value()) {
LOG_ERROR(env, "Found mismatch in value cf %s", cf.name);
return;
}
++keysCompared;
keySize += it1->key().size();
valueSize += it1->value().size();
if (keysCompared % 100000000ull == 0) {
LOG_INFO(env, "Compared %s key/value pairs so far and they are identical", keysCompared);
}
it1->Next();
it2->Next();
}
if (it1->Valid()) {
LOG_ERROR(env, "Database %s has extra keys in cf %s", db1Path, cf.name);
return;
}
if (it2->Valid()) {
LOG_ERROR(env, "Database %s has extra keys in cf %s", db2Path, cf.name);
return;
}
delete it1;
delete it2;
LOG_INFO(env, "CF %s identical. Compared %s key/value pairs. KeySize: %s, ValueSize: %s", cf.name, keysCompared, keySize, valueSize);
totalKeysCompared += keysCompared;
totalKeySize += keySize;
totalValueSize += valueSize;
}
LOG_INFO(env, "Databases identical! Compared %s cfs, %s key/value pairs. Total key size: %s, Total value size: %s", ShardDB::getColumnFamilyDescriptors().size(), totalKeysCompared, totalKeySize, totalValueSize);
}
std::ostream& operator<<(std::ostream& out, const std::vector<LogIdx>& indices) {
out << "[";
for (auto idx : indices) {
out << idx << ", ";
}
out << "]";
return out;
}
void ShardDBTools::outputUnreleasedState(const std::string& dbPath) {
Logger logger(LogLevel::LOG_INFO, STDERR_FILENO, false, false);
std::shared_ptr<XmonAgent> xmon;
Env env(logger, xmon, "ShardDBTools");
SharedRocksDB sharedDb(logger, xmon);
sharedDb.registerCFDescriptors(ShardDB::getColumnFamilyDescriptors());
sharedDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
rocksdb::Options rocksDBOptions;
rocksDBOptions.compression = rocksdb::kLZ4Compression;
rocksDBOptions.bottommost_compression = rocksdb::kZSTD;
sharedDb.openForReadOnly(rocksDBOptions, dbPath);
LogIdx lastReleased;
std::vector<LogIdx> unreleasedLogEntries;
LogsDBTools::getUnreleasedLogEntries(env, sharedDb, lastReleased, unreleasedLogEntries);
LOG_INFO(env, "Last released: %s", lastReleased);
LOG_INFO(env, "Unreleased entries: %s", unreleasedLogEntries);
}