Use an EMA for the in-flight CDC txns as well

This commit is contained in:
Francesco Mazzoli
2023-11-29 16:28:11 +00:00
parent d7da3e82c8
commit 3eae5bbf9b
+5 -17
View File
@@ -43,12 +43,10 @@ struct CDCShared {
// How long it took us to process the entire request, from parse to response.
std::array<Timings, maxCDCMessageKind+1> timingsTotal;
std::array<ErrorCount, maxCDCMessageKind+1> errors;
// right now we have a max of 200req/s and we send the metrics every minute or so, so
// this should cover us for at least 1.5mins. Power of two is good for mod.
std::array<std::atomic<size_t>, 0x3FFF> inFlightTxnsWindow;
std::atomic<double> inFlightTxns;
ErrorCount shardErrors;
CDCShared(CDCDB& db_) : db(db_) {
CDCShared(CDCDB& db_) : db(db_), inFlightTxns(0) {
for (auto& shard: shards) {
memset(&shard, 0, sizeof(shard));
}
@@ -57,9 +55,6 @@ struct CDCShared {
for (CDCMessageKind kind : allCDCMessageKind) {
timingsTotal[(int)kind] = Timings::Standard();
}
for (int i = 0; i < inFlightTxnsWindow.size(); i++) {
inFlightTxnsWindow[i] = 0;
}
}
};
@@ -201,7 +196,6 @@ private:
// The requests we've enqueued, but haven't completed yet, with
// where to send the response. Indexed by txn id.
std::unordered_map<CDCTxnId, InFlightCDCRequest> _inFlightTxns;
uint64_t _inFlightTxnsWindowCursor;
// The enqueued requests, but indexed by req id + ip + port. We
// store this so that we can drop repeated requests which are
// still queued, and which will therefore be processed in due
@@ -223,8 +217,7 @@ public:
_maxUpdateSize(500),
// important to not catch stray requests from previous executions
_shardRequestIdCounter(wyhash64_rand()),
_shardTimeout(options.shardTimeout),
_inFlightTxnsWindowCursor(0)
_shardTimeout(options.shardTimeout)
{
_currentLogIndex = _shared.db.lastAppliedLogEntry();
expandKey(CDCKey, _expandedCDCKey);
@@ -308,8 +301,7 @@ public:
private:
void _updateInFlightTxns() {
_shared.inFlightTxnsWindow[_inFlightTxnsWindowCursor%_shared.inFlightTxnsWindow.size()].store(_inFlightTxns.size());
_inFlightTxnsWindowCursor++;
_shared.inFlightTxns = _shared.inFlightTxns*0.95 + ((double)_inFlightTxns.size())*0.05;
}
bool _waitForShards() {
@@ -855,11 +847,7 @@ public:
}
{
_metricsBuilder.measurement("eggsfs_cdc_in_flight_txns");
uint64_t sum = 0;
for (size_t x: _shared.inFlightTxnsWindow) {
sum += x;
}
_metricsBuilder.fieldFloat("size", (double)sum / (double)_shared.inFlightTxnsWindow.size());
_metricsBuilder.fieldFloat("count", _shared.inFlightTxns);
_metricsBuilder.timestamp(now);
}
for (int i = 0; i < _shared.shardErrors.count.size(); i++) {