mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2026-01-07 19:40:56 -06:00
core: various protocol changes
This commit is contained in:
committed by
GitHub Enterprise
parent
7aac745457
commit
2cd15fc0be
320
cpp/cdc/CDC.cpp
320
cpp/cdc/CDC.cpp
@@ -1,45 +1,33 @@
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <cstddef>
|
||||
#include <cstring>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <netinet/in.h>
|
||||
#include <netinet/ip.h>
|
||||
#include <sys/socket.h>
|
||||
#include <atomic>
|
||||
#include <fcntl.h>
|
||||
#include <sys/types.h>
|
||||
#include <unordered_map>
|
||||
#include <arpa/inet.h>
|
||||
#include <unordered_set>
|
||||
#include <map>
|
||||
#include <poll.h>
|
||||
|
||||
#include "Assert.hpp"
|
||||
#include "Bincode.hpp"
|
||||
#include "CDC.hpp"
|
||||
#include "CDCDB.hpp"
|
||||
#include "CDCKey.hpp"
|
||||
#include "Crypto.hpp"
|
||||
#include "Env.hpp"
|
||||
#include "ErrorCount.hpp"
|
||||
#include "Exception.hpp"
|
||||
#include "LogsDB.hpp"
|
||||
#include "Msgs.hpp"
|
||||
#include "MsgsGen.hpp"
|
||||
#include "Loop.hpp"
|
||||
#include "MultiplexedChannel.hpp"
|
||||
#include "Shard.hpp"
|
||||
#include "PeriodicLoop.hpp"
|
||||
#include "Protocol.hpp"
|
||||
#include "SharedRocksDB.hpp"
|
||||
#include "Time.hpp"
|
||||
#include "CDCDB.hpp"
|
||||
#include "Crypto.hpp"
|
||||
#include "CDCKey.hpp"
|
||||
#include "Shuckle.hpp"
|
||||
#include "Time.hpp"
|
||||
#include "Timings.hpp"
|
||||
#include "UDPSocketPair.hpp"
|
||||
#include "XmonAgent.hpp"
|
||||
#include "wyhash.h"
|
||||
#include "Xmon.hpp"
|
||||
#include "Timings.hpp"
|
||||
#include "PeriodicLoop.hpp"
|
||||
#include "Loop.hpp"
|
||||
#include "ErrorCount.hpp"
|
||||
#include "XmonAgent.hpp"
|
||||
|
||||
static constexpr uint8_t CDC_SOCK = 0;
|
||||
static constexpr uint8_t SHARD_SOCK = 1;
|
||||
@@ -332,7 +320,8 @@ public:
|
||||
uint64_t requestId = oldest->first;
|
||||
auto resp = _prepareCDCShardResp(requestId);
|
||||
ALWAYS_ASSERT(resp != nullptr); // must be there, we've just timed it out
|
||||
_recordCDCShardRespError(requestId, *resp, EggsError::TIMEOUT);
|
||||
resp->resp.setError() = EggsError::TIMEOUT;
|
||||
_recordCDCShardResp(requestId, *resp);
|
||||
++oldest;
|
||||
}
|
||||
}
|
||||
@@ -377,7 +366,7 @@ public:
|
||||
|
||||
if (_logsDB.isLeader()) {
|
||||
auto err = _logsDB.appendEntries(entries);
|
||||
ALWAYS_ASSERT(err == NO_ERROR);
|
||||
ALWAYS_ASSERT(err == EggsError::NO_ERROR);
|
||||
// we need to drop information about entries which might have been dropped due to append window being full
|
||||
bool foundLastInserted = false;
|
||||
for (auto it = entries.rbegin(); it != entries.rend(); ++it) {
|
||||
@@ -478,12 +467,10 @@ private:
|
||||
_shared.socks[CDC_SOCK].addr(),
|
||||
addrInfo,
|
||||
[&response,this](BincodeBuf& buf) {
|
||||
response.header.pack(buf);
|
||||
response.responseContainer.pack(buf);
|
||||
buf.packFixedBytes<8>({cbcmac(_expandedCDCKey, buf.data, buf.cursor - buf.data)});
|
||||
response.msg.pack(buf, _expandedCDCKey);
|
||||
});
|
||||
|
||||
LOG_DEBUG(_env, "will send response for req id %s kind %s to %s", response.header.requestId, response.header.kind, addrInfo);
|
||||
LOG_DEBUG(_env, "will send response for req id %s kind %s to %s", response.msg.id, response.msg.body.kind(), addrInfo);
|
||||
}
|
||||
|
||||
void _packLogsDBRequest(LogsDBRequest& request) {
|
||||
@@ -499,12 +486,10 @@ private:
|
||||
_shared.socks[CDC_SOCK].addr(),
|
||||
addrInfo,
|
||||
[&request,this](BincodeBuf& buf) {
|
||||
request.header.pack(buf);
|
||||
request.requestContainer.pack(buf);
|
||||
buf.packFixedBytes<8>({cbcmac(_expandedCDCKey, buf.data, buf.cursor - buf.data)});
|
||||
request.msg.pack(buf, _expandedCDCKey);
|
||||
});
|
||||
|
||||
LOG_DEBUG(_env, "will send request for req id %s kind %s to %s", request.header.requestId, request.header.kind, addrInfo);
|
||||
LOG_DEBUG(_env, "will send request for req id %s kind %s to %s", request.msg.id, request.msg.body.kind(), addrInfo);
|
||||
}
|
||||
void _updateInFlightTxns() {
|
||||
_shared.inFlightTxns = _shared.inFlightTxns*0.95 + ((double)_inFlightTxns.size())*0.05;
|
||||
@@ -557,10 +542,13 @@ private:
|
||||
return &resp;
|
||||
}
|
||||
|
||||
void _recordCDCShardRespError(uint64_t requestId, CDCShardResp& resp, EggsError err) {
|
||||
resp.err = err;
|
||||
_shared.shardErrors.add(err);
|
||||
if (resp.err == EggsError::TIMEOUT) {
|
||||
void _recordCDCShardResp(uint64_t requestId, CDCShardResp& resp) {
|
||||
resp.err = resp.resp.kind() != ShardMessageKind::ERROR ? EggsError::NO_ERROR : resp.resp.getError();
|
||||
_shared.shardErrors.add(resp.err);
|
||||
if (resp.err == EggsError::NO_ERROR) {
|
||||
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, process soon", requestId, resp.resp.kind());
|
||||
return;
|
||||
} else if (resp.err == EggsError::TIMEOUT) {
|
||||
LOG_DEBUG(_env, "txn %s shard req %s, timed out", resp.txnId, requestId);
|
||||
} else if (innocuousShardError(resp.err)) {
|
||||
LOG_DEBUG(_env, "txn %s shard req %s, finished with innocuous error %s", resp.txnId, requestId, resp.err);
|
||||
@@ -617,35 +605,14 @@ private:
|
||||
auto& req = requests.emplace_back();
|
||||
req.replicaId = replicaId;
|
||||
try {
|
||||
req.header.unpack(msg.buf);
|
||||
req.requestContainer.unpack(msg.buf, req.header.kind);
|
||||
req.msg.unpack(msg.buf, _expandedCDCKey);
|
||||
} catch (const BincodeException& err) {
|
||||
LOG_ERROR(_env, "could not parse: %s", err.what());
|
||||
RAISE_ALERT(_env, "could not parse LogsDBRequest from %s, dropping it.", msg.clientAddr);
|
||||
requests.pop_back();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (unlikely(msg.buf.remaining() < 8)) {
|
||||
LOG_ERROR(_env, "Could not parse LogsDBRequest of kind %s from %s, message signature is 8 bytes, only %s remaining", req.header.kind, msg.clientAddr, msg.buf.remaining());
|
||||
requests.pop_back();
|
||||
return;
|
||||
}
|
||||
|
||||
auto expectedMac = cbcmac(_expandedCDCKey, msg.buf.data, msg.buf.cursor - msg.buf.data);
|
||||
BincodeFixedBytes<8> receivedMac;
|
||||
msg.buf.unpackFixedBytes<8>(receivedMac);
|
||||
if (unlikely(expectedMac != receivedMac.data)) {
|
||||
LOG_ERROR(_env, "Incorrect signature for LogsDBRequest %s from %s", req.header.kind, msg.clientAddr);
|
||||
requests.pop_back();
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(msg.buf.remaining())) {
|
||||
LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBRequest %s from %s", msg.buf.remaining(), req.header.kind, msg.clientAddr);
|
||||
requests.pop_back();
|
||||
}
|
||||
LOG_DEBUG(_env, "Received request %s with requests id %s from replica id %s", req.header.kind, req.header.requestId, req.replicaId);
|
||||
LOG_DEBUG(_env, "Received request %s with requests id %s from replica id %s", req.msg.body.kind(), req.msg.id, req.replicaId);
|
||||
}
|
||||
for (auto& msg : responseMessages) {
|
||||
auto replicaId = _getReplicaId(msg.clientAddr);
|
||||
@@ -656,35 +623,14 @@ private:
|
||||
auto& resp = responses.emplace_back();
|
||||
resp.replicaId = replicaId;
|
||||
try {
|
||||
resp.header.unpack(msg.buf);
|
||||
resp.responseContainer.unpack(msg.buf, resp.header.kind);
|
||||
resp.msg.unpack(msg.buf, _expandedCDCKey);
|
||||
} catch (const BincodeException& err) {
|
||||
LOG_ERROR(_env, "could not parse: %s", err.what());
|
||||
RAISE_ALERT(_env, "could not parse LogsDBResponse from %s, dropping it.", msg.clientAddr);
|
||||
requests.pop_back();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (unlikely(msg.buf.remaining() < 8)) {
|
||||
LOG_ERROR(_env, "Could not parse LogsDBResponse of kind %s from %s, message signature is 8 bytes, only %s remaining", resp.header.kind, msg.clientAddr, msg.buf.remaining());
|
||||
requests.pop_back();
|
||||
return;
|
||||
}
|
||||
|
||||
auto expectedMac = cbcmac(_expandedCDCKey, msg.buf.data, msg.buf.cursor - msg.buf.data);
|
||||
BincodeFixedBytes<8> receivedMac;
|
||||
msg.buf.unpackFixedBytes<8>(receivedMac);
|
||||
if (unlikely(expectedMac != receivedMac.data)) {
|
||||
LOG_ERROR(_env, "Incorrect signature for LogsDBResponse %s from %s", resp.header.kind, msg.clientAddr);
|
||||
requests.pop_back();
|
||||
return;
|
||||
}
|
||||
|
||||
if (unlikely(msg.buf.remaining())) {
|
||||
LOG_ERROR(_env, "Malformed message. Extra %s bytes for LogsDBResponse %s from %s", msg.buf.remaining(), resp.header.kind, msg.clientAddr);
|
||||
requests.pop_back();
|
||||
}
|
||||
LOG_DEBUG(_env, "Received response %s with requests id %s from replica id %s", resp.header.kind, resp.header.requestId, resp.replicaId);
|
||||
LOG_DEBUG(_env, "Received response %s with requests id %s from replica id %s", resp.msg.body.kind(), resp.msg.id, resp.replicaId);
|
||||
}
|
||||
_logsDB.processIncomingMessages(requests, responses);
|
||||
}
|
||||
@@ -693,65 +639,38 @@ private:
|
||||
int startUpdateSize = _updateSize();
|
||||
for (auto& msg: _channel.protocolMessages(CDC_REQ_PROTOCOL_VERSION)) {
|
||||
// First, try to parse the header
|
||||
CDCRequestHeader reqHeader;
|
||||
CDCReqMsg cdcMsg;
|
||||
try {
|
||||
reqHeader.unpack(msg.buf);
|
||||
cdcMsg.unpack(msg.buf);
|
||||
} catch (const BincodeException& err) {
|
||||
LOG_ERROR(_env, "could not parse: %s", err.what());
|
||||
RAISE_ALERT(_env, "could not parse request header from %s, dropping it.", msg.clientAddr);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_DEBUG(_env, "received request id %s, kind %s", reqHeader.requestId, reqHeader.kind);
|
||||
|
||||
if (unlikely(reqHeader.kind == CDCMessageKind::CDC_SNAPSHOT)) {
|
||||
_processCDCSnapshotMessage(reqHeader, msg);
|
||||
RAISE_ALERT(_env, "could not parse request from %s, dropping it.", msg.clientAddr);
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_DEBUG(_env, "received request id %s, kind %s", cdcMsg.id, cdcMsg.body.kind());
|
||||
auto receivedAt = eggsNow();
|
||||
|
||||
// If we're already processing this request, drop it to try to not clog the queue
|
||||
if (_inFlightCDCReqs.contains(InFlightCDCRequestKey(reqHeader.requestId, msg.clientAddr))) {
|
||||
LOG_DEBUG(_env, "dropping req id %s from %s since it's already being processed", reqHeader.requestId, msg.clientAddr);
|
||||
if (unlikely(cdcMsg.body.kind() == CDCMessageKind::CDC_SNAPSHOT)) {
|
||||
_processCDCSnapshotMessage(cdcMsg, msg);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If this will be filled in with an actual code, it means that we couldn't process
|
||||
// the request.
|
||||
EggsError err = NO_ERROR;
|
||||
|
||||
// Now, try to parse the body
|
||||
auto& cdcReq = _cdcReqs.emplace_back();
|
||||
try {
|
||||
cdcReq.unpack(msg.buf, reqHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed request: %s", cdcReq);
|
||||
} catch (const BincodeException& exc) {
|
||||
LOG_ERROR(_env, "could not parse: %s", exc.what());
|
||||
RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, msg.clientAddr);
|
||||
err = EggsError::MALFORMED_REQUEST;
|
||||
// If we're already processing this request, drop it to try to not clog the queue
|
||||
if (_inFlightCDCReqs.contains(InFlightCDCRequestKey(cdcMsg.id, msg.clientAddr))) {
|
||||
LOG_DEBUG(_env, "dropping req id %s from %s since it's already being processed", cdcMsg.id, msg.clientAddr);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Make sure nothing is left
|
||||
if (err == NO_ERROR && msg.buf.remaining() != 0) {
|
||||
RAISE_ALERT(_env, "%s bytes remaining after parsing CDC request of kind %s from %s, will reply with error", msg.buf.remaining(), reqHeader.kind, msg.clientAddr);
|
||||
err = EggsError::MALFORMED_REQUEST;
|
||||
}
|
||||
auto& cdcReq = _cdcReqs.emplace_back(std::move(cdcMsg.body));
|
||||
|
||||
if (err == NO_ERROR) {
|
||||
LOG_DEBUG(_env, "CDC request %s successfully parsed, will process soon", cdcReq.kind());
|
||||
_cdcReqsInfo.emplace_back(CDCReqInfo{
|
||||
.reqId = reqHeader.requestId,
|
||||
.clientAddr = msg.clientAddr,
|
||||
.receivedAt = receivedAt,
|
||||
.sockIx = msg.socketIx,
|
||||
});
|
||||
} else {
|
||||
// We couldn't parse, reply immediately with an error
|
||||
RAISE_ALERT(_env, "request %s failed before enqueue with error %s", cdcReq.kind(), err);
|
||||
_packCDCResponseError(msg.socketIx, msg.clientAddr, reqHeader, err);
|
||||
_cdcReqs.pop_back(); // let's just forget all about this
|
||||
}
|
||||
LOG_DEBUG(_env, "CDC request %s successfully parsed, will process soon", cdcReq.kind());
|
||||
_cdcReqsInfo.emplace_back(CDCReqInfo{
|
||||
.reqId = cdcMsg.id,
|
||||
.clientAddr = msg.clientAddr,
|
||||
.receivedAt = receivedAt,
|
||||
.sockIx = msg.socketIx,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -759,80 +678,38 @@ private:
|
||||
for (auto& msg : _channel.protocolMessages(SHARD_RESP_PROTOCOL_VERSION)) {
|
||||
LOG_DEBUG(_env, "received response from shard");
|
||||
|
||||
ShardResponseHeader respHeader;
|
||||
ShardRespMsg respMsg;
|
||||
try {
|
||||
respHeader.unpack(msg.buf);
|
||||
respMsg.unpack(msg.buf);
|
||||
} catch (BincodeException err) {
|
||||
LOG_ERROR(_env, "could not parse: %s", err.what());
|
||||
RAISE_ALERT(_env, "could not parse response header, dropping response");
|
||||
RAISE_ALERT(_env, "could not parse response, dropping response");
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG_DEBUG(_env, "received response id %s, kind %s", respHeader.requestId, respHeader.kind);
|
||||
LOG_DEBUG(_env, "received response %s", respMsg);
|
||||
|
||||
// Note that below we just let the BincodeExceptions propagate upwards since we
|
||||
// control all the code in this codebase, and the header is good, and we're a
|
||||
// bit lazy.
|
||||
|
||||
auto shardResp = _prepareCDCShardResp(respHeader.requestId);
|
||||
auto shardResp = _prepareCDCShardResp(respMsg.id);
|
||||
if (shardResp == nullptr) {
|
||||
// we couldn't find it
|
||||
continue;
|
||||
}
|
||||
shardResp->resp = std::move(respMsg.body);
|
||||
|
||||
// We got an error
|
||||
if (respHeader.kind == (ShardMessageKind)0) {
|
||||
_recordCDCShardRespError(respHeader.requestId, *shardResp, msg.buf.unpackScalar<EggsError>());
|
||||
LOG_DEBUG(_env, "got error %s for response id %s", shardResp->err, respHeader.requestId);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Otherwise, parse the body
|
||||
shardResp->resp.unpack(msg.buf, respHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed shard response: %s", shardResp->resp);
|
||||
ALWAYS_ASSERT(msg.buf.remaining() == 0);
|
||||
_shared.shardErrors.add(NO_ERROR);
|
||||
|
||||
// If all went well, advance with the newly received request
|
||||
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, process soon", respHeader.requestId, respHeader.kind);
|
||||
_recordCDCShardResp(respMsg.id, *shardResp);
|
||||
}
|
||||
}
|
||||
|
||||
void _processCDCSnapshotMessage(CDCRequestHeader& reqHeader, UDPMessage& msg) {
|
||||
// If this will be filled in with an actual code, it means that we couldn't process
|
||||
// the request.
|
||||
EggsError err = NO_ERROR;
|
||||
|
||||
// Now, try to parse the body
|
||||
CDCReqContainer req;
|
||||
try {
|
||||
req.unpack(msg.buf, reqHeader.kind);
|
||||
LOG_DEBUG(_env, "parsed request: %s", req);
|
||||
} catch (const BincodeException& exc) {
|
||||
LOG_ERROR(_env, "could not parse: %s", exc.what());
|
||||
RAISE_ALERT(_env, "could not parse CDC request of kind %s from %s, will reply with error.", reqHeader.kind, msg.clientAddr);
|
||||
err = EggsError::MALFORMED_REQUEST;
|
||||
}
|
||||
|
||||
// Make sure nothing is left
|
||||
if (err == NO_ERROR && msg.buf.remaining() != 0) {
|
||||
RAISE_ALERT(_env, "%s bytes remaining after parsing CDC request of kind %s from %s, will reply with error", msg.buf.remaining(), reqHeader.kind, msg.clientAddr);
|
||||
err = EggsError::MALFORMED_REQUEST;
|
||||
}
|
||||
|
||||
if (err == NO_ERROR) {
|
||||
LOG_DEBUG(_env, "CDC request %s successfully parsed, processing", req.kind());
|
||||
err = _shared.sharedDb.snapshot(_basePath +"/snapshot-" + std::to_string(req.getCdcSnapshot().snapshotId));
|
||||
}
|
||||
|
||||
if (err == NO_ERROR) {
|
||||
CDCResponseHeader respHeader{reqHeader.requestId, reqHeader.kind};
|
||||
CDCRespContainer resp;
|
||||
resp.setCdcSnapshot();
|
||||
_packCDCResponse(msg.socketIx, msg.clientAddr, respHeader, resp);
|
||||
void _processCDCSnapshotMessage(CDCReqMsg& msg, const UDPMessage& udpMsg) {
|
||||
auto err = _shared.sharedDb.snapshot(_basePath +"/snapshot-" + std::to_string(msg.body.getCdcSnapshot().snapshotId));
|
||||
CDCRespMsg respMsg;
|
||||
respMsg.id = msg.id;
|
||||
if (err == EggsError::NO_ERROR) {
|
||||
respMsg.body.setCdcSnapshot();
|
||||
} else {
|
||||
_packCDCResponseError(msg.socketIx, msg.clientAddr, reqHeader, err);
|
||||
respMsg.body.setError() = err;
|
||||
}
|
||||
_packCDCResponse(udpMsg.socketIx, udpMsg.clientAddr, CDCMessageKind::CDC_SNAPSHOT, respMsg);
|
||||
}
|
||||
|
||||
#ifdef __clang__
|
||||
@@ -852,14 +729,12 @@ private:
|
||||
auto inFlight = _inFlightTxns.find(txnId);
|
||||
if (inFlight->second.hasClient) {
|
||||
_shared.timingsTotal[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt);
|
||||
_shared.errors[(int)inFlight->second.kind].add(resp.err);
|
||||
CDCResponseHeader respHeader(inFlight->second.cdcRequestId, inFlight->second.kind);
|
||||
if (resp.err != NO_ERROR) {
|
||||
_packCDCResponseError(inFlight->second.sockIx, inFlight->second.clientAddr, CDCRequestHeader(respHeader.requestId, respHeader.kind), resp.err);
|
||||
} else {
|
||||
LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr);
|
||||
_packCDCResponse(inFlight->second.sockIx, inFlight->second.clientAddr, respHeader, resp.resp);
|
||||
}
|
||||
_shared.errors[(int)inFlight->second.kind].add(resp.kind() != CDCMessageKind::ERROR ? EggsError::NO_ERROR : resp.getError());
|
||||
CDCRespMsg respMsg;
|
||||
respMsg.id = inFlight->second.cdcRequestId;
|
||||
respMsg.body = std::move(resp);
|
||||
LOG_DEBUG(_env, "sending response with req id %s, kind %s, back to %s", inFlight->second.cdcRequestId, inFlight->second.kind, inFlight->second.clientAddr);
|
||||
_packCDCResponse(inFlight->second.sockIx, inFlight->second.clientAddr, inFlight->second.kind, respMsg);
|
||||
_inFlightCDCReqs.erase(InFlightCDCRequestKey(inFlight->second.cdcRequestId, inFlight->second.clientAddr));
|
||||
}
|
||||
_inFlightTxns.erase(inFlight);
|
||||
@@ -869,8 +744,8 @@ private:
|
||||
for (const auto& [txnId, shardReq]: _step.runningTxns) {
|
||||
CDCShardReq prevReq;
|
||||
LOG_TRACE(_env, "txn %s needs shard %s, req %s", txnId, shardReq.shid, shardReq.req);
|
||||
// Header
|
||||
ShardRequestHeader shardReqHeader;
|
||||
ShardReqMsg shardReqMsg;
|
||||
|
||||
// Do not allocate new req id for repeated requests, so that we'll just accept
|
||||
// the first one that comes back. There's a chance for the txnId to not be here
|
||||
// yet: if we have just restarted the CDC. In this case we fill it in here, but
|
||||
@@ -882,55 +757,52 @@ private:
|
||||
req.hasClient = false;
|
||||
req.lastSentRequestId = _freshShardReqId();
|
||||
inFlightTxn = _inFlightTxns.emplace(txnId, req).first;
|
||||
shardReqHeader.requestId = req.lastSentRequestId;
|
||||
shardReqMsg.id = req.lastSentRequestId;
|
||||
_updateInFlightTxns();
|
||||
} else if (shardReq.repeated) {
|
||||
shardReqHeader.requestId = inFlightTxn->second.lastSentRequestId;
|
||||
shardReqMsg.id = inFlightTxn->second.lastSentRequestId;
|
||||
} else {
|
||||
shardReqHeader.requestId = _freshShardReqId();
|
||||
shardReqMsg.id = _freshShardReqId();
|
||||
}
|
||||
shardReqHeader.kind = shardReq.req.kind();
|
||||
shardReqMsg.body = shardReq.req;
|
||||
// Pack
|
||||
_shared.shardsMutex.lock();
|
||||
ShardInfo shardInfo = _shared.shards[shardReq.shid.u8];
|
||||
_shared.shardsMutex.unlock();
|
||||
|
||||
LOG_DEBUG(_env, "sending request for txn %s with req id %s to shard %s (%s)", txnId, shardReqHeader.requestId, shardReq.shid, shardInfo.addrs);
|
||||
auto& req = shardReq.req;
|
||||
_shardSender.prepareOutgoingMessage(_env, _shared.socks[SHARD_SOCK].addr(), shardInfo.addrs, [this, &shardReqHeader, &req](BincodeBuf& bbuf) {
|
||||
shardReqHeader.pack(bbuf);
|
||||
req.pack(bbuf);
|
||||
if (isPrivilegedRequestKind(req.kind())) {
|
||||
bbuf.packFixedBytes<8>({cbcmac(_expandedCDCKey, bbuf.data, bbuf.len())});
|
||||
LOG_DEBUG(_env, "sending request for txn %s with req id %s to shard %s (%s)", txnId, shardReqMsg.id, shardReq.shid, shardInfo.addrs);
|
||||
_shardSender.prepareOutgoingMessage(_env, _shared.socks[SHARD_SOCK].addr(), shardInfo.addrs, [this, &shardReqMsg](BincodeBuf& bbuf) {
|
||||
if (isPrivilegedRequestKind((uint8_t)shardReqMsg.body.kind())) {
|
||||
SignedShardReqMsg signedReq;
|
||||
signedReq.id = shardReqMsg.id;
|
||||
signedReq.body = std::move(shardReqMsg.body);
|
||||
signedReq.pack(bbuf, _expandedCDCKey);
|
||||
} else {
|
||||
shardReqMsg.pack(bbuf);
|
||||
}
|
||||
});
|
||||
// Record the in-flight req
|
||||
_inFlightShardReqs.insert(shardReqHeader.requestId, InFlightShardRequest{
|
||||
_inFlightShardReqs.insert(shardReqMsg.id, InFlightShardRequest{
|
||||
.txnId = txnId,
|
||||
.sentAt = eggsNow(),
|
||||
.shid = shardReq.shid,
|
||||
});
|
||||
inFlightTxn->second.lastSentRequestId = shardReqHeader.requestId;
|
||||
inFlightTxn->second.lastSentRequestId = shardReqMsg.id;
|
||||
}
|
||||
}
|
||||
|
||||
void _packCDCResponseError(int sockIx, const IpPort& clientAddr, const CDCRequestHeader& reqHeader, EggsError err) {
|
||||
LOG_DEBUG(_env, "will send error %s to %s", err, clientAddr);
|
||||
if (err != EggsError::DIRECTORY_NOT_EMPTY && err != EggsError::EDGE_NOT_FOUND) {
|
||||
RAISE_ALERT(_env, "request %s of kind %s from client %s failed with err %s", reqHeader.requestId, reqHeader.kind, clientAddr, err);
|
||||
void _packCDCResponse(int sockIx, const IpPort& clientAddr, CDCMessageKind reqKind, const CDCRespMsg& respMsg) {
|
||||
if (unlikely(respMsg.body.kind() == CDCMessageKind::ERROR)) {
|
||||
auto err = respMsg.body.getError();
|
||||
LOG_DEBUG(_env, "will send error %s to %s", err, clientAddr);
|
||||
if (err != EggsError::DIRECTORY_NOT_EMPTY && err != EggsError::EDGE_NOT_FOUND) {
|
||||
RAISE_ALERT(_env, "request %s of kind %s from client %s failed with err %s", respMsg.id, reqKind, clientAddr, err);
|
||||
}
|
||||
} else {
|
||||
LOG_DEBUG(_env, "will send response to CDC req %s, kind %s, to %s", respMsg.id, reqKind, clientAddr);
|
||||
}
|
||||
_cdcSender.prepareOutgoingMessage(_env, _shared.socks[CDC_SOCK].addr(), sockIx, clientAddr, [&reqHeader, err](BincodeBuf& respBbuf) {
|
||||
CDCResponseHeader(reqHeader.requestId, CDCMessageKind::ERROR).pack(respBbuf);
|
||||
respBbuf.packScalar<uint16_t>((uint16_t)err);
|
||||
respBbuf.len();
|
||||
});
|
||||
}
|
||||
|
||||
void _packCDCResponse(int sockIx, const IpPort& clientAddr, const CDCResponseHeader& respHeader, const CDCRespContainer& resp) {
|
||||
LOG_DEBUG(_env, "will send response to CDC req %s, kind %s, to %s", respHeader.requestId, respHeader.kind, clientAddr);
|
||||
_cdcSender.prepareOutgoingMessage(_env, _shared.socks[CDC_SOCK].addr(), sockIx, clientAddr, [&respHeader, &resp](BincodeBuf& respBbuf) {
|
||||
respHeader.pack(respBbuf);
|
||||
resp.pack(respBbuf);
|
||||
_cdcSender.prepareOutgoingMessage(_env, _shared.socks[CDC_SOCK].addr(), sockIx, clientAddr, [&respMsg](BincodeBuf& respBbuf) {
|
||||
respMsg.pack(respBbuf);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user