See comments

This commit is contained in:
Francesco Mazzoli
2023-08-01 14:49:16 +00:00
committed by Francesco Mazzoli
parent a5eb12a262
commit fe2ce7aa17

View File

@@ -12,6 +12,7 @@
#include <thread>
#include <unordered_map>
#include <arpa/inet.h>
#include <unordered_set>
#include "Bincode.hpp"
#include "CDC.hpp"
@@ -71,6 +72,32 @@ struct InFlightCDCRequest {
int sock;
};
// these can happen through normal user interaction
static bool innocuousShardError(EggsError err) {
return err == EggsError::NAME_NOT_FOUND || err == EggsError::EDGE_NOT_FOUND;
}
struct InFlightCDCRequestKey {
uint64_t requestId;
uint32_t ip;
uint16_t port;
InFlightCDCRequestKey(uint64_t requestId_, struct sockaddr_in clientAddr_) :
requestId(requestId_), ip(clientAddr_.sin_addr.s_addr), port(clientAddr_.sin_port)
{}
bool operator==(const InFlightCDCRequestKey& other) const {
return requestId == other.requestId && ip == other.ip && port == other.port;
}
};
template <>
struct std::hash<InFlightCDCRequestKey> {
std::size_t operator()(const InFlightCDCRequestKey& key) const {
return key.requestId ^ (((uint64_t)key.port << 32) | ((uint64_t)key.ip));
}
};
struct CDCServer : Undertaker::Reapable {
private:
Env _env;
@@ -90,6 +117,13 @@ private:
// The requests we've enqueued, but haven't completed yet, with
// where to send the response. Indexed by txn id.
std::unordered_map<uint64_t, InFlightCDCRequest> _inFlightTxns;
// The enqueued requests, but indexed by req id + ip + port. We
// store this so that we can drop repeated requests which are
// still queued, and which will therefore be processed in due
// time anyway. This relies on clients having unique req ids. It's
// kinda unsafe anyway (if clients get restarted), but it's such
// a useful optimization for now that we live with it.
std::unordered_set<InFlightCDCRequestKey> _inFlightCDCReqs;
// The _shard_ request we're currently waiting for, if any.
std::optional<InFlightShardRequest> _inFlightShardReq;
// This is just used to calculate the timings
@@ -316,6 +350,12 @@ private:
LOG_DEBUG(_env, "received request id %s, kind %s", reqHeader.requestId, reqHeader.kind);
auto receivedAt = eggsNow();
// If we're already processing this request, drop it to try to not clog the queue
if (_inFlightCDCReqs.contains(InFlightCDCRequestKey(reqHeader.requestId, clientAddr))) {
LOG_DEBUG(_env, "dropping req id %s from %s since it's already being processed", reqHeader.requestId, clientAddr);
continue;
}
// If this will be filled in with an actual code, it means that we couldn't process
// the request.
EggsError err = NO_ERROR;
@@ -347,6 +387,7 @@ private:
inFlight.kind = reqHeader.kind;
inFlight.sock = sock;
inFlight.receivedAt = receivedAt;
_inFlightCDCReqs.insert(InFlightCDCRequestKey(reqHeader.requestId, clientAddr));
// Go forward
_processStep(_step);
} else {
@@ -421,8 +462,7 @@ private:
}
void _handleShardError(ShardId shid, EggsError err) {
if (err == EggsError::NAME_NOT_FOUND || err == EggsError::EDGE_NOT_FOUND) {
// these can happen through normal user interaction
if (innocuousShardError(err)) {
LOG_DEBUG(_env, "got innocuous shard error %s from shard %s", err, shid);
} else if (err == EggsError::DIRECTORY_HAS_OWNER || err == EggsError::TIMEOUT || err == EggsError::MISMATCHING_CREATION_TIME || err == EggsError::EDGE_NOT_FOUND) {
// These can happen but should be rare.
@@ -453,7 +493,9 @@ private:
} else {
_shared.timingsTotal[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt);
if (step.err != NO_ERROR) {
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err);
if (!innocuousShardError(step.err)) {
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err);
}
_sendError(inFlight->second.sock, inFlight->second.cdcRequestId, step.err, inFlight->second.clientAddr);
} else {
LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr);
@@ -463,6 +505,7 @@ private:
step.resp.pack(bbuf);
_send(inFlight->second.sock, inFlight->second.clientAddr, (const char*)bbuf.data, bbuf.len());
}
_inFlightCDCReqs.erase(InFlightCDCRequestKey(inFlight->second.cdcRequestId, inFlight->second.clientAddr));
_inFlightTxns.erase(inFlight);
}
}