diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index e76bb595..3e038543 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -45,6 +45,9 @@ struct CDCShared { // when it finished executing. std::array timingsProcess; std::array errors; + // right now we have a max of 200req/s and we send the metrics every minute or so, so + // this should cover us for at least 1.5mins. Power of two is good for mod. + std::array, 0x3FFF> inFlightTxnsWindow; CDCShared(CDCDB& db_) : db(db_) { for (auto& shard: shards) { @@ -56,6 +59,9 @@ struct CDCShared { timingsTotal[(int)kind] = Timings::Standard(); timingsProcess[(int)kind] = Timings::Standard(); } + for (int i = 0; i < inFlightTxnsWindow.size(); i++) { + inFlightTxnsWindow[i] = 0; + } } }; @@ -133,6 +139,7 @@ private: // The requests we've enqueued, but haven't completed yet, with // where to send the response. Indexed by txn id. std::unordered_map _inFlightTxns; + uint64_t _inFlightTxnsWindowCursor; // The enqueued requests, but indexed by req id + ip + port. We // store this so that we can drop repeated requests which are // still queued, and which will therefore be processed in due @@ -174,6 +181,7 @@ public: _shardRequestIdCounter(0), _shardTimeout(options.shardTimeout), _maximumEnqueuedRequests(options.maximumEnqueuedRequests), + _inFlightTxnsWindowCursor(0), _runningTxn(0) { _currentLogIndex = _shared.db.lastAppliedLogEntry(); @@ -226,6 +234,11 @@ public: } private: + void _updateInFlightTxns() { + _shared.inFlightTxnsWindow[_inFlightTxnsWindowCursor%_shared.inFlightTxnsWindow.size()].store(_inFlightTxns.size()); + _inFlightTxnsWindowCursor++; + } + bool _waitForShards() { bool badShard = false; { @@ -394,6 +407,7 @@ private: inFlight.kind = reqHeader.kind; inFlight.sock = sock; inFlight.receivedAt = receivedAt; + _updateInFlightTxns(); _inFlightCDCReqs.insert(InFlightCDCRequestKey(reqHeader.requestId, clientAddr)); // Go forward _processStep(_step); @@ -509,6 +523,7 @@ private: } _inFlightCDCReqs.erase(InFlightCDCRequestKey(inFlight->second.cdcRequestId, inFlight->second.clientAddr)); _inFlightTxns.erase(inFlight); + _updateInFlightTxns(); } } if (step.txnNeedsShard != 0) { @@ -772,6 +787,15 @@ public: _metricsBuilder.timestamp(now); } } + { + _metricsBuilder.measurement("eggsfs_cdc_queue"); + uint64_t sum = 0; + for (size_t x: _shared.inFlightTxnsWindow) { + sum += x; + } + _metricsBuilder.fieldFloat("size", (double)sum / (double)_shared.inFlightTxnsWindow.size()); + _metricsBuilder.timestamp(now); + } std::string err = sendMetrics(10_sec, _metricsBuilder.payload()); _metricsBuilder.reset(); if (err.empty()) { diff --git a/cpp/core/Metrics.hpp b/cpp/core/Metrics.hpp index 783fb6f2..6df1fa6b 100644 --- a/cpp/core/Metrics.hpp +++ b/cpp/core/Metrics.hpp @@ -45,6 +45,12 @@ public: fieldRaw(name, ss.str()); } + void fieldFloat(const std::string& name, double x) { + std::ostringstream ss; + ss << x; + fieldRaw(name, ss.str()); + } + void timestamp(EggsTime t); };