mirror of
https://github.com/XTXMarkets/ternfs.git
synced 2026-01-06 02:49:45 -06:00
cdc: differentiate replicas in xmon and in metrics
This commit is contained in:
@@ -1050,14 +1050,16 @@ public:
|
||||
struct CDCMetricsInserter : PeriodicLoop {
|
||||
private:
|
||||
CDCShared& _shared;
|
||||
ReplicaId _replicaId;
|
||||
XmonNCAlert _sendMetricsAlert;
|
||||
MetricsBuilder _metricsBuilder;
|
||||
std::unordered_map<std::string, uint64_t> _rocksDBStats;
|
||||
XmonNCAlert _updateSizeAlert;
|
||||
public:
|
||||
CDCMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, CDCShared& shared):
|
||||
CDCMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, CDCShared& shared, ReplicaId replicaId):
|
||||
PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}),
|
||||
_shared(shared),
|
||||
_replicaId(replicaId),
|
||||
_sendMetricsAlert(XmonAppType::DAYTIME, 1_mins),
|
||||
_updateSizeAlert(XmonAppType::NEVER)
|
||||
{}
|
||||
@@ -1078,6 +1080,7 @@ public:
|
||||
if (count == 0) { continue; }
|
||||
_metricsBuilder.measurement("eggsfs_cdc_requests");
|
||||
_metricsBuilder.tag("kind", kind);
|
||||
_metricsBuilder.tag("replica", _replicaId);
|
||||
if (i == 0) {
|
||||
_metricsBuilder.tag("error", "NO_ERROR");
|
||||
} else {
|
||||
@@ -1089,11 +1092,13 @@ public:
|
||||
}
|
||||
{
|
||||
_metricsBuilder.measurement("eggsfs_cdc_in_flight_txns");
|
||||
_metricsBuilder.tag("replica", _replicaId);
|
||||
_metricsBuilder.fieldFloat("count", _shared.inFlightTxns);
|
||||
_metricsBuilder.timestamp(now);
|
||||
}
|
||||
{
|
||||
_metricsBuilder.measurement("eggsfs_cdc_update");
|
||||
_metricsBuilder.tag("replica", _replicaId);
|
||||
_metricsBuilder.fieldFloat("size", _shared.updateSize);
|
||||
_metricsBuilder.timestamp(now);
|
||||
}
|
||||
@@ -1101,6 +1106,7 @@ public:
|
||||
uint64_t count = _shared.shardErrors.count[i].load();
|
||||
if (count == 0) { continue; }
|
||||
_metricsBuilder.measurement("eggsfs_cdc_shard_requests");
|
||||
_metricsBuilder.tag("replica", _replicaId);
|
||||
if (i == 0) {
|
||||
_metricsBuilder.tag("error", "NO_ERROR");
|
||||
} else {
|
||||
@@ -1114,6 +1120,7 @@ public:
|
||||
_shared.sharedDb.rocksDBMetrics(_rocksDBStats);
|
||||
for (const auto& [name, value]: _rocksDBStats) {
|
||||
_metricsBuilder.measurement("eggsfs_cdc_rocksdb");
|
||||
_metricsBuilder.tag("replica", _replicaId);
|
||||
_metricsBuilder.fieldU64(name, value);
|
||||
_metricsBuilder.timestamp(now);
|
||||
}
|
||||
@@ -1170,7 +1177,11 @@ void runCDC(const std::string& dbDir, CDCOptions& options) {
|
||||
// xmon first, so that by the time it shuts down it'll have all the leftover requests
|
||||
if (xmon) {
|
||||
XmonConfig config;
|
||||
config.appInstance = "eggscdc";
|
||||
{
|
||||
std::ostringstream ss;
|
||||
ss << "eggscdc" << options.replicaId;
|
||||
config.appInstance = ss.str();
|
||||
}
|
||||
config.appType = XmonAppType::CRITICAL;
|
||||
config.prod = options.xmonProd;
|
||||
|
||||
@@ -1205,7 +1216,7 @@ void runCDC(const std::string& dbDir, CDCOptions& options) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCStatsInserter>(logger, xmon, options, shared)));
|
||||
}
|
||||
if (options.metrics) {
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCMetricsInserter>(logger, xmon, shared)));
|
||||
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCMetricsInserter>(logger, xmon, shared, options.replicaId)));
|
||||
}
|
||||
|
||||
LoopThread::waitUntilStopped(threads);
|
||||
|
||||
Reference in New Issue
Block a user