shard: cdc lookup strongly consistent

This commit is contained in:
Miroslav Crnic
2025-12-09 10:33:06 +00:00
parent f4ca4d226f
commit d6db8c6cc8

View File

@@ -521,7 +521,7 @@ private:
LOG_DEBUG(_env, "parsed request: %s", req);
}
auto& entry = readOnlyShardReq(req.body.kind()) ? _readRequests.emplace_back() : _writeReqs.emplace_back();
auto& entry = _requestNeedsConsistency(req.body.kind(), protocol) ? _writeReqs.emplace_back() : _readRequests.emplace_back();
entry.sockIx = msg.socketIx;
entry.clientAddr = msg.clientAddr;
entry.receivedAt = t0;
@@ -529,6 +529,15 @@ private:
entry.msg = std::move(req);
}
// All write requests fall into this category. Some read requests issues by CDC also need cross regional consistency
// which is handled only on write thread
bool _requestNeedsConsistency(ShardMessageKind kind, uint32_t protocol) {
if (protocol == CDC_TO_SHARD_REQ_PROTOCOL_VERSION && kind == ShardMessageKind::LOOKUP) {
return true;
}
return !readOnlyShardReq(kind);
}
void _handleShardResponse(UDPMessage& msg, uint32_t protocol) {
LOG_DEBUG(_env, "received message from %s", msg.clientAddr);
if (unlikely(!_shared.options.isLeader())) {
@@ -745,6 +754,7 @@ private:
std::vector<LogsDBRequest> _logsDBRequests; // requests from other replicas
std::vector<LogsDBResponse> _logsDBResponses; // responses from other replicas
std::vector<ShardReq> _shardRequests; // requests from clients or proxy locations
std::vector<ShardReq> _strongConsistencyReadRequests; // read requests that need strong cross region consistency
std::vector<ProxyShardRespMsg> _shardResponses; // responses from shard leader in primary location
std::vector<ProxyLogsDBRequest> _proxyLogsDBRequests; // catchup requests from logs db leader in proxy location
// or write requests from logs db leader in primary location
@@ -1447,6 +1457,7 @@ public:
}
_linkReadRequests.clear();
_strongConsistencyReadRequests.clear();
if (!_shared.options.isProxyLocation()) {
// we don't expect and should not process any shard responses if in primary location
@@ -1467,6 +1478,13 @@ public:
// we already have a request in flight with this id from this client
continue;
}
if (readOnlyShardReq(req.msg.body.kind())) {
if (req.msg.id != 0) {
_inFlightRequestKeys.insert(InFlightRequestKey{req.msg.id, req.clientAddr});
}
_strongConsistencyReadRequests.emplace_back(std::move(req));
continue;
}
switch (req.msg.body.kind()) {
case ShardMessageKind::SHARD_SNAPSHOT:
snapshotReq = std::move(req);
@@ -1554,6 +1572,71 @@ public:
_logIdToShardRequest.insert({entry.idx.u64, std::move(req)});
}
for(auto& req : _strongConsistencyReadRequests) {
ALWAYS_ASSERT(req.protocol == CDC_TO_SHARD_REQ_PROTOCOL_VERSION);
CdcToShardRespMsg resp;
resp.body.checkPointIdx = _shared.shardDB.read(req.msg.body, resp.body.resp);
resp.id = req.msg.id;
// For CDC requests, wait for all secondary leaders to apply
// before sending response
auto now = ternNow();
auto leadersAtOtherLocations = _shared.getLeadersAtOtherLocations();
bool shouldWait = false;
if ((!leadersAtOtherLocations->empty()) && resp.body.resp.kind() != ShardMessageKind::ERROR) {
CrossRegionWaitInfo waitInfo;
waitInfo.idx = resp.body.checkPointIdx;
waitInfo.createdAt = now;
uint64_t responseId = resp.id;
for (const auto& leader : *leadersAtOtherLocations) {
if (leader.locationId == 0) continue;
auto timeSinceLastSeen = now - leader.lastSeen;
if (timeSinceLastSeen >= SHARD_DOWN_THRESHOLD) {
LOG_INFO(_env, "Skipping wait for secondary leader at location %s (down for %s)",
leader.locationId, timeSinceLastSeen);
continue;
}
shouldWait = true;
uint64_t waitReqId = ++_requestIdCounter;
CrossRegionLocationState locState;
locState.lastSent = now;
locState.waitReqId = waitReqId;
waitInfo.locationStates[leader.locationId] = locState;
_waitReqIdToResponseId[waitReqId] = responseId;
ShardReqMsg waitReq;
waitReq.id = waitReqId;
auto& waitBody = waitReq.body.setWaitStateApplied();
waitBody.idx = waitInfo.idx;
_sender.prepareOutgoingMessage(
_env,
_shared.sock().addr(),
leader.addrs,
[&waitReq](BincodeBuf& buf) {
waitReq.pack(buf);
});
LOG_DEBUG(_env, "Sent WaitStateAppliedReq (id=%s) for LogIdx %s to location %s",
waitReqId, waitInfo.idx, leader.locationId);
}
if (shouldWait) {
waitInfo.originalRequest = std::move(req);
waitInfo.responseToSend = std::move(resp);
_crossRegionWaitResponses[responseId] = std::move(waitInfo);
}
}
if (!shouldWait) {
bool dropArtificially = _packetDropRand.generate64() % 10'000 < _outgoingPacketDropProbability;
packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, req, resp, _expandedCDCKey);
ALWAYS_ASSERT(_inFlightRequestKeys.erase(InFlightRequestKey{req.msg.id, req.clientAddr}) == 1);
}
}
for(auto& waitResp : _waitResponses) {
ALWAYS_ASSERT(waitResp.msg.body.kind() == ShardMessageKind::WAIT_STATE_APPLIED);
auto waitReqIt = _waitReqIdToResponseId.find(waitResp.msg.id);