diff --git a/cpp/core/LogsDB.cpp b/cpp/core/LogsDB.cpp index 26feb12f..b4a3bab4 100644 --- a/cpp/core/LogsDB.cpp +++ b/cpp/core/LogsDB.cpp @@ -1245,6 +1245,10 @@ public: _lastContinuousIdx(lastRead), _lastMissingIdx(lastRead) {} + LogIdx getLastContinuous() const { + return _lastContinuousIdx; + } + void readEntries(std::vector& entries, size_t maxEntries) { if (_lastRead == _lastContinuousIdx) { update_atomic_stat_ema(_stats.entriesRead, (uint64_t)0); @@ -1755,6 +1759,10 @@ public: return _appender.appendEntries(entries); } + LogIdx getLastContinuous() const { + return _catchupReader.getLastContinuous(); + } + void readEntries(std::vector& entries, size_t maxEntries) { _catchupReader.readEntries(entries, maxEntries); } @@ -1840,6 +1848,10 @@ EggsError LogsDB::appendEntries(std::vector& entries) { return _impl->appendEntries(entries); } +LogIdx LogsDB::getLastContinuous() const { + return _impl->getLastContinuous(); +} + void LogsDB::readEntries(std::vector& entries, size_t maxEntries) { _impl->readEntries(entries, maxEntries); } diff --git a/cpp/core/LogsDB.hpp b/cpp/core/LogsDB.hpp index 3fde3b41..228b9dbf 100644 --- a/cpp/core/LogsDB.hpp +++ b/cpp/core/LogsDB.hpp @@ -124,6 +124,8 @@ public: EggsError appendEntries(std::vector& entries); + // returns index of last entry available for read + LogIdx getLastContinuous() const; void readEntries(std::vector& entries, size_t maxEntries = IN_FLIGHT_APPEND_WINDOW); // Takes a sorted vector of log inxices and returns the corresponding entries @@ -134,6 +136,7 @@ public: LogIdx getLastReleased() const; + const LogsDBStats& getStats() const; static std::vector getColumnFamilyDescriptors(); diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 843b6cc5..98b6b38d 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -946,127 +946,133 @@ public: } void _applyLogEntries() { - for (auto& logsDBEntry : _logsDBEntries) { - ++_currentLogIndex; - ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx); - ALWAYS_ASSERT(logsDBEntry.value.size() > 0); - BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size()); - ShardLogEntry shardEntry; - shardEntry.unpack(buf); - ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx); + ALWAYS_ASSERT(_logsDBEntries.empty()); + auto lastContinuousIdx = _logsDB.getLastContinuous(); + while (_currentLogIndex < lastContinuousIdx.u64) { + _logsDB.readEntries(_logsDBEntries); + ALWAYS_ASSERT(!_logsDBEntries.empty()); + for (auto& logsDBEntry : _logsDBEntries) { + ++_currentLogIndex; + ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx); + ALWAYS_ASSERT(logsDBEntry.value.size() > 0); + BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size()); + ShardLogEntry shardEntry; + shardEntry.unpack(buf); + ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx); - if (_isLogsDBLeader) { - { - //this is sanity check confirming what we got through the log and deserialized - //exactly matches what we serialized and pushed to log - //this is only ever true in primary location - auto it = _inFlightEntries.find(shardEntry.idx.u64); - ALWAYS_ASSERT(it != _inFlightEntries.end()); - ALWAYS_ASSERT(shardEntry == it->second); - ALWAYS_ASSERT(shardEntry.idx == it->second.idx); - _inFlightEntries.erase(it); - } - auto it = _logIdToShardRequest.find(shardEntry.idx.u64); - if (it == _logIdToShardRequest.end()) { - // if we are primary location, all writes should be triggered by requests - ALWAYS_ASSERT(_shared.options.isProxyLocation()); - // we are proxy location there are writes not initiated by us and we behave like follower - // we are not leader, we can not do any checks and there is no response to send + if (_isLogsDBLeader) { + { + //this is sanity check confirming what we got through the log and deserialized + //exactly matches what we serialized and pushed to log + //this is only ever true in primary location + auto it = _inFlightEntries.find(shardEntry.idx.u64); + ALWAYS_ASSERT(it != _inFlightEntries.end()); + ALWAYS_ASSERT(shardEntry == it->second); + ALWAYS_ASSERT(shardEntry.idx == it->second.idx); + _inFlightEntries.erase(it); + } + auto it = _logIdToShardRequest.find(shardEntry.idx.u64); + if (it == _logIdToShardRequest.end()) { + // if we are primary location, all writes should be triggered by requests + ALWAYS_ASSERT(_shared.options.isProxyLocation()); + // we are proxy location there are writes not initiated by us and we behave like follower + // we are not leader, we can not do any checks and there is no response to send + ShardRespContainer _; + _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _); + continue; + } + auto& request = it->second; + if (likely(request.msg.id)) { + LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", request.msg.id, request.msg.body.kind(), request.clientAddr); + } else { + LOG_DEBUG(_env, "applying request-less log entry"); + } + // first handle case where client does not care about response + if (request.msg.id == 0) { + ShardRespContainer resp; + _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp); + if (unlikely(resp.kind() == ShardMessageKind::ERROR)) { + RAISE_ALERT(_env, "could not apply request-less log entry: %s", resp.getError()); + } + _logIdToShardRequest.erase(it); + continue; + } + + // depending on protocol we need different kind of responses + bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; + switch(request.protocol){ + case SHARD_REQ_PROTOCOL_VERSION: + { + ShardRespMsg resp; + _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body); + resp.id = request.msg.id; + auto it = _proxiedResponses.find(logsDBEntry.idx.u64); + if (it != _proxiedResponses.end()) { + ALWAYS_ASSERT(_shared.options.isProxyLocation()); + it->second.second.finished = eggsNow(); + logSlowProxyReq(it->second.second); + resp.body = std::move(it->second.first); + _proxiedResponses.erase(it); + } + if (resp.body.kind() == ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE) { + ShardRespContainer tmpResp; + switch (request.msg.body.kind()) { + case ShardMessageKind::ADD_SPAN_INITIATE: + { + auto& addResp = tmpResp.setAddSpanInitiate(); + addResp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks); + resp.body.setAddSpanInitiate().blocks = std::move(addResp.blocks); + break; + } + case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE: + { + auto& addResp = tmpResp.setAddSpanInitiateWithReference(); + addResp.resp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks); + resp.body.setAddSpanInitiateWithReference().resp.blocks = std::move(addResp.resp.blocks); + break; + } + case ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE: + { + break; + } + default: + ALWAYS_ASSERT(false, "Unexpected reponse kind %s for requests kind %s", resp.body.kind(), request.msg.body.kind() ); + } + } + packShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp); + } + break; + case CDC_TO_SHARD_REQ_PROTOCOL_VERSION: + { + CdcToShardRespMsg resp; + resp.body.checkPointIdx = shardEntry.idx; + resp.id = request.msg.id; + _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp); + packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedCDCKey); + } + break; + case PROXY_SHARD_REQ_PROTOCOL_VERSION: + { + ProxyShardRespMsg resp; + resp.body.checkPointIdx = shardEntry.idx; + resp.id = request.msg.id; + _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp); + packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedShardKey); + } + break; + } + ALWAYS_ASSERT(_inFlightRequestKeys.erase(InFlightRequestKey{request.msg.id, request.clientAddr}) == 1); + _logIdToShardRequest.erase(it); + } else { + // we are not leader, we can not do any checks and there is no response to send ShardRespContainer _; _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _); - continue; - } - auto& request = it->second; - if (likely(request.msg.id)) { - LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", request.msg.id, request.msg.body.kind(), request.clientAddr); - } else { - LOG_DEBUG(_env, "applying request-less log entry"); } - // first handle case where client does not care about response - if (request.msg.id == 0) { - ShardRespContainer resp; - _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp); - if (unlikely(resp.kind() == ShardMessageKind::ERROR)) { - RAISE_ALERT(_env, "could not apply request-less log entry: %s", resp.getError()); - } - _logIdToShardRequest.erase(it); - continue; - } - - // depending on protocol we need different kind of responses - bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability; - switch(request.protocol){ - case SHARD_REQ_PROTOCOL_VERSION: - { - ShardRespMsg resp; - _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body); - resp.id = request.msg.id; - auto it = _proxiedResponses.find(logsDBEntry.idx.u64); - if (it != _proxiedResponses.end()) { - ALWAYS_ASSERT(_shared.options.isProxyLocation()); - it->second.second.finished = eggsNow(); - logSlowProxyReq(it->second.second); - resp.body = std::move(it->second.first); - _proxiedResponses.erase(it); - } - if (resp.body.kind() == ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE) { - ShardRespContainer tmpResp; - switch (request.msg.body.kind()) { - case ShardMessageKind::ADD_SPAN_INITIATE: - { - auto& addResp = tmpResp.setAddSpanInitiate(); - addResp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks); - resp.body.setAddSpanInitiate().blocks = std::move(addResp.blocks); - break; - } - case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE: - { - auto& addResp = tmpResp.setAddSpanInitiateWithReference(); - addResp.resp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks); - resp.body.setAddSpanInitiateWithReference().resp.blocks = std::move(addResp.resp.blocks); - break; - } - case ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE: - { - break; - } - default: - ALWAYS_ASSERT(false, "Unexpected reponse kind %s for requests kind %s", resp.body.kind(), request.msg.body.kind() ); - } - } - packShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp); - } - break; - case CDC_TO_SHARD_REQ_PROTOCOL_VERSION: - { - CdcToShardRespMsg resp; - resp.body.checkPointIdx = shardEntry.idx; - resp.id = request.msg.id; - _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp); - packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedCDCKey); - } - break; - case PROXY_SHARD_REQ_PROTOCOL_VERSION: - { - ProxyShardRespMsg resp; - resp.body.checkPointIdx = shardEntry.idx; - resp.id = request.msg.id; - _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp); - packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedShardKey); - } - break; - } - ALWAYS_ASSERT(_inFlightRequestKeys.erase(InFlightRequestKey{request.msg.id, request.clientAddr}) == 1); - _logIdToShardRequest.erase(it); - } else { - // we are not leader, we can not do any checks and there is no response to send - ShardRespContainer _; - _shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _); } + // we send new LogsDB entry to leaders in other locations + _tryReplicateToOtherLocations(); + _logsDBEntries.clear(); } - // we send new LogsDB entry to leaders in other locations - _tryReplicateToOtherLocations(); - _logsDBEntries.clear(); } void _processCathupReads() { @@ -1186,11 +1192,7 @@ public: ALWAYS_ASSERT(_proxiedResponses.empty()); // during leader election relase point might have moved and there could be entries we have not applied yet - ALWAYS_ASSERT(_logsDBEntries.empty()); - do { - _logsDB.readEntries(_logsDBEntries); - _applyLogEntries(); - } while(!_logsDBEntries.empty()); + _applyLogEntries(); _knownLastReleased = _logsDB.getLastReleased(); } else { LOG_INFO(_env, "We are no longer leader in LogsDB"); @@ -1222,11 +1224,7 @@ public: } // there could be log entries just released which we should apply - ALWAYS_ASSERT(_logsDBEntries.empty()); - do { - _logsDB.readEntries(_logsDBEntries); - _applyLogEntries(); - } while(!_logsDBEntries.empty()); + _applyLogEntries(); // if we are leader we should alyways have latest state applied ALWAYS_ASSERT(!_isLogsDBLeader || _currentLogIndex == _logsDB.getLastReleased()); @@ -1314,7 +1312,7 @@ public: it->second.gotLogIdx = now; // it is possible we already applied the log entry, forward the response - if (resp.body.checkPointIdx <= _logsDB.getLastReleased()) { + if (resp.body.checkPointIdx <= _logsDB.getLastContinuous()) { if (likely(req.msg.id)) { LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", req.msg.id, req.msg.body.kind(), req.clientAddr); } else { @@ -1477,10 +1475,7 @@ public: _logsDBRequests.clear(); _logsDBResponses.clear(); _logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses); - do { - _logsDB.readEntries(_logsDBEntries); - _applyLogEntries(); - } while(!_logsDBEntries.empty()); + _applyLogEntries(); ALWAYS_ASSERT(_inFlightEntries.empty()); } }