From fe2ce7aa17dfe2bfd29cdb50a5385014257191f1 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Tue, 1 Aug 2023 14:49:16 +0000 Subject: [PATCH] See comments --- cpp/cdc/CDC.cpp | 49 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index 1873a449..89557369 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include "Bincode.hpp" #include "CDC.hpp" @@ -71,6 +72,32 @@ struct InFlightCDCRequest { int sock; }; +// these can happen through normal user interaction +static bool innocuousShardError(EggsError err) { + return err == EggsError::NAME_NOT_FOUND || err == EggsError::EDGE_NOT_FOUND; +} + +struct InFlightCDCRequestKey { + uint64_t requestId; + uint32_t ip; + uint16_t port; + + InFlightCDCRequestKey(uint64_t requestId_, struct sockaddr_in clientAddr_) : + requestId(requestId_), ip(clientAddr_.sin_addr.s_addr), port(clientAddr_.sin_port) + {} + + bool operator==(const InFlightCDCRequestKey& other) const { + return requestId == other.requestId && ip == other.ip && port == other.port; + } +}; + +template <> +struct std::hash { + std::size_t operator()(const InFlightCDCRequestKey& key) const { + return key.requestId ^ (((uint64_t)key.port << 32) | ((uint64_t)key.ip)); + } +}; + struct CDCServer : Undertaker::Reapable { private: Env _env; @@ -90,6 +117,13 @@ private: // The requests we've enqueued, but haven't completed yet, with // where to send the response. Indexed by txn id. std::unordered_map _inFlightTxns; + // The enqueued requests, but indexed by req id + ip + port. We + // store this so that we can drop repeated requests which are + // still queued, and which will therefore be processed in due + // time anyway. This relies on clients having unique req ids. It's + // kinda unsafe anyway (if clients get restarted), but it's such + // a useful optimization for now that we live with it. + std::unordered_set _inFlightCDCReqs; // The _shard_ request we're currently waiting for, if any. std::optional _inFlightShardReq; // This is just used to calculate the timings @@ -316,6 +350,12 @@ private: LOG_DEBUG(_env, "received request id %s, kind %s", reqHeader.requestId, reqHeader.kind); auto receivedAt = eggsNow(); + // If we're already processing this request, drop it to try to not clog the queue + if (_inFlightCDCReqs.contains(InFlightCDCRequestKey(reqHeader.requestId, clientAddr))) { + LOG_DEBUG(_env, "dropping req id %s from %s since it's already being processed", reqHeader.requestId, clientAddr); + continue; + } + // If this will be filled in with an actual code, it means that we couldn't process // the request. EggsError err = NO_ERROR; @@ -347,6 +387,7 @@ private: inFlight.kind = reqHeader.kind; inFlight.sock = sock; inFlight.receivedAt = receivedAt; + _inFlightCDCReqs.insert(InFlightCDCRequestKey(reqHeader.requestId, clientAddr)); // Go forward _processStep(_step); } else { @@ -421,8 +462,7 @@ private: } void _handleShardError(ShardId shid, EggsError err) { - if (err == EggsError::NAME_NOT_FOUND || err == EggsError::EDGE_NOT_FOUND) { - // these can happen through normal user interaction + if (innocuousShardError(err)) { LOG_DEBUG(_env, "got innocuous shard error %s from shard %s", err, shid); } else if (err == EggsError::DIRECTORY_HAS_OWNER || err == EggsError::TIMEOUT || err == EggsError::MISMATCHING_CREATION_TIME || err == EggsError::EDGE_NOT_FOUND) { // These can happen but should be rare. @@ -453,7 +493,9 @@ private: } else { _shared.timingsTotal[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt); if (step.err != NO_ERROR) { - RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err); + if (!innocuousShardError(step.err)) { + RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err); + } _sendError(inFlight->second.sock, inFlight->second.cdcRequestId, step.err, inFlight->second.clientAddr); } else { LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr); @@ -463,6 +505,7 @@ private: step.resp.pack(bbuf); _send(inFlight->second.sock, inFlight->second.clientAddr, (const char*)bbuf.data, bbuf.len()); } + _inFlightCDCReqs.erase(InFlightCDCRequestKey(inFlight->second.cdcRequestId, inFlight->second.clientAddr)); _inFlightTxns.erase(inFlight); } }