Reapply "cdc: apply serialized entries from other leaders"

This reverts commit 5eb556e35b.
This commit is contained in:
Francesco Mazzoli
2026-02-27 14:14:14 +00:00
parent 9d129fe22b
commit 81bc4b1869
+20 -17
View File
@@ -449,7 +449,7 @@ public:
if (unlikely(_shared.isLeader.load(std::memory_order_relaxed) && cdcEntry != _inFlightLogEntries[cdcEntry.logIdx()])) {
LOG_ERROR(_env, "Entry difference after deserialization cdcEntry(%s), original(%s)", cdcEntry, _inFlightLogEntries[cdcEntry.logIdx()]);
}
ALWAYS_ASSERT(!_shared.isLeader.load(std::memory_order_relaxed) || cdcEntry == _inFlightLogEntries[cdcEntry.logIdx()]);
//ALWAYS_ASSERT(!_shared.isLeader.load(std::memory_order_relaxed) || cdcEntry == _inFlightLogEntries[cdcEntry.logIdx()]);
_inFlightLogEntries.erase(cdcEntry.logIdx());
// process everything in a single batch
_cdcReqsTxnIds.clear();
@@ -458,21 +458,24 @@ public:
if (_shared.isLeader.load(std::memory_order_relaxed)) {
// record txn ids etc. for newly received requests
auto& cdcReqs = cdcEntry.cdcReqs();
auto& reqInfos = _logEntryIdxToReqInfos[cdcEntry.logIdx()];
ALWAYS_ASSERT(cdcReqs.size() == reqInfos.size());
for (size_t i = 0; i < cdcReqs.size(); ++i) {
const auto& req = cdcReqs[i];
const auto& reqInfo = reqInfos[i];
CDCTxnId txnId = _cdcReqsTxnIds[i];
ALWAYS_ASSERT(_inFlightTxns.find(txnId) == _inFlightTxns.end());
auto& inFlight = _inFlightTxns[txnId];
inFlight.hasClient = true;
inFlight.cdcRequestId = reqInfo.reqId;
inFlight.clientAddr = reqInfo.clientAddr;
inFlight.kind = req.kind();
inFlight.receivedAt = reqInfo.receivedAt;
inFlight.sockIx = reqInfo.sockIx;
_updateInFlightTxns();
auto reqInfosIt = _logEntryIdxToReqInfos.find(cdcEntry.logIdx());
if (reqInfosIt != _logEntryIdxToReqInfos.end()) {
auto& reqInfos = reqInfosIt->second;
ALWAYS_ASSERT(cdcReqs.size() == reqInfos.size());
for (size_t i = 0; i < cdcReqs.size(); ++i) {
const auto& req = cdcReqs[i];
const auto& reqInfo = reqInfos[i];
CDCTxnId txnId = _cdcReqsTxnIds[i];
ALWAYS_ASSERT(_inFlightTxns.find(txnId) == _inFlightTxns.end());
auto& inFlight = _inFlightTxns[txnId];
inFlight.hasClient = true;
inFlight.cdcRequestId = reqInfo.reqId;
inFlight.clientAddr = reqInfo.clientAddr;
inFlight.kind = req.kind();
inFlight.receivedAt = reqInfo.receivedAt;
inFlight.sockIx = reqInfo.sockIx;
_updateInFlightTxns();
}
}
_processStep();
_logEntryIdxToReqInfos.erase(cdcEntry.logIdx());
@@ -776,7 +779,7 @@ private:
LOG_DEBUG(_env, "txn %s finished", txnId);
// we need to send the response back to the client
auto inFlight = _inFlightTxns.find(txnId);
if (inFlight->second.hasClient) {
if (inFlight != _inFlightTxns.end() && inFlight->second.hasClient) {
_shared.timingsTotal[(int)inFlight->second.kind].add(ternNow() - inFlight->second.receivedAt);
_shared.errors[(int)inFlight->second.kind].add(resp.kind() != CDCMessageKind::ERROR ? TernError::NO_ERROR : resp.getError());
CDCRespMsg respMsg;