diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index 4ab4db8e..82053ade 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -147,6 +147,12 @@ public: const std::map& _pq; std::map::const_iterator _it; }; + + void clear() { + _reqs.clear(); + _pq.clear(); + } + size_t size() const { return _reqs.size(); } @@ -242,7 +248,7 @@ private: // The _shard_ request we're currently waiting for, if any. InFlightShardRequests _inFlightShardReqs; - const bool _dontDoReplication; + const bool _noReplication; LogsDB& _logsDB; std::vector _logsDBRequests; std::vector _logsDBResponses; @@ -263,15 +269,10 @@ public: _shardTimeout(options.shardTimeout), _receiver({.perSockMaxRecvMsg = MAX_MSG_RECEIVE, .maxMsgSize = MAX_UDP_MTU}), _cdcSender({.maxMsgSize = MAX_UDP_MTU}), - _dontDoReplication(options.dontDoReplication), + _noReplication(options.noReplication), _logsDB(shared.logsDB) { expandKey(CDCKey, _expandedCDCKey); - - if (options.forceLeader) { - // In case of force leader it immediately detects and promotes us to leader - _logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses); - } _shared.isLeader.store(_logsDB.isLeader(), std::memory_order_relaxed); _logsDBLogIndex = _logsDB.getLastReleased(); LOG_INFO(_env, "Waiting for shard info to be filled in"); @@ -284,17 +285,6 @@ public: return; } _seenShards = true; - if (_shared.isLeader.load(std::memory_order_relaxed)) { - // If we've got dangling transactions, immediately start processing it - auto bootstrap = CDCLogEntry::prepareBootstrapEntry(); - auto& entry = entries.emplace_back(); - entry.value.resize(bootstrap.packedSize()); - BincodeBuf bbuf((char*) entry.value.data(), entry.value.size()); - bootstrap.pack(bbuf); - ALWAYS_ASSERT(bbuf.len() == entry.value.size()); - bootstrap.logIdx(++_logsDBLogIndex.u64); - _inFlightLogEntries[bootstrap.logIdx()] = std::move(bootstrap); - } } // clear internal buffers @@ -365,6 +355,19 @@ public: } } + if (!_shared.isLeader.load(std::memory_order_relaxed) && _logsDB.isLeader()) { + // If we've got dangling transactions, immediately start processing it + auto bootstrap = CDCLogEntry::prepareBootstrapEntry(); + auto& entry = entries.emplace_back(); + entry.value.resize(bootstrap.packedSize()); + BincodeBuf bbuf((char*) entry.value.data(), entry.value.size()); + bootstrap.pack(bbuf); + ALWAYS_ASSERT(bbuf.len() == entry.value.size()); + bootstrap.logIdx(++_logsDBLogIndex.u64); + _inFlightLogEntries[bootstrap.logIdx()] = std::move(bootstrap); + } + _shared.isLeader.store(_logsDB.isLeader(), std::memory_order_relaxed); + if (_logsDB.isLeader()) { auto err = _logsDB.appendEntries(entries); ALWAYS_ASSERT(err == EggsError::NO_ERROR); @@ -390,6 +393,13 @@ public: } } entries.clear(); + } else { + _inFlightEntries.clear(); + _inFlightLogEntries.clear(); + _logEntryIdxToReqInfos.clear(); + _inFlightCDCReqs.clear(); + _inFlightShardReqs.clear(); + _inFlightTxns.clear(); } // Log if not active is not chaty but it's messages are higher priority as they make us progress state under high load. // We want to have priority when sending out @@ -402,7 +412,7 @@ public: for (auto request : _logsDBOutRequests) { _packLogsDBRequest(*request); } - if (_dontDoReplication) { + if (_noReplication) { _logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses); } _logsDB.readEntries(entries); @@ -663,6 +673,11 @@ private: continue; } + if (unlikely(_shared.isLeader.load(std::memory_order_relaxed) == false)) { + LOG_DEBUG(_env, "dropping request since we're not the leader %s", cdcMsg); + continue; + } + auto& cdcReq = _cdcReqs.emplace_back(std::move(cdcMsg.body)); LOG_DEBUG(_env, "CDC request %s successfully parsed, will process soon", cdcReq.kind()); @@ -688,6 +703,10 @@ private: continue; } + if (unlikely(_shared.isLeader.load(std::memory_order_relaxed) == false)) { + LOG_DEBUG(_env, "dropping response since we're not the leader %s", respMsg); + continue; + } LOG_DEBUG(_env, "received response %s", respMsg); auto shardResp = _prepareCDCShardResp(respMsg.id); @@ -863,7 +882,8 @@ struct CDCRegisterer : PeriodicLoop { CDCShared& _shared; const ReplicaId _replicaId; const uint8_t _location; - const bool _dontDoReplication; + const bool _noReplication; + const bool _avoidBeingLeader; const std::string _shuckleHost; const uint16_t _shucklePort; XmonNCAlert _alert; @@ -873,7 +893,8 @@ public: _shared(shared), _replicaId(options.replicaId), _location(options.location), - _dontDoReplication(options.dontDoReplication), + _noReplication(options.noReplication), + _avoidBeingLeader(options.avoidBeingLeader), _shuckleHost(options.shuckleHost), _shucklePort(options.shucklePort), _alert(10_sec) @@ -884,7 +905,8 @@ public: virtual bool periodicStep() override { LOG_DEBUG(_env, "Registering ourselves (CDC %s, location %s, %s) with shuckle", _replicaId, (int)_location, _shared.socks[CDC_SOCK].addr()); { - const auto [err, errStr] = registerCDCReplica(_shuckleHost, _shucklePort, 10_sec, _replicaId, _location, _shared.isLeader.load(std::memory_order_relaxed), _shared.socks[CDC_SOCK].addr()); + // TODO: report _shared.isleader instead of command line flag once leader election is enabled + const auto [err, errStr] = registerCDCReplica(_shuckleHost, _shucklePort, 10_sec, _replicaId, _location, !_avoidBeingLeader, _shared.socks[CDC_SOCK].addr()); if (err == EINTR) { return false; } if (err) { _env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", errStr); @@ -913,7 +935,7 @@ public: ++emptyReplicas; } } - if (!_dontDoReplication && emptyReplicas > 0 ) { + if (!_noReplication && emptyReplicas > 0 ) { _env.updateAlert(_alert, "Didn't get enough replicas with known addresses from shuckle"); return false; } @@ -1154,9 +1176,8 @@ void runCDC(CDCOptions& options) { LOG_INFO(env, " cdcAddrs = %s", options.cdcAddrs); LOG_INFO(env, " syslog = %s", (int)options.syslog); LOG_INFO(env, "Using LogsDB with options:"); - LOG_INFO(env, " dontDoReplication = '%s'", (int)options.dontDoReplication); - LOG_INFO(env, " forceLeader = '%s'", (int)options.forceLeader); - LOG_INFO(env, " forcedLastReleased = '%s'", options.forcedLastReleased); + LOG_INFO(env, " avoidBeingLeader = '%s'", (int)options.avoidBeingLeader); + LOG_INFO(env, " noReplication = '%s'", (int)options.noReplication); std::vector> threads; @@ -1188,7 +1209,7 @@ void runCDC(CDCOptions& options) { sharedDb.openTransactionDB(dbOptions); CDCDB db(logger, xmon, sharedDb); - LogsDB logsDB(logger, xmon, sharedDb, options.replicaId, db.lastAppliedLogEntry(), options.dontDoReplication, options.forceLeader, !options.forceLeader, db.lastAppliedLogEntry()); + LogsDB logsDB(logger, xmon, sharedDb, options.replicaId, db.lastAppliedLogEntry(), options.noReplication, options.avoidBeingLeader); CDCShared shared( sharedDb, db, logsDB, std::array({UDPSocketPair(env, options.cdcAddrs), UDPSocketPair(env, options.cdcToShardAddress)}) diff --git a/cpp/cdc/CDC.hpp b/cpp/cdc/CDC.hpp index 0d11f72d..63a02fdd 100644 --- a/cpp/cdc/CDC.hpp +++ b/cpp/cdc/CDC.hpp @@ -28,9 +28,8 @@ struct CDCOptions { // LogsDB settings - bool dontDoReplication = false; - bool forceLeader = false; - LogIdx forcedLastReleased = 0; + bool avoidBeingLeader = true; + bool noReplication = false; }; void runCDC(CDCOptions& options); diff --git a/cpp/cdc/eggscdc.cpp b/cpp/cdc/eggscdc.cpp index 6343e51a..83107b57 100644 --- a/cpp/cdc/eggscdc.cpp +++ b/cpp/cdc/eggscdc.cpp @@ -29,10 +29,11 @@ void usage(const char* binary) { fprintf(stderr, " Enable Xmon alerts.\n"); fprintf(stderr, " -metrics\n"); fprintf(stderr, " Enable metrics.\n"); - fprintf(stderr, " -use-logsdb LEADER|LEADER_NO_FOLLOWERS|FOLLOWER\n"); - fprintf(stderr, " Specify in which mode to use LogsDB, as LEADER|LEADER_NO_FOLLOWERS|FOLLOWER. Default is FOLLOWER.\n"); - fprintf(stderr, " -force-last-released LogIdx\n"); - fprintf(stderr, " Force forward last released. Used for manual leader election. Can not be combined with starting in any LEADER mode\n"); + fprintf(stderr, " -logsdb-leader\n"); + fprintf(stderr, " Allow replica to become leader. Default is false\n"); + fprintf(stderr, " -logsdb-no-replication\n"); + fprintf(stderr, " Don't wait for acks from other replicas when becoming leader or replicating.\n"); + fprintf(stderr, " Can only be set if -logsdb-leader is also set. Default is false\n"); fprintf(stderr, " -app-name-suffix suffix\n"); fprintf(stderr, " Suffix to use in app name, app name format is 'eggscdc{suffix}_shardId_replicaId'\n"); } @@ -94,15 +95,6 @@ static uint8_t parseReplicaId(const std::string& arg) { return replicaId; } -static LogIdx parseLogIdx(const std::string& arg) { - size_t idx; - uint64_t x = std::stoull(arg, &idx); - if (idx != arg.size()) { - die("Runoff character in LogIdx %s", arg.c_str()); - } - return x; -} - int main(int argc, char** argv) { namespace fs = std::filesystem; @@ -181,20 +173,10 @@ int main(int argc, char** argv) { } } else if (arg == "-metrics") { options.metrics = true; - } else if (arg == "-use-logsdb") { - std::string logsDBMode = getNextArg(); - if (logsDBMode == "LEADER") { - options.forceLeader = true; - } else if (logsDBMode == "LEADER_NO_FOLLOWERS") { - options.forceLeader = true; - options.dontDoReplication = true; - } else if (logsDBMode == "FOLLOWER") { - } else { - fprintf(stderr, "Invalid logsDB mode %s", logsDBMode.c_str()); - dieWithUsage(); - } - } else if (arg == "-force-last-released") { - options.forcedLastReleased = parseLogIdx(getNextArg()); + } else if (arg == "-logsdb-leader") { + options.avoidBeingLeader = false; + } else if (arg == "-logsdb-no-replication") { + options.noReplication = true; } else if (arg == "-app-name-suffix") { options.appNameSuffix = getNextArg(); } else { @@ -210,9 +192,9 @@ int main(int argc, char** argv) { args.emplace_back("0"); } - if (options.forceLeader && options.forcedLastReleased != 0) { - fprintf(stderr, "You can not forward release point on a LEADER replica."); - dieWithUsage(); + if (options.noReplication && options.avoidBeingLeader) { + fprintf(stderr, "-logsdb-leader needs to be set if -logsdb-no-replication is set\n"); + dieWithUsage(); } #ifndef EGGS_DEBUG diff --git a/cpp/core/LogsDB.cpp b/cpp/core/LogsDB.cpp index 7e96d676..d7be6df0 100644 --- a/cpp/core/LogsDB.cpp +++ b/cpp/core/LogsDB.cpp @@ -395,7 +395,7 @@ public: return !it->Valid(); } - bool init(bool initialStart, LogIdx forcedLastReleased) { + bool init(bool initialStart) { bool initSuccess = true; std::string value; if (tryGet(_sharedDb.db(), _cf, logsDBMetadataKey(LEADER_TOKEN_KEY), value)) { @@ -428,11 +428,6 @@ public: initSuccess = false; LOG_ERROR(_env, "Last released time not found! Possible DB corruption!"); } - if (forcedLastReleased != 0) { - //ALWAYS_ASSERT(_lastReleased <= forcedLastReleased, "Forcing last released to go backwards is not possible. It would cause data inconsistency"); - LOG_INFO(_env, "Forcing last released to %s", forcedLastReleased); - setLastReleased(forcedLastReleased); - } _stats.currentEpoch.store(_leaderToken.idx().u64, std::memory_order_relaxed); return initSuccess; } @@ -495,6 +490,7 @@ public: } void setLastReleased(LogIdx lastReleased) { + ALWAYS_ASSERT(_lastReleased <= lastReleased, "Moving release point backwards is not possible. It would cause data inconsistency"); auto now = eggsNow(); rocksdb::WriteBatch batch; batch.Put(_cf, logsDBMetadataKey(LAST_RELEASED_IDX_KEY), U64Value::Static(lastReleased.u64).toSlice()); @@ -679,23 +675,26 @@ struct LeaderElectionState { class LeaderElection { public: - LeaderElection(Env& env, LogsDBStats& stats, bool forceLeader, bool avoidBeingLeader, ReplicaId replicaId, LogMetadata& metadata, DataPartitions& data, ReqResp& reqResp) : + LeaderElection(Env& env, LogsDBStats& stats, bool noReplication, bool avoidBeingLeader, ReplicaId replicaId, LogMetadata& metadata, DataPartitions& data, ReqResp& reqResp) : _env(env), _stats(stats), - _forceLeader(forceLeader), + _noReplication(noReplication), _avoidBeingLeader(avoidBeingLeader), _replicaId(replicaId), _metadata(metadata), _data(data), _reqResp(reqResp), _state(LeadershipState::FOLLOWER), - _leaderLastActive(_forceLeader ? 0 :eggsNow()) {} + _leaderLastActive(_noReplication ? 0 :eggsNow()) {} bool isLeader() const { return _state == LeadershipState::LEADER; } void maybeStartLeaderElection() { + if (unlikely(_avoidBeingLeader)) { + return; + } auto now = eggsNow(); if (_state != LeadershipState::FOLLOWER || (_leaderLastActive + LogsDB::LEADER_INACTIVE_TIMEOUT > now)) { @@ -707,17 +706,11 @@ public: _metadata.setNomineeToken(nomineeToken); _state = LeadershipState::BECOMING_NOMINEE; - if (unlikely(_avoidBeingLeader)) { - LOG_INFO(_env, "AvoidBeingLeader set, resetting leader election"); - resetLeaderElection(); - return; - } - _electionState.reset(new LeaderElectionState()); _electionState->lastReleased = _metadata.getLastReleased(); _leaderLastActive = now; - if (unlikely(_forceLeader)) { + if (unlikely(_noReplication)) { LOG_INFO(_env,"ForceLeader set, skipping to confirming leader phase"); _electionState->requestIds.fill(ReqResp::CONFIRMED_REQ_ID); _tryBecomeLeader(); @@ -738,13 +731,14 @@ public: } void proccessNewLeaderResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderResp& response) { + LOG_DEBUG(_env, "Received NEW_LEADER response %s from replicaId %s", response, fromReplicaId); ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE, "In state %s Received NEW_LEADER response %s", _state, response); auto& state = *_electionState; - ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.replicaId); + ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id); auto result = EggsError(response.result); switch (result) { case EggsError::NO_ERROR: - _electionState->requestIds[request.replicaId.u8] = 0; + _electionState->requestIds[request.replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; _electionState->lastReleased = std::max(_electionState->lastReleased, response.lastReleased); _reqResp.eraseRequest(request.msg.id); _tryProgressToDigest(); @@ -760,13 +754,14 @@ public: void proccessNewLeaderConfirmResponse(ReplicaId fromReplicaId, LogsDBRequest& request, const NewLeaderConfirmResp& response) { ALWAYS_ASSERT(_state == LeadershipState::CONFIRMING_LEADERSHIP, "In state %s Received NEW_LEADER_CONFIRM response %s", _state, response); - ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.replicaId); + ALWAYS_ASSERT(_electionState->requestIds[fromReplicaId.u8] == request.msg.id); auto result = EggsError(response.result); switch (result) { case EggsError::NO_ERROR: _electionState->requestIds[request.replicaId.u8] = 0; _reqResp.eraseRequest(request.msg.id); + LOG_DEBUG(_env,"trying to become leader"); _tryBecomeLeader(); break; case EggsError::LEADER_PREEMPTED: @@ -803,6 +798,7 @@ public: break; } case EggsError::LEADER_PREEMPTED: + LOG_DEBUG(_env, "Got preempted during recovery by replica %s",fromReplicaId); resetLeaderElection(); break; default: @@ -948,6 +944,7 @@ private: void _tryProgressToDigest() { ALWAYS_ASSERT(_state == LeadershipState::BECOMING_NOMINEE); + LOG_DEBUG(_env, "trying to progress to digest"); if (!ReqResp::isQuorum(_electionState->requestIds)) { return; } @@ -982,7 +979,7 @@ private: auto& requestIds = _electionState->recoveryRequests[i]; auto& participatingReplicas = _electionState->requestIds; for(ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) { - if (replicaId == replicaId) { + if (replicaId == _replicaId) { requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; continue; } @@ -1045,7 +1042,11 @@ private: requestIds[replicaId.u8] = request.msg.id; } } - _data.writeLogEntries(entries); + if (entries.empty()) { + _tryProgressToLeaderConfirm(); + } else { + _data.writeLogEntries(entries); + } } void _tryProgressToLeaderConfirm() { @@ -1070,10 +1071,11 @@ private: // It is safe to move last released for us even if we don't become leader // if we do become leader we guarantee state up here was readable _metadata.setLastReleased(newLastReleased); + _state = LeadershipState::CONFIRMING_LEADERSHIP; auto& requestIds = _electionState->requestIds; for (ReplicaId replicaId = 0; replicaId.u8 < LogsDB::REPLICA_COUNT; ++replicaId.u8) { - if (replicaId == replicaId) { + if (replicaId == _replicaId) { requestIds[replicaId.u8] = ReqResp::CONFIRMED_REQ_ID; continue; } @@ -1084,6 +1086,7 @@ private: auto& recoveryConfirm = request.msg.body.setNewLeaderConfirm(); recoveryConfirm.nomineeToken = _metadata.getNomineeToken(); recoveryConfirm.releasedIdx = _metadata.getLastReleased(); + requestIds[replicaId.u8] = request.msg.id; } } @@ -1117,7 +1120,7 @@ private: Env& _env; LogsDBStats& _stats; - const bool _forceLeader; + const bool _noReplication; const bool _avoidBeingLeader; const ReplicaId _replicaId; LogMetadata& _metadata; @@ -1378,12 +1381,12 @@ class Appender { static constexpr size_t IN_FLIGHT_MASK = LogsDB::IN_FLIGHT_APPEND_WINDOW - 1; static_assert((IN_FLIGHT_MASK & LogsDB::IN_FLIGHT_APPEND_WINDOW) == 0); public: - Appender(Env& env, LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, LeaderElection& leaderElection, bool dontDoReplication) : + Appender(Env& env, LogsDBStats& stats, ReqResp& reqResp, LogMetadata& metadata, LeaderElection& leaderElection, bool noReplication) : _env(env), _reqResp(reqResp), _metadata(metadata), _leaderElection(leaderElection), - _dontDoReplication(dontDoReplication), + _noReplication(noReplication), _currentIsLeader(false), _entriesStart(0), _entriesEnd(0) { } @@ -1407,7 +1410,7 @@ public: for (; _entriesStart < _entriesEnd; ++_entriesStart) { auto offset = _entriesStart & IN_FLIGHT_MASK; auto& requestIds = _requestIds[offset]; - if (_dontDoReplication || ReqResp::isQuorum(requestIds)) { + if (_noReplication || ReqResp::isQuorum(requestIds)) { ++newRelease; entriesToWrite.emplace_back(std::move(_entries[offset])); ALWAYS_ASSERT(newRelease == entriesToWrite.back().idx); @@ -1450,7 +1453,7 @@ public: requestIds[replicaId.u8] = 0; continue; } - if (unlikely(_dontDoReplication)) { + if (unlikely(_noReplication)) { requestIds[replicaId.u8] = 0; continue; } @@ -1535,7 +1538,7 @@ private: LogMetadata& _metadata; LeaderElection& _leaderElection; - const bool _dontDoReplication; + const bool _noReplication; bool _currentIsLeader; uint64_t _entriesStart; uint64_t _entriesEnd; @@ -1555,10 +1558,8 @@ public: SharedRocksDB& sharedDB, ReplicaId replicaId, LogIdx lastRead, - bool dontDoReplication, - bool forceLeader, - bool avoidBeingLeader, - LogIdx forcedLastReleased) + bool noReplication, + bool avoidBeingLeader) : _env(logger, xmon, "LogsDB"), _db(sharedDB.db()), @@ -1567,10 +1568,10 @@ public: _partitions(_env,sharedDB), _metadata(_env,_stats, sharedDB, replicaId, _partitions), _reqResp(_stats), - _leaderElection(_env, _stats, forceLeader, avoidBeingLeader, replicaId, _metadata, _partitions, _reqResp), + _leaderElection(_env, _stats, noReplication, avoidBeingLeader, replicaId, _metadata, _partitions, _reqResp), _batchWriter(_env,_reqResp, _leaderElection), _catchupReader(_stats, _reqResp, _metadata, _partitions, replicaId, lastRead), - _appender(_env, _stats, _reqResp, _metadata, _leaderElection, dontDoReplication) + _appender(_env, _stats, _reqResp, _metadata, _leaderElection, noReplication) { LOG_INFO(_env, "Initializing LogsDB"); auto initialStart = _metadata.isInitialStart() && _partitions.isInitialStart(); @@ -1578,7 +1579,7 @@ public: LOG_INFO(_env, "Initial start of LogsDB"); } - auto initSuccess = _metadata.init(initialStart, forcedLastReleased); + auto initSuccess = _metadata.init(initialStart); initSuccess = _partitions.init(initialStart) && initSuccess; auto now = eggsNow(); @@ -1643,6 +1644,7 @@ public: LOG_ERROR(_env, "Expected response of type %s, got type %s. Response: %s", request->msg.body.kind(), resp.msg.body.kind(), resp); continue; } + LOG_DEBUG(_env, "processing %s", resp); switch(resp.msg.body.kind()) { case LogMessageKind::RELEASE: @@ -1780,12 +1782,10 @@ LogsDB::LogsDB( SharedRocksDB& sharedDB, ReplicaId replicaId, LogIdx lastRead, - bool dontDoReplication, - bool forceLeader, - bool avoidBeingLeader, - LogIdx forcedLastReleased) + bool noReplication, + bool avoidBeingLeader) { - _impl = new LogsDBImpl(logger, xmon, sharedDB, replicaId, lastRead, dontDoReplication, forceLeader, avoidBeingLeader, forcedLastReleased); + _impl = new LogsDBImpl(logger, xmon, sharedDB, replicaId, lastRead, noReplication, avoidBeingLeader); } LogsDB::~LogsDB() { @@ -1838,7 +1838,7 @@ void LogsDB::_getUnreleasedLogEntries(Env& env, SharedRocksDB& sharedDB, LogIdx& bool initSuccess = data.init(false); LogsDBStats stats; LogMetadata metadata(env, stats, sharedDB, 0, data); - initSuccess = initSuccess && metadata.init(false, 0); + initSuccess = initSuccess && metadata.init(false); ALWAYS_ASSERT(initSuccess, "Failed to init LogsDB, check if you need to run with \"initialStart\" flag!"); lastReleasedOut = metadata.getLastReleased(); diff --git a/cpp/core/LogsDB.hpp b/cpp/core/LogsDB.hpp index 7ccf6d53..35e5f281 100644 --- a/cpp/core/LogsDB.hpp +++ b/cpp/core/LogsDB.hpp @@ -108,10 +108,8 @@ public: SharedRocksDB& sharedDB, ReplicaId replicaId, LogIdx lastRead, - bool dontDoReplication, - bool forceLeader, - bool avoidBeingLeader, - LogIdx forcedLastReleased); + bool noReplication, + bool avoidBeingLeader); ~LogsDB(); diff --git a/cpp/core/Msgs.hpp b/cpp/core/Msgs.hpp index e7ba19d8..f44f817e 100644 --- a/cpp/core/Msgs.hpp +++ b/cpp/core/Msgs.hpp @@ -126,7 +126,7 @@ struct ReplicaId { constexpr ReplicaId(): u8(0) {} constexpr ReplicaId(uint8_t id): u8(id) { - ALWAYS_ASSERT(valid()); + ALWAYS_ASSERT(valid(), "Invalid replica id %s", int(u8)); } bool operator==(ReplicaId rhs) const { diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 650e6f3d..80b33018 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -595,7 +595,7 @@ private: SnapshotRequest _snapshotRequest; LogsDB& _logsDB; - const bool _dontDoReplication; + const bool _noReplication; std::vector _logsDBRequests; std::vector _logsDBResponses; std::vector _logsDBOutRequests; @@ -620,7 +620,7 @@ public: _maxWritesAtOnce(LogsDB::IN_FLIGHT_APPEND_WINDOW * 10), _snapshotRequest({0,0,0,{},0}), _logsDB(shared.logsDB), - _dontDoReplication(_shared.options.dontDoReplication), + _noReplication(_shared.options.noReplication), _sender(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}), _packetDropRand(eggsNow().ns), _outgoingPacketDropProbability(0) @@ -637,9 +637,6 @@ public: }; convertProb("outgoing", _shared.options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); _requests.reserve(_maxWritesAtOnce); - - // In case of force leader it immediately detects and promotes us to leader - _logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses); _shared.isLeader.store(_logsDB.isLeader(), std::memory_order_relaxed); } @@ -684,7 +681,7 @@ public: for (size_t i = 0; i < _logEntries.size(); ++i) { if (logsDBEntries[i].idx == 0) { // if we don't wait for replication the window gets cleared immediately - ALWAYS_ASSERT(!_dontDoReplication); + ALWAYS_ASSERT(!_noReplication); droppedDueToInFlightWindow = true; LOG_INFO(_env, "Appended %s out of %s shard write requests. Log in flight windows is full. Dropping other entries", i, _logEntries.size()); break; @@ -694,7 +691,7 @@ public: } logsDBEntries.clear(); - if (_dontDoReplication) { + if (_noReplication) { // usually the state machine is moved by responses if we don't expect any we move it manually _logsDBRequests.clear(); _logsDBResponses.clear(); @@ -716,7 +713,7 @@ public: _logsDB.readEntries(logsDBEntries); _outgoingLogEntries.reserve(logsDBEntries.size()); - if (_dontDoReplication && _logsDB.isLeader()) { + if (_noReplication && _logsDB.isLeader()) { ALWAYS_ASSERT(_inFlightEntries.size() == logsDBEntries.size()); } @@ -730,7 +727,7 @@ public: ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx); auto it = _inFlightEntries.find(shardEntry.idx.u64); - if (_dontDoReplication && _logsDB.isLeader()) { + if (_noReplication && _logsDB.isLeader()) { ALWAYS_ASSERT(it != _inFlightEntries.end()); ALWAYS_ASSERT(shardEntry == it->second.logEntry); } @@ -896,7 +893,7 @@ private: ShardShared& _shared; const ShardReplicaId _shrid; const uint8_t _location; - const bool _dontDoReplication; + const bool _noReplication; const std::string _shuckleHost; const uint16_t _shucklePort; XmonNCAlert _alert; @@ -906,7 +903,7 @@ public: _shared(shared), _shrid(_shared.options.shrid), _location(_shared.options.location), - _dontDoReplication(_shared.options.dontDoReplication), + _noReplication(_shared.options.noReplication), _shuckleHost(_shared.options.shuckleHost), _shucklePort(_shared.options.shucklePort) {} @@ -920,7 +917,8 @@ public: virtual bool periodicStep() { { LOG_INFO(_env, "Registering ourselves (shard %s, location %s, %s) with shuckle", _shrid, (int)_location, _shared.sock().addr()); - const auto [err, errStr] = registerShard(_shuckleHost, _shucklePort, 10_sec, _shrid, _location, _shared.isLeader.load(std::memory_order_relaxed), _shared.sock().addr()); + // ToDO: once leader election is fully enabled report or leader status instead of value of flag passed on startup + const auto [err, errStr] = registerShard(_shuckleHost, _shucklePort, 10_sec, _shrid, _location, !_shared.options.avoidBeingLeader, _shared.sock().addr()); if (err == EINTR) { return false; } if (err) { _env.updateAlert(_alert, "Couldn't register ourselves with shuckle: %s", errStr); @@ -948,7 +946,7 @@ public: ++emptyReplicas; } } - if (emptyReplicas > 0 && !_dontDoReplication) { + if (emptyReplicas > 0 && !_noReplication) { _env.updateAlert(_alert, "Didn't get enough replicas with known addresses from shuckle"); return false; } @@ -1251,9 +1249,8 @@ void runShard(ShardOptions& options) { LOG_INFO(env, " simulateOutgoingPacketDrop = %s", options.simulateOutgoingPacketDrop); LOG_INFO(env, " syslog = %s", (int)options.syslog); LOG_INFO(env, "Using LogsDB with options:"); - LOG_INFO(env, " dontDoReplication = '%s'", (int)options.dontDoReplication); - LOG_INFO(env, " forceLeader = '%s'", (int)options.forceLeader); - LOG_INFO(env, " forcedLastReleased = '%s'", options.forcedLastReleased); + LOG_INFO(env, " noReplication = '%s'", (int)options.noReplication); + LOG_INFO(env, " avoidBeingLeader = '%s'", (int)options.avoidBeingLeader); } // Immediately start xmon: we want the database initializing update to @@ -1296,13 +1293,9 @@ void runShard(ShardOptions& options) { BlockServicesCacheDB blockServicesCache(logger, xmon, sharedDB); ShardDB shardDB(logger, xmon, options.shrid.shardId(), options.transientDeadlineInterval, sharedDB, blockServicesCache); - LogsDB logsDB(logger, xmon, sharedDB, options.shrid.replicaId(), shardDB.lastAppliedLogEntry(), options.dontDoReplication, options.forceLeader, !options.forceLeader, options.forcedLastReleased); + LogsDB logsDB(logger, xmon, sharedDB, options.shrid.replicaId(), shardDB.lastAppliedLogEntry(), options.noReplication, options.avoidBeingLeader); env.clearAlert(dbInitAlert); - if (options.dontDoReplication) { - options.forcedLastReleased.u64 = shardDB.lastAppliedLogEntry(); - } - ShardShared shared(options, sharedDB, blockServicesCache, shardDB, logsDB, UDPSocketPair(env, options.shardAddrs)); threads.emplace_back(LoopThread::Spawn(std::make_unique(logger, xmon, shared))); diff --git a/cpp/shard/Shard.hpp b/cpp/shard/Shard.hpp index 504a5847..22a9c25c 100644 --- a/cpp/shard/Shard.hpp +++ b/cpp/shard/Shard.hpp @@ -29,9 +29,8 @@ struct ShardOptions { Duration transientDeadlineInterval = DEFAULT_DEADLINE_INTERVAL; // LogsDB settings - bool dontDoReplication = false; - bool forceLeader = false; - LogIdx forcedLastReleased = 0; + bool avoidBeingLeader = true; + bool noReplication = false; }; void runShard(ShardOptions& options); diff --git a/cpp/shard/eggsshard.cpp b/cpp/shard/eggsshard.cpp index 10e80480..3a5c436e 100644 --- a/cpp/shard/eggsshard.cpp +++ b/cpp/shard/eggsshard.cpp @@ -38,10 +38,11 @@ static void usage(const char* binary) { fprintf(stderr, " Enable metrics.\n"); fprintf(stderr, " -transient-deadline-interval\n"); fprintf(stderr, " Tweaks the interval with wich the deadline for transient file gets bumped.\n"); - fprintf(stderr, " -use-logsdb LEADER|LEADER_NO_FOLLOWERS|FOLLOWER\n"); - fprintf(stderr, " Specify in which mode to use LogsDB, as LEADER|LEADER_NO_FOLLOWERS|FOLLOWER. Default is FOLLOWER.\n"); - fprintf(stderr, " -force-last-released LogIdx\n"); - fprintf(stderr, " Force forward last released. Used for manual leader election. Can not be combined with starting in any LEADER mode\n"); + fprintf(stderr, " -logsdb-leader\n"); + fprintf(stderr, " Allow replica to become leader. Default is false\n"); + fprintf(stderr, " -logsdb-no-replication\n"); + fprintf(stderr, " Don't wait for acks from other replicas when becoming leader or replicating.\n"); + fprintf(stderr, " Can only be set if -logsdb-leader is also set. Default is false\n"); fprintf(stderr, " -app-name-suffix suffix\n"); fprintf(stderr, " Suffix to use in app name, app name format is 'eggsshard{suffix}_shardId_replicaId'\n"); } @@ -124,15 +125,6 @@ static Duration parseDuration(const std::string& arg) { return x * durationUnitMap.at(arg.substr(idx)); } -static LogIdx parseLogIdx(const std::string& arg) { - size_t idx; - uint64_t x = std::stoull(arg, &idx); - if (idx != arg.size()) { - die("Runoff character in LogIdx %s", arg.c_str()); - } - return x; -} - static uint8_t parseReplicaId(const std::string& arg) { size_t idx; unsigned long replicaId = std::stoul(arg, &idx); @@ -215,20 +207,10 @@ int main(int argc, char** argv) { options.metrics = true; } else if (arg == "-transient-deadline-interval") { options.transientDeadlineInterval = parseDuration(getNextArg()); - } else if (arg == "-use-logsdb") { - std::string logsDBMode = getNextArg(); - if (logsDBMode == "LEADER") { - options.forceLeader = true; - } else if (logsDBMode == "LEADER_NO_FOLLOWERS") { - options.forceLeader = true; - options.dontDoReplication = true; - } else if (logsDBMode == "FOLLOWER") { - } else { - fprintf(stderr, "Invalid logsDB mode %s", logsDBMode.c_str()); - dieWithUsage(); - } - } else if (arg == "-force-last-released") { - options.forcedLastReleased = parseLogIdx(getNextArg()); + } else if (arg == "-logsdb-leader") { + options.avoidBeingLeader = false; + } else if (arg == "-logsdb-no-replication") { + options.noReplication = true; } else if (arg == "-app-name-suffix") { options.appNameSuffix = getNextArg(); } else { @@ -236,9 +218,9 @@ int main(int argc, char** argv) { } } - if (options.forceLeader && options.forcedLastReleased != 0) { - fprintf(stderr, "You can not forward release point on a LEADER replica."); - dieWithUsage(); + if (options.noReplication && options.avoidBeingLeader) { + fprintf(stderr, "-logsdb-leader needs to be set if -logsdb-no-replication is set\n"); + dieWithUsage(); } if (args.size() < 3 || args.size() > 4) { diff --git a/cpp/tests/logsdbtests.cpp b/cpp/tests/logsdbtests.cpp index 3e46d192..1dc7220f 100644 --- a/cpp/tests/logsdbtests.cpp +++ b/cpp/tests/logsdbtests.cpp @@ -117,8 +117,8 @@ TEST_CASE("EmptyLogsDBNoOverrides") { TEST_CASE("LogsDBStandAloneLeader") { _setCurrentTime(eggsNow()); - LogIdx readUpTo = 100; - TempLogsDB db(LogLevel::LOG_ERROR, 0, readUpTo, true, true, false, readUpTo); + LogIdx readUpTo = 0; + TempLogsDB db(LogLevel::LOG_ERROR, 0, readUpTo,true,false); std::vector inReq; std::vector inResp; @@ -126,6 +126,8 @@ TEST_CASE("LogsDBStandAloneLeader") { std::vector outReq; std::vector outResp; db->processIncomingMessages(inReq, inResp); + _setCurrentTime(eggsNow() + LogsDB::LEADER_INACTIVE_TIMEOUT + 1_ms); + db->processIncomingMessages(inReq, inResp); REQUIRE(db->isLeader()); std::vector entries{ @@ -147,7 +149,7 @@ TEST_CASE("LogsDBStandAloneLeader") { TEST_CASE("LogsDBAvoidBeingLeader") { _setCurrentTime(eggsNow()); - TempLogsDB db(LogLevel::LOG_ERROR, 0, 0, false, false, true); + TempLogsDB db(LogLevel::LOG_ERROR, 0, 0, true, true); REQUIRE_FALSE(db->isLeader()); std::vector inReq; std::vector inResp; diff --git a/cpp/tests/utils/TempLogsDB.hpp b/cpp/tests/utils/TempLogsDB.hpp index 591350b8..003a8900 100644 --- a/cpp/tests/utils/TempLogsDB.hpp +++ b/cpp/tests/utils/TempLogsDB.hpp @@ -18,10 +18,8 @@ struct TempLogsDB { LogLevel level, ReplicaId replicaId = 0, LogIdx lastRead = 0, - bool dontDoReplication = false, - bool forceLeader = false, - bool avoidBeingLeader = false, - LogIdx forcedLastReleased = 0): logger(level, STDERR_FILENO, false, false) + bool noReplication = false, + bool avoidBeingLeader = false): logger(level, STDERR_FILENO, false, false) { dbDir = std::string("temp-logs-db.XXXXXX"); if (mkdtemp(dbDir.data()) == nullptr) { @@ -31,22 +29,20 @@ struct TempLogsDB { sharedDB = std::make_unique(logger, xmon, dbDir + "/db", dbDir + "/db-statistics.txt"); initSharedDB(); - db = std::make_unique(logger, xmon, *sharedDB, replicaId, lastRead, dontDoReplication, forceLeader, avoidBeingLeader, forcedLastReleased); + db = std::make_unique(logger, xmon, *sharedDB, replicaId, lastRead, noReplication, avoidBeingLeader); } // useful to test recovery void restart( ReplicaId replicaId = 0, LogIdx lastRead = 0, - bool dontDoReplication = false, - bool forceLeader = false, - bool avoidBeingLeader = false, - LogIdx forcedLastReleased = 0) + bool noReplication = false, + bool avoidBeingLeader = false) { db->close(); sharedDB = std::make_unique(logger, xmon, dbDir + "/db", dbDir + "/db-statistics.txt"); initSharedDB(); - db = std::make_unique(logger, xmon, *sharedDB, replicaId, lastRead, dontDoReplication, forceLeader, avoidBeingLeader, forcedLastReleased); + db = std::make_unique(logger, xmon, *sharedDB, replicaId, lastRead, noReplication, avoidBeingLeader); } ~TempLogsDB() { diff --git a/go/cleanup/migrate.go b/go/cleanup/migrate.go index adfa8564..bc20442e 100644 --- a/go/cleanup/migrate.go +++ b/go/cleanup/migrate.go @@ -900,7 +900,7 @@ func (m *migrator) runFileMigrators(wg *sync.WaitGroup) { break } if err != msgs.BLOCK_NOT_FOUND { - m.log.Info("could not migrate file in shard %v: %v", file, shid, err) + m.log.Info("could not migrate file %v in shard %v: %v", file, shid, err) break } m.log.RaiseNC(blockNotFoundAlert, "could not migrate blocks in file %v because a block was not found in it. this is probably due to conflicts with other migrations or scrubbing. will retry in one second.", file) diff --git a/go/client/client.go b/go/client/client.go index e92db4c9..3413bc30 100644 --- a/go/client/client.go +++ b/go/client/client.go @@ -893,7 +893,7 @@ func (procs *blocksProcessors) send( args *sendArgs, completionChan chan *blockCompletion, ) error { - if args.addrs.Addr1.Port == 0 && args.addrs.Addr1.Port == 0 { + if args.addrs.Addr1.Port == 0 && args.addrs.Addr2.Port == 0 { panic(fmt.Errorf("got zero ports for both addresses for block service %v: %v:%v %v:%v", args.blockService, args.addrs.Addr1.Addrs, args.addrs.Addr1.Port, args.addrs.Addr2.Addrs, args.addrs.Addr2.Port)) } resp := &clientBlockResponse{ diff --git a/go/eggsrun/eggsrun.go b/go/eggsrun/eggsrun.go index 32e25919..65ff3308 100644 --- a/go/eggsrun/eggsrun.go +++ b/go/eggsrun/eggsrun.go @@ -195,12 +195,10 @@ func main() { } if r == 0 { if *leaderOnly { - opts.UseLogsDB = "LEADER_NO_FOLLOWERS" + opts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"} } else { - opts.UseLogsDB = "LEADER" + opts.LogsDBFlags = []string{"-logsdb-leader"} } - } else { - opts.UseLogsDB = "FOLLOWER" } if *startingPort != 0 { opts.Addr1 = fmt.Sprintf("127.0.0.1:%v", uint16(*startingPort)+uint16(r)) @@ -224,16 +222,16 @@ func main() { ShuckleAddress: shuckleAddress, Perf: *profile, Xmon: *xmon, - UseLogsDB: "", + LogsDBFlags: nil, } if *leaderOnly && r > 0 { continue } if r == 0 { if *leaderOnly { - opts.UseLogsDB = "LEADER_NO_FOLLOWERS" + opts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"} } else { - opts.UseLogsDB = "LEADER" + opts.LogsDBFlags = []string{"-logsdb-leader"} } } if *startingPort != 0 { diff --git a/go/eggsshuckle/eggsshuckle.go b/go/eggsshuckle/eggsshuckle.go index 5ab72920..b84e5c0f 100644 --- a/go/eggsshuckle/eggsshuckle.go +++ b/go/eggsshuckle/eggsshuckle.go @@ -1191,7 +1191,7 @@ func handleMoveCDCLeader(log *lib.Logger, s *state, req *msgs.MoveCdcLeaderReq) panic(err) } if rowsAffected != 5 { - panic(fmt.Errorf("unusual number of rows affected (%d) when changing CDC leader to %s", rowsAffected, req.Replica)) + panic(fmt.Errorf("unusual number of rows affected (%d) when changing CDC leader to %v", rowsAffected, req.Replica)) } return &msgs.MoveCdcLeaderResp{}, nil @@ -1241,7 +1241,7 @@ func handleClearCDCInfo(log *lib.Logger, s *state, req *msgs.ClearCdcInfoReq) (* panic(err) } if rowsAffected != 1 { - panic(fmt.Errorf("unusual number of rows affected (%d) when executing ClearCDCInfoReq for %s", rowsAffected, req.Replica)) + panic(fmt.Errorf("unusual number of rows affected (%d) when executing ClearCDCInfoReq for %v", rowsAffected, req.Replica)) } return &msgs.ClearCdcInfoResp{}, nil diff --git a/go/eggstests/eggstests.go b/go/eggstests/eggstests.go index 19375528..d6372a09 100644 --- a/go/eggstests/eggstests.go +++ b/go/eggstests/eggstests.go @@ -1151,12 +1151,10 @@ func main() { } if r == 0 { if *leaderOnly { - cdcOpts.UseLogsDB = "LEADER_NO_FOLLOWERS" + cdcOpts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"} } else { - cdcOpts.UseLogsDB = "LEADER" + cdcOpts.LogsDBFlags = []string{"-logsdb-leader"} } - } else { - cdcOpts.UseLogsDB = "FOLLOWER" } if *buildType == "valgrind" { // apparently 100ms is too little when running with valgrind @@ -1182,16 +1180,16 @@ func main() { Addr1: "127.0.0.1:0", Addr2: "127.0.0.1:0", TransientDeadlineInterval: &testTransientDeadlineInterval, - UseLogsDB: "", + LogsDBFlags: nil, } if *leaderOnly && r > 0 { continue } if r == 0 { if *leaderOnly { - shopts.UseLogsDB = "LEADER_NO_FOLLOWERS" + shopts.LogsDBFlags = []string{"-logsdb-leader", "-logsdb-no-replication"} } else { - shopts.UseLogsDB = "LEADER" + shopts.LogsDBFlags = []string{"-logsdb-leader"} } } procs.StartShard(log, *repoDir, &shopts) diff --git a/go/managedprocess/managedprocess.go b/go/managedprocess/managedprocess.go index 54d50ed7..7315ed28 100644 --- a/go/managedprocess/managedprocess.go +++ b/go/managedprocess/managedprocess.go @@ -477,7 +477,7 @@ type ShardOpts struct { Addr2 string TransientDeadlineInterval *time.Duration Xmon string - UseLogsDB string + LogsDBFlags []string } func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts *ShardOpts) { @@ -500,8 +500,8 @@ func (procs *ManagedProcesses) StartShard(ll *lib.Logger, repoDir string, opts * if opts.Xmon != "" { args = append(args, "-xmon", opts.Xmon) } - if opts.UseLogsDB != "" { - args = append(args, "-use-logsdb", opts.UseLogsDB) + if opts.LogsDBFlags != nil { + args = append(args, opts.LogsDBFlags...) } switch opts.LogLevel { case lib.TRACE: @@ -571,7 +571,7 @@ type CDCOpts struct { Addr2 string ShardTimeout time.Duration Xmon string - UseLogsDB string + LogsDBFlags []string } func (procs *ManagedProcesses) StartCDC(ll *lib.Logger, repoDir string, opts *CDCOpts) { @@ -593,8 +593,8 @@ func (procs *ManagedProcesses) StartCDC(ll *lib.Logger, repoDir string, opts *CD if opts.Xmon != "" { args = append(args, "-xmon", opts.Xmon) } - if opts.UseLogsDB != "" { - args = append(args, "-use-logsdb", opts.UseLogsDB) + if opts.LogsDBFlags != nil { + args = append(args, opts.LogsDBFlags...) } switch opts.LogLevel { case lib.TRACE: