diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index c5a2f485..7ab3b1f3 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -29,19 +29,16 @@ #include "wyhash.h" #include "Xmon.hpp" #include "Timings.hpp" +#include "Stopper.hpp" struct CDCShared { CDCDB& db; - std::atomic stop; std::array, 2> ownPorts; std::mutex shardsMutex; std::array shards; std::array timings; - CDCShared(CDCDB& db_) : - db(db_), - stop(false) - { + CDCShared(CDCDB& db_) : db(db_) { for (auto& shard: shards) { memset(&shard, 0, sizeof(shard)); } @@ -72,6 +69,7 @@ struct CDCServer : Undertaker::Reapable { private: Env _env; CDCShared& _shared; + Stopper _stopper; std::array _ipPorts; uint64_t _currentLogIndex; std::vector _recvBuf; @@ -108,7 +106,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -189,8 +187,8 @@ public: // Start processing CDC requests and shard responses for (;;) { - if (_shared.stop.load()) { - LOG_DEBUG(_env, "got told to stop, stopping"); + if (_stopper.shouldStop()) { + LOG_INFO(_env, "got told to stop, stopping"); break; } @@ -220,6 +218,7 @@ public: } _shared.db.close(); + _stopper.stopDone(); } private: @@ -228,7 +227,8 @@ private: EggsTime t0 = eggsNow(); Duration maxWait = 1_mins; for (;;) { - if (_shared.stop.load()) { + if (_stopper.shouldStop()) { + LOG_INFO(_env, "got told to stop, stopping"); return; } if (eggsNow() - t0 > maxWait) { @@ -490,11 +490,23 @@ private: } void _send(int sock, struct sockaddr_in& dest, const char* data, size_t len) { - // TODO we might very well get EAGAIN here since these are non-blocking sockets. - // We should probably come up with a better strategy regarding writing stuff out, - // but, being a bit lazy for now. - if (sendto(sock, data, len, 0, (struct sockaddr*)&dest, sizeof(dest)) != len) { - throw SYSCALL_EXCEPTION("sendto"); + // We need to handle EAGAIN/EPERM when trying to send. Here we take a ... + // lazy approach and just loop with a delay. This seems to happen when + // we restart everything while under load, it's not great to block here + // but it's probably OK to do so in those cases. We should also automatically + // clear the alert when done with this. + for (;;) { + if (sendto(sock, data, len, 0, (struct sockaddr*)&dest, sizeof(dest)) == len) { + break; + } + int err = errno; + // Note that we get EPERM on `sendto` when nf drops packets. + if (err == EAGAIN || err == EPERM) { + RAISE_ALERT(_env, "we got %s/%s=%s when trying to send shard message, will wait and retry", err, translateErrno(err), safe_strerror(err)); + sleepFor(100_ms); + } else { + throw EXPLICIT_SYSCALL_EXCEPTION(err, "sendto"); + } } } @@ -511,6 +523,7 @@ static void* runCDCServer(void* server) { struct CDCShardUpdater : Undertaker::Reapable { Env _env; CDCShared& _shared; + Stopper _stopper; std::string _shuckleHost; uint16_t _shucklePort; public: @@ -525,7 +538,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -537,7 +550,9 @@ public: auto shards = std::make_unique>(); for (;;) { std::this_thread::sleep_for(std::chrono::milliseconds(10)); - if (_shared.stop.load()) { + if (_stopper.shouldStop()) { + LOG_INFO(_env, "got told to stop, stopping"); + _stopper.stopDone(); return; } auto now = eggsNow(); @@ -582,6 +597,7 @@ static void* runCDCShardUpdater(void* server) { struct CDCRegisterer : Undertaker::Reapable { Env _env; CDCShared& _shared; + Stopper _stopper; uint32_t _ownIp1; uint32_t _ownIp2; std::string _shuckleHost; @@ -602,7 +618,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -614,7 +630,9 @@ public: EggsTime nextRegister = 0; // when 0, it means that the last one wasn't successful for (;;) { std::this_thread::sleep_for(std::chrono::milliseconds(100 + (wyhash64(&rand)%100))); // fuzz the startup busy loop - if (_shared.stop.load()) { + if (_stopper.shouldStop()) { + LOG_INFO(_env, "got told to stop, stopping"); + _stopper.stopDone(); return; } if (eggsNow() < nextRegister) { @@ -654,6 +672,7 @@ struct CDCStatsInserter : Undertaker::Reapable { private: Env _env; CDCShared& _shared; + Stopper _stopper; std::string _shuckleHost; uint16_t _shucklePort; public: @@ -668,7 +687,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -704,11 +723,12 @@ public: continue; \ for (;;) { - if (_shared.stop.load()) { + if (_stopper.shouldStop()) { LOG_INFO(_env, "got told to stop, trying to insert stats before stopping"); insertCDCStats(); LOG_INFO(_env, "done, goodbye."); - break; + _stopper.stopDone(); + return; } EggsTime t = eggsNow(); @@ -827,7 +847,7 @@ void runCDC(const std::string& dbDir, const CDCOptions& options) { if (xmon) { XmonConfig config; config.appInstance = "cdc"; - auto xmonRunner = std::make_unique(logger, xmon, config, shared->stop); + auto xmonRunner = std::make_unique(logger, xmon, config); pthread_t tid; if (pthread_create(&tid, nullptr, &runXmon, &*xmonRunner) != 0) { throw SYSCALL_EXCEPTION("pthread_create"); diff --git a/cpp/core/Stopper.hpp b/cpp/core/Stopper.hpp new file mode 100644 index 00000000..616dbf2b --- /dev/null +++ b/cpp/core/Stopper.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include +#include + +struct Stopper { +private: + std::atomic _stop; + std::mutex _stopped; + +public: + Stopper() : _stop(false) { + _stopped.lock(); + } + + bool shouldStop() const { + return _stop.load(); + } + + void stopDone() { + _stopped.unlock(); + } + + void stop() { + _stop.store(true); + _stopped.lock(); + } +}; \ No newline at end of file diff --git a/cpp/core/Xmon.cpp b/cpp/core/Xmon.cpp index 18685775..df08b533 100644 --- a/cpp/core/Xmon.cpp +++ b/cpp/core/Xmon.cpp @@ -19,11 +19,9 @@ static std::string generateErrString(const std::string& what, int err) { Xmon::Xmon( Logger& logger, std::shared_ptr& agent, - const XmonConfig& config, - std::atomic& stop + const XmonConfig& config ) : _env(logger, agent, "xmon"), - _stop(stop), _agent(agent), _appType("restech.info"), _xmonHost(config.prod ? "REDACTED" : "REDACTED"), @@ -196,7 +194,7 @@ reconnect: // Start recv loop bool gotHeartbeat = false; for (;;) { - bool shutDown = _stop.load(); + bool shutDown = _stopper.shouldStop(); // send all requests if (gotHeartbeat) { @@ -230,6 +228,7 @@ reconnect: if (shutDown) { LOG_DEBUG(_env, "got told to stop, stopping"); close(sock); + _stopper.stopDone(); return; } diff --git a/cpp/core/Xmon.hpp b/cpp/core/Xmon.hpp index ee944c53..bfef8b13 100644 --- a/cpp/core/Xmon.hpp +++ b/cpp/core/Xmon.hpp @@ -11,6 +11,7 @@ #include "Env.hpp" #include "XmonAgent.hpp" #include "Undertaker.hpp" +#include "Stopper.hpp" struct XmonConfig { bool prod = false; @@ -105,7 +106,7 @@ enum struct XmonMood : int32_t { struct Xmon : Undertaker::Reapable { private: Env _env; - std::atomic& _stop; + Stopper _stopper; std::shared_ptr _agent; std::string _hostname; std::string _appType; @@ -120,15 +121,14 @@ public: Xmon( Logger& logger, std::shared_ptr& agent, - const XmonConfig& config, - std::atomic& stop + const XmonConfig& config ); virtual ~Xmon() = default; virtual void terminate() override { _env.flush(); - _stop.store(true); + _stopper.stop(); } virtual void onAbort() override { diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 32a4da1b..347c424d 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -25,6 +25,7 @@ #include "wyhash.h" #include "Xmon.hpp" #include "Timings.hpp" +#include "Stopper.hpp" // Data needed to synchronize between the different threads struct ShardShared { @@ -33,7 +34,6 @@ private: std::mutex _applyLock; public: ShardDB& db; - std::atomic stop; std::atomic ip1; std::atomic port1; std::atomic ip2; @@ -41,7 +41,7 @@ public: std::array timings; ShardShared() = delete; - ShardShared(ShardDB& db_): db(db_), stop(false), ip1(0), port1(0), ip2(0), port2(0) { + ShardShared(ShardDB& db_): db(db_), ip1(0), port1(0), ip2(0), port2(0) { _currentLogIndex = db.lastAppliedLogEntry(); for (ShardMessageKind kind : allShardMessageKind) { timings[(int)kind] = Timings::Standard(); @@ -99,6 +99,7 @@ struct ShardServer : Undertaker::Reapable { private: Env _env; ShardShared& _shared; + Stopper _stopper; ShardId _shid; int _ipPortIx; uint32_t _ownIp; @@ -133,7 +134,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -190,8 +191,8 @@ public: auto logEntry = std::make_unique(); for (;;) { - if (_shared.stop.load()) { - LOG_DEBUG(_env, "got told to stop, stopping"); + if (_stopper.shouldStop()) { + LOG_INFO(_env, "got told to stop, stopping"); break; } @@ -315,6 +316,7 @@ public: if (_ipPortIx == 0) { _shared.db.close(); } + _stopper.stopDone(); } }; @@ -327,6 +329,7 @@ struct ShardRegisterer : Undertaker::Reapable { private: Env _env; ShardShared& _shared; + Stopper _stopper; ShardId _shid; std::string _shuckleHost; uint16_t _shucklePort; @@ -345,7 +348,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -357,9 +360,10 @@ public: EggsTime nextRegister = 0; // when 0, it means that the last one wasn't successful for (;;) { std::this_thread::sleep_for(std::chrono::milliseconds(100 + (wyhash64(&rand)%100))); // fuzz the startup busy loop - if (_shared.stop.load()) { - LOG_DEBUG(_env, "got told to stop, stopping"); - break; + if (_stopper.shouldStop()) { + LOG_INFO(_env, "got told to stop, stopping"); + _stopper.stopDone(); + return; } if (eggsNow() < nextRegister) { continue; @@ -401,6 +405,7 @@ struct ShardBlockServiceUpdater : Undertaker::Reapable { private: Env _env; ShardShared& _shared; + Stopper _stopper; ShardId _shid; std::string _shuckleHost; uint16_t _shucklePort; @@ -417,7 +422,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -436,9 +441,10 @@ public: continue; \ for (;;) { - if (_shared.stop.load()) { - LOG_DEBUG(_env, "got told to stop, stopping"); - break; + if (_stopper.shouldStop()) { + LOG_INFO(_env, "got told to stop, stopping"); + _stopper.stopDone(); + return; } EggsTime t = eggsNow(); @@ -490,6 +496,7 @@ struct ShardStatsInserter : Undertaker::Reapable { private: Env _env; ShardShared& _shared; + Stopper _stopper; ShardId _shid; std::string _shuckleHost; uint16_t _shucklePort; @@ -506,7 +513,7 @@ public: virtual void terminate() override { _env.flush(); - _shared.stop.store(true); + _stopper.stop(); } virtual void onAbort() override { @@ -541,11 +548,12 @@ public: continue; \ for (;;) { - if (_shared.stop.load()) { + if (_stopper.shouldStop()) { LOG_INFO(_env, "got told to stop, trying to insert stats before stopping"); insertShardStats(); LOG_INFO(_env, "done, goodbye."); - break; + _stopper.stopDone(); + return; } EggsTime t = eggsNow(); @@ -679,7 +687,7 @@ void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& option ss << std::setw(3) << std::setfill('0') << shid; config.appInstance = "shard:" + ss.str(); } - auto xmonRunner = std::make_unique(logger, xmon, config, shared.stop); + auto xmonRunner = std::make_unique(logger, xmon, config); pthread_t tid; if (pthread_create(&tid, nullptr, &runXmon, &*xmonRunner) != 0) { throw SYSCALL_EXCEPTION("pthread_create");