From 8075e99bb6f5b641639e008aac019ef2bf9a4856 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Mon, 8 Jan 2024 14:56:12 +0000 Subject: [PATCH] Graceful shard teardown See for tradeoffs regarding how to terminate threads gracefully. The goal of this work was for valgrind to work correctly, which in turn was to investigate #141. It looks like I have succeeded: ==2715080== Warning: unimplemented fcntl command: 1036 ==2715080== 20,052 bytes in 5,013 blocks are definitely lost in loss record 133 of 135 ==2715080== at 0x483F013: operator new(unsigned long) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so) ==2715080== by 0x3B708E: allocate (new_allocator.h:121) ==2715080== by 0x3B708E: allocate (allocator.h:173) ==2715080== by 0x3B708E: allocate (alloc_traits.h:460) ==2715080== by 0x3B708E: _M_allocate (stl_vector.h:346) ==2715080== by 0x3B708E: std::vector >::_M_default_append(unsigned long) (vector.tcc:635) ==2715080== by 0x42BF1C: resize (stl_vector.h:940) ==2715080== by 0x42BF1C: ShardDBImpl::_fileSpans(rocksdb::ReadOptions&, FileSpansReq const&, FileSpansResp&) (shard/ShardDB.cpp:921) ==2715080== by 0x420867: ShardDBImpl::read(ShardReqContainer const&, ShardRespContainer&) (shard/ShardDB.cpp:1034) ==2715080== by 0x3CB3EE: ShardServer::_handleRequest(int, sockaddr_in*, char*, unsigned long) (shard/Shard.cpp:347) ==2715080== by 0x3C8A39: ShardServer::step() (shard/Shard.cpp:405) ==2715080== by 0x40B1E8: run (core/Loop.cpp:67) ==2715080== by 0x40B1E8: startLoop(void*) (core/Loop.cpp:37) ==2715080== by 0x4BEA258: start_thread (in /usr/lib/libpthread-2.33.so) ==2715080== by 0x4D005E2: clone (in /usr/lib/libc-2.33.so) ==2715080== ==2715080== ==2715080== Exit program on first error (--exit-on-first-error=yes) --- cpp/cdc/CDC.cpp | 55 ++++++++------- cpp/cdc/CDC.hpp | 2 +- cpp/cdc/CDCDB.cpp | 26 ++++---- cpp/cdc/CDCDB.hpp | 1 + cpp/core/Loop.cpp | 136 ++++++++++++++++++++++++++++++++++++++ cpp/core/Loop.hpp | 51 ++++++++++---- cpp/core/PeriodicLoop.hpp | 6 +- cpp/core/SPSC.hpp | 26 +++++++- cpp/core/Xmon.cpp | 3 +- cpp/core/Xmon.hpp | 2 +- cpp/shard/Shard.cpp | 101 +++++++++++++++++----------- cpp/shard/Shard.hpp | 2 +- cpp/shard/ShardDB.cpp | 27 ++++---- cpp/shard/ShardDB.hpp | 1 + 14 files changed, 328 insertions(+), 111 deletions(-) create mode 100644 cpp/core/Loop.cpp diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index f6ad5c58..e954bc7f 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -275,7 +275,8 @@ public: } LOG_DEBUG(_env, "Blocking to wait for readable sockets"); - if (unlikely(poll(_socks.data(), _socks.size(), -1) < 0)) { + if (unlikely(poll(_socks.data(), _socks.size()) < 0)) { + if (errno == EINTR) { return; } throw SYSCALL_EXCEPTION("poll"); } @@ -791,6 +792,8 @@ public: _env.updateAlert(_alert, "Waiting to get shards"); } + virtual ~CDCShardUpdater() = default; + virtual bool periodicStep() override { LOG_INFO(_env, "Fetching shards"); std::string err = fetchShards(_shuckleHost, _shucklePort, 10_sec, _shards); @@ -842,6 +845,8 @@ public: _alert(10_sec) {} + virtual ~CDCRegisterer() = default; + virtual bool periodicStep() override { uint16_t port1 = _shared.ownPorts[0].load(); uint16_t port2 = _shared.ownPorts[1].load(); @@ -876,6 +881,8 @@ public: _alert(10_sec) {} + virtual ~CDCStatsInserter() = default; + virtual bool periodicStep() override { std::string err; for (CDCMessageKind kind : allCDCMessageKind) { @@ -921,6 +928,8 @@ public: _alert(10_sec) {} + virtual ~CDCMetricsInserter() = default; + virtual bool periodicStep() { auto now = eggsNow(); for (CDCMessageKind kind : allCDCMessageKind) { @@ -1011,36 +1020,34 @@ void runCDC(const std::string& dbDir, const CDCOptions& options) { } LOG_INFO(env, " syslog = %s", (int)options.syslog); - std::vector threads; + std::vector> threads; // xmon first, so that by the time it shuts down it'll have all the leftover requests if (xmon) { - threads.emplace_back([&logger, xmon, &options]() mutable { - XmonConfig config; - config.appInstance = "eggscdc"; - config.appType = "restech_eggsfs.critical"; - config.prod = options.xmonProd; - Xmon(logger, xmon, config).run(); - }); + XmonConfig config; + config.appInstance = "eggscdc"; + config.appType = "restech_eggsfs.critical"; + config.prod = options.xmonProd; + + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, config))); } CDCDB db(logger, xmon, dbDir); CDCShared shared(db); - threads.emplace_back([&logger, xmon, &options, &shared]() mutable { - CDCShardUpdater(logger, xmon, options, shared).run(); - }); - threads.emplace_back([&logger, xmon, &options, &shared]() mutable { - CDCRegisterer(logger, xmon, options, shared).run(); - }); - threads.emplace_back([&logger, xmon, &options, &shared]() mutable { - CDCStatsInserter(logger, xmon, options, shared).run(); - }); - if (options.metrics) { - threads.emplace_back([&logger, xmon, &shared]() mutable { - CDCMetricsInserter(logger, xmon, shared).run(); - }); - } + LOG_INFO(env, "Spawning server threads"); - CDCServer(logger, xmon, options, shared).run(); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, options, shared))); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, options, shared))); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, options, shared))); + if (options.metrics) { + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, shared))); + } + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, options, shared))); + + waitUntilStopped(threads); + + db.close(); + + LOG_INFO(env, "CDC terminating gracefully, bye."); } diff --git a/cpp/cdc/CDC.hpp b/cpp/cdc/CDC.hpp index b1c7f051..1940c17f 100644 --- a/cpp/cdc/CDC.hpp +++ b/cpp/cdc/CDC.hpp @@ -19,4 +19,4 @@ struct CDCOptions { bool metrics = false; }; -[[noreturn]] void runCDC(const std::string& dbDir, const CDCOptions& options); \ No newline at end of file +void runCDC(const std::string& dbDir, const CDCOptions& options); \ No newline at end of file diff --git a/cpp/cdc/CDCDB.cpp b/cpp/cdc/CDCDB.cpp index cf8f0ce4..3afd28c7 100644 --- a/cpp/cdc/CDCDB.cpp +++ b/cpp/cdc/CDCDB.cpp @@ -1370,22 +1370,16 @@ struct CDCDBImpl { _initDb(); } - ~CDCDBImpl() { + void close() { LOG_INFO(_env, "destroying column families and closing database"); - const auto gentleRocksDBChecked = [this](const std::string& what, rocksdb::Status status) { - if (!status.ok()) { - LOG_INFO(_env, "Could not %s: %s", what, status.ToString()); - } - }; - gentleRocksDBChecked("destroy default CF", _dbDontUseDirectly->DestroyColumnFamilyHandle(_defaultCf)); - gentleRocksDBChecked("destroy req queue CF", _dbDontUseDirectly->DestroyColumnFamilyHandle(_reqQueueCfLegacy)); - gentleRocksDBChecked("destroy parent CF", _dbDontUseDirectly->DestroyColumnFamilyHandle(_parentCf)); - gentleRocksDBChecked("destroy enqueued CF", _dbDontUseDirectly->DestroyColumnFamilyHandle(_enqueuedCf)); - gentleRocksDBChecked("destroy executing CF", _dbDontUseDirectly->DestroyColumnFamilyHandle(_executingCf)); - gentleRocksDBChecked("destroy dirs to txns CF", _dbDontUseDirectly->DestroyColumnFamilyHandle(_dirsToTxnsCf)); - - gentleRocksDBChecked("close DB", _dbDontUseDirectly->Close()); + ROCKS_DB_CHECKED(_dbDontUseDirectly->DestroyColumnFamilyHandle(_defaultCf)); + ROCKS_DB_CHECKED(_dbDontUseDirectly->DestroyColumnFamilyHandle(_reqQueueCfLegacy)); + ROCKS_DB_CHECKED(_dbDontUseDirectly->DestroyColumnFamilyHandle(_parentCf)); + ROCKS_DB_CHECKED(_dbDontUseDirectly->DestroyColumnFamilyHandle(_enqueuedCf)); + ROCKS_DB_CHECKED(_dbDontUseDirectly->DestroyColumnFamilyHandle(_executingCf)); + ROCKS_DB_CHECKED(_dbDontUseDirectly->DestroyColumnFamilyHandle(_dirsToTxnsCf)); + ROCKS_DB_CHECKED(_dbDontUseDirectly->Close()); delete _dbDontUseDirectly; } @@ -1918,6 +1912,10 @@ CDCDB::CDCDB(Logger& logger, std::shared_ptr& xmon, const std::string _impl = new CDCDBImpl(logger, xmon, path); } +void CDCDB::close() { + ((CDCDBImpl*)_impl)->close(); +} + CDCDB::~CDCDB() { delete ((CDCDBImpl*)_impl); } diff --git a/cpp/cdc/CDCDB.hpp b/cpp/cdc/CDCDB.hpp index 0a74eda8..0fcd60da 100644 --- a/cpp/cdc/CDCDB.hpp +++ b/cpp/cdc/CDCDB.hpp @@ -85,6 +85,7 @@ public: CDCDB(Logger& env, std::shared_ptr& xmon, const std::string& path); ~CDCDB(); + void close(); // Unlike with ShardDB, we don't have an explicit log preparation step here, // because at least for now logs are simply either CDC requests, or shard diff --git a/cpp/core/Loop.cpp b/cpp/core/Loop.cpp new file mode 100644 index 00000000..f77d5fee --- /dev/null +++ b/cpp/core/Loop.cpp @@ -0,0 +1,136 @@ +#include +#include +#include +#include + +#include "Assert.hpp" +#include "Loop.hpp" +#include "Exception.hpp" + +// The sigset we run with normally, with SIGINT/SIGTERM masked. +static sigset_t baseSigset; +// The sigset to run when running blocking syscalls, with SIGINT/SIGTERM unmasked. +static sigset_t blockingSigset; + +__attribute__((constructor)) +static void setupSigsets() { + sigemptyset(&baseSigset); + blockingSigset = baseSigset; + sigaddset(&baseSigset, SIGINT); + sigaddset(&baseSigset, SIGTERM); +} + +thread_local std::atomic stopLoop; + +static void stopLoopHandler(int signum) { + stopLoop.store(true, std::memory_order_release); +} + +void LoopThread::stop() { + pthread_kill(thread, SIGTERM); +} + +Loop::Loop(Logger& logger, std::shared_ptr& xmon, const std::string& name) : _env(logger, xmon, name), _name(name) {} + +static void* startLoop(void* rawLoop) { + std::unique_ptr loop((Loop*)rawLoop); + loop->run(); + return nullptr; +} + +std::unique_ptr Loop::Spawn(std::unique_ptr&& loop) { + Loop* rawLoop = loop.release(); + pthread_t thr; + { + pthread_attr_t attr; + int res = pthread_attr_init(&attr); + if (res != 0) { + throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_attr_init"); + } + res = pthread_attr_setsigmask_np(&attr, &baseSigset); + if (res != 0) { + throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_attr_setsigmask_np"); + } + res = pthread_create(&thr, &attr, &startLoop, rawLoop); + if (res != 0) { + throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_create"); + } + res = pthread_attr_destroy(&attr); + if (res != 0) { + throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_attr_destroy"); + } + } + return std::make_unique(thr); +} + +void Loop::run() { + while (!stopLoop.load()) { step(); } +} + +int Loop::poll(struct pollfd* fds, nfds_t nfds) { + return ppoll(fds, nfds, nullptr, &blockingSigset); +} + +void Loop::stop() { + stopLoop.store(true, std::memory_order_release); +} + +int Loop::sleep(Duration d) { + auto tspec = d.timespec(); + return ppoll(nullptr, 0, &tspec, &blockingSigset); +} + +void waitUntilStopped(std::vector>& loops) { + ALWAYS_ASSERT(getpid() == gettid(), "You can only run this function from the main thread"); + + // mask signals here too + { + int ret = pthread_sigmask(SIG_SETMASK, &baseSigset, nullptr); + if (ret < 0) { + throw EXPLICIT_SYSCALL_EXCEPTION(ret, "pthread_sigmask"); + } + } + + // setup signal handler + { + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = &stopLoopHandler; + if (sigaction(SIGTERM, &sa, nullptr) < 0) { + throw SYSCALL_EXCEPTION("sigaction"); + } + if (sigaction(SIGINT, &sa, nullptr) < 0) { + throw SYSCALL_EXCEPTION("sigaction"); + } + } + + // start waiting + while (!stopLoop.load()) { + struct timespec timeout { .tv_sec = 60*60*24 }; + int ret = ppoll(nullptr, 0, &timeout, &blockingSigset); + if (ret < 0 && errno != EINTR) { + throw SYSCALL_EXCEPTION("ppoll"); + } + } + + // we've been told to stop, tear down all threads + for (int i = loops.size()-1; i >= 0; i--) { + loops[i]->stop(); + struct timespec timeout; + if (clock_gettime(CLOCK_REALTIME, &timeout) < 0) { + throw SYSCALL_EXCEPTION("clock_gettime"); + } + timeout.tv_sec += 10; + int ret = pthread_timedjoin_np(loops[i]->thread, nullptr, &timeout); + if (ret != 0 && ret == ETIMEDOUT) { + char name[16]; + { + int ret = pthread_getname_np(loops[i]->thread, name, sizeof(name)); + if (ret != 0) { + throw EXPLICIT_SYSCALL_EXCEPTION(ret, "pthread_getname_np"); + } + } + throw EGGS_EXCEPTION("loop %s has not terminated in time, aborting", name); + } + } +} \ No newline at end of file diff --git a/cpp/core/Loop.hpp b/cpp/core/Loop.hpp index 09f9f01a..5ec9dc1b 100644 --- a/cpp/core/Loop.hpp +++ b/cpp/core/Loop.hpp @@ -1,10 +1,26 @@ #pragma once #include +#include +#include +#include #include "Env.hpp" #include "Exception.hpp" +struct LoopThread { + pthread_t thread; + + LoopThread(pthread_t thread_) : thread(thread_) {} + virtual ~LoopThread() = default; + LoopThread(const LoopThread&) = delete; + + virtual void stop(); +}; + +// Each loop runs with SIGINT/SIGTERM blocked. It's expected that any +// non-time-bounded syscalls which step runs has SIGINT/SIGTERM unmasked +// (e.g. using ppoll). struct Loop { protected: Env _env; @@ -12,21 +28,30 @@ protected: private: std::string _name; -public: - Loop(Logger& logger, std::shared_ptr& xmon, const std::string& name) : _env(logger, xmon, name), _name(name) { - int ret = pthread_setname_np(pthread_self(), name.c_str()); - if (ret != 0) { - throw EXPLICIT_SYSCALL_EXCEPTION(ret, "pthreat_setname_np %s", name); - } - } +protected: + // can be used to stop from within the thread + void stop(); - const std::string& name() const { - return _name; - } +public: + Loop(Logger& logger, std::shared_ptr& xmon, const std::string& name); + virtual ~Loop() = default; + + // This will remember the sigmask that it's been called with, and use it + // to unmask stuff in poll/sleep below. + static std::unique_ptr Spawn(std::unique_ptr&& loop); + + const std::string& name() const { return _name; } virtual void step() = 0; - [[noreturn]] void run() { - for (;;) { step(); } - } + void run(); + + // Polls forever with SIGINT/SIGTERM unmasked. + int poll(struct pollfd* fds, nfds_t nfds); + + // Sleeps with SIGINT/SIGTERM unmasked. + int sleep(Duration d); }; + +void setupSigmask(); +void waitUntilStopped(std::vector>& loops); \ No newline at end of file diff --git a/cpp/core/PeriodicLoop.hpp b/cpp/core/PeriodicLoop.hpp index c91ff3dc..2aea6411 100644 --- a/cpp/core/PeriodicLoop.hpp +++ b/cpp/core/PeriodicLoop.hpp @@ -1,5 +1,6 @@ #pragma once +#include "Exception.hpp" #include "Time.hpp" #include "Env.hpp" #include "Loop.hpp" @@ -45,7 +46,10 @@ public: pause = _config.failureInterval + Duration((double)_config.failureInterval.ns * (_config.failureIntervalJitter * wyhash64_double(&_rand))); LOG_DEBUG(_env, "periodic step failed, next step at %s", t + pause); } - pause.sleepRetry(); + if (sleep(pause) < 0) { + if (errno == EINTR) { return; } + throw SYSCALL_EXCEPTION("sleep"); + } _lastSucceded = periodicStep(); } }; \ No newline at end of file diff --git a/cpp/core/SPSC.hpp b/cpp/core/SPSC.hpp index b2bbdba3..c7f36132 100644 --- a/cpp/core/SPSC.hpp +++ b/cpp/core/SPSC.hpp @@ -21,6 +21,8 @@ struct SPSC { private: uint32_t _maxSize; uint32_t _sizeMask; + // If the highest bit of size is set, then the queue is closed, + // and push/pull will always return zero. alignas(64) std::atomic _size; alignas(64) uint64_t _head; alignas(64) uint64_t _tail; @@ -36,13 +38,27 @@ public: { ALWAYS_ASSERT(_maxSize > 0); ALWAYS_ASSERT((_maxSize&_sizeMask) == 0); + ALWAYS_ASSERT(_maxSize < (1ull<<31)); + } + + // This will interrupt pullers. + void close() { + _size.fetch_or(1ull<<31, std::memory_order_release); + long ret = syscall(SYS_futex, &_size, FUTEX_WAKE_PRIVATE, 1, nullptr, nullptr, 0); + if (unlikely(ret < 0)) { + throw SYSCALL_EXCEPTION("futex"); + } } // Tries to push all the elements. Returns how many were actually - // pushed. First element in `els` gets pushed first. + // pushed. First element in `els` gets pushed first. Returns 0 + // if the queue is closed. uint32_t push(std::vector& els) { uint32_t sz = _size.load(std::memory_order_acquire); + // queue is closed + if (unlikely(sz & (1ull<31))) { return 0; } + // push as much as possible uint32_t toPush = std::min(els.size(), _maxSize-sz); for (uint32_t i = 0; i < toPush; i++, _head++) { @@ -61,7 +77,8 @@ public: return toPush; } - // Drains at least one element, blocking if there are no elements. + // Drains at least one element, blocking if there are no elements, + // unless the queue is closed, in which case it'll return 0. // Returns how many we've drained. uint32_t pull(std::vector& els, uint32_t max) { for (;;) { @@ -73,6 +90,8 @@ public: continue; // try again } throw SYSCALL_EXCEPTION("futex"); + } else if (unlikely(sz & (1ull<<31))) { // queue is closed + return 0; } else { // we've got something to drain uint32_t toDrain = std::min(sz, max); for (uint64_t i = _tail; i < _tail+toDrain; i++) { @@ -85,7 +104,8 @@ public: } } + // don't return misleading numbers for a closed queue uint32_t size() const { - return _size.load(); + return _size.load() & ~(1ull<<31); } }; \ No newline at end of file diff --git a/cpp/core/Xmon.cpp b/cpp/core/Xmon.cpp index 2fdb8c0c..bd4509ae 100644 --- a/cpp/core/Xmon.cpp +++ b/cpp/core/Xmon.cpp @@ -251,7 +251,8 @@ EggsTime Xmon::_stepNextWakeup() { nextWakeup = std::min(nextWakeup, eggsNow() + HEARTBEAT_INTERVAL*2); } - if (poll(_fds, NUM_FDS, -1) < 0) { + if (poll(_fds, NUM_FDS) < 0) { + if (errno == EINTR) { return nextWakeup; } throw SYSCALL_EXCEPTION("poll"); } diff --git a/cpp/core/Xmon.hpp b/cpp/core/Xmon.hpp index a5c70659..a333107b 100644 --- a/cpp/core/Xmon.hpp +++ b/cpp/core/Xmon.hpp @@ -145,7 +145,7 @@ public: std::shared_ptr& agent, const XmonConfig& config ); - ~Xmon(); + virtual ~Xmon(); virtual void step() override; }; diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 7bb5094c..f3e8553a 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -168,7 +168,6 @@ struct ShardServer : Loop { private: // init data ShardShared& _shared; - bool _initialized; ShardId _shid; std::array _ipPorts; uint64_t _packetDropRand; @@ -197,7 +196,6 @@ public: ShardServer(Logger& logger, std::shared_ptr& xmon, ShardId shid, const ShardOptions& options, ShardShared& shared) : Loop(logger, xmon, "server"), _shared(shared), - _initialized(false), _shid(shid), _ipPorts(options.ipPorts), _packetDropRand(eggsNow().ns), @@ -213,8 +211,12 @@ public: }; convertProb("incoming", options.simulateIncomingPacketDrop, _incomingPacketDropProbability); convertProb("outgoing", options.simulateOutgoingPacketDrop, _outgoingPacketDropProbability); + + _init(); } + virtual ~ShardServer() = default; + private: void _init() { LOG_INFO(_env, "initializing server sockets"); @@ -367,11 +369,6 @@ private: public: virtual void step() override { - if (unlikely(!_initialized)) { - _init(); - _initialized = true; - } - if (unlikely(!_shared.blockServicesWritten)) { (100_ms).sleepRetry(); return; @@ -384,7 +381,8 @@ public: _sendVecs[i].clear(); } - if (unlikely(poll(_shared.socks.data(), 1 + (_shared.socks[1].fd != 0), -1) < 0)) { + if (unlikely(poll(_shared.socks.data(), 1 + (_shared.socks[1].fd != 0)) < 0)) { + if (errno == EINTR) { return; } throw SYSCALL_EXCEPTION("poll"); } @@ -465,6 +463,19 @@ private: uint64_t _incomingPacketDropProbability; // probability * 10,000 uint64_t _outgoingPacketDropProbability; // probability * 10,000 + struct WriterThread : LoopThread { + ShardShared& shared; + + WriterThread(pthread_t thread_, ShardShared& shared_) : LoopThread(thread_), shared(shared_) {} + virtual ~WriterThread() = default; + WriterThread(const WriterThread&) = delete; + + virtual void stop() { + LoopThread::stop(); + shared.logEntriesQueue.close(); + } + }; + public: ShardWriter(Logger& logger, std::shared_ptr& xmon, const ShardOptions& options, ShardShared& shared) : Loop(logger, xmon, "writer"), @@ -486,6 +497,14 @@ public: _logEntries.reserve(MAX_WRITES_AT_ONCE); } + virtual ~ShardWriter() = default; + + // We need a special one since we're waiting on a futex, not with poll + static std::unique_ptr SpawnWriter(std::unique_ptr&& loop) { + ShardShared& shared = loop->_shared; + return std::make_unique(std::move(Loop::Spawn(std::move(loop))->thread), shared); + } + virtual void step() override { _logEntries.clear(); _sendBuf.clear(); @@ -494,9 +513,12 @@ public: _sendVecs[i].clear(); } uint32_t pulled = _shared.logEntriesQueue.pull(_logEntries, MAX_WRITES_AT_ONCE); - if (pulled > 0) { + if (likely(pulled > 0)) { LOG_DEBUG(_env, "pulled %s requests from write queue", pulled); _shared.pulledWriteRequests = _shared.pulledWriteRequests*0.95 + ((double)pulled)*0.05; + } else { + // queue is closed, stop + stop(); } for (auto& logEntry : _logEntries) { @@ -569,6 +591,8 @@ public: _hasSecondIp(options.ipPorts[1].port != 0) {} + virtual ~ShardRegisterer() = default; + void init() { _env.updateAlert(_alert, "Waiting to register ourselves for the first time"); } @@ -694,6 +718,8 @@ public: _shucklePort(options.shucklePort) {} + virtual ~ShardStatsInserter() = default; + virtual bool periodicStep() override { for (ShardMessageKind kind : allShardMessageKind) { std::ostringstream prefix; @@ -738,6 +764,8 @@ public: _shid(shid) {} + virtual ~ShardMetricsInserter() = default; + virtual bool periodicStep() { _shared.db.dumpRocksDBStatistics(); auto now = eggsNow(); @@ -837,46 +865,43 @@ void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& option LOG_INFO(env, " syslog = %s", (int)options.syslog); } - std::vector threads; + // Immediately start xmon: we want the database initializing update to + // be there. + std::vector> threads; if (xmon) { - threads.emplace_back([&logger, xmon, shid, &options]() mutable { - XmonConfig config; - { - std::ostringstream ss; - ss << std::setw(3) << std::setfill('0') << shid; - config.appInstance = "eggsshard" + ss.str(); - } - config.prod = options.xmonProd; - config.appType = "restech_eggsfs.critical"; - Xmon(logger, xmon, config).run(); - }); + XmonConfig config; + { + std::ostringstream ss; + ss << std::setw(3) << std::setfill('0') << shid; + config.appInstance = "eggsshard" + ss.str(); + } + config.prod = options.xmonProd; + config.appType = "restech_eggsfs.critical"; + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, config))); } + // then everything else + XmonNCAlert dbInitAlert; env.updateAlert(dbInitAlert, "initializing database"); ShardDB db(logger, xmon, shid, options.transientDeadlineInterval, dbDir); env.clearAlert(dbInitAlert); - ShardShared shared(db); - threads.emplace_back([&logger, xmon, &options, &shared]() mutable { - ShardWriter(logger, xmon, options, shared).run(); - }); - threads.emplace_back([&logger, xmon, shid, &options, &shared]() mutable { - ShardRegisterer(logger, xmon, shid, options, shared).run(); - }); - threads.emplace_back([&logger, xmon, shid, &options, &shared]() mutable { - ShardBlockServiceUpdater(logger, xmon, shid, options, shared).run(); - }); - threads.emplace_back([&logger, xmon, shid, &options, &shared]() mutable { - ShardStatsInserter(logger, xmon, shid, options, shared).run(); - }); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, shid, options, shared))); + threads.emplace_back(ShardWriter::SpawnWriter(std::make_unique(logger, xmon, options, shared))); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, shid, options, shared))); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, shid, options, shared))); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, shid, options, shared))); if (options.metrics) { - threads.emplace_back([&logger, xmon, shid, &shared]() mutable { - ShardMetricsInserter(logger, xmon, shid, shared).run(); - }); + threads.emplace_back(Loop::Spawn(std::make_unique(logger, xmon, shid, shared))); } - ShardServer(logger, xmon, shid, options, shared).run(); + // from this point on termination on SIGINT/SIGTERM will be graceful + waitUntilStopped(threads); + + db.close(); + + LOG_INFO(env, "Shard terminating gracefully, bye."); } \ No newline at end of file diff --git a/cpp/shard/Shard.hpp b/cpp/shard/Shard.hpp index f8485fa3..33ac3752 100644 --- a/cpp/shard/Shard.hpp +++ b/cpp/shard/Shard.hpp @@ -30,4 +30,4 @@ struct ShardOptions { Duration transientDeadlineInterval = DEFAULT_DEADLINE_INTERVAL; }; -[[noreturn]] void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& options); +void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& options); diff --git a/cpp/shard/ShardDB.cpp b/cpp/shard/ShardDB.cpp index fad4d74f..9bba3dbd 100644 --- a/cpp/shard/ShardDB.cpp +++ b/cpp/shard/ShardDB.cpp @@ -288,25 +288,20 @@ struct ShardDBImpl { _updateCurrentReadSnapshot(); } - ~ShardDBImpl() { + void close() { LOG_INFO(_env, "destroying read snapshot, column families and closing database"); _currentReadSnapshot.reset(); - const auto gentleRocksDBChecked = [this](const std::string& what, rocksdb::Status status) { - if (!status.ok()) { - LOG_INFO(_env, "Could not %s: %s", what, status.ToString()); - } - }; - gentleRocksDBChecked("destroy default CF", _db->DestroyColumnFamilyHandle(_defaultCf)); - gentleRocksDBChecked("destroy files CF", _db->DestroyColumnFamilyHandle(_filesCf)); - gentleRocksDBChecked("destroy spans CF", _db->DestroyColumnFamilyHandle(_spansCf)); - gentleRocksDBChecked("destroy transient CF", _db->DestroyColumnFamilyHandle(_transientCf)); - gentleRocksDBChecked("destroy directories CF", _db->DestroyColumnFamilyHandle(_directoriesCf)); - gentleRocksDBChecked("destroy edges CF", _db->DestroyColumnFamilyHandle(_edgesCf)); - gentleRocksDBChecked("destroy block services CF", _db->DestroyColumnFamilyHandle(_blockServicesToFilesCf)); - gentleRocksDBChecked("close DB", _db->Close()); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_defaultCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_filesCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_spansCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_transientCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_directoriesCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_edgesCf)); + ROCKS_DB_CHECKED(_db->DestroyColumnFamilyHandle(_blockServicesToFilesCf)); + ROCKS_DB_CHECKED(_db->Close()); LOG_INFO(_env, "database closed"); @@ -3851,6 +3846,10 @@ ShardDB::ShardDB(Logger& logger, std::shared_ptr& agent, ShardId shid _impl = new ShardDBImpl(logger, agent, shid, deadlineInterval, path); } +void ShardDB::close() { + ((ShardDBImpl*)_impl)->close(); +} + ShardDB::~ShardDB() { delete (ShardDBImpl*)_impl; _impl = nullptr; diff --git a/cpp/shard/ShardDB.hpp b/cpp/shard/ShardDB.hpp index c517197a..0a575d93 100644 --- a/cpp/shard/ShardDB.hpp +++ b/cpp/shard/ShardDB.hpp @@ -38,6 +38,7 @@ public: // init/teardown ShardDB(Logger& logger, std::shared_ptr& xmon, ShardId shid, Duration deadlineInterval, const std::string& path); ~ShardDB(); + void close(); // Performs a read-only request, responding immediately. If an error is returned, // the contents of `resp` should be ignored.