shard: always apply entries in proxy location

This commit is contained in:
Miroslav Crnic
2025-02-03 20:39:23 +00:00
parent 2b946f4efc
commit 1e2788f321
3 changed files with 140 additions and 130 deletions

View File

@@ -1245,6 +1245,10 @@ public:
_lastContinuousIdx(lastRead),
_lastMissingIdx(lastRead) {}
LogIdx getLastContinuous() const {
return _lastContinuousIdx;
}
void readEntries(std::vector<LogsDBLogEntry>& entries, size_t maxEntries) {
if (_lastRead == _lastContinuousIdx) {
update_atomic_stat_ema(_stats.entriesRead, (uint64_t)0);
@@ -1755,6 +1759,10 @@ public:
return _appender.appendEntries(entries);
}
LogIdx getLastContinuous() const {
return _catchupReader.getLastContinuous();
}
void readEntries(std::vector<LogsDBLogEntry>& entries, size_t maxEntries) {
_catchupReader.readEntries(entries, maxEntries);
}
@@ -1840,6 +1848,10 @@ EggsError LogsDB::appendEntries(std::vector<LogsDBLogEntry>& entries) {
return _impl->appendEntries(entries);
}
LogIdx LogsDB::getLastContinuous() const {
return _impl->getLastContinuous();
}
void LogsDB::readEntries(std::vector<LogsDBLogEntry>& entries, size_t maxEntries) {
_impl->readEntries(entries, maxEntries);
}

View File

@@ -124,6 +124,8 @@ public:
EggsError appendEntries(std::vector<LogsDBLogEntry>& entries);
// returns index of last entry available for read
LogIdx getLastContinuous() const;
void readEntries(std::vector<LogsDBLogEntry>& entries, size_t maxEntries = IN_FLIGHT_APPEND_WINDOW);
// Takes a sorted vector of log inxices and returns the corresponding entries
@@ -134,6 +136,7 @@ public:
LogIdx getLastReleased() const;
const LogsDBStats& getStats() const;
static std::vector<rocksdb::ColumnFamilyDescriptor> getColumnFamilyDescriptors();

View File

@@ -946,127 +946,133 @@ public:
}
void _applyLogEntries() {
for (auto& logsDBEntry : _logsDBEntries) {
++_currentLogIndex;
ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx);
ALWAYS_ASSERT(logsDBEntry.value.size() > 0);
BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size());
ShardLogEntry shardEntry;
shardEntry.unpack(buf);
ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx);
ALWAYS_ASSERT(_logsDBEntries.empty());
auto lastContinuousIdx = _logsDB.getLastContinuous();
while (_currentLogIndex < lastContinuousIdx.u64) {
_logsDB.readEntries(_logsDBEntries);
ALWAYS_ASSERT(!_logsDBEntries.empty());
for (auto& logsDBEntry : _logsDBEntries) {
++_currentLogIndex;
ALWAYS_ASSERT(_currentLogIndex == logsDBEntry.idx);
ALWAYS_ASSERT(logsDBEntry.value.size() > 0);
BincodeBuf buf((char*)&logsDBEntry.value.front(), logsDBEntry.value.size());
ShardLogEntry shardEntry;
shardEntry.unpack(buf);
ALWAYS_ASSERT(_currentLogIndex == shardEntry.idx);
if (_isLogsDBLeader) {
{
//this is sanity check confirming what we got through the log and deserialized
//exactly matches what we serialized and pushed to log
//this is only ever true in primary location
auto it = _inFlightEntries.find(shardEntry.idx.u64);
ALWAYS_ASSERT(it != _inFlightEntries.end());
ALWAYS_ASSERT(shardEntry == it->second);
ALWAYS_ASSERT(shardEntry.idx == it->second.idx);
_inFlightEntries.erase(it);
}
auto it = _logIdToShardRequest.find(shardEntry.idx.u64);
if (it == _logIdToShardRequest.end()) {
// if we are primary location, all writes should be triggered by requests
ALWAYS_ASSERT(_shared.options.isProxyLocation());
// we are proxy location there are writes not initiated by us and we behave like follower
// we are not leader, we can not do any checks and there is no response to send
if (_isLogsDBLeader) {
{
//this is sanity check confirming what we got through the log and deserialized
//exactly matches what we serialized and pushed to log
//this is only ever true in primary location
auto it = _inFlightEntries.find(shardEntry.idx.u64);
ALWAYS_ASSERT(it != _inFlightEntries.end());
ALWAYS_ASSERT(shardEntry == it->second);
ALWAYS_ASSERT(shardEntry.idx == it->second.idx);
_inFlightEntries.erase(it);
}
auto it = _logIdToShardRequest.find(shardEntry.idx.u64);
if (it == _logIdToShardRequest.end()) {
// if we are primary location, all writes should be triggered by requests
ALWAYS_ASSERT(_shared.options.isProxyLocation());
// we are proxy location there are writes not initiated by us and we behave like follower
// we are not leader, we can not do any checks and there is no response to send
ShardRespContainer _;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _);
continue;
}
auto& request = it->second;
if (likely(request.msg.id)) {
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", request.msg.id, request.msg.body.kind(), request.clientAddr);
} else {
LOG_DEBUG(_env, "applying request-less log entry");
}
// first handle case where client does not care about response
if (request.msg.id == 0) {
ShardRespContainer resp;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp);
if (unlikely(resp.kind() == ShardMessageKind::ERROR)) {
RAISE_ALERT(_env, "could not apply request-less log entry: %s", resp.getError());
}
_logIdToShardRequest.erase(it);
continue;
}
// depending on protocol we need different kind of responses
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
switch(request.protocol){
case SHARD_REQ_PROTOCOL_VERSION:
{
ShardRespMsg resp;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body);
resp.id = request.msg.id;
auto it = _proxiedResponses.find(logsDBEntry.idx.u64);
if (it != _proxiedResponses.end()) {
ALWAYS_ASSERT(_shared.options.isProxyLocation());
it->second.second.finished = eggsNow();
logSlowProxyReq(it->second.second);
resp.body = std::move(it->second.first);
_proxiedResponses.erase(it);
}
if (resp.body.kind() == ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE) {
ShardRespContainer tmpResp;
switch (request.msg.body.kind()) {
case ShardMessageKind::ADD_SPAN_INITIATE:
{
auto& addResp = tmpResp.setAddSpanInitiate();
addResp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks);
resp.body.setAddSpanInitiate().blocks = std::move(addResp.blocks);
break;
}
case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE:
{
auto& addResp = tmpResp.setAddSpanInitiateWithReference();
addResp.resp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks);
resp.body.setAddSpanInitiateWithReference().resp.blocks = std::move(addResp.resp.blocks);
break;
}
case ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE:
{
break;
}
default:
ALWAYS_ASSERT(false, "Unexpected reponse kind %s for requests kind %s", resp.body.kind(), request.msg.body.kind() );
}
}
packShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp);
}
break;
case CDC_TO_SHARD_REQ_PROTOCOL_VERSION:
{
CdcToShardRespMsg resp;
resp.body.checkPointIdx = shardEntry.idx;
resp.id = request.msg.id;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp);
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedCDCKey);
}
break;
case PROXY_SHARD_REQ_PROTOCOL_VERSION:
{
ProxyShardRespMsg resp;
resp.body.checkPointIdx = shardEntry.idx;
resp.id = request.msg.id;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp);
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedShardKey);
}
break;
}
ALWAYS_ASSERT(_inFlightRequestKeys.erase(InFlightRequestKey{request.msg.id, request.clientAddr}) == 1);
_logIdToShardRequest.erase(it);
} else {
// we are not leader, we can not do any checks and there is no response to send
ShardRespContainer _;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _);
continue;
}
auto& request = it->second;
if (likely(request.msg.id)) {
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", request.msg.id, request.msg.body.kind(), request.clientAddr);
} else {
LOG_DEBUG(_env, "applying request-less log entry");
}
// first handle case where client does not care about response
if (request.msg.id == 0) {
ShardRespContainer resp;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp);
if (unlikely(resp.kind() == ShardMessageKind::ERROR)) {
RAISE_ALERT(_env, "could not apply request-less log entry: %s", resp.getError());
}
_logIdToShardRequest.erase(it);
continue;
}
// depending on protocol we need different kind of responses
bool dropArtificially = wyhash64(&_packetDropRand) % 10'000 < _outgoingPacketDropProbability;
switch(request.protocol){
case SHARD_REQ_PROTOCOL_VERSION:
{
ShardRespMsg resp;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body);
resp.id = request.msg.id;
auto it = _proxiedResponses.find(logsDBEntry.idx.u64);
if (it != _proxiedResponses.end()) {
ALWAYS_ASSERT(_shared.options.isProxyLocation());
it->second.second.finished = eggsNow();
logSlowProxyReq(it->second.second);
resp.body = std::move(it->second.first);
_proxiedResponses.erase(it);
}
if (resp.body.kind() == ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE) {
ShardRespContainer tmpResp;
switch (request.msg.body.kind()) {
case ShardMessageKind::ADD_SPAN_INITIATE:
{
auto& addResp = tmpResp.setAddSpanInitiate();
addResp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks);
resp.body.setAddSpanInitiate().blocks = std::move(addResp.blocks);
break;
}
case ShardMessageKind::ADD_SPAN_INITIATE_WITH_REFERENCE:
{
auto& addResp = tmpResp.setAddSpanInitiateWithReference();
addResp.resp.blocks = std::move(resp.body.getAddSpanAtLocationInitiate().resp.blocks);
resp.body.setAddSpanInitiateWithReference().resp.blocks = std::move(addResp.resp.blocks);
break;
}
case ShardMessageKind::ADD_SPAN_AT_LOCATION_INITIATE:
{
break;
}
default:
ALWAYS_ASSERT(false, "Unexpected reponse kind %s for requests kind %s", resp.body.kind(), request.msg.body.kind() );
}
}
packShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp);
}
break;
case CDC_TO_SHARD_REQ_PROTOCOL_VERSION:
{
CdcToShardRespMsg resp;
resp.body.checkPointIdx = shardEntry.idx;
resp.id = request.msg.id;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp);
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedCDCKey);
}
break;
case PROXY_SHARD_REQ_PROTOCOL_VERSION:
{
ProxyShardRespMsg resp;
resp.body.checkPointIdx = shardEntry.idx;
resp.id = request.msg.id;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, resp.body.resp);
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, request, resp, _expandedShardKey);
}
break;
}
ALWAYS_ASSERT(_inFlightRequestKeys.erase(InFlightRequestKey{request.msg.id, request.clientAddr}) == 1);
_logIdToShardRequest.erase(it);
} else {
// we are not leader, we can not do any checks and there is no response to send
ShardRespContainer _;
_shared.shardDB.applyLogEntry(logsDBEntry.idx.u64, shardEntry, _);
}
// we send new LogsDB entry to leaders in other locations
_tryReplicateToOtherLocations();
_logsDBEntries.clear();
}
// we send new LogsDB entry to leaders in other locations
_tryReplicateToOtherLocations();
_logsDBEntries.clear();
}
void _processCathupReads() {
@@ -1186,11 +1192,7 @@ public:
ALWAYS_ASSERT(_proxiedResponses.empty());
// during leader election relase point might have moved and there could be entries we have not applied yet
ALWAYS_ASSERT(_logsDBEntries.empty());
do {
_logsDB.readEntries(_logsDBEntries);
_applyLogEntries();
} while(!_logsDBEntries.empty());
_applyLogEntries();
_knownLastReleased = _logsDB.getLastReleased();
} else {
LOG_INFO(_env, "We are no longer leader in LogsDB");
@@ -1222,11 +1224,7 @@ public:
}
// there could be log entries just released which we should apply
ALWAYS_ASSERT(_logsDBEntries.empty());
do {
_logsDB.readEntries(_logsDBEntries);
_applyLogEntries();
} while(!_logsDBEntries.empty());
_applyLogEntries();
// if we are leader we should alyways have latest state applied
ALWAYS_ASSERT(!_isLogsDBLeader || _currentLogIndex == _logsDB.getLastReleased());
@@ -1314,7 +1312,7 @@ public:
it->second.gotLogIdx = now;
// it is possible we already applied the log entry, forward the response
if (resp.body.checkPointIdx <= _logsDB.getLastReleased()) {
if (resp.body.checkPointIdx <= _logsDB.getLastContinuous()) {
if (likely(req.msg.id)) {
LOG_DEBUG(_env, "applying log entry for request %s kind %s from %s", req.msg.id, req.msg.body.kind(), req.clientAddr);
} else {
@@ -1477,10 +1475,7 @@ public:
_logsDBRequests.clear();
_logsDBResponses.clear();
_logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses);
do {
_logsDB.readEntries(_logsDBEntries);
_applyLogEntries();
} while(!_logsDBEntries.empty());
_applyLogEntries();
ALWAYS_ASSERT(_inFlightEntries.empty());
}
}