From 99aa882575cd4ff79e4137224570db801a39d2e6 Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Thu, 20 Jun 2024 15:19:16 +0000 Subject: [PATCH] gcmigrate: support sharding --- cpp/shard/Shard.cpp | 2 ++ go/eggsgc/eggsgc.go | 7 +++++++ 2 files changed, 9 insertions(+) diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 164c2fbe..d5cc4832 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -856,6 +856,7 @@ public: _outgoingLogEntries.clear(); _replicaInfo = _shared.replicas; uint32_t pulled = _shared.writerRequestsQueue.pull(_requests, _maxWritesAtOnce, _logsDB.getNextTimeout()); + auto start = eggsNow(); if (likely(pulled > 0)) { LOG_DEBUG(_env, "pulled %s requests from write queue", pulled); _shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05; @@ -883,6 +884,7 @@ public: } } logsDBStep(); + auto loopTime = eggsNow() - start; } }; diff --git a/go/eggsgc/eggsgc.go b/go/eggsgc/eggsgc.go index c2135d08..e6561745 100644 --- a/go/eggsgc/eggsgc.go +++ b/go/eggsgc/eggsgc.go @@ -96,6 +96,8 @@ func main() { metrics := flag.Bool("metrics", false, "Send metrics") countMetrics := flag.Bool("count-metrics", false, "Compute and send count metrics") migrate := flag.Bool("migrate", false, "migrate") + numMigrators := flag.Int("num-migrators", 1, "How many migrate instances are running. 1 by default") + migratorIdx := flag.Int("migrator-idx", 0, "Which migrate instance is this. should be less than num-migrators. 0 by default") scrub := flag.Bool("scrub", false, "scrub") scrubWorkersPerShard := flag.Int("scrub-workers-per-shard", 10, "") scrubWorkersQueueSize := flag.Int("scrub-workers-queue-size", 50, "") @@ -343,6 +345,9 @@ func main() { blockServices := blockServicesResp.(*msgs.AllBlockServicesResp) blockServicesToMigrate := make(map[string]*[]msgs.BlockServiceId) // by failure domain for _, bs := range blockServices.BlockServices { + if uint64(bs.Id)%uint64(*numMigrators) != uint64(*migratorIdx) { + continue + } if bs.Flags.HasAny(msgs.EGGSFS_BLOCK_SERVICE_DECOMMISSIONED) && bs.HasFiles { bss := blockServicesToMigrate[bs.FailureDomain.String()] if bss == nil { @@ -372,6 +377,8 @@ func main() { time.Sleep(time.Minute) } }() + } else { + } defragStats := &cleanup.DefragStats{}