shard: support multiple reader threads

This commit is contained in:
Miroslav Crnic
2025-10-16 12:39:11 +01:00
committed by GitHub
parent 8cec8bcf6b
commit 924e75674f
6 changed files with 178 additions and 110 deletions

View File

@@ -36,7 +36,7 @@ public:
++_currentArg;
return *this;
}
std::string getArg() {
auto res = peekArg();
next();
@@ -87,7 +87,7 @@ static inline bool parseLogOptions(CommandLineArgs& args, LogOptions& options) {
args.dieWithUsage();
}
return true;
}
}
if (arg == "-log-file") {
options.logFile = args.next().getArg();
return true;
@@ -105,13 +105,13 @@ static inline void printLogOptionsUsage() {
fprintf(stderr, " Same as '-log-level debug'.\n");
}
static inline bool validateLogOptions(const LogOptions& options) {
static inline bool validateLogOptions(const LogOptions& options) {
#ifndef TERN_DEBUG
if (options.logLevel <= LogLevel::LOG_TRACE) {
fprintf(stderr,"Cannot use trace for non-debug builds (it won't work).\n");
return false;
}
#endif
#endif
return true;
}
@@ -165,8 +165,8 @@ static inline void printMetricsOptionsUsage() {
fprintf(stderr, " Enable metrics.\n");
}
static inline bool validateMetricsOptions(const MetricsOptions& options) {
if (options.origin.empty() != options.org.empty() ||
static inline bool validateMetricsOptions(const MetricsOptions& options) {
if (options.origin.empty() != options.org.empty() ||
options.origin.empty() != options.bucket.empty())
{
fprintf(stderr, "either all or none of the -influx-db flags must be provided\n");
@@ -175,7 +175,7 @@ static inline bool validateMetricsOptions(const MetricsOptions& options) {
return true;
}
// RegistryClientOptions
// RegistryClientOptions
struct RegistryClientOptions {
std::string host;
@@ -201,7 +201,7 @@ static inline void printRegistryClientOptionsUsage() {
fprintf(stderr, " How to reach registry\n");
}
static inline bool validateRegistryClientOptions(const RegistryClientOptions& options) {
static inline bool validateRegistryClientOptions(const RegistryClientOptions& options) {
if (options.host.empty()) {
fprintf(stderr, "-registry needs to be set\n");
return false;
@@ -284,7 +284,7 @@ static inline void printLogsDBOptionsUsage() {
fprintf(stderr, " Which location we are running as [0-255]. Default is 0\n");
}
static inline bool validateLogsDBOptions(const LogsDBOptions& options) {
static inline bool validateLogsDBOptions(const LogsDBOptions& options) {
if (options.dbDir.empty()) {
fprintf(stderr, "-db-dir needs to be set\n");
return false;
@@ -404,7 +404,7 @@ static inline void printServerOptionsUsage() {
fprintf(stderr, " Drop given ratio of packets after processing them.\n");
}
static inline bool validateServerOptions(const ServerOptions& options) {
static inline bool validateServerOptions(const ServerOptions& options) {
if (options.numAddressesFound == 0) {
fprintf(stderr, "at least one -addr needs to be defined\n");
return false;
@@ -425,3 +425,13 @@ static inline uint32_t parseUint32(CommandLineArgs& args) {
}
return static_cast<uint32_t>(x);
}
static inline uint32_t parseUint16(CommandLineArgs& args) {
size_t processed;
auto arg = args.getArg();
uint64_t x = std::stoull(arg, &processed);
if (processed != arg.size() || x > std::numeric_limits<uint16_t>::max()) {
fprintf(stderr, "Invalid argument '%s', expecting an unsigned 16-bit integer\n", arg.c_str());
}
return static_cast<uint32_t>(x);
}

View File

@@ -7,6 +7,7 @@
#include <cerrno>
#include <cstdint>
#include <ctime>
#include <iterator>
#include <vector>
#include <stdint.h>
#include <atomic>
@@ -125,15 +126,24 @@ public:
// pushed. First element in `els` gets pushed first. Returns 0
// if the queue is closed.
uint32_t push(std::vector<A>& els) {
auto begin = els.begin();
auto end = els.end();
return std::distance(begin, push(begin,end));
}
// Tries to push all the elements. Returns iterator past last pushed element
// Returns begin iterator if nothing is pushed or queue is closed
typename std::vector<A>::iterator push(typename std::vector<A>::iterator begin, typename std::vector<A>::iterator end) {
uint32_t sz = _size.load(std::memory_order_relaxed);
// queue is closed
if (unlikely(sz & (1ull<<31))) { return 0; }
if (unlikely(sz & (1ull<<31))) { return begin; }
// push as much as possible
uint32_t toPush = std::min<uint64_t>(els.size(), _maxSize-sz);
uint32_t toPush = std::min<uint64_t>(std::distance(begin,end), _maxSize-sz);
for (uint32_t i = 0; i < toPush; i++, _head++) {
_elements[_head&_sizeMask] = std::move(els[i]);
_elements[_head&_sizeMask] = std::move(*begin);
++begin;
}
// update size and wake up puller, if necessary
@@ -152,7 +162,7 @@ public:
}
}
return toPush;
return begin;
}
// Drains at least one element, blocking if there are no elements,

View File

@@ -87,7 +87,7 @@ struct ShardShared {
SPSC<ProxyLogsDBResponse, true> proxyLogsDBResponseQueue;
SPSC<ShardReq, true> writerShardReqQueue;
SPSC<ProxyShardRespMsg, true> writerProxyShardRespQueue;
SPSC<ShardReq> readerRequestsQueue;
std::vector<std::unique_ptr<SPSC<ShardReq>>> readerRequestsQueues;
// databases and caches
SharedRocksDB& sharedDB;
@@ -110,15 +110,25 @@ struct ShardShared {
std::atomic<double> writerProxyRespQueueSize;
std::atomic<double> readerRequestQueueSize;
std::array<std::atomic<double>, 2> receivedRequests; // how many requests we got at once from each socket
// how many requests we got from write queue
// how many requests we pulled from queues
std::atomic<double> pulledLogsDBRequests;
std::atomic<double> pulledLogsDBResponses;
std::atomic<double> pulledProxyLogsDBRequests;
std::atomic<double> pulledProxyLogsDBResponses;
std::atomic<double> pulledWriteRequests;
std::atomic<double> pulledProxyWriteResponses;
std::atomic<double> pulledReadRequests; // how many requests we got from read queue
std::atomic<double> pulledReadRequests;
// how many requests we dropped
std::atomic<double> droppedLogsDBRequests;
std::atomic<double> droppedLogsDBResponses;
std::atomic<double> droppedProxyLogsDBRequests;
std::atomic<double> droppedProxyLogsDBResponses;
std::atomic<double> droppedWriteRequests;
std::atomic<double> droppedProxyWriteResponses;
std::atomic<double> droppedReadRequests;
// we should get up to date information from registry before we start serving any requests
// this is populated by ShardRegisterer
@@ -135,7 +145,6 @@ struct ShardShared {
proxyLogsDBResponseQueue(WRITER_QUEUE_SIZE, writeQueuesWaiter),
writerShardReqQueue(WRITER_QUEUE_SIZE, writeQueuesWaiter),
writerProxyShardRespQueue(WRITER_QUEUE_SIZE, writeQueuesWaiter),
readerRequestsQueue(READER_QUEUE_SIZE),
sharedDB(sharedDB_),
logsDB(logsDB_),
shardDB(shardDB_),
@@ -156,9 +165,19 @@ struct ShardShared {
pulledWriteRequests(0),
pulledProxyWriteResponses(0),
pulledReadRequests(0),
droppedLogsDBRequests(0),
droppedLogsDBResponses(0),
droppedProxyLogsDBRequests(0),
droppedProxyLogsDBResponses(0),
droppedWriteRequests(0),
droppedProxyWriteResponses(0),
droppedReadRequests(0),
isInitiated(false),
isBlockServiceCacheInitiated(false)
{
for (uint16_t i = 0; i < options_.numReaders; ++i) {
readerRequestsQueues.emplace_back(std::make_unique<SPSC<ShardReq>>(READER_QUEUE_SIZE));
}
for (ShardMessageKind kind : allShardMessageKind) {
timings[(int)kind] = Timings::Standard();
}
@@ -307,12 +326,15 @@ private:
// read requests buffer
std::vector<ShardReq> _readRequests;
std::vector<std::unique_ptr<SPSC<ShardReq>>>::iterator _readQueueIt;
std::unique_ptr<UDPReceiver<1>> _receiver;
std::unique_ptr<ShardChannel> _channel;
public:
ShardServer(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardShared& shared) :
Loop(logger, xmon, "server"),
_shared(shared)
_shared(shared),
_readQueueIt(_shared.readerRequestsQueues.begin())
{
auto convertProb = [this](const std::string& what, double prob, uint64_t& iprob) {
if (prob != 0.0) {
@@ -366,7 +388,7 @@ private:
LOG_ERROR(_env, "Could not parse LogsDBResponse: %s", err.what());
if (proxyLocation) {
_proxyLogsDBResponses.pop_back();
} else {
} else {
_logsDBResponses.pop_back();
}
return;
@@ -411,7 +433,7 @@ private:
LOG_ERROR(_env, "Could not parse LogsDBRequest: %s", err.what());
if (proxyLocation) {
_proxyLogsDBRequests.pop_back();
} else {
} else {
_logsDBRequests.pop_back();
}
return;
@@ -571,24 +593,50 @@ public:
}
// write out write requests to queues
_shared.logsDBRequestQueue.push(_logsDBRequests);
auto pushed = _shared.logsDBRequestQueue.push(_logsDBRequests);
_shared.logsDBRequestQueueSize = _shared.logsDBRequestQueueSize*0.95 + _shared.logsDBRequestQueue.size()*0.05;
_shared.logsDBResponseQueue.push(_logsDBResponses);
_shared.droppedLogsDBRequests = _shared.droppedLogsDBRequests*0.95 + (_logsDBRequests.size()-pushed)*0.05;
pushed = _shared.logsDBResponseQueue.push(_logsDBResponses);
_shared.logsDBResponseQueueSize = _shared.logsDBResponseQueueSize*0.95 + _shared.logsDBResponseQueue.size()*0.05;
_shared.droppedLogsDBResponses = _shared.droppedLogsDBResponses*0.95 + (_logsDBResponses.size()-pushed)*0.05;
_shared.proxyLogsDBRequestQueue.push(_proxyLogsDBRequests);
pushed = _shared.proxyLogsDBRequestQueue.push(_proxyLogsDBRequests);
_shared.proxyLogsDBRequestQueueSize = _shared.proxyLogsDBRequestQueueSize*0.95 + _shared.proxyLogsDBRequestQueue.size()*0.05;
_shared.droppedProxyLogsDBRequests = _shared.droppedProxyLogsDBRequests*0.95 + (_proxyLogsDBRequests.size()-pushed)*0.05;
_shared.proxyLogsDBResponseQueue.push(_proxyLogsDBResponses);
pushed = _shared.proxyLogsDBResponseQueue.push(_proxyLogsDBResponses);
_shared.proxyLogsDBResponseQueueSize = _shared.proxyLogsDBResponseQueueSize*0.95 + _shared.proxyLogsDBResponseQueue.size()*0.05;
_shared.droppedProxyLogsDBResponses = _shared.droppedProxyLogsDBResponses*0.95 + (_proxyLogsDBResponses.size()-pushed)*0.05;
_shared.writerShardReqQueue.push(_writeReqs);
pushed = _shared.writerShardReqQueue.push(_writeReqs);
_shared.writerRequestQueueSize = _shared.writerRequestQueueSize*0.95 + _shared.writerShardReqQueue.size()*0.05;
_shared.droppedWriteRequests = _shared.droppedWriteRequests*0.95 + (_writeReqs.size()-pushed)*0.05;
// write out read requests to queue
_shared.readerRequestsQueue.push(_readRequests);
_shared.readerRequestQueueSize = _shared.readerRequestQueueSize*0.95 + _shared.readerRequestsQueue.size()*0.05;
auto readIt = _readRequests.begin();
auto readQueueIt = _readQueueIt;
for(;;) {
readIt = (*readQueueIt)->push(readIt, _readRequests.end());
if (readIt == _readRequests.end()) {
break;
}
if (++readQueueIt == _shared.readerRequestsQueues.end()) {
readQueueIt = _shared.readerRequestsQueues.begin();
}
if (readQueueIt == _readQueueIt) {
break;
}
}
auto dropped = std::distance(readIt, _readRequests.end());
pushed = _readRequests.size() - dropped;
size_t readQueuesSize = 0;
for (const auto& q : _shared.readerRequestsQueues) {
readQueuesSize += q->size();
}
_shared.readerRequestQueueSize = _shared.readerRequestQueueSize*0.95 + readQueuesSize*0.05;
_shared.droppedReadRequests = _shared.droppedReadRequests*0.95 + dropped*0.05;
}
};
@@ -1489,7 +1537,7 @@ public:
auto start = ternNow();
// the order here determins priority of processing when there is more work than we can handle at once
// we prioritize LogsDB requests and responses as they make us progress state
remainingPullBudget -= _shared.logsDBResponseQueue.pull(_logsDBResponses, remainingPullBudget);
_shared.pulledLogsDBResponses = _shared.pulledLogsDBResponses*0.95 + ((double)_logsDBResponses.size())*0.05;
@@ -1501,7 +1549,7 @@ public:
_shared.pulledProxyLogsDBResponses = _shared.pulledProxyLogsDBResponses*0.95 + ((double)_proxyLogsDBResponses.size())*0.05;
remainingPullBudget -= _shared.proxyLogsDBRequestQueue.pull(_proxyLogsDBRequests, remainingPullBudget);
_shared.pulledProxyLogsDBRequests = _shared.pulledProxyLogsDBRequests*0.95 + ((double)_proxyLogsDBRequests.size())*0.05;
// then shard reponses as these are responses for requests we sent to primary location
remainingPullBudget -= _shared.writerProxyShardRespQueue.pull(_shardResponses, remainingPullBudget);
_shared.pulledProxyWriteResponses = _shared.pulledProxyWriteResponses*0.95 + ((double)_shardResponses.size())*0.05;
@@ -1509,7 +1557,7 @@ public:
// last are new requests from local clients and then proxy locations
remainingPullBudget -= _shared.writerShardReqQueue.pull(_shardRequests, remainingPullBudget);
_shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)_shardRequests.size())*0.05;
logsDBStep();
auto loopTime = ternNow() - start;
}
@@ -1519,6 +1567,7 @@ struct ShardReader : Loop {
private:
ShardShared& _shared;
SPSC<ShardReq>& _queue;
AES128Key _expandedShardKey;
AES128Key _expandedCDCKey;
ShardRespMsg _respContainer;
@@ -1530,13 +1579,14 @@ private:
uint64_t _outgoingPacketDropProbability; // probability * 10,000
virtual void sendStop() override {
_shared.readerRequestsQueue.close();
_queue.close();
}
public:
ShardReader(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardShared& shared) :
ShardReader(Logger& logger, std::shared_ptr<XmonAgent>& xmon, ShardShared& shared, SPSC<ShardReq>& queue) :
Loop(logger, xmon, "reader"),
_shared(shared),
_queue(queue),
_sender(UDPSenderConfig{.maxMsgSize = MAX_UDP_MTU}),
_packetDropRand(ternNow().ns),
_outgoingPacketDropProbability(0)
@@ -1558,13 +1608,13 @@ public:
virtual void step() override {
_requests.clear();
uint32_t pulled = _shared.readerRequestsQueue.pull(_requests, MAX_RECV_MSGS * 2);
uint32_t pulled = _queue.pull(_requests, MAX_RECV_MSGS * 2);
auto start = ternNow();
if (likely(pulled > 0)) {
LOG_DEBUG(_env, "pulled %s requests from read queue", pulled);
_shared.pulledReadRequests = _shared.pulledReadRequests*0.95 + ((double)pulled)*0.05;
}
if (unlikely(_shared.readerRequestsQueue.isClosed())) {
if (unlikely(_queue.isClosed())) {
// queue is closed, stop
stop();
return;
@@ -1882,6 +1932,12 @@ static void logsDBstatsToMetrics(struct MetricsBuilder& metricsBuilder, const Lo
}
}
struct Metric {
std::string name;
std::string fieldName;
std::atomic<double>* value;
};
struct ShardMetricsInserter : PeriodicLoop {
private:
InfluxDB _influxDB;
@@ -1894,6 +1950,7 @@ private:
std::array<XmonNCAlert, 2> _sockQueueAlerts;
XmonNCAlert _writeQueuesAlert;
XmonNCAlert _readerQueueAlert;
std::vector<Metric> _stats;
public:
ShardMetricsInserter(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const InfluxDB& influxDB, ShardShared& shared):
PeriodicLoop(logger, xmon, "metrics", {1_sec, 1.0, 1_mins, 0.1}),
@@ -1905,7 +1962,34 @@ public:
_sockQueueAlerts({XmonAppType::NEVER, XmonAppType::NEVER}),
_writeQueuesAlert(XmonAppType::NEVER),
_readerQueueAlert(XmonAppType::NEVER)
{}
{
_stats = {
// queue sizes
{"eggsfs_shard_logsdb_request_queue", "size", &_shared.logsDBRequestQueueSize},
{"eggsfs_shard_logsdb_response_queue", "size", &_shared.logsDBResponseQueueSize},
{"eggsfs_shard_proxy_logsdb_request_queue", "size", &_shared.proxyLogsDBRequestQueueSize},
{"eggsfs_shard_proxy_logsdb_response_queue", "size", &_shared.proxyLogsDBResponseQueueSize},
{"eggsfs_shard_writer_request_queue", "size", &_shared.writerRequestQueueSize},
{"eggsfs_shard_writer_proxy_response_queue", "size", &_shared.writerProxyRespQueueSize},
{"eggsfs_shard_read_queue", "size", &_shared.readerRequestQueueSize},
// pulled counts
{"eggsfs_shard_pulled_logsdb_requests", "count", &_shared.pulledLogsDBRequests},
{"eggsfs_shard_pulled_logsdb_responses", "count", &_shared.pulledLogsDBResponses},
{"eggsfs_shard_pulled_proxy_logsdb_requests", "count", &_shared.pulledProxyLogsDBRequests},
{"eggsfs_shard_pulled_proxy_logsdb_responses", "count", &_shared.pulledProxyLogsDBResponses},
{"eggsfs_shard_pulled_writer_requests", "count", &_shared.pulledWriteRequests},
{"eggsfs_shard_pulled_writer_proxy_responses", "count", &_shared.pulledProxyWriteResponses},
{"eggsfs_shard_pulled_read_requests", "count", &_shared.pulledReadRequests},
// dropped counts
{"eggsfs_shard_dropped_logsdb_requests", "count", &_shared.droppedLogsDBRequests},
{"eggsfs_shard_dropped_logsdb_responses", "count", &_shared.droppedLogsDBResponses},
{"eggsfs_shard_dropped_proxy_logsdb_requests", "count", &_shared.droppedLogsDBRequests},
{"eggsfs_shard_dropped_proxy_logsdb_responses", "count", &_shared.droppedProxyLogsDBResponses},
{"eggsfs_shard_dropped_writer_requests", "count", &_shared.droppedWriteRequests},
{"eggsfs_shard_dropped_writer_proxy_responses", "count", &_shared.droppedProxyWriteResponses},
{"eggsfs_shard_dropped_read_requests", "count", &_shared.droppedReadRequests},
};
}
virtual ~ShardMetricsInserter() = default;
@@ -1943,15 +2027,18 @@ public:
} else {
_env.clearAlert(_readerQueueAlert);
}
auto newMeassurement = [this](const std::string& name) {
_metricsBuilder.measurement(name);
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
};
auto now = ternNow();
for (ShardMessageKind kind : allShardMessageKind) {
const ErrorCount& errs = _shared.errors[(int)kind];
for (int i = 0; i < errs.count.size(); i++) {
uint64_t count = errs.count[i].load();
if (count == 0) { continue; }
_metricsBuilder.measurement("eggsfs_shard_requests");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
newMeassurement("eggsfs_shard_requests");
_metricsBuilder.tag("kind", kind);
_metricsBuilder.tag("write", !readOnlyShardReq(kind));
if (i == 0) {
@@ -1963,55 +2050,7 @@ public:
_metricsBuilder.timestamp(now);
}
}
{
_metricsBuilder.measurement("eggsfs_shard_logsdb_request_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("size", _shared.logsDBRequestQueueSize);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_logsdb_response_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("size", _shared.logsDBResponseQueueSize);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_proxy_logsdb_request_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("size", _shared.proxyLogsDBRequestQueueSize);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_proxy_logsdb_response_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("size", _shared.proxyLogsDBResponseQueueSize);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_writer_request_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("size", _shared.writerRequestQueueSize);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_writer_proxy_response_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("size", _shared.writerProxyRespQueueSize);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_read_queue");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("size", _shared.readerRequestQueueSize);
_metricsBuilder.timestamp(now);
}
for (int i = 0; i < _shared.receivedRequests.size(); i++) {
_metricsBuilder.measurement("eggsfs_shard_received_requests");
_metricsBuilder.tag("shard", _shrid);
@@ -2020,27 +2059,18 @@ public:
_metricsBuilder.fieldFloat("count", _shared.receivedRequests[i]);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_pulled_write_requests");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("count", _shared.pulledWriteRequests);
_metricsBuilder.timestamp(now);
}
{
_metricsBuilder.measurement("eggsfs_shard_pulled_read_requests");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
_metricsBuilder.fieldFloat("count", _shared.pulledReadRequests);
for (const auto& stat : _stats) {
newMeassurement(stat.name);
_metricsBuilder.fieldFloat(stat.fieldName, *stat.value);
_metricsBuilder.timestamp(now);
}
{
_rocksDBStats.clear();
_shared.sharedDB.rocksDBMetrics(_rocksDBStats);
for (const auto& [name, value]: _rocksDBStats) {
_metricsBuilder.measurement("eggsfs_shard_rocksdb");
_metricsBuilder.tag("shard", _shrid);
_metricsBuilder.tag("location", int(_location));
newMeassurement("eggsfs_shard_rocksdb");
_metricsBuilder.fieldU64(name, value);
_metricsBuilder.timestamp(now);
}
@@ -2134,7 +2164,9 @@ void runShard(ShardOptions& options) {
ShardShared shared(options, sharedDB, blockServicesCache, shardDB, logsDB, UDPSocketPair(env, options.serverOptions.addrs));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardServer>(logger, xmon, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardReader>(logger, xmon, shared)));
for (uint16_t i = 0; i < options.numReaders; ++i) {
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardReader>(logger, xmon, shared, *shared.readerRequestsQueues[i])));
}
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardWriter>(logger, xmon, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardRegisterer>(logger, xmon, shared)));
threads.emplace_back(LoopThread::Spawn(std::make_unique<ShardBlockServiceUpdater>(logger, xmon, shared)));

View File

@@ -14,11 +14,13 @@ struct ShardOptions {
RegistryClientOptions registryClientOptions;
LogsDBOptions logsDBOptions;
ServerOptions serverOptions;
Duration transientDeadlineInterval = DEFAULT_DEADLINE_INTERVAL;
ShardId shardId;
bool shardIdSet = false;
uint16_t numReaders = 1;
// implicit options
bool isLeader() const { return !logsDBOptions.avoidBeingLeader; }
bool isProxyLocation() const { return logsDBOptions.location != 0; }

View File

@@ -13,7 +13,7 @@
static bool parseShardOptions(CommandLineArgs& args, ShardOptions& options) {
while(!args.done()) {
if (parseLogOptions(args, options.logOptions) ||
if (parseLogOptions(args, options.logOptions) ||
parseXmonOptions(args, options.xmonOptions) ||
parseMetricsOptions(args, options.metricsOptions) ||
parseRegistryClientOptions(args, options.registryClientOptions) ||
@@ -32,6 +32,14 @@ static bool parseShardOptions(CommandLineArgs& args, ShardOptions& options) {
options.shardIdSet = true;
continue;
}
if (arg == "-num-readers") {
options.numReaders = parseUint16(args.next());
if (options.numReaders == 0) {
fprintf(stderr, "-num-readers must be bigger than 0\n");
return false;
}
continue;
}
fprintf(stderr, "unknown argument %s\n", args.peekArg().c_str());
return false;
}
@@ -46,18 +54,20 @@ static void printShardOptionsUsage() {
printLogsDBOptionsUsage();
printServerOptionsUsage();
fprintf(stderr, "ShardOptions:\n");
fprintf(stderr, " -num-readers\n");
fprintf(stderr, " Number of reader threads. Default: 1\n");
fprintf(stderr, " -shard\n");
fprintf(stderr, " Which shard we are running as [0-255]\n");
fprintf(stderr, " -transient-deadline-interval\n");
fprintf(stderr, " Tweaks the interval with which the deadline for transient file gets bumped.\n");
}
static bool validateShardOptions(const ShardOptions& options) {
static bool validateShardOptions(const ShardOptions& options) {
if (!options.shardIdSet) {
fprintf(stderr, "-shard needs to be set\n");
return false;
}
return (validateLogOptions(options.logOptions) &&
return (validateLogOptions(options.logOptions) &&
validateXmonOptions(options.xmonOptions) &&
validateMetricsOptions(options.metricsOptions) &&
validateRegistryClientOptions(options.registryClientOptions) &&