From c8cda7e4db2d5f2e8d6cb5fa0d771b38340e01cb Mon Sep 17 00:00:00 2001 From: Miroslav Crnic Date: Mon, 18 Mar 2024 09:44:47 +0000 Subject: [PATCH] logsdb: periodically log status --- cpp/core/LogsDB.cpp | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/cpp/core/LogsDB.cpp b/cpp/core/LogsDB.cpp index 92bb71d9..8b5f1f91 100644 --- a/cpp/core/LogsDB.cpp +++ b/cpp/core/LogsDB.cpp @@ -455,7 +455,7 @@ public: } void setNomineeToken(LeaderToken token) { - if (LeaderToken(_leaderToken.replica(), ++_leaderToken.idx()) < _nomineeToken) { + if (++_leaderToken.idx() < _nomineeToken.idx()) { LOG_INFO(_env, "Got a nominee token for epoch %s, last leader epoch is %s, we must have skipped leader election.", _nomineeToken.idx(), _leaderToken.idx()); _data.dropEntriesAfterIdx(_lastReleased); } @@ -810,7 +810,7 @@ public: auto& response = _reqResp.newResponse(LogMessageKind::NEW_LEADER, fromReplicaId, requestId); auto& newLeaderResponse = response.responseContainer.setNewLeader(); - if (request.nomineeToken < _metadata.getLeaderToken() || request.nomineeToken < _metadata.getNomineeToken()) { + if (request.nomineeToken.idx() <= _metadata.getLeaderToken().idx() || request.nomineeToken < _metadata.getNomineeToken()) { newLeaderResponse.result = (uint16_t)EggsError::LEADER_PREEMPTED; return; } @@ -1264,6 +1264,10 @@ public: _data.writeLogEntry(entry); } + LogIdx lastRead() const { + return _lastRead; + } + private: void _findMissingEntries() { @@ -1541,10 +1545,13 @@ public: ALWAYS_ASSERT(lastRead <= _metadata.getLastReleased()); flush(true); _catchupReader.init(); + + LOG_INFO(_env,"LogsDB opened, leaderToken(%s), lastReleased(%s), lastRead(%s)",_metadata.getLeaderToken(), _metadata.getLastReleased(), _catchupReader.lastRead()); + _infoLoggedTime = eggsNow(); } void close() { - LOG_INFO(_env,"closing LogsDB"); + LOG_INFO(_env,"closing LogsDB, leaderToken(%s), lastReleased(%s), lastRead(%s)",_metadata.getLeaderToken(), _metadata.getLastReleased(), _catchupReader.lastRead()); } LogIdx appendLogEntries(std::vector& entries) { @@ -1645,6 +1652,7 @@ public: _appender.maybeMoveRelease(); _catchupReader.maybeCatchUp(); _reqResp.resendTimedOutRequests(); + _maybeLogStatus(); } void getOutgoingMessages(std::vector& requests, std::vector& responses) { @@ -1669,6 +1677,15 @@ public: } private: + + void _maybeLogStatus() { + auto now = eggsNow(); + if (now - _infoLoggedTime > 1_mins) { + LOG_INFO(_env,"LogsDB status: leaderToken(%s), lastReleased(%s), lastRead(%s)",_metadata.getLeaderToken(), _metadata.getLastReleased(), _catchupReader.lastRead()); + _infoLoggedTime = now; + } + } + Env _env; rocksdb::DB* _db; const ReplicaId _replicaId; @@ -1679,6 +1696,7 @@ private: BatchWriter _batchWriter; CatchupReader _catchupReader; Appender _appender; + EggsTime _infoLoggedTime; }; LogsDB::LogsDB(