diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 3dd93b57..e7e89823 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -521,7 +521,7 @@ private: LOG_DEBUG(_env, "parsed request: %s", req); } - auto& entry = readOnlyShardReq(req.body.kind()) ? _readRequests.emplace_back() : _writeReqs.emplace_back(); + auto& entry = _requestNeedsConsistency(req.body.kind(), protocol) ? _writeReqs.emplace_back() : _readRequests.emplace_back(); entry.sockIx = msg.socketIx; entry.clientAddr = msg.clientAddr; entry.receivedAt = t0; @@ -529,6 +529,15 @@ private: entry.msg = std::move(req); } + // All write requests fall into this category. Some read requests issues by CDC also need cross regional consistency + // which is handled only on write thread + bool _requestNeedsConsistency(ShardMessageKind kind, uint32_t protocol) { + if (protocol == CDC_TO_SHARD_REQ_PROTOCOL_VERSION && kind == ShardMessageKind::LOOKUP) { + return true; + } + return !readOnlyShardReq(kind); + } + void _handleShardResponse(UDPMessage& msg, uint32_t protocol) { LOG_DEBUG(_env, "received message from %s", msg.clientAddr); if (unlikely(!_shared.options.isLeader())) { @@ -745,6 +754,7 @@ private: std::vector _logsDBRequests; // requests from other replicas std::vector _logsDBResponses; // responses from other replicas std::vector _shardRequests; // requests from clients or proxy locations + std::vector _strongConsistencyReadRequests; // read requests that need strong cross region consistency std::vector _shardResponses; // responses from shard leader in primary location std::vector _proxyLogsDBRequests; // catchup requests from logs db leader in proxy location // or write requests from logs db leader in primary location @@ -1447,6 +1457,7 @@ public: } _linkReadRequests.clear(); + _strongConsistencyReadRequests.clear(); if (!_shared.options.isProxyLocation()) { // we don't expect and should not process any shard responses if in primary location @@ -1467,6 +1478,13 @@ public: // we already have a request in flight with this id from this client continue; } + if (readOnlyShardReq(req.msg.body.kind())) { + if (req.msg.id != 0) { + _inFlightRequestKeys.insert(InFlightRequestKey{req.msg.id, req.clientAddr}); + } + _strongConsistencyReadRequests.emplace_back(std::move(req)); + continue; + } switch (req.msg.body.kind()) { case ShardMessageKind::SHARD_SNAPSHOT: snapshotReq = std::move(req); @@ -1554,6 +1572,71 @@ public: _logIdToShardRequest.insert({entry.idx.u64, std::move(req)}); } + for(auto& req : _strongConsistencyReadRequests) { + ALWAYS_ASSERT(req.protocol == CDC_TO_SHARD_REQ_PROTOCOL_VERSION); + CdcToShardRespMsg resp; + resp.body.checkPointIdx = _shared.shardDB.read(req.msg.body, resp.body.resp); + resp.id = req.msg.id; + + // For CDC requests, wait for all secondary leaders to apply + // before sending response + auto now = ternNow(); + auto leadersAtOtherLocations = _shared.getLeadersAtOtherLocations(); + bool shouldWait = false; + if ((!leadersAtOtherLocations->empty()) && resp.body.resp.kind() != ShardMessageKind::ERROR) { + + CrossRegionWaitInfo waitInfo; + waitInfo.idx = resp.body.checkPointIdx; + waitInfo.createdAt = now; + + uint64_t responseId = resp.id; + + for (const auto& leader : *leadersAtOtherLocations) { + if (leader.locationId == 0) continue; + auto timeSinceLastSeen = now - leader.lastSeen; + if (timeSinceLastSeen >= SHARD_DOWN_THRESHOLD) { + LOG_INFO(_env, "Skipping wait for secondary leader at location %s (down for %s)", + leader.locationId, timeSinceLastSeen); + continue; + } + shouldWait = true; + + uint64_t waitReqId = ++_requestIdCounter; + CrossRegionLocationState locState; + locState.lastSent = now; + locState.waitReqId = waitReqId; + waitInfo.locationStates[leader.locationId] = locState; + _waitReqIdToResponseId[waitReqId] = responseId; + + ShardReqMsg waitReq; + waitReq.id = waitReqId; + auto& waitBody = waitReq.body.setWaitStateApplied(); + waitBody.idx = waitInfo.idx; + + _sender.prepareOutgoingMessage( + _env, + _shared.sock().addr(), + leader.addrs, + [&waitReq](BincodeBuf& buf) { + waitReq.pack(buf); + }); + + LOG_DEBUG(_env, "Sent WaitStateAppliedReq (id=%s) for LogIdx %s to location %s", + waitReqId, waitInfo.idx, leader.locationId); + } + if (shouldWait) { + waitInfo.originalRequest = std::move(req); + waitInfo.responseToSend = std::move(resp); + _crossRegionWaitResponses[responseId] = std::move(waitInfo); + } + } + if (!shouldWait) { + bool dropArtificially = _packetDropRand.generate64() % 10'000 < _outgoingPacketDropProbability; + packCheckPointedShardResponse(_env, _shared, _shared.sock().addr(), _sender, dropArtificially, req, resp, _expandedCDCKey); + ALWAYS_ASSERT(_inFlightRequestKeys.erase(InFlightRequestKey{req.msg.id, req.clientAddr}) == 1); + } + } + for(auto& waitResp : _waitResponses) { ALWAYS_ASSERT(waitResp.msg.body.kind() == ShardMessageKind::WAIT_STATE_APPLIED); auto waitReqIt = _waitReqIdToResponseId.find(waitResp.msg.id);