diff --git a/cpp/core/Msgs.cpp b/cpp/core/Msgs.cpp index a326a01b..7b53f229 100644 --- a/cpp/core/Msgs.cpp +++ b/cpp/core/Msgs.cpp @@ -13,7 +13,7 @@ std::ostream& operator<<(std::ostream& out, ReplicaId replica) { } std::ostream& operator<<(std::ostream& out, ShardReplicaId shrid) { - out << shrid.shardId() << ":" << shrid.replicaId(); + out << std::setw(3) << std::setfill('0') << shrid.shardId() << ":" << shrid.replicaId(); return out; } diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 707b074f..faade80a 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -170,7 +170,7 @@ struct ShardServer : Loop { private: // init data ShardShared& _shared; - ShardId _shid; + ShardReplicaId _shrid; std::array _ipPorts; uint64_t _packetDropRand; uint64_t _incomingPacketDropProbability; // probability * 10,000 @@ -196,10 +196,10 @@ private: MultiplexedChannel<3, std::array{SHARD_REQ_PROTOCOL_VERSION, LOG_REQ_PROTOCOL_VERSION, LOG_RESP_PROTOCOL_VERSION}> _channel; public: - ShardServer(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared) : + ShardServer(Logger& logger, std::shared_ptr& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared) : Loop(logger, xmon, "server"), _shared(shared), - _shid(shid), + _shrid(shrid), _ipPorts(options.ipPorts), _packetDropRand(eggsNow().ns), _incomingPacketDropProbability(0), @@ -263,7 +263,7 @@ private: _shared.ports[i].store(ntohs(serverAddr.sin_port), std::memory_order_release); _shared.socks[i].fd = sock; _shared.socks[i].events = POLL_IN; - LOG_INFO(_env, "Bound shard %s to %s", _shid, serverAddr); + LOG_INFO(_env, "Bound shard %s to %s", _shrid, serverAddr); _recvBuf[i].resize(DEFAULT_UDP_MTU*MAX_RECV_MSGS); _recvHdrs[i].resize(MAX_RECV_MSGS); @@ -705,17 +705,17 @@ public: struct ShardBlockServiceUpdater : PeriodicLoop { private: ShardShared& _shared; - ShardId _shid; + ShardReplicaId _shrid; std::string _shuckleHost; uint16_t _shucklePort; XmonNCAlert _alert; ShardRespContainer _respContainer; std::vector _logEntries; public: - ShardBlockServiceUpdater(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared): + ShardBlockServiceUpdater(Logger& logger, std::shared_ptr& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared): PeriodicLoop(logger, xmon, "bs_updater", {1_sec, 1_mins}), _shared(shared), - _shid(shid), + _shrid(shrid), _shuckleHost(options.shuckleHost), _shucklePort(options.shucklePort) { @@ -728,7 +728,7 @@ public: auto& logEntry = _logEntries.emplace_back(); logEntry.logEntry.time = eggsNow(); auto& blockServicesEntry = logEntry.logEntry.body.setUpdateBlockServices(); - std::string err = fetchBlockServices(_shuckleHost, _shucklePort, 10_sec, _shid, blockServicesEntry); + std::string err = fetchBlockServices(_shuckleHost, _shucklePort, 10_sec, _shrid.shardId(), blockServicesEntry); if (!err.empty()) { _env.updateAlert(_alert, "could not reach shuckle: %s", err); return false; @@ -787,16 +787,16 @@ public: struct ShardStatsInserter : PeriodicLoop { private: ShardShared& _shared; - ShardId _shid; + ShardReplicaId _shrid; std::string _shuckleHost; uint16_t _shucklePort; XmonNCAlert _alert; std::vector _stats; public: - ShardStatsInserter(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared): + ShardStatsInserter(Logger& logger, std::shared_ptr& xmon, ShardReplicaId shrid, const ShardOptions& options, ShardShared& shared): PeriodicLoop(logger, xmon, "stats", {1_mins, 1_hours}), _shared(shared), - _shid(shid), + _shrid(shrid), _shuckleHost(options.shuckleHost), _shucklePort(options.shucklePort), _alert(XmonAppType::DAYTIME) @@ -807,7 +807,7 @@ public: virtual bool periodicStep() override { for (ShardMessageKind kind : allShardMessageKind) { std::ostringstream prefix; - prefix << "shard." << std::setw(3) << std::setfill('0') << _shid << "." << kind; + prefix << "shard." << _shrid << "." << kind; _shared.timings[(int)kind].toStats(prefix.str(), _stats); _shared.errors[(int)kind].toStats(prefix.str(), _stats); } @@ -837,17 +837,17 @@ public: struct ShardMetricsInserter : PeriodicLoop { private: ShardShared& _shared; - ShardId _shid; + ShardReplicaId _shrid; XmonNCAlert _sendMetricsAlert; MetricsBuilder _metricsBuilder; std::unordered_map _rocksDBStats; std::array _sockQueueAlerts; XmonNCAlert _writeQueueAlert; public: - ShardMetricsInserter(Logger& logger, std::shared_ptr& xmon, ShardId shid, ShardShared& shared): + ShardMetricsInserter(Logger& logger, std::shared_ptr& xmon, ShardReplicaId shrid, ShardShared& shared): PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}), _shared(shared), - _shid(shid), + _shrid(shrid), _sendMetricsAlert(XmonAppType::DAYTIME), _sockQueueAlerts({XmonAppType::NEVER, XmonAppType::NEVER}), _writeQueueAlert(XmonAppType::NEVER) @@ -876,7 +876,7 @@ public: uint64_t count = errs.count[i].load(); if (count == 0) { continue; } _metricsBuilder.measurement("eggsfs_shard_requests"); - _metricsBuilder.tag("shard", _shid); + _metricsBuilder.tag("shard", _shrid); _metricsBuilder.tag("kind", kind); _metricsBuilder.tag("write", !readOnlyShardReq(kind)); if (i == 0) { @@ -890,20 +890,20 @@ public: } { _metricsBuilder.measurement("eggsfs_shard_write_queue"); - _metricsBuilder.tag("shard", _shid); + _metricsBuilder.tag("shard", _shrid); _metricsBuilder.fieldFloat("size", _shared.logEntriesQueueSize); _metricsBuilder.timestamp(now); } for (int i = 0; i < _shared.receivedRequests.size(); i++) { _metricsBuilder.measurement("eggsfs_shard_received_requests"); - _metricsBuilder.tag("shard", _shid); + _metricsBuilder.tag("shard", _shrid); _metricsBuilder.tag("socket", i); _metricsBuilder.fieldFloat("count", _shared.receivedRequests[i]); _metricsBuilder.timestamp(now); } { _metricsBuilder.measurement("eggsfs_shard_pulled_write_requests"); - _metricsBuilder.tag("shard", _shid); + _metricsBuilder.tag("shard", _shrid); _metricsBuilder.fieldFloat("count", _shared.pulledWriteRequests); _metricsBuilder.timestamp(now); } @@ -912,7 +912,7 @@ public: _shared.sharedDB.rocksDBMetrics(_rocksDBStats); for (const auto& [name, value]: _rocksDBStats) { _metricsBuilder.measurement("eggsfs_shard_rocksdb"); - _metricsBuilder.tag("shard", _shid); + _metricsBuilder.tag("shard", _shrid); _metricsBuilder.fieldU64(name, value); _metricsBuilder.timestamp(now); } @@ -982,7 +982,7 @@ void runShard(ShardReplicaId shrid, const std::string& dbDir, const ShardOptions XmonConfig config; { std::ostringstream ss; - ss << std::setw(5) << std::setfill('0') << shrid; + ss << shrid; config.appInstance = "eggsshard" + ss.str(); } config.prod = options.xmonProd; @@ -1014,13 +1014,13 @@ void runShard(ShardReplicaId shrid, const std::string& dbDir, const ShardOptions env.clearAlert(dbInitAlert); ShardShared shared(sharedDB, shardDB); - threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid.shardId(), options, shared))); + threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid, options, shared))); threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid, options, shared))); threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid, options, shared))); - threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid.shardId(), options, shared))); - threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid.shardId(), options, shared))); + threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid, options, shared))); + threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid, options, shared))); if (options.metrics) { - threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid.shardId(), shared))); + threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shrid, shared))); } // from this point on termination on SIGINT/SIGTERM will be graceful