Split CDC timings to distinguish queue time from exec time

This commit is contained in:
Francesco Mazzoli
2023-07-27 13:08:04 +00:00
parent fd132ca4b2
commit 6a52a961eb
4 changed files with 107 additions and 37 deletions

View File

@@ -37,7 +37,11 @@ struct CDCShared {
std::array<std::atomic<uint16_t>, 2> ownPorts;
std::mutex shardsMutex;
std::array<ShardInfo, 256> shards;
std::array<Timings, maxCDCMessageKind+1> timings;
// How long it took us to process the entire request, from parse to response.
std::array<Timings, maxCDCMessageKind+1> timingsTotal;
// How long it took to process the request, from when it exited the queue to
// when it finished executing.
std::array<Timings, maxCDCMessageKind+1> timingsProcess;
CDCShared(CDCDB& db_) : db(db_) {
for (auto& shard: shards) {
@@ -46,7 +50,8 @@ struct CDCShared {
ownPorts[0].store(0);
ownPorts[1].store(0);
for (CDCMessageKind kind : allCDCMessageKind) {
timings[(int)kind] = Timings::Standard();
timingsTotal[(int)kind] = Timings::Standard();
timingsProcess[(int)kind] = Timings::Standard();
}
}
};
@@ -87,6 +92,26 @@ private:
std::unordered_map<uint64_t, InFlightCDCRequest> _inFlightTxns;
// The _shard_ request we're currently waiting for, if any.
std::optional<InFlightShardRequest> _inFlightShardReq;
// This is just used to calculate the timings
uint64_t _runningTxn;
CDCMessageKind _runningTxnKind;
EggsTime _runningTxnStartedAt;
CDCStatus _status;
void _updateProcessTimings() {
if (_status.runningTxn != _runningTxn) { // we've got something new
EggsTime now = eggsNow();
if (_runningTxn != 0) { // something has finished running
_shared.timingsProcess[(int)_runningTxnKind].add(now - _runningTxnStartedAt);
_runningTxn = 0;
}
if (_status.runningTxn != 0) { // something has started running
_runningTxn = _status.runningTxn;
_runningTxnKind = _status.runningTxnKind;
_runningTxnStartedAt = now;
}
}
}
public:
CDCServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared) :
@@ -96,7 +121,8 @@ public:
_recvBuf(DEFAULT_UDP_MTU),
_sendBuf(DEFAULT_UDP_MTU),
_shardRequestIdCounter(0),
_shardTimeout(options.shardTimeout)
_shardTimeout(options.shardTimeout),
_runningTxn(0)
{
_currentLogIndex = _shared.db.lastAppliedLogEntry();
memset(&_socks[0], 0, sizeof(_socks));
@@ -183,7 +209,8 @@ public:
LOG_INFO(_env, "running on ports %s and %s", _shared.ownPorts[0].load(), _shared.ownPorts[1].load());
// If we've got a dangling transaction, immediately start processing it
_shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step);
_shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step, _status);
_updateProcessTimings();
_processStep(_step);
// Start processing CDC requests and shard responses
@@ -312,7 +339,8 @@ private:
if (err == NO_ERROR) {
// If things went well, process the request
LOG_DEBUG(_env, "CDC request %s successfully parsed, will now process", _cdcReqContainer.kind());
uint64_t txnId = _shared.db.processCDCReq(true, eggsNow(), _advanceLogIndex(), _cdcReqContainer, _step);
uint64_t txnId = _shared.db.processCDCReq(true, eggsNow(), _advanceLogIndex(), _cdcReqContainer, _step, _status);
_updateProcessTimings();
auto& inFlight = _inFlightTxns[txnId];
inFlight.cdcRequestId = reqHeader.requestId;
inFlight.clientAddr = clientAddr;
@@ -386,7 +414,8 @@ private:
// If all went well, advance with the newly received request
LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, will now process", respHeader.requestId, respHeader.kind);
_shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step);
_shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step, _status);
_updateProcessTimings();
_processStep(_step);
}
}
@@ -408,7 +437,8 @@ private:
} else {
RAISE_ALERT(_env, "got shard error %s from shard %s", err, shid);
}
_shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), err, nullptr, _step);
_shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), err, nullptr, _step, _status);
_updateProcessTimings();
_processStep(_step);
}
@@ -421,7 +451,7 @@ private:
if (inFlight == _inFlightTxns.end()) {
LOG_INFO(_env, "Could not find in-flight request %s, this might be because the CDC was restarted in the middle of a transaction.", step.txnFinished);
} else {
_shared.timings[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt);
_shared.timingsTotal[(int)inFlight->second.kind].add(eggsNow() - inFlight->second.receivedAt);
if (step.err != NO_ERROR) {
RAISE_ALERT(_env, "txn %s, req id %s, finished with error %s", step.txnFinished, inFlight->second.cdcRequestId, step.err);
_sendError(inFlight->second.sock, inFlight->second.cdcRequestId, step.err, inFlight->second.clientAddr);
@@ -481,7 +511,8 @@ private:
}
if (step.nextTxn != 0) {
LOG_DEBUG(_env, "we have txn %s lined up, starting it", step.nextTxn);
_shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step);
_shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step, _status);
_updateProcessTimings();
_processStep(_step);
}
}
@@ -712,15 +743,23 @@ public:
const auto insertCDCStats = [this, &stats]() {
std::string err;
for (CDCMessageKind kind : allCDCMessageKind) {
std::ostringstream prefix;
prefix << "cdc." << kind;
_shared.timings[(int)kind].toStats(prefix.str(), stats);
{
std::ostringstream prefix;
prefix << "cdc." << kind;
_shared.timingsTotal[(int)kind].toStats(prefix.str(), stats);
}
{
std::ostringstream prefix;
prefix << "cdc." << kind << ".process";
_shared.timingsProcess[(int)kind].toStats(prefix.str(), stats);
}
}
err = insertStats(_shuckleHost, _shucklePort, 10_sec, stats);
stats.clear();
if (err.empty()) {
for (CDCMessageKind kind : allCDCMessageKind) {
_shared.timings[(int)kind].reset();
_shared.timingsTotal[(int)kind].reset();
_shared.timingsProcess[(int)kind].reset();
}
}
return err;