core: common option parsing

This commit is contained in:
Miroslav Crnic
2025-09-11 22:15:06 +00:00
parent 8c75dd0d89
commit c29f3191d8
13 changed files with 676 additions and 637 deletions
+35 -40
View File
@@ -254,7 +254,6 @@ private:
// The _shard_ request we're currently waiting for, if any.
InFlightShardRequests _inFlightShardReqs;
const bool _noReplication;
LogsDB& _logsDB;
std::vector<LogsDBRequest> _logsDBRequests;
std::vector<LogsDBResponse> _logsDBResponses;
@@ -267,7 +266,7 @@ public:
CDCServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, CDCOptions& options, CDCShared& shared) :
Loop(logger, xmon, "req_server"),
_shared(shared),
_basePath(options.dbDir),
_basePath(options.logsDBOptions.dbDir),
_seenShards(false),
_currentLogIndex(_shared.db.lastAppliedLogEntry()),
// important to not catch stray requests from previous executions
@@ -275,7 +274,6 @@ public:
_shardTimeout(options.shardTimeout),
_receiver({.perSockMaxRecvMsg = MAX_MSG_RECEIVE, .maxMsgSize = MAX_UDP_MTU}),
_cdcSender({.maxMsgSize = MAX_UDP_MTU}),
_noReplication(options.noReplication),
_logsDB(shared.logsDB)
{
expandKey(CDCKey, _expandedCDCKey);
@@ -418,9 +416,7 @@ public:
for (auto request : _logsDBOutRequests) {
_packLogsDBRequest(*request);
}
if (_noReplication) {
_logsDB.processIncomingMessages(_logsDBRequests, _logsDBResponses);
}
_logsDB.readEntries(entries);
// Apply replicated log entries
@@ -845,8 +841,8 @@ public:
CDCShardUpdater(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared):
PeriodicLoop(logger, xmon, "shard_updater", {1_sec, 1_mins}),
_shared(shared),
_registryHost(options.registryHost),
_registryPort(options.registryPort),
_registryHost(options.registryClientOptions.host),
_registryPort(options.registryClientOptions.port),
_alert(10_sec)
{
_env.updateAlert(_alert, "Waiting to get shards");
@@ -899,12 +895,12 @@ public:
CDCRegisterer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const CDCOptions& options, CDCShared& shared):
PeriodicLoop(logger, xmon, "registerer", { 1_sec, 1_mins }),
_shared(shared),
_replicaId(options.replicaId),
_location(options.location),
_noReplication(options.noReplication),
_avoidBeingLeader(options.avoidBeingLeader),
_registryHost(options.registryHost),
_registryPort(options.registryPort),
_replicaId(options.logsDBOptions.replicaId),
_location(options.logsDBOptions.location),
_noReplication(options.logsDBOptions.noReplication),
_avoidBeingLeader(options.logsDBOptions.avoidBeingLeader),
_registryHost(options.registryClientOptions.host),
_registryPort(options.registryClientOptions.port),
_alert(10_sec)
{}
@@ -1162,50 +1158,49 @@ public:
void runCDC(CDCOptions& options) {
int logOutFd = STDOUT_FILENO;
if (!options.logFile.empty()) {
logOutFd = open(options.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644);
if (!options.logOptions.logFile.empty()) {
logOutFd = open(options.logOptions.logFile.c_str(), O_WRONLY|O_CREAT|O_APPEND, 0644);
if (logOutFd < 0) {
throw SYSCALL_EXCEPTION("open");
}
}
Logger logger(options.logLevel, logOutFd, options.syslog, true);
Logger logger(options.logOptions.logLevel, logOutFd, options.logOptions.syslog, true);
std::shared_ptr<XmonAgent> xmon;
if (!options.xmonAddr.empty()) {
if (!options.xmonOptions.addr.empty()) {
xmon = std::make_shared<XmonAgent>();
}
Env env(logger, xmon, "startup");
LOG_INFO(env, "Running CDC with options:");
LOG_INFO(env, " level = %s", options.logLevel);
LOG_INFO(env, " logFile = '%s'", options.logFile);
LOG_INFO(env, " replicaId = %s", options.replicaId);
LOG_INFO(env, " port = %s", options.port);
LOG_INFO(env, " registryHost = '%s'", options.registryHost);
LOG_INFO(env, " registryPort = %s", options.registryPort);
LOG_INFO(env, " cdcAddrs = %s", options.cdcAddrs);
LOG_INFO(env, " syslog = %s", (int)options.syslog);
LOG_INFO(env, " level = %s", options.logOptions.logLevel);
LOG_INFO(env, " logFile = '%s'", options.logOptions.logFile);
LOG_INFO(env, " replicaId = %s", options.logsDBOptions.replicaId);
LOG_INFO(env, " addrs = %s", options.serverOptions.addrs);
LOG_INFO(env, " registryHost = '%s'", options.registryClientOptions.host);
LOG_INFO(env, " registryPort = %s", options.registryClientOptions.port);
LOG_INFO(env, " cdcAddrs = %s", options.serverOptions.addrs);
LOG_INFO(env, " syslog = %s", (int)options.logOptions.syslog);
LOG_INFO(env, "Using LogsDB with options:");
LOG_INFO(env, " avoidBeingLeader = '%s'", (int)options.avoidBeingLeader);
LOG_INFO(env, " noReplication = '%s'", (int)options.noReplication);
LOG_INFO(env, " avoidBeingLeader = '%s'", (int)options.logsDBOptions.avoidBeingLeader);
LOG_INFO(env, " noReplication = '%s'", (int)options.logsDBOptions.noReplication);
std::vector<std::unique_ptr<LoopThread>> threads;
LoopThreads threads;
// xmon first, so that by the time it shuts down it'll have all the leftover requests
if (xmon) {
XmonConfig config;
{
std::ostringstream ss;
ss << "eggscdc_" << options.replicaId;
config.appInstance = ss.str();
ss << "eggscdc_" << options.logsDBOptions.replicaId;
options.xmonOptions.appInstance = ss.str();
}
config.appType = XmonAppType::CRITICAL;
config.addr = options.xmonAddr;
options.xmonOptions.appType = XmonAppType::CRITICAL;
threads.emplace_back(LoopThread::Spawn(std::make_unique<Xmon>(logger, xmon, config)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<Xmon>(logger, xmon, options.xmonOptions)));
}
SharedRocksDB sharedDb(logger, xmon, options.dbDir + "/db", options.dbDir + "/db-statistics.txt");
SharedRocksDB sharedDb(logger, xmon, options.logsDBOptions.dbDir + "/db", options.logsDBOptions.dbDir + "/db-statistics.txt");
sharedDb.registerCFDescriptors(LogsDB::getColumnFamilyDescriptors());
sharedDb.registerCFDescriptors(CDCDB::getColumnFamilyDescriptors());
@@ -1219,10 +1214,10 @@ void runCDC(CDCOptions& options) {
sharedDb.openTransactionDB(dbOptions);
CDCDB db(logger, xmon, sharedDb);
LogsDB logsDB(logger, xmon, sharedDb, options.replicaId, db.lastAppliedLogEntry(), options.noReplication, options.avoidBeingLeader);
LogsDB logsDB(logger, xmon, sharedDb, options.logsDBOptions.replicaId, db.lastAppliedLogEntry(), options.logsDBOptions.noReplication, options.logsDBOptions.avoidBeingLeader);
CDCShared shared(
sharedDb, db, logsDB,
std::array<UDPSocketPair, 2>({UDPSocketPair(env, options.cdcAddrs), UDPSocketPair(env, options.cdcToShardAddress)})
std::array<UDPSocketPair, 2>({UDPSocketPair(env, options.serverOptions.addrs), UDPSocketPair(env, options.cdcToShardAddress)})
);
LOG_INFO(env, "Spawning server threads");
@@ -1230,8 +1225,8 @@ void runCDC(CDCOptions& options) {
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCShardUpdater>(logger, xmon, options, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCServer>(logger, xmon, options, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCRegisterer>(logger, xmon, options, shared)));
if (options.influxDB) {
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCMetricsInserter>(logger, xmon, *options.influxDB, shared, options.replicaId)));
if (!options.metricsOptions.origin.empty()) {
threads.emplace_back(LoopThread::Spawn(std::make_unique<CDCMetricsInserter>(logger, xmon, options.metricsOptions, shared, options.logsDBOptions.replicaId)));
}
LoopThread::waitUntilStopped(threads);