shard: ignore block services which cant fit few blocks

This commit is contained in:
Miroslav Crnic
2026-05-03 07:37:20 +00:00
parent d823fbf2e7
commit 33aee19163
11 changed files with 32 additions and 18 deletions
+1 -1
View File
@@ -713,7 +713,7 @@ void ShardDBTools::outputFilesWithDuplicateFailureDomains(const std::string& dbP
sharedDb.openForReadOnly(rocksDBOptions);
auto db = sharedDb.db();
ShardOptions shardOptions;
BlockServicesCacheDB blockServiceDB{logger, xmon, sharedDb, shardOptions.blockServiceWritableDelay, shardOptions.hddDriveThroughput, shardOptions.flashDriveThroughput};
BlockServicesCacheDB blockServiceDB{logger, xmon, sharedDb, shardOptions.blockServiceWritableDelay, shardOptions.hddDriveThroughput, shardOptions.flashDriveThroughput, shardOptions.minSpaceRequiredForWrite};
auto blockServiceCache = blockServiceDB.getCache();
rocksdb::ReadOptions options;
+7 -5
View File
@@ -18,8 +18,8 @@ namespace {
return (uint16_t(locationId) << 8) | uint16_t(storageClass);
}
inline bool blockServiceIsWritable(const BlockServiceCache& bs, Duration writableDelay) {
return bs.availableBytes > 0 && blockServiceFlagsWritable(bs.flags) && ternNow() - bs.firstSeen > writableDelay;
inline bool blockServiceIsWritable(const BlockServiceCache& bs, Duration writableDelay, uint64_t minSpaceRequiredForWrite) {
return bs.availableBytes > minSpaceRequiredForWrite && blockServiceFlagsWritable(bs.flags) && ternNow() - bs.firstSeen > writableDelay;
}
// Cluster-wide bytes/sec from a single shard's accumulated bytes over `elapsed`.
@@ -246,10 +246,12 @@ namespace {
}
BlockServicePicker::BlockServicePicker(Logger& logger, std::shared_ptr<XmonAgent>& xmon, uint8_t maxBlocksToPick, Duration writableDelay,
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput)
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput,
uint64_t minSpaceRequiredForWrite)
: _state(nullptr), _rawState(nullptr), _rng(ternNow().ns), _env(logger, xmon, "block_service_picker"),
_maxBlocksToPick(maxBlocksToPick), _writableDelay(writableDelay),
_hddDriveThroughput(hddDriveThroughput), _flashDriveThroughput(flashDriveThroughput) {}
_hddDriveThroughput(hddDriveThroughput), _flashDriveThroughput(flashDriveThroughput),
_minSpaceRequiredForWrite(minSpaceRequiredForWrite) {}
void BlockServicePicker::update(
const std::unordered_map<uint64_t, BlockServiceCache>& allBlockServices
@@ -263,7 +265,7 @@ void BlockServicePicker::update(
for (const auto& [id, bs] : allBlockServices) {
distinctBlockServiceTypeLoc.insert(lcKey(bs.locationId, bs.storageClass));
if (!blockServiceIsWritable(bs, _writableDelay)) continue;
if (!blockServiceIsWritable(bs, _writableDelay, _minSpaceRequiredForWrite)) continue;
uint16_t key = lcKey(bs.locationId, bs.storageClass);
std::string fdStr(reinterpret_cast<const char*>(bs.failureDomain.data()), bs.failureDomain.size());
+3 -1
View File
@@ -80,9 +80,11 @@ struct BlockServicePicker {
const Duration _writableDelay;
const uint64_t _hddDriveThroughput; // bytes/sec per drive
const uint64_t _flashDriveThroughput; // bytes/sec per drive
const uint64_t _minSpaceRequiredForWrite; // min available bytes for a block service to be writable
BlockServicePicker(Logger& logger, std::shared_ptr<XmonAgent>& xmon, uint8_t maxBlocksToPick, Duration writableDelay,
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput);
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput,
uint64_t minSpaceRequiredForWrite);
void update(
const std::unordered_map<uint64_t, BlockServiceCache>& allBlockServices
+3 -2
View File
@@ -185,11 +185,12 @@ std::vector<rocksdb::ColumnFamilyDescriptor> BlockServicesCacheDB::getColumnFami
}
BlockServicesCacheDB::BlockServicesCacheDB(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const SharedRocksDB& sharedDB, Duration blockServiceWritableDelay,
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput) :
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput,
uint64_t minSpaceRequiredForWrite) :
_env(logger, xmon, "bs_cache_db"),
_db(sharedDB.db()),
_blockServicesCF(sharedDB.getCF("blockServicesCache")),
_picker(logger, xmon, 15, blockServiceWritableDelay, hddDriveThroughput, flashDriveThroughput)
_picker(logger, xmon, 15, blockServiceWritableDelay, hddDriveThroughput, flashDriveThroughput, minSpaceRequiredForWrite)
{
LOG_INFO(_env, "Initializing block services cache DB");
+2 -1
View File
@@ -73,7 +73,8 @@ private:
public:
BlockServicesCacheDB(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const SharedRocksDB& sharedDB, Duration blockServiceWritableDelay,
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput);
uint64_t hddDriveThroughput, uint64_t flashDriveThroughput,
uint64_t minSpaceRequiredForWrite);
static std::vector<rocksdb::ColumnFamilyDescriptor> getColumnFamilyDescriptors();
void updateCache(const std::vector<FullBlockServiceInfo>& blockServices);
+1 -1
View File
@@ -2611,7 +2611,7 @@ void runShard(ShardOptions& options) {
sharedDB.open(rocksDBOptions);
BlockServicesCacheDB blockServicesCache(logger, xmon, sharedDB, options.blockServiceWritableDelay,
options.hddDriveThroughput, options.flashDriveThroughput);
options.hddDriveThroughput, options.flashDriveThroughput, options.minSpaceRequiredForWrite);
ShardDB shardDB(logger, xmon, options.shardId, options.logsDBOptions.location, options.transientDeadlineInterval, sharedDB, blockServicesCache);
LogsDB logsDB(logger, xmon, sharedDB, options.logsDBOptions.replicaId, shardDB.lastAppliedLogEntry(), options.logsDBOptions.noReplication, !options.logsDBOptions.leaderElection, options.logsDBOptions.avoidBeingLeader);
+1
View File
@@ -24,6 +24,7 @@ struct ShardOptions {
Duration blockServiceWritableDelay = 5_mins; // delay before new block service becomes writable
uint64_t hddDriveThroughput = 35'000'000; // bytes/sec per HDD drive
uint64_t flashDriveThroughput = 350'000'000; // bytes/sec per flash drive
uint64_t minSpaceRequiredForWrite = uint64_t(MAXIMUM_SPAN_SIZE); // min available bytes for a block service to be considered writable
// implicit options
bool isLeader() const { return !logsDBOptions.avoidBeingLeader; }
+6
View File
@@ -56,6 +56,10 @@ static bool parseShardOptions(CommandLineArgs& args, ShardOptions& options) {
options.flashDriveThroughput = std::stoull(args.next().getArg());
continue;
}
if (arg == "-min-space-required-for-write") {
options.minSpaceRequiredForWrite = std::stoull(args.next().getArg());
continue;
}
fprintf(stderr, "unknown argument %s\n", args.peekArg().c_str());
return false;
}
@@ -84,6 +88,8 @@ static void printShardOptionsUsage() {
fprintf(stderr, " Max throughput per HDD drive in bytes/sec. Default: 35000000\n");
fprintf(stderr, " -flash-drive-throughput\n");
fprintf(stderr, " Max throughput per flash drive in bytes/sec. Default: 350000000\n");
fprintf(stderr, " -min-space-required-for-write\n");
fprintf(stderr, " Min available bytes for a block service to be considered writable. Default: MAXIMUM_SPAN_SIZE\n");
}
static bool validateShardOptions(const ShardOptions& options) {
+3 -2
View File
@@ -19,9 +19,10 @@ static std::shared_ptr<XmonAgent> testXmon;
static BlockServicePicker makePicker(uint8_t maxBlocksToPick = 15,
Duration writableDelay = 0_sec,
uint64_t hddDriveThroughput = 0,
uint64_t flashDriveThroughput = 0) {
uint64_t flashDriveThroughput = 0,
uint64_t minSpaceRequiredForWrite = 0) {
return BlockServicePicker(testLogger, testXmon, maxBlocksToPick, writableDelay,
hddDriveThroughput, flashDriveThroughput);
hddDriveThroughput, flashDriveThroughput, minSpaceRequiredForWrite);
}
static FailureDomain fdWith(uint8_t v) {
+3 -3
View File
@@ -25,7 +25,7 @@ TEST_CASE("BlockServicesCacheDB initializes empty") {
tdb.open();
auto* cf = tdb.sharedDB->createCF({"blockServicesCache", {}});
BlockServicesCacheDB db(logger, tdb.xmon, *tdb.sharedDB, 0_sec, 0, 0);
BlockServicesCacheDB db(logger, tdb.xmon, *tdb.sharedDB, 0_sec, 0, 0, 0);
auto cache = db.getCache();
CHECK(cache.blockServices.size() == 0);
}
@@ -58,13 +58,13 @@ TEST_CASE("BlockServicesCacheDB persists blockServices metadata on update") {
blockSvcs.push_back(e);
{
BlockServicesCacheDB db(logger, tdb.xmon, *tdb.sharedDB, 0_sec, 0, 0);
BlockServicesCacheDB db(logger, tdb.xmon, *tdb.sharedDB, 0_sec, 0, 0, 0);
db.updateCache(blockSvcs);
}
// Re-open and verify block service metadata persisted
{
BlockServicesCacheDB db(logger, tdb.xmon, *tdb.sharedDB, 0_sec, 0, 0);
BlockServicesCacheDB db(logger, tdb.xmon, *tdb.sharedDB, 0_sec, 0, 0, 0);
auto cache = db.getCache();
auto it = cache.blockServices.find(idBase);
+2 -2
View File
@@ -398,7 +398,7 @@ struct TempShardDB {
std::shared_ptr<XmonAgent> xmon;
sharedDB = std::make_unique<SharedRocksDB>(logger, xmon, dbDir + "/db", dbDir + "/db-statistics.txt");
initSharedDB();
blockServicesCacheDB = std::make_unique<BlockServicesCacheDB>(logger, xmon, *sharedDB, 0_sec, 0, 0);
blockServicesCacheDB = std::make_unique<BlockServicesCacheDB>(logger, xmon, *sharedDB, 0_sec, 0, 0, 0);
db = std::make_unique<ShardDB>(logger, xmon, shid, 0, DEFAULT_DEADLINE_INTERVAL, *sharedDB, *blockServicesCacheDB);
}
@@ -408,7 +408,7 @@ struct TempShardDB {
db->close();
sharedDB = std::make_unique<SharedRocksDB>(logger, xmon, dbDir + "/db", dbDir + "/db-statistics.txt");
initSharedDB();
blockServicesCacheDB = std::make_unique<BlockServicesCacheDB>(logger, xmon, *sharedDB, 0_sec, 0, 0);
blockServicesCacheDB = std::make_unique<BlockServicesCacheDB>(logger, xmon, *sharedDB, 0_sec, 0, 0, 0);
db = std::make_unique<ShardDB>(logger, xmon, shid, 0, DEFAULT_DEADLINE_INTERVAL, *sharedDB, *blockServicesCacheDB);
}