From 2cb52f1e595dc57d94f0a38587569a2fa4dc627a Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Fri, 8 May 2026 16:16:43 +0000 Subject: [PATCH] blockservicepicker: sequential weighted pick + global per-disk cap --- cpp/shard/BlockServicePicker.cpp | 353 +++++++------------------- cpp/shard/BlockServicePicker.hpp | 5 +- cpp/shard/BlockServicesCacheDB.cpp | 2 +- cpp/shard/Shard.cpp | 1 - cpp/tests/blockservicepickertests.cpp | 318 +++++++---------------- 5 files changed, 196 insertions(+), 483 deletions(-) diff --git a/cpp/shard/BlockServicePicker.cpp b/cpp/shard/BlockServicePicker.cpp index 0d3394cf..a510477a 100644 --- a/cpp/shard/BlockServicePicker.cpp +++ b/cpp/shard/BlockServicePicker.cpp @@ -10,7 +10,6 @@ #include "Msgs.hpp" #include "BlockServicesCacheDB.hpp" #include -#include #include namespace { @@ -22,163 +21,59 @@ namespace { return bs.availableBytes > minSpaceRequiredForWrite && blockServiceFlagsWritable(bs.flags) && ternNow() - bs.firstSeen > writableDelay; } - // Cluster-wide bytes/sec from a single shard's accumulated bytes over `elapsed`. inline double throughputBytesPerSec(uint64_t accumulatedBytes, Duration elapsed) { double elapsedSec = static_cast(elapsed.ns) / 1'000'000'000.0; return static_cast(accumulatedBytes) * ShardId::SHARD_COUNT / elapsedSec; } - // Scale factor that pushes per-drive load toward `maxDriveThroughput`, clamped to >= 1. - // Returns 1.0 for the degenerate cases so callers can unconditionally use the result. inline double ratioFromThroughput(uint64_t maxDriveThroughput, uint64_t numDrives, double throughput) { if (maxDriveThroughput == 0 || numDrives == 0 || throughput <= 0.0) return 1.0; double ratio = static_cast(maxDriveThroughput) * numDrives / throughput; return ratio < 1.0 ? 1.0 : ratio; } - // Clamp dominant FD weights so that maxFdWeight * maxBlocksToPick <= totalWeight. - // If maxFdRatio >= 1.0, also clamp so that maxFdWeight <= minFdWeight * maxFdRatio. - // Both caps are computed on original weights, then min(ratioCap, strideCap) is applied once. - void applyClamping( + // Cap every disk's weight to globalMinSvcWeight * maxFdRatio across the (loc, sc). + // At ratio = 1.0 this makes per-disk weights uniform, so FD weights become + // proportional to disk count and per-disk pick probability becomes uniform. + // At high ratio (low load) the cap is large and original capacity-proportional + // weights flow through unchanged. + void applyGlobalCap( BlockServicePicker::LocationStorageInfo& lsInfo, std::unordered_map& serviceToFdInfo, - uint8_t maxBlocksToPick, - double maxFdRatio = 0.0 + double maxFdRatio ) { - auto& failureDomains = lsInfo.failureDomains; - if (failureDomains.empty()) return; + if (maxFdRatio < 1.0 || lsInfo.failureDomains.empty()) return; - // Phase 0: intra-FD (service-level) clamp. Prevents a single fresh disk - // in an otherwise-full FD from owning nearly all of the FD's weight, - // which would cause a single-disk hotspot under stridePick. Uses the - // same throughput-derived ratio as the FD-level pass below. - if (maxFdRatio >= 1.0) { - for (auto& fd : failureDomains) { - uint64_t minSvcWeight = UINT64_MAX; - for (const auto& svc : fd.services) { - if (svc.availableBytes > 0) { - minSvcWeight = std::min(minSvcWeight, svc.availableBytes); - } - } - if (minSvcWeight == UINT64_MAX) continue; - - double svcCapD = static_cast(minSvcWeight) * maxFdRatio; - uint64_t svcCap = (svcCapD >= static_cast(UINT64_MAX)) - ? UINT64_MAX : static_cast(svcCapD); - if (svcCap == 0) svcCap = 1; - - uint64_t newFdTotal = 0; - for (auto& svc : fd.services) { - if (svc.availableBytes == 0) continue; - if (svc.availableBytes > svcCap) { - svc.availableBytes = svcCap; - serviceToFdInfo[svc.id.u64].weight = svcCap; - } - newFdTotal += svc.availableBytes; - } - fd.totalWeight = newFdTotal; - } - uint64_t newLsTotal = 0; - for (const auto& fd : failureDomains) newLsTotal += fd.totalWeight; - lsInfo.totalWeight = newLsTotal; - } - - uint64_t minFdWeight = UINT64_MAX; - uint64_t maxFdWeight = 0; - for (const auto& fd : failureDomains) { - if (fd.totalWeight > 0) { - minFdWeight = std::min(minFdWeight, fd.totalWeight); - maxFdWeight = std::max(maxFdWeight, fd.totalWeight); - } - } - if (maxFdWeight == 0) return; - - // Compute ratio cap: maxFdWeight <= minFdWeight * maxFdRatio - uint64_t ratioCap = UINT64_MAX; - if (maxFdRatio >= 1.0 && minFdWeight > 0 && minFdWeight < UINT64_MAX) { - double ratioCapD = static_cast(minFdWeight) * maxFdRatio; - ratioCap = (ratioCapD >= static_cast(UINT64_MAX)) - ? UINT64_MAX : static_cast(ratioCapD); - } - - // Compute stride cap: maxFdWeight * maxBlocksToPick <= totalWeight - uint64_t strideCap = UINT64_MAX; - if (maxFdWeight > lsInfo.totalWeight / maxBlocksToPick) { - size_t numFds = failureDomains.size(); - - // Sort FD indices by weight descending - std::vector sortedIdx(numFds); - std::iota(sortedIdx.begin(), sortedIdx.end(), 0); - std::sort(sortedIdx.begin(), sortedIdx.end(), [&](size_t a, size_t b) { - return failureDomains[a].totalWeight > failureDomains[b].totalWeight; - }); - - // Find cap C: the largest value such that C * maxBlocksToPick <= c*C + S_unclamped, - // where c = number of FDs clamped (those with weight > C) and - // S_unclamped = sum of unclamped FD weights. - // Rearranging: C = S_unclamped / (maxBlocksToPick - c) - uint64_t sumTop = 0; - bool found = false; - - for (size_t c = 1; c <= numFds && c < static_cast(maxBlocksToPick); c++) { - sumTop += failureDomains[sortedIdx[c - 1]].totalWeight; - uint64_t sUnclamped = lsInfo.totalWeight - sumTop; - uint64_t candidateCap = sUnclamped / (static_cast(maxBlocksToPick) - c); - - bool topAboveCap = failureDomains[sortedIdx[c - 1]].totalWeight > candidateCap; - bool nextBelowOrEqCap = (c == numFds) || (failureDomains[sortedIdx[c]].totalWeight <= candidateCap); - - if (topAboveCap && nextBelowOrEqCap) { - strideCap = candidateCap; - found = true; - break; + uint64_t minSvcWeight = UINT64_MAX; + for (const auto& fd : lsInfo.failureDomains) { + for (const auto& svc : fd.services) { + if (svc.availableBytes > 0) { + minSvcWeight = std::min(minSvcWeight, svc.availableBytes); } } - - if (!found) { - strideCap = lsInfo.totalWeight / maxBlocksToPick; - } } + if (minSvcWeight == UINT64_MAX) return; - uint64_t cap = std::min(ratioCap, strideCap); - if (cap >= maxFdWeight) return; - if (cap == 0) cap = 1; + double svcCapD = static_cast(minSvcWeight) * maxFdRatio; + uint64_t svcCap = (svcCapD >= static_cast(UINT64_MAX)) + ? UINT64_MAX : static_cast(svcCapD); + if (svcCap == 0) svcCap = 1; - // Single clamping loop: scale and trim FDs above cap - lsInfo.totalWeight = 0; - for (auto& fd : failureDomains) { - if (fd.totalWeight <= cap) { - lsInfo.totalWeight += fd.totalWeight; - continue; - } - - double ratio = static_cast(cap) / fd.totalWeight; - fd.totalWeight = 0; + uint64_t newLsTotal = 0; + for (auto& fd : lsInfo.failureDomains) { + uint64_t newFdTotal = 0; for (auto& svc : fd.services) { if (svc.availableBytes == 0) continue; - svc.availableBytes = std::max(static_cast(svc.availableBytes * ratio), 1ul); - serviceToFdInfo[svc.id.u64].weight = svc.availableBytes; - fd.totalWeight += svc.availableBytes; - } - - // Trim if per-service rounding pushed FD weight above cap - while (fd.totalWeight > cap) { - BlockServicePicker::BlockServiceInfo* maxSvc = nullptr; - uint64_t maxSvcW = 0; - for (auto& svc : fd.services) { - if (svc.availableBytes > maxSvcW) { - maxSvcW = svc.availableBytes; - maxSvc = &svc; - } + if (svc.availableBytes > svcCap) { + svc.availableBytes = svcCap; + serviceToFdInfo[svc.id.u64].weight = svcCap; } - if (!maxSvc || maxSvcW <= 1) break; - maxSvc->availableBytes--; - serviceToFdInfo[maxSvc->id.u64].weight--; - fd.totalWeight--; + newFdTotal += svc.availableBytes; } - - lsInfo.totalWeight += fd.totalWeight; + fd.totalWeight = newFdTotal; + newLsTotal += newFdTotal; } + lsInfo.totalWeight = newLsTotal; } static constexpr size_t FAILURE_DOMAIN_NAME_SIZE = decltype(FailureDomain::name)::STATIC_SIZE; @@ -188,68 +83,75 @@ namespace { std::array fdData; }; - // Stride-based pick: selects `needed` services from evenly-spaced points in the - // cumulative weight distribution. Returns true if all `needed` services were picked. - bool stridePick( + // Sequential weighted sampling without replacement over failure domains. + // For each of `needed` draws: pick an FD with probability proportional to its + // remaining weight, then weighted-sample a non-blacklisted disk in that FD, + // then drop the FD from the live set. + // Caller guarantees: at least `needed` FDs have positive weight in fdWeights. + void sequentialWeightedPick( const std::vector& failureDomains, const std::vector& fdWeights, - uint64_t totalWeight, uint8_t needed, const std::unordered_set& blacklistedServices, RandomGenerator& rng, std::vector& results ) { - if (totalWeight == 0 || needed == 0) return false; - - uint64_t step = totalWeight / needed; - results.clear(); results.reserve(needed); - uint64_t offset = rng.generate64() % totalWeight; - - for (uint8_t i = 0; i < needed; i++) { - uint64_t target = (offset + i * step) % totalWeight; - - uint64_t cumulative = 0; - for (size_t fdIdx = 0; fdIdx < failureDomains.size(); fdIdx++) { - uint64_t fdWeight = fdWeights[fdIdx]; - if (fdWeight == 0) continue; - // FD weight should not exceed step, or we may pick multiple from same FD and break guarantees - if (fdWeight > step) { - results.clear(); - return false; - } - - if (target < cumulative + fdWeight) { - const auto& fdInfo = failureDomains[fdIdx]; - uint64_t fdTarget = target - cumulative; - uint64_t svcCumulative = 0; - - for (const auto& svc : fdInfo.services) { - if (blacklistedServices.contains(svc.id.u64)) continue; - - if (fdTarget < svcCumulative + svc.availableBytes) { - results.push_back({svc.id, fdInfo.failureDomain.name.data}); - break; - } - svcCumulative += svc.availableBytes; - } - break; - } - cumulative += fdWeight; + std::vector live; + std::vector liveWeights; + live.reserve(failureDomains.size()); + liveWeights.reserve(failureDomains.size()); + uint64_t totalWeight = 0; + for (size_t i = 0; i < failureDomains.size(); i++) { + if (fdWeights[i] > 0) { + live.push_back(i); + liveWeights.push_back(fdWeights[i]); + totalWeight += fdWeights[i]; } } - return results.size() == needed; + for (uint8_t k = 0; k < needed; k++) { + uint64_t target = rng.generate64() % totalWeight; + size_t chosen = 0; + uint64_t cumulative = 0; + for (size_t i = 0; i < live.size(); i++) { + cumulative += liveWeights[i]; + if (target < cumulative) { + chosen = i; + break; + } + } + + const auto& fdInfo = failureDomains[live[chosen]]; + uint64_t fdTarget = rng.generate64() % liveWeights[chosen]; + uint64_t svcCumulative = 0; + for (const auto& svc : fdInfo.services) { + if (blacklistedServices.contains(svc.id.u64)) continue; + svcCumulative += svc.availableBytes; + if (fdTarget < svcCumulative) { + results.push_back({svc.id, fdInfo.failureDomain.name.data}); + break; + } + } + + totalWeight -= liveWeights[chosen]; + live[chosen] = live.back(); + liveWeights[chosen] = liveWeights.back(); + live.pop_back(); + liveWeights.pop_back(); + } + + ALWAYS_ASSERT(results.size() == needed); } } -BlockServicePicker::BlockServicePicker(Logger& logger, std::shared_ptr& xmon, uint8_t maxBlocksToPick, Duration writableDelay, +BlockServicePicker::BlockServicePicker(Logger& logger, std::shared_ptr& xmon, Duration writableDelay, uint64_t hddDriveThroughput, uint64_t flashDriveThroughput, uint64_t minSpaceRequiredForWrite) : _state(nullptr), _rawState(nullptr), _rng(ternNow().ns), _env(logger, xmon, "block_service_picker"), - _maxBlocksToPick(maxBlocksToPick), _writableDelay(writableDelay), + _writableDelay(writableDelay), _hddDriveThroughput(hddDriveThroughput), _flashDriveThroughput(flashDriveThroughput), _minSpaceRequiredForWrite(minSpaceRequiredForWrite) {} @@ -258,8 +160,6 @@ void BlockServicePicker::update( ) { auto next = std::make_shared(); - // Build weighted structure: group by location/storageClass, then by failure domain - // Map: (location, storageClass) -> map of (failure domain string) -> failure domain index std::unordered_map> grouped; std::unordered_set distinctBlockServiceTypeLoc; @@ -302,7 +202,6 @@ void BlockServicePicker::update( _rawState.store(next, std::memory_order_release); - // Clone raw state and apply clamping, preserving existing throughput ratio auto clamped = std::make_shared(*next); auto nowNs = ternNow().ns; @@ -319,8 +218,6 @@ void BlockServicePicker::update( uint8_t storageClass = key & 0xFF; uint64_t maxDriveThroughput = (storageClass == FLASH_STORAGE) ? _flashDriveThroughput : _hddDriveThroughput; - // Recalculate throughput estimate from accumulated data, or initialize for new lcKeys. - // When maxDriveThroughput == 0 (throughput tracking disabled), skip ratio clamping. double ratio = 0.0; uint64_t lastEstimate = stats.lastThroughputEstimate.load(std::memory_order_relaxed); if (maxDriveThroughput > 0 && totalServices > 0) { @@ -342,7 +239,7 @@ void BlockServicePicker::update( ratio = ratioFromThroughput(maxDriveThroughput, totalServices, static_cast(lastEstimate)); } - applyClamping(lsInfo, clamped->serviceToFdInfo, _maxBlocksToPick, ratio); + applyGlobalCap(lsInfo, clamped->serviceToFdInfo, ratio); uint64_t maxW = 0, minW = UINT64_MAX; for (const auto& fd : lsInfo.failureDomains) { @@ -383,9 +280,9 @@ TernError BlockServicePicker::pick( uint64_t blockSize ) const { auto state = _state.load(std::memory_order_acquire); - if (!state || needed == 0 || needed > _maxBlocksToPick) { - LOG_DEBUG(_env, "pick failed: state=%s needed=%s maxBlocksToPick=%s", - state != nullptr, (int)needed, (int)_maxBlocksToPick); + if (!state || needed == 0) { + LOG_DEBUG(_env, "pick failed: state=%s needed=%s", + state != nullptr, (int)needed); return TernError::COULD_NOT_PICK_BLOCK_SERVICES; } @@ -396,7 +293,6 @@ TernError BlockServicePicker::pick( key = lcKey(locationId, storageClass); } - // Throughput tracking — spike-triggered recalc (periodic recalc happens via update()) if (blockSize > 0) { ALWAYS_ASSERT(blockSize <= MAXIMUM_SPAN_SIZE, "blockSize %s > MAXIMUM_SPAN_SIZE %s", blockSize, MAXIMUM_SPAN_SIZE); @@ -410,19 +306,17 @@ TernError BlockServicePicker::pick( Duration elapsed = now - lastRecalcTime; uint64_t lastEstimate = stats.lastThroughputEstimate.load(std::memory_order_relaxed); - // Trigger a recalc if we've seen a sustained spike above the last estimate — this helps us react faster to sudden load increases. - // We require at least 1 second of data to avoid overreacting to noise in very short intervals. - // We don't care in load reduction scenarios, since the periodic recalc in update() will eventually restore throughput ratios and thus unblock any clamped FDs. + // Trigger a recalc if we've seen a sustained spike above the last estimate. + // We don't react to load reductions — the periodic recalc in update() will + // restore throughput ratios and unblock any clamped FDs. bool spikeTriggered = elapsed >= 1_sec && lastEstimate > 0 && throughputBytesPerSec(accumulated, elapsed) > static_cast(lastEstimate) * 1.1; if (spikeTriggered && _recalcMutex.try_lock()) { std::unique_lock recalcLock(_recalcMutex, std::adopt_lock); - // Re-read after acquiring lock (another thread may have recalced) lastRecalcTime = TernTime(stats.lastRecalcTimeNs.load(std::memory_order_relaxed)); elapsed = now - lastRecalcTime; accumulated = stats.throughputBytes.load(std::memory_order_relaxed); - // we need to check again as update could have happened while we were waiting for the lock, and if so we can skip recalculation if (elapsed > 1_sec) { double currentThroughput = throughputBytesPerSec(accumulated, elapsed); uint64_t numDrives = stats.numDrives; @@ -459,7 +353,7 @@ TernError BlockServicePicker::pick( } } - applyClamping(lsInfoRef, newState->serviceToFdInfo, _maxBlocksToPick, newRatio); + applyGlobalCap(lsInfoRef, newState->serviceToFdInfo, newRatio); uint64_t maxW = 0, minW = UINT64_MAX; for (const auto& fd : lsInfoRef.failureDomains) { @@ -469,10 +363,8 @@ TernError BlockServicePicker::pick( stats.maxWeight.store(maxW, std::memory_order_relaxed); stats.minWeight.store(lsInfoRef.failureDomains.empty() ? 0 : minW, std::memory_order_relaxed); - // CAS swap — if update() raced, discard _state.compare_exchange_strong(expected, newState, std::memory_order_release); - // Reload state for this pick state = _state.load(std::memory_order_acquire); } } @@ -494,12 +386,9 @@ TernError BlockServicePicker::pick( blacklistedServices.insert(b.blockService.u64); } - // Build adjusted FD weights and lookup (copy and apply blacklist) std::vector fdWeights; std::unordered_set actuallyBlacklistedServices; fdWeights.reserve(lsInfo.failureDomains.size()); - uint64_t totalWeight = 0; - uint64_t maxFdWeight = 0; for (const auto& fdInfo : lsInfo.failureDomains) { uint64_t adjustedWeight = fdInfo.totalWeight; @@ -509,26 +398,30 @@ TernError BlockServicePicker::pick( break; } } - fdWeights.emplace_back(adjustedWeight); - totalWeight += adjustedWeight; - maxFdWeight = std::max(maxFdWeight, adjustedWeight); } for(const auto& blacklistEntry : blacklist) { auto svcIt = state->serviceToFdInfo.find(blacklistEntry.blockService.u64); if (svcIt != state->serviceToFdInfo.end()) { const auto& svcInfo = svcIt->second; - if (fdWeights[svcInfo.fdIndex] == 0) continue; // already blacklisted via FD + if (fdWeights[svcInfo.fdIndex] == 0) continue; if (svcInfo.lcKey == key) { actuallyBlacklistedServices.insert(blacklistEntry.blockService.u64); fdWeights[svcInfo.fdIndex] -= svcInfo.weight; - totalWeight -= svcInfo.weight; } } } - auto commitResults = [&](const std::vector& results, bool rescaled) { + size_t liveFdCount = 0; + for (uint64_t w : fdWeights) { + if (w > 0) liveFdCount++; + } + + if (liveFdCount >= needed) { + std::vector results; + sequentialWeightedPick(lsInfo.failureDomains, fdWeights, needed, + actuallyBlacklistedServices, _rng, results); out.clear(); out.reserve(needed); std::lock_guard lock(_statsMutex); @@ -539,57 +432,7 @@ TernError BlockServicePicker::pick( _failureDomainStats[fdStr].fetch_add(1, std::memory_order_relaxed); } _locStorageStats[key].totalPicks.fetch_add(needed, std::memory_order_relaxed); - if (rescaled) _locStorageStats[key].blacklistRescales.fetch_add(1, std::memory_order_relaxed); - }; - - // Count live FDs up front — if blacklisting left us with fewer FDs than - // `needed`, neither fast nor slow path can succeed; bail early. - size_t liveFdCount = 0; - for (uint64_t w : fdWeights) { - if (w > 0) liveFdCount++; - } - - if (totalWeight > 0 && needed > 0 && liveFdCount >= needed) { - // Fast path: stride pick on pre-scaled weights with blacklist adjustments - std::vector results; - if (stridePick(lsInfo.failureDomains, fdWeights, totalWeight, needed, - actuallyBlacklistedServices, _rng, results)) { - commitResults(results, false); - return TernError::NO_ERROR; - } - - // Slow path: blacklisting broke the stride invariant (maxFdWeight > step). - // just pick `needed` distinct FDs uniformly - // then weighted-sample one non-blacklisted service per FD. - std::vector liveFds; - liveFds.reserve(liveFdCount); - for (size_t i = 0; i < lsInfo.failureDomains.size(); i++) { - if (fdWeights[i] > 0) liveFds.push_back(i); - } - - results.clear(); - results.reserve(needed); - for (uint8_t k = 0; k < needed; k++) { - size_t j = k + static_cast(_rng.generate64() % (liveFds.size() - k)); - std::swap(liveFds[k], liveFds[j]); - - const auto& fd = lsInfo.failureDomains[liveFds[k]]; - uint64_t target = _rng.generate64() % fdWeights[liveFds[k]]; - uint64_t acc = 0; - for (const auto& svc : fd.services) { - if (actuallyBlacklistedServices.contains(svc.id.u64)) continue; - if (target < acc + svc.availableBytes) { - results.push_back({svc.id, fd.failureDomain.name.data}); - break; - } - acc += svc.availableBytes; - } - } - - if (results.size() == needed) { - commitResults(results, true); - return TernError::NO_ERROR; - } + return TernError::NO_ERROR; } } @@ -639,7 +482,6 @@ BlockServicePicker::StatsSnapshot BlockServicePicker::getStats() const { stats.writableBlockServices.load(std::memory_order_relaxed), stats.maxWeight.load(std::memory_order_relaxed), stats.minWeight.load(std::memory_order_relaxed), - stats.blacklistRescales.load(std::memory_order_relaxed), effectiveMaxRatio, lastEstimate }); @@ -677,7 +519,6 @@ void BlockServicePicker::resetStats() { for (auto& [key, stats] : _locStorageStats) { stats.totalPicks.store(0, std::memory_order_relaxed); - stats.blacklistRescales.store(0, std::memory_order_relaxed); } for (auto& [id, stats] : _blockServiceStats) { diff --git a/cpp/shard/BlockServicePicker.hpp b/cpp/shard/BlockServicePicker.hpp index 2435cb1f..73cf1fea 100644 --- a/cpp/shard/BlockServicePicker.hpp +++ b/cpp/shard/BlockServicePicker.hpp @@ -57,7 +57,6 @@ struct BlockServicePicker { std::atomic writableBlockServices{0}; std::atomic maxWeight{0}; std::atomic minWeight{0}; - std::atomic blacklistRescales{0}; // Throughput tracking for dynamic ratio recalc std::atomic throughputBytes{0}; std::atomic lastRecalcTimeNs{0}; @@ -76,13 +75,12 @@ struct BlockServicePicker { mutable std::unordered_map> _failureDomainStats; mutable std::mutex _statsMutex; // protects map structures (not atomic values) mutable std::mutex _recalcMutex; - const uint8_t _maxBlocksToPick; const Duration _writableDelay; const uint64_t _hddDriveThroughput; // bytes/sec per drive const uint64_t _flashDriveThroughput; // bytes/sec per drive const uint64_t _minSpaceRequiredForWrite; // min available bytes for a block service to be writable - BlockServicePicker(Logger& logger, std::shared_ptr& xmon, uint8_t maxBlocksToPick, Duration writableDelay, + BlockServicePicker(Logger& logger, std::shared_ptr& xmon, Duration writableDelay, uint64_t hddDriveThroughput, uint64_t flashDriveThroughput, uint64_t minSpaceRequiredForWrite); @@ -109,7 +107,6 @@ struct BlockServicePicker { uint64_t writableBlockServices; uint64_t maxWeight; uint64_t minWeight; - uint64_t blacklistRepicks; double effectiveMaxRatio; uint64_t throughputEstimate; }; diff --git a/cpp/shard/BlockServicesCacheDB.cpp b/cpp/shard/BlockServicesCacheDB.cpp index a880c1cb..397e160a 100644 --- a/cpp/shard/BlockServicesCacheDB.cpp +++ b/cpp/shard/BlockServicesCacheDB.cpp @@ -190,7 +190,7 @@ BlockServicesCacheDB::BlockServicesCacheDB(Logger& logger, std::shared_ptr testXmon; -static BlockServicePicker makePicker(uint8_t maxBlocksToPick = 15, - Duration writableDelay = 0_sec, +static BlockServicePicker makePicker(Duration writableDelay = 0_sec, uint64_t hddDriveThroughput = 0, uint64_t flashDriveThroughput = 0, uint64_t minSpaceRequiredForWrite = 0) { - return BlockServicePicker(testLogger, testXmon, maxBlocksToPick, writableDelay, + return BlockServicePicker(testLogger, testXmon, writableDelay, hddDriveThroughput, flashDriveThroughput, minSpaceRequiredForWrite); } @@ -142,8 +141,7 @@ TEST_CASE("picker concurrency update while picks") { } TEST_CASE("picker weighted distribution") { - // Use maxBlocksToPick=1 to avoid scaling effects on small clusters - auto p = makePicker(1); + auto p = makePicker(); // Create services with different weights // Service 1000: 10 MB available (weight 10M) @@ -264,8 +262,7 @@ TEST_CASE("picker blacklist enforcement") { } TEST_CASE("picker weighted distribution with blacklist") { - // Use maxBlocksToPick=1 to avoid scaling effects on small clusters - auto p = makePicker(1); + auto p = makePicker(); // Create services with varying weights across multiple failure domains // FD 1: service 100 (10MB), service 101 (10MB) - total 20MB @@ -439,15 +436,17 @@ TEST_CASE("picker multi-service weighted distribution") { // Verify that picks are distributed according to weights uint64_t totalPicks = (uint64_t)NUM_ITERATIONS * needed; - // Check failure domain distribution + // Check failure domain distribution. Marginal pick probability under + // sequential weighted-without-replacement isn't exactly proportional + // to weight when needed/numFDs is large (heavy FDs saturate towards + // P=1), so widen the tolerance for larger picks. for (uint8_t fdByte = 1; fdByte <= 20; ++fdByte) { double expectedRatio = (double)fdTotalWeights[fdByte] / totalSystemWeight; double expectedPicks = totalPicks * expectedRatio; int actualPicks = fdPickCounts[fdByte]; - // Allow 5% deviation for FD distribution - double tolerance = 0.05; - if (expectedPicks > 1000) { // Only check FDs with significant expected picks + double tolerance = (needed >= 10) ? 0.10 : 0.05; + if (expectedPicks > 1000) { CHECK(actualPicks > expectedPicks * (1.0 - tolerance)); CHECK(actualPicks < expectedPicks * (1.0 + tolerance)); } @@ -491,81 +490,6 @@ TEST_CASE("picker multi-service weighted distribution") { CHECK(allPickedServices.size() > serviceWeights.size() * 0.9); } -TEST_CASE("picker weight clamping for dominant failure domain") { - auto p = makePicker(); - std::vector catalog; - - // Scenario: - // FD 1: Dominant (10000) - // FD 2: Large (5000) - // FD 3-52: Small (100 each) - 50 FDs - // Total FDs: 52 - // - // With clamping (maxBlocksToPick=15): - // FD1 and FD2 are both above the cap and get clamped to the same weight. - // Small FDs keep their exact weights (100). - // cap = S_unclamped / (15 - 2) = 5000 / 13 = 384 - // newTotal = 2*384 + 50*100 = 5768 - - // Add FD 1 - catalog.push_back(bs(1, 1, FLASH_STORAGE, 1)); - // Add FD 2 - catalog.push_back(bs(2, 1, FLASH_STORAGE, 2)); - // Add FD 3-52 - for (int i = 3; i <= 52; ++i) { - catalog.push_back(bs(i, 1, FLASH_STORAGE, i)); - } - - auto cache = makeCatalog(catalog); - - // Set weights - cache[1].availableBytes = 10000; - cache[2].availableBytes = 5000; - for (int i = 3; i <= 52; ++i) { - cache[i].availableBytes = 100; - } - - p.update(cache); - - double cap = 384.0; - double total = 2 * cap + 50 * 100.0; - double probClamped = cap / total; // ~6.66% for FD1 and FD2 - double probSmall = 100.0 / total; // ~1.73% for each small FD - - // Run many iterations - const int NUM_ITERATIONS = 1000000; - std::unordered_map picks; - - for (int i = 0; i < NUM_ITERATIONS; ++i) { - std::vector out; - auto err = p.pick(1, FLASH_STORAGE, 1, {}, out); - REQUIRE(err == TernError::NO_ERROR); - REQUIRE(out.size() == 1); - picks[out[0].u64]++; - } - - // FD 1 and FD 2: both clamped to same weight, so approximately equal - double actualProb1 = (double)picks[1] / NUM_ITERATIONS; - double actualProb2 = (double)picks[2] / NUM_ITERATIONS; - CHECK(actualProb1 > probClamped * 0.85); - CHECK(actualProb1 < probClamped * 1.15); - CHECK(actualProb2 > probClamped * 0.85); - CHECK(actualProb2 < probClamped * 1.15); - - // Small FDs: keep exact weights - double totalSmallPicks = 0; - for (int i = 3; i <= 52; ++i) { - totalSmallPicks += picks[i]; - } - double actualProbSmallAvg = (totalSmallPicks / 50.0) / NUM_ITERATIONS; - CHECK(actualProbSmallAvg > probSmall * 0.85); - CHECK(actualProbSmallAvg < probSmall * 1.15); - - // Clamped FDs > small FDs - CHECK(picks[1] > (totalSmallPicks / 50.0)); - CHECK(picks[2] > (totalSmallPicks / 50.0)); -} - TEST_CASE("picker never picks same failure domain twice") { // Test various configurations: different FD counts, service counts, and needed values struct TestConfig { @@ -579,7 +503,7 @@ TEST_CASE("picker never picks same failure domain twice") { {3, 1, 2, 1000000}, // minimal: 3 FDs, 1 service each, pick 2 {3, 1, 3, 1000000}, // pick all 3 {5, 3, 4, 1000000}, // 5 FDs with 3 services each, pick 4 - {15, 1, 15, 1000000}, // exactly maxBlocksToPick FDs + {15, 1, 15, 1000000}, // pick every one of 15 FDs {20, 5, 10, 1000000}, // 20 FDs, pick 10 {3, 1, 2, 1}, // tiny weights (post-scaling edge case) {52, 1, 15, 100}, // many FDs, small weights @@ -626,67 +550,13 @@ TEST_CASE("picker never picks same failure domain twice") { } } -TEST_CASE("picker blacklist rescale metric is bumped") { - // Create a scenario where blacklisting a *service* (not FD) breaks the stride - // invariant, forcing the slow path (rescale). - // 3 FDs each with weight 100, maxBlocksToPick=3. step=100, maxFdWeight=100 → OK. - // Blacklisting a service worth 50 from FD3 → totalWeight=250, maxFdWeight=100, - // step=83 < 100 → invariant broken → slow path with rescale. - auto p = makePicker(3); - - std::unordered_map cache; - auto makeEntry = [](uint8_t fd, uint64_t avail) { - BlockServiceCache entry; - entry.locationId = 1; - entry.storageClass = FLASH_STORAGE; - entry.failureDomain = fdWith(fd).name.data; - entry.flags = BlockServiceFlags::EMPTY; - entry.availableBytes = avail; - entry.capacityBytes = avail * 10; - entry.blocks = 0; - entry.hasFiles = false; - return entry; - }; - - cache[1] = makeEntry(1, 100); - cache[2] = makeEntry(2, 100); - cache[3] = makeEntry(3, 50); - cache[4] = makeEntry(3, 50); - - p.update(cache); - p.resetStats(); - - // Blacklist service 3 (one of FD3's services, weight 50) - std::vector blacklist; - BlacklistEntry bl; - bl.blockService = BlockServiceId(3); - blacklist.push_back(bl); - - for (int i = 0; i < 100; i++) { - std::vector out; - auto err = p.pick(1, FLASH_STORAGE, 3, blacklist, out); - REQUIRE(err == TernError::NO_ERROR); - REQUIRE(out.size() == 3); - } - - auto stats = p.getStats(); - bool foundRescales = false; - for (const auto& ls : stats.locStorage) { - if (ls.blacklistRepicks > 0) { - foundRescales = true; - CHECK(ls.blacklistRepicks == 100); - } - } - CHECK(foundRescales); -} - TEST_CASE("picker extreme weight disparity uniform at max load") { // Scenario: 200 existing FDs (nearly full, 5GB/disk × 100 disks = 500GB/FD) // plus 5 brand-new FDs (empty, 20TB/disk × 100 disks = 2000TB/FD). // Weight ratio per FD: 2000TB / 500GB = 4000x. // At max load (default), ratio=1.0 clamps all FDs to minFdWeight, so distribution // should be near-uniform across all FDs regardless of capacity disparity. - auto p = makePicker(15, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); const uint64_t EXISTING_BYTES_PER_DISK = 5000000000ULL; // 5 GB const uint64_t NEW_BYTES_PER_DISK = 20000000000000ULL; // 20 TB @@ -763,10 +633,9 @@ TEST_CASE("picker extreme weight disparity uniform at max load") { TEST_CASE("picker throughput adaptation increases ratio at low load") { // Low observed throughput → high effectiveMaxRatio → capacity-proportional picks. - // Use maxBlocksToPick=1 so stride clamping doesn't interfere (only ratio clamping matters). _setCurrentTime(ternNow()); - auto p = makePicker(1, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); const int EXISTING_FDS = 4; const int NEW_FDS = 1; @@ -843,12 +712,11 @@ TEST_CASE("picker throughput adaptation increases ratio at low load") { TEST_CASE("picker clamping ratio varies with load") { // 2 FDs: FD1 = 1GB (10 drives), FD2 = 100GB (10 drives). 100x weight difference. - // maxBlocksToPick=1 so stride clamping doesn't interfere. - // At max load (initial): ratio=1.0 → all FDs clamped to minFdWeight → near-uniform. + // At max load (initial): ratio=1.0 → all disks clamped to minSvcWeight → near-uniform. // At low load (after recalc): high ratio → capacity-proportional → FD2 gets ~100x more. _setCurrentTime(ternNow()); - auto p = makePicker(1, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); std::unordered_map cache; std::unordered_map serviceToFd; @@ -907,8 +775,8 @@ TEST_CASE("picker clamping ratio varies with load") { // Max cluster throughput = 600MB/s * 20 drives = 12GB/s. // For ratio=5: cluster throughput = 12GB/s / 5 = 2.4GB/s. // Per-shard over 2s = 2.4GB/s * 2 / 256 ≈ 18.75MB → 1000 picks of ~18750 bytes. - // With ratio=5: ratioCap = minFdWeight(10GB) * 5 = 50GB. - // FD1 stays at 10GB, FD2 clamped from 1TB to 50GB → pick ratio ≈ 5x. + // With ratio=5: svcCap = minSvc(1GB) * 5 = 5GB per disk. + // FD1 disks stay at 1GB, FD2 disks clamped 100GB→5GB → FD weights 10GB vs 50GB → ~5x. for (int i = 0; i < 1000; i++) { std::vector out; p.pick(1, FLASH_STORAGE, 1, {}, out, 18750); @@ -948,71 +816,11 @@ TEST_CASE("picker insufficient live FDs returns error fast") { CHECK(out.size() == 0); } -TEST_CASE("picker slow path picks distinct FDs with service blacklist") { - // Force the slow path: 3 FDs, each with two services. Blacklist one service - // in the heaviest FD so stride invariant (maxFdWeight <= step) breaks. - auto p = makePicker(3); - - std::unordered_map cache; - auto mk = [](uint8_t fd, uint64_t avail) { - BlockServiceCache e; - e.locationId = 1; - e.storageClass = FLASH_STORAGE; - e.failureDomain = fdWith(fd).name.data; - e.flags = BlockServiceFlags::EMPTY; - e.availableBytes = avail; - e.capacityBytes = avail * 10; - e.blocks = 0; - e.hasFiles = false; - return e; - }; - // FD1: svcs 10, 11 (both weight 100) → FD total 200 - // FD2: svcs 20, 21 (both weight 100) → FD total 200 - // FD3: svcs 30, 31 (both weight 100) → FD total 200 - cache[10] = mk(1, 100); cache[11] = mk(1, 100); - cache[20] = mk(2, 100); cache[21] = mk(2, 100); - cache[30] = mk(3, 100); cache[31] = mk(3, 100); - - p.update(cache); - p.resetStats(); - - // Blacklist svc 30 from FD3 (weight 100): totalWeight=500, maxFdWeight=200, - // step = 500/3 = 166 < 200 → stride invariant broken, slow path taken. - std::vector bl; - BlacklistEntry e; e.blockService = BlockServiceId(30); bl.push_back(e); - - const int ITER = 2000; - for (int i = 0; i < ITER; i++) { - std::vector out; - auto err = p.pick(1, FLASH_STORAGE, 3, bl, out); - REQUIRE(err == TernError::NO_ERROR); - REQUIRE(out.size() == 3); - - std::unordered_set seenFds; - for (const auto& id : out) { - CHECK(id.u64 != 30); // blacklisted - uint8_t fd = (id.u64 < 20) ? 1 : (id.u64 < 30) ? 2 : 3; - CHECK(seenFds.insert(fd).second); // distinct FDs - } - } - - auto stats = p.getStats(); - bool sawRescale = false; - for (const auto& ls : stats.locStorage) { - if (ls.blacklistRepicks > 0) { - sawRescale = true; - // We took the slow path every iteration. - CHECK(ls.blacklistRepicks == (uint64_t)ITER); - } - } - CHECK(sawRescale); -} - TEST_CASE("picker drained lcKey resets throughput stats") { // if an lcKey loses all services, stale numDrives / // lastThroughputEstimate must be cleared so the spike path can't read them. _setCurrentTime(ternNow()); - auto p = makePicker(1, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); std::vector catalog{ bs(1, 1, FLASH_STORAGE, 1), @@ -1047,7 +855,7 @@ TEST_CASE("picker stale service id from dropped lcKey does not corrupt weights") // but still appears in a blacklist must not subtract bogus weight from // the current fdWeights. _setCurrentTime(ternNow()); - auto p = makePicker(1, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); // Phase 1: service 99 lives in location=1. std::vector catalog1{ @@ -1091,7 +899,7 @@ TEST_CASE("picker intra-FD clamp at max load spreads within heterogeneous FD") { // Phase 0 should clamp the fresh disk down to 10GB; each disk then receives ~1/10 of picks. // Without Phase 0, the fresh disk would receive ~99.5% of picks (2000 / 2090 of in-FD weight). _setCurrentTime(ternNow()); - auto p = makePicker(1, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); std::unordered_map cache; auto mk = [](uint64_t avail) { @@ -1145,7 +953,7 @@ TEST_CASE("picker intra-FD clamp no-op at low load preserves capacity-proportion // becomes large; Phase 0 becomes a no-op and fresh disk dominates in-FD picks // (this is the desirable "drain to fresh capacity when under-utilized" behaviour). _setCurrentTime(ternNow()); - auto p = makePicker(1, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); std::unordered_map cache; auto mk = [](uint64_t avail) { @@ -1207,7 +1015,7 @@ TEST_CASE("picker intra-FD clamp preserves consistency with service blacklist") // blacklisting a service that was clamped must still produce a valid pick // without corrupting remaining FD weights. _setCurrentTime(ternNow()); - auto p = makePicker(3, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); std::unordered_map cache; auto mk = [](uint8_t fd, uint64_t avail) { @@ -1247,14 +1055,12 @@ TEST_CASE("picker intra-FD clamp preserves consistency with service blacklist") _setCurrentTime(TernTime(0)); } -TEST_CASE("picker intra-FD clamp changes stride-cap outcome") { - // Without Phase 0: FD1 raw total = 2TB + 9×10GB ≈ 2.09TB dominates; cluster - // total ≈ 4.09TB; step = 1.36TB; FD1 > step → stride cap binds. - // With Phase 0: FD1 clamped to 10×10GB = 100GB; ratio cap equalises all 3 FDs - // to minFdWeight = 100GB → no stride cap needed; FD picks should be uniform - // and maxWeight == minWeight after update(). +TEST_CASE("picker global cap equalises heterogeneous FDs at max load") { + // Three FDs with very different per-disk capacity. Global cap clamps every + // disk to the global minimum (10GB), so every FD ends at 10 disks × 10GB = + // 100GB. maxWeight == minWeight, and picking 3 from 3 FDs always covers all. _setCurrentTime(ternNow()); - auto p = makePicker(3, 0_sec, 600'000'000, 600'000'000); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); std::unordered_map cache; std::unordered_map serviceToFd; @@ -1309,3 +1115,73 @@ TEST_CASE("picker intra-FD clamp changes stride-cap outcome") { _setCurrentTime(TernTime(0)); } + +TEST_CASE("picker uneven disk count per FD spreads per-disk load at max load") { + // Two FDs with the same total bytes but very different disk counts: + // FD1: 2 disks × 1TB = 2TB total, 2 disks + // FD2: 50 disks × 40GB = 2TB total, 50 disks + // Without a global per-disk cap, equal FD bytes make stride/weighted picks + // hit each FD ~50% of the time, hammering FD1's two disks at ~25% each + // while FD2's disks see ~1% each — a 25× per-disk imbalance. + // With the global cap at max load (ratio=1) every disk is capped to the + // global minimum (40GB), so FD weight becomes proportional to disk count + // and per-disk pick rate equalises across all 52 disks. + _setCurrentTime(ternNow()); + auto p = makePicker(0_sec, 600'000'000, 600'000'000); + + std::unordered_map cache; + std::unordered_map serviceToFd; + auto addSvc = [&](uint64_t id, uint8_t fdByte, uint64_t avail) { + BlockServiceCache e; + e.locationId = 1; + e.storageClass = FLASH_STORAGE; + e.failureDomain = fdWith(fdByte).name.data; + e.flags = BlockServiceFlags::EMPTY; + e.availableBytes = avail; + e.capacityBytes = avail * 10; + e.blocks = 0; + e.hasFiles = false; + cache[id] = e; + serviceToFd[id] = fdByte; + }; + + const uint64_t FD1_AVAIL = 1'000'000'000'000ULL; // 1 TB per disk + const uint64_t FD2_AVAIL = 40'000'000'000ULL; // 40 GB per disk + uint64_t id = 1; + for (int i = 0; i < 2; i++) addSvc(id++, 1, FD1_AVAIL); + for (int i = 0; i < 50; i++) addSvc(id++, 2, FD2_AVAIL); + + p.update(cache); + p.resetStats(); + + const int N = 200000; + for (int i = 0; i < N; i++) { + std::vector out; + auto err = p.pick(1, FLASH_STORAGE, 1, {}, out); + REQUIRE(err == TernError::NO_ERROR); + REQUIRE(out.size() == 1); + } + + auto stats = p.getStats(); + uint64_t totalDiskPicks = 0; + uint64_t maxDiskPicks = 0; + uint64_t minDiskPicks = UINT64_MAX; + for (const auto& b : stats.blockServices) { + totalDiskPicks += b.picks; + maxDiskPicks = std::max(maxDiskPicks, b.picks); + minDiskPicks = std::min(minDiskPicks, b.picks); + } + REQUIRE(totalDiskPicks == (uint64_t)N); + + // Per-disk pick rate should be uniform across all 52 disks. + double expectedPerDisk = (double)N / 52.0; + double maxDeviation = std::max( + std::abs((double)maxDiskPicks - expectedPerDisk) / expectedPerDisk, + std::abs((double)minDiskPicks - expectedPerDisk) / expectedPerDisk); + CHECK(maxDeviation < 0.20); + + // Sanity: max-to-min ratio should be near 1 — emphatically not 25x. + CHECK((double)maxDiskPicks / minDiskPicks < 1.5); + + _setCurrentTime(TernTime(0)); +}