logsdb: enable partial leader election

This commit is contained in:
Miroslav Crnic
2024-10-11 09:52:18 +01:00
committed by GitHub Enterprise
parent 8372115b49
commit 48c3aa7d4a
17 changed files with 165 additions and 197 deletions

View File

@@ -395,7 +395,7 @@ public:
return !it->Valid();
}
bool init(bool initialStart, LogIdx forcedLastReleased) {
bool init(bool initialStart) {
bool initSuccess = true;
std::string value;
if (tryGet(_sharedDb.db(), _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), value)) {
@@ -428,11 +428,6 @@ public:
initSuccess = false;
LOG_ERROR(_env, "Last released time not found! Possible DB corruption!");
}
if (forcedLastReleased != 0) {
//ALWAYS_ASSERT(_lastReleased <= forcedLastReleased, "Forcing last released to go backwards is not possible. It would cause data inconsistency");
LOG_INFO(_env, "Forcing last released to %s", forcedLastReleased);
setLastReleased(forcedLastReleased);
}
_stats.currentEpoch.store(_leaderToken.idx().u64, std::memory_order_relaxed);
return initSuccess;
}
@@ -495,6 +490,7 @@ public:
}
void setLastReleased(LogIdx lastReleased) {
ALWAYS_ASSERT(_lastReleased <= lastReleased, "Moving release point backwards is not possible. It would cause data inconsistency");
auto now = eggsNow();
rocksdb::WriteBatch batch;
batch.Put(_cf, logsDBMetadataKey(LAST_RELEASED_IDX_KEY), U64Value::Static(lastReleased.u64).toSlice());
@@ -679,23 +675,26 @@ struct LeaderElectionState {
class LeaderElection {
public:
LeaderElection(Env& env, LogsDBStats& stats, bool forceLeader, bool avoidBeingLeader, ReplicaId replicaId, LogMetadata& metadata, DataPartitions& data, ReqResp& reqResp) :
LeaderElection(Env& env, LogsDBStats& stats, bool noReplication, bool avoidBeingLeader, ReplicaId replicaId, LogMetadata& metadata, DataPartitions& data, ReqResp& reqResp) :
_env(env),
_stats(stats),
_forceLeader(forceLeader),
_noReplication(noReplication),
_avoidBeingLeader(avoidBeingLeader),
_replicaId(replicaId),
_metadata(metadata),
_data(data),
_reqResp(reqResp),
_state(LeadershipState::FOLLOWER),
_leaderLastActive(_forceLeader ? 0 :eggsNow()) {}
_leaderLastActive(_noReplication ? 0 :eggsNow()) {}
bool isLeader() const {
return _state == LeadershipState::LEADER;
}
void maybeStartLeaderElection() {
if (unlikely(_avoidBeingLeader)) {
return;
}
auto now = eggsNow();
if (_state != LeadershipState::FOLLOWER ||
(_leaderLastActive + LogsDB::LEADER_INACTIVE_TIMEOUT > now)) {
@@ -707,17 +706,11 @@ public:
_metadata.setNomineeToken(nomineeToken);
_state = LeadershipState::BECOMING_NOMINEE;
if (unlikely(_avoidBeingLeader)) {
LOG_INFO(_env, "AvoidBeingLeader set, resetting leader election");
resetLeaderElection();
return;
}
_electionState.reset(new LeaderElectionState());
_electionState->lastReleased = _metadata.getLastReleased();
_leaderLastActive = now;
if (unlikely(_forceLeader)) {
if (unlikely(_noReplication)) {
LOG_INFO(_env,"ForceLeader set, skipping to confirming leader phase");
_electionState->requestIds.fill(ReqResp::CONFIRMED_REQ_ID);
_tryBecomeLeader();
@@ -738,13 +731,14 @@ public:
}
void proccessNewLeaderResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderResp& response) {
LOG_DEBUG(_env, "Received NEW_LEADER response %s from replicaId %s", response, fromReplicaId);
ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE, "In state %s Received NEW_LEADER response %s", _state, response);
auto& state = *_electionState;
ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.replicaId);
ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id);
auto result = EggsError(response.result);
switch (result) {
case EggsError::NO_ERROR:
_electionState->requestIds[request.replicaId.u8] = 0;
_electionState->requestIds[request.replicaId.u8] = ReqResp::CONFIRMED_REQ_ID;
_electionState->lastReleased = std::max(_electionState->lastReleased, response.lastReleased);
_reqResp.eraseRequest(request.msg.id);
_tryProgressToDigest();
@@ -760,13 +754,14 @@ public:
void proccessNewLeaderConfirmResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderConfirmResp& response) {
ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_LEADERSHIP, "In state %s Received NEW_LEADER_CONFIRM response %s", _state, response);
ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.replicaId);
ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id);
auto result = EggsError(response.result);
switch (result) {
case EggsError::NO_ERROR:
_electionState->requestIds[request.replicaId.u8] = 0;
_reqResp.eraseRequest(request.msg.id);
LOG_DEBUG(_env,"trying to become leader");
_tryBecomeLeader();
break;
case EggsError::LEADER_PREEMPTED:
@@ -803,6 +798,7 @@ public:
break;
}
case EggsError::LEADER_PREEMPTED:
LOG_DEBUG(_env, "Got preempted during recovery by replica %s",fromReplicaId);
resetLeaderElection();
break;
default:
@@ -948,6 +944,7 @@ private:
void _tryProgressToDigest() {
ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE);
LOG_DEBUG(_env, "trying to progress to digest");
if (!ReqResp::isQuorum(_electionState->requestIds)) {
return;
}
@@ -982,7 +979,7 @@ private:
auto& requestIds = _electionState->recoveryRequests[i];
auto& participatingReplicas = _electionState->requestIds;
for(ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) {
if (replicaId == replicaId) {
if (replicaId == _replicaId) {
requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID;
continue;
}
@@ -1045,7 +1042,11 @@ private:
requestIds[replicaId.u8] = request.msg.id;
}
}
_data.writeLogEntries(entries);
if (entries.empty()) {
_tryProgressToLeaderConfirm();
} else {
_data.writeLogEntries(entries);
}
}
void _tryProgressToLeaderConfirm() {
@@ -1070,10 +1071,11 @@ private:
// It is safe to move last released for us even if we don't become leader
// if we do become leader we guarantee state up here was readable
_metadata.setLastReleased(newLastReleased);
_state = LeadershipState::CONFIRMING_LEADERSHIP;
auto& requestIds = _electionState->requestIds;
for (ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) {
if (replicaId == replicaId) {
if (replicaId == _replicaId) {
requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID;
continue;
}
@@ -1084,6 +1086,7 @@ private:
auto& recoveryConfirm = request.msg.body.setNewLeaderConfirm();
recoveryConfirm.nomineeToken = _metadata.getNomineeToken();
recoveryConfirm.releasedIdx = _metadata.getLastReleased();
requestIds[replicaId.u8] = request.msg.id;
}
}
@@ -1117,7 +1120,7 @@ private:
Env& _env;
LogsDBStats& _stats;
const bool _forceLeader;
const bool _noReplication;
const bool _avoidBeingLeader;
const ReplicaId _replicaId;
LogMetadata& _metadata;
@@ -1378,12 +1381,12 @@ class Appender {
static constexpr size_t IN_FLIGHT_MASK = LogsDB::IN_FLIGHT_APPEND_WINDOW - 1;
static_assert((IN_FLIGHT_MASK & LogsDB::IN_FLIGHT_APPEND_WINDOW) == 0);
public:
Appender(Env& env, LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, LeaderElection& leaderElection, bool dontDoReplication) :
Appender(Env& env, LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, LeaderElection& leaderElection, bool noReplication) :
_env(env),
_reqResp(reqResp),
_metadata(metadata),
_leaderElection(leaderElection),
_dontDoReplication(dontDoReplication),
_noReplication(noReplication),
_currentIsLeader(false),
_entriesStart(0),
_entriesEnd(0) { }
@@ -1407,7 +1410,7 @@ public:
for (; _entriesStart < _entriesEnd; ++_entriesStart) {
auto offset = _entriesStart & IN_FLIGHT_MASK;
auto& requestIds = _requestIds[offset];
if (_dontDoReplication || ReqResp::isQuorum(requestIds)) {
if (_noReplication || ReqResp::isQuorum(requestIds)) {
++newRelease;
entriesToWrite.emplace_back(std::move(_entries[offset]));
ALWAYS_ASSERT(newRelease == entriesToWrite.back().idx);
@@ -1450,7 +1453,7 @@ public:
requestIds[replicaId.u8] = 0;
continue;
}
if (unlikely(_dontDoReplication)) {
if (unlikely(_noReplication)) {
requestIds[replicaId.u8] = 0;
continue;
}
@@ -1535,7 +1538,7 @@ private:
LogMetadata& _metadata;
LeaderElection& _leaderElection;
const bool _dontDoReplication;
const bool _noReplication;
bool _currentIsLeader;
uint64_t _entriesStart;
uint64_t _entriesEnd;
@@ -1555,10 +1558,8 @@ public:
SharedRocksDB& sharedDB,
ReplicaId replicaId,
LogIdx lastRead,
bool dontDoReplication,
bool forceLeader,
bool avoidBeingLeader,
LogIdx forcedLastReleased)
bool noReplication,
bool avoidBeingLeader)
:
_env(logger, xmon, "LogsDB"),
_db(sharedDB.db()),
@@ -1567,10 +1568,10 @@ public:
_partitions(_env,sharedDB),
_metadata(_env,_stats, sharedDB, replicaId, _partitions),
_reqResp(_stats),
_leaderElection(_env, _stats, forceLeader, avoidBeingLeader, replicaId, _metadata, _partitions, _reqResp),
_leaderElection(_env, _stats, noReplication, avoidBeingLeader, replicaId, _metadata, _partitions, _reqResp),
_batchWriter(_env,_reqResp, _leaderElection),
_catchupReader(_stats, _reqResp, _metadata, _partitions, replicaId, lastRead),
_appender(_env, _stats, _reqResp, _metadata, _leaderElection, dontDoReplication)
_appender(_env, _stats, _reqResp, _metadata, _leaderElection, noReplication)
{
LOG_INFO(_env, "Initializing LogsDB");
auto initialStart = _metadata.isInitialStart() && _partitions.isInitialStart();
@@ -1578,7 +1579,7 @@ public:
LOG_INFO(_env, "Initial start of LogsDB");
}
auto initSuccess = _metadata.init(initialStart, forcedLastReleased);
auto initSuccess = _metadata.init(initialStart);
initSuccess = _partitions.init(initialStart) && initSuccess;
auto now = eggsNow();
@@ -1643,6 +1644,7 @@ public:
LOG_ERROR(_env, "Expected response of type %s, got type %s. Response: %s", request->msg.body.kind(), resp.msg.body.kind(), resp);
continue;
}
LOG_DEBUG(_env, "processing %s", resp);
switch(resp.msg.body.kind()) {
case LogMessageKind::RELEASE:
@@ -1780,12 +1782,10 @@ LogsDB::LogsDB(
SharedRocksDB& sharedDB,
ReplicaId replicaId,
LogIdx lastRead,
bool dontDoReplication,
bool forceLeader,
bool avoidBeingLeader,
LogIdx forcedLastReleased)
bool noReplication,
bool avoidBeingLeader)
{
_impl = new LogsDBImpl(logger, xmon, sharedDB, replicaId, lastRead, dontDoReplication, forceLeader, avoidBeingLeader, forcedLastReleased);
_impl = new LogsDBImpl(logger, xmon, sharedDB, replicaId, lastRead, noReplication, avoidBeingLeader);
}
LogsDB::~LogsDB() {
@@ -1838,7 +1838,7 @@ void LogsDB::_getUnreleasedLogEntries(Env& env, SharedRocksDB& sharedDB, LogIdx&
bool initSuccess = data.init(false);
LogsDBStats stats;
LogMetadata metadata(env, stats, sharedDB, 0, data);
initSuccess = initSuccess && metadata.init(false, 0);
initSuccess = initSuccess && metadata.init(false);
ALWAYS_ASSERT(initSuccess, "Failed to init LogsDB, check if you need to run with \"initialStart\" flag!");
lastReleasedOut = metadata.getLastReleased();