From df9efa481dcbf360ea657e933a730efdd3f01f58 Mon Sep 17 00:00:00 2001 From: Francesco Mazzoli Date: Mon, 30 Jan 2023 12:13:12 +0000 Subject: [PATCH] Keep pinging shuckle --- cpp/cdc/CDC.cpp | 285 +++++++++++++++++++++++++++----------- cpp/core/Shuckle.cpp | 9 +- cpp/core/Shuckle.hpp | 2 +- cpp/shard/Shard.cpp | 144 ++++++++++++------- go/eggs/managedprocess.go | 4 +- 5 files changed, 309 insertions(+), 135 deletions(-) diff --git a/cpp/cdc/CDC.cpp b/cpp/cdc/CDC.cpp index b7d72b41..7741855e 100644 --- a/cpp/cdc/CDC.cpp +++ b/cpp/cdc/CDC.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -27,6 +28,24 @@ #include "splitmix64.hpp" #include "Shuckle.hpp" +struct CDCShared { + CDCDB& db; + std::atomic stop; + std::atomic ownPort; + std::mutex shardsMutex; + std::array shards; + + CDCShared(CDCDB& db_) : + db(db_), + stop(false), + ownPort(0) + { + for (auto& shard: shards) { + memset(&shard, 0, sizeof(shard)); + } + } +}; + struct InFlightShardRequest { uint64_t txnId; // the txn id that requested this shard request EggsTime sentAt; @@ -43,13 +62,8 @@ struct InFlightCDCRequest { struct CDCServer : Undertaker::Reapable { private: Env _env; - std::atomic _stop; - std::string _shuckleHost; - uint16_t _shucklePort; - uint16_t _port; - std::array _ownIp; - std::vector _shards; - CDCDB& _db; + CDCShared& _shared; + uint16_t _desiredPort; uint64_t _currentLogIndex; std::vector _recvBuf; std::vector _sendBuf; @@ -66,20 +80,15 @@ private: std::optional _inFlightShardReq; public: - CDCServer(Logger& logger, const CDCOptions& options, std::vector&& shards, CDCDB& db) : + CDCServer(Logger& logger, const CDCOptions& options, CDCShared& shared) : _env(logger, "req_server"), - _stop(false), - _shuckleHost(options.shuckleHost), - _shucklePort(options.shucklePort), - _port(options.port), - _ownIp(options.ownIp), - _shards(std::move(shards)), - _db(db), + _shared(shared), + _desiredPort(options.port), _recvBuf(UDP_MTU), _sendBuf(UDP_MTU), _shardRequestIdCounter(0) { - _currentLogIndex = _db.lastAppliedLogEntry(); + _currentLogIndex = _shared.db.lastAppliedLogEntry(); memset(&_socks[0], 0, sizeof(_socks)); expandKey(CDCKey, _expandedCDCKey); } @@ -88,7 +97,7 @@ public: virtual void terminate() override { _env.flush(); - _stop.store(true); + _shared.stop.store(true); } virtual void onAbort() override { @@ -96,6 +105,8 @@ public: } void run() { + _waitForShards(); + // Create sockets // First sock: the CDC sock // Next 256 socks: the socks we use to communicate with the shards @@ -111,14 +122,14 @@ public: struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); - if (i == 0 && _port != 0) { // CDC - addr.sin_port = htons(_port); - } else { // just to communicate with the shard + if (i == 0 && _desiredPort != 0) { // CDC with specified port + addr.sin_port = htons(_desiredPort); + } else { // automatically assigned port addr.sin_port = 0; } if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) != 0) { - if (i == 0 && _port != 0) { - throw SYSCALL_EXCEPTION("cannot bind socket to port %s", _port); + if (i == 0 && _desiredPort != 0) { + throw SYSCALL_EXCEPTION("cannot bind socket to port %s", _desiredPort); } else { throw SYSCALL_EXCEPTION("cannot bind socket"); } @@ -131,14 +142,12 @@ public: } if (i == 0) { LOG_DEBUG(_env, "bound CDC sock to port %s", ntohs(addr.sin_port)); - _port = ntohs(addr.sin_port); + _shared.ownPort.store(ntohs(addr.sin_port)); } else { LOG_DEBUG(_env, "bound shard %s sock to port %s", i-1, ntohs(addr.sin_port)); } } - _registerWithShuckle(); - // create epoll structure int epoll = epoll_create1(0); if (epoll < 0) { @@ -154,15 +163,15 @@ public: } } - LOG_INFO(_env, "running on port %s", _port); + LOG_INFO(_env, "running on port %s", _desiredPort); // If we've got a dangling transaction, immediately start processing it - _db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step); + _shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step); _processStep(_step); // Start processing CDC requests and shard responses for (;;) { - if (_stop.load()) { + if (_shared.stop.load()) { LOG_DEBUG(_env, "got told to stop, stopping"); break; } @@ -189,27 +198,43 @@ public: } } - _db.close(); + _shared.db.close(); } private: - void _registerWithShuckle() { - ALWAYS_ASSERT(_port != 0); + void _waitForShards() { + LOG_INFO(_env, "Waiting for shard info to be filled in"); + EggsTime t0 = eggsNow(); + Duration maxWait = 1_mins; for (;;) { - if (_stop.load()) { + if (_shared.stop.load()) { return; } - std::string err = registerCDC(_shuckleHost, _shucklePort, 100_ms, _ownIp, _port); - if (!err.empty()) { - RAISE_ALERT(_env, "Couldn't register ourselves with shuckle: %s", err); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + if (eggsNow() - t0 > maxWait) { + throw EGGS_EXCEPTION("could not reach shuckle to get shards after %s, giving up", maxWait); + } + + bool badShard = false; + { + const std::lock_guard lock(_shared.shardsMutex); + for (int i = 0; i < _shared.shards.size(); i++) { + const auto sh = _shared.shards[i]; + if (sh.port == 0) { + LOG_DEBUG(_env, "Shard %s isn't ready yet", i); + badShard = true; + break; + } + } + } + if (badShard) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); continue; } - break; - } - LOG_INFO(_env, "Successfully registered with shuckle"); - } + LOG_INFO(_env, "shards found, proceeding"); + return; + } + } void _drainCDCSock() { int sock = _socks[0]; @@ -266,7 +291,7 @@ private: if (err == NO_ERROR) { // If things went well, process the request LOG_DEBUG(_env, "CDC request %s successfully parsed, will now process", _cdcReqContainer.kind()); - uint64_t txnId = _db.processCDCReq(true, eggsNow(), _advanceLogIndex(), _cdcReqContainer, _step); + uint64_t txnId = _shared.db.processCDCReq(true, eggsNow(), _advanceLogIndex(), _cdcReqContainer, _step); auto& inFlight = _inFlightTxns[txnId]; inFlight.cdcRequestId = reqHeader.requestId; inFlight.clientAddr = clientAddr; @@ -339,14 +364,14 @@ private: // If all went well, advance with the newly received request LOG_DEBUG(_env, "successfully parsed shard response %s with kind %s, will now process: %s", respHeader.requestId, respHeader.kind, _shardRespContainer); - _db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step); + _shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), NO_ERROR, &_shardRespContainer, _step); _processStep(_step); } } void _handleShardError(ShardId shid, EggsError err) { RAISE_ALERT(_env, "got shard error %s from shard %s", err, shid); - _db.processShardResp(true, eggsNow(), _advanceLogIndex(), err, nullptr, _step); + _shared.db.processShardResp(true, eggsNow(), _advanceLogIndex(), err, nullptr, _step); _processStep(_step); } @@ -391,7 +416,9 @@ private: // Send struct sockaddr_in shardAddr; memset(&shardAddr, 0, sizeof(shardAddr)); - const auto& shardInfo = _shards[step.shardReq.shid.u8]; + _shared.shardsMutex.lock(); + ShardInfo shardInfo = _shared.shards[step.shardReq.shid.u8]; + _shared.shardsMutex.unlock(); shardAddr.sin_family = AF_INET; shardAddr.sin_port = htons(shardInfo.port); static_assert(sizeof(shardAddr.sin_addr) == sizeof(shardInfo.ip)); @@ -408,7 +435,7 @@ private: } if (step.nextTxn != 0) { LOG_DEBUG(_env, "we have txn %s lined up, starting it", step.nextTxn); - _db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step); + _shared.db.startNextTransaction(true, eggsNow(), _advanceLogIndex(), _step); _processStep(_step); } } @@ -441,39 +468,124 @@ static void* runCDCServer(void* server) { return nullptr; } -std::vector lookupShardInfo(Logger& logger, const CDCOptions& options) { - Env env(logger, "lookup_shard_info"); - auto startT = eggsNow(); - std::vector shards; - for (;;) { - if (eggsNow() - startT > 20_sec) { - throw EGGS_EXCEPTION("could not reach shuckle to get shards after 20 seconds, giving up"); - } +struct CDCShardUpdater : Undertaker::Reapable { + Env _env; + CDCShared& _shared; + std::string _shuckleHost; + uint16_t _shucklePort; +public: + CDCShardUpdater(Logger& logger, const CDCOptions& options, CDCShared& shared): + _env(logger, "shard_updater"), + _shared(shared), + _shuckleHost(options.shuckleHost), + _shucklePort(options.shucklePort) + {} - std::string err = fetchShards(options.shuckleHost, options.shucklePort, 100_ms, shards); - if (!err.empty()) { - LOG_INFO(env, "failed to reach shuckle at %s:%s to fetch shards, might retry: %s", options.shuckleHost, options.shucklePort, err); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } + virtual ~CDCShardUpdater() = default; - bool badShard = false; - for (int i = 0; i < 256; i++) { - const auto& shard = shards.at(i); - if (shard.port == 0) { - LOG_INFO(env, "shard %s is not registered in shuckle yet, might retry", i); - badShard = true; - break; - } - } - if (badShard) { - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - continue; - } - - LOG_INFO(env, "sucessfully fetched shards from shuckle"); - return shards; + virtual void terminate() override { + _env.flush(); + _shared.stop.store(true); } + + virtual void onAbort() override { + _env.flush(); + } + + void run() { + auto shards = std::make_unique>(); + for (;;) { + if (_shared.stop.load()) { + return; + } + std::string err = fetchShards(_shuckleHost, _shucklePort, 100_ms, *shards); + if (!err.empty()) { + LOG_INFO(_env, "failed to reach shuckle at %s:%s to fetch shards, might retry: %s", _shuckleHost, _shucklePort, err); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + bool badShard = false; + for (int i = 0; i < shards->size(); i++) { + if (shards->at(i).port == 0) { + LOG_DEBUG(_env, "Shard %s not ready yet", i); + badShard = true; + break; + } + } + if (badShard) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + { + const std::lock_guard lock(_shared.shardsMutex); + for (int i = 0; i < shards->size(); i++) { + _shared.shards[i] = shards->at(i); + } + } + LOG_INFO(_env, "successfully fetched all shards from shuckle, will wait one minute"); + std::this_thread::sleep_for(std::chrono::minutes(1)); + } + } +}; + +static void* runCDCShardUpdater(void* server) { + ((CDCShardUpdater*)server)->run(); + return nullptr; +} + +struct CDCRegisterer : Undertaker::Reapable { + Env _env; + CDCShared& _shared; + std::array _ownIp; + std::string _shuckleHost; + uint16_t _shucklePort; +public: + CDCRegisterer(Logger& logger, const CDCOptions& options, CDCShared& shared): + _env(logger, "registerer"), + _shared(shared), + _ownIp(options.ownIp), + _shuckleHost(options.shuckleHost), + _shucklePort(options.shucklePort) + {} + + virtual ~CDCRegisterer() = default; + + virtual void terminate() override { + _env.flush(); + _shared.stop.store(true); + } + + virtual void onAbort() override { + _env.flush(); + } + + void run() { + for (;;) { + if (_shared.stop.load()) { + return; + } + uint16_t port = _shared.ownPort.load(); + if (port == 0) { + // shard server isn't up yet + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + LOG_INFO(_env, "Registering ourselves (CDC, port %s) with shuckle", port); + std::string err = registerCDC(_shuckleHost, _shucklePort, 100_ms, _ownIp, port); + if (!err.empty()) { + RAISE_ALERT(_env, "Couldn't register ourselves with shuckle: %s", err); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + LOG_INFO(_env, "Successfully registered with shuckle, will register again in one minute"); + std::this_thread::sleep_for(std::chrono::minutes(1)); + } + } +}; + +static void* runCDCRegisterer(void* server) { + ((CDCRegisterer*)server)->run(); + return nullptr; } void runCDC(const std::string& dbDir, const CDCOptions& options) { @@ -490,12 +602,11 @@ void runCDC(const std::string& dbDir, const CDCOptions& options) { } Logger logger(options.level, *logOut); - auto shards = lookupShardInfo(logger, options); - CDCDB db(logger, dbDir); + auto shared = std::make_unique(db); { - auto server = std::make_unique(logger, options, std::move(shards), db); + auto server = std::make_unique(logger, options, *shared); pthread_t tid; if (pthread_create(&tid, nullptr, &runCDCServer, &*server) != 0) { throw SYSCALL_EXCEPTION("pthread_create"); @@ -503,5 +614,23 @@ void runCDC(const std::string& dbDir, const CDCOptions& options) { undertaker->checkin(std::move(server), tid, "server"); } + { + auto server = std::make_unique(logger, options, *shared); + pthread_t tid; + if (pthread_create(&tid, nullptr, &runCDCShardUpdater, &*server) != 0) { + throw SYSCALL_EXCEPTION("pthread_create"); + } + undertaker->checkin(std::move(server), tid, "shard_updater"); + } + + { + auto server = std::make_unique(logger, options, *shared); + pthread_t tid; + if (pthread_create(&tid, nullptr, &runCDCRegisterer, &*server) != 0) { + throw SYSCALL_EXCEPTION("pthread_create"); + } + undertaker->checkin(std::move(server), tid, "registerer"); + } + undertaker->reap(); } \ No newline at end of file diff --git a/cpp/core/Shuckle.cpp b/cpp/core/Shuckle.cpp index 8104772f..21da57ce 100644 --- a/cpp/core/Shuckle.cpp +++ b/cpp/core/Shuckle.cpp @@ -212,7 +212,7 @@ std::string registerCDC(const std::string& host, uint16_t port, Duration timeout return {}; } -std::string fetchShards(const std::string& host, uint16_t port, Duration timeout, std::vector& shards) { +std::string fetchShards(const std::string& host, uint16_t port, Duration timeout, std::array& shards) { std::string errString; int sock = shuckleSock(host, port, timeout, errString); if (sock < 0) { @@ -231,7 +231,12 @@ std::string fetchShards(const std::string& host, uint16_t port, Duration timeout if (!errString.empty()) { return errString; } - shards = respContainer.getShards().shards.els; + if (respContainer.getShards().shards.els.size() != shards.size()) { + throw EGGS_EXCEPTION("expecting %s shards, got %s", shards.size(), respContainer.getShards().shards.els.size()); + } + for (int i = 0; i < shards.size(); i++) { + shards[i] = respContainer.getShards().shards.els[i]; + } return {}; } diff --git a/cpp/core/Shuckle.hpp b/cpp/core/Shuckle.hpp index 57fbc37e..a7d53061 100644 --- a/cpp/core/Shuckle.hpp +++ b/cpp/core/Shuckle.hpp @@ -39,7 +39,7 @@ std::string fetchShards( const std::string& shuckleHost, uint16_t shucklePort, Duration timeout, - std::vector& shards + std::array& shards ); const std::string defaultShuckleAddress = "REDACTED"; diff --git a/cpp/shard/Shard.cpp b/cpp/shard/Shard.cpp index 2d11fd55..a35d2818 100644 --- a/cpp/shard/Shard.cpp +++ b/cpp/shard/Shard.cpp @@ -21,6 +21,7 @@ #include "Time.hpp" #include "Undertaker.hpp" #include "splitmix64.hpp" +#include "Time.hpp" // Data needed to synchronize between the different threads struct ShardShared { @@ -29,9 +30,11 @@ private: std::mutex _applyLock; public: ShardDB& db; + std::atomic stop; + std::atomic ownPort; ShardShared() = delete; - ShardShared(ShardDB& db_): db(db_) { + ShardShared(ShardDB& db_): db(db_), stop(false), ownPort(0) { _currentLogIndex = db.lastAppliedLogEntry(); } @@ -43,29 +46,22 @@ public: } }; + struct ShardServer : Undertaker::Reapable { private: Env _env; ShardShared& _shared; ShardId _shid; - uint16_t _port; - std::array _ownIp; - std::atomic _stop; - std::string _shuckleHost; - uint16_t _shucklePort; + uint16_t _desiredPort; uint64_t _packetDropRand; uint64_t _incomingPacketDropProbability; // probability * 10,000 uint64_t _outgoingPacketDropProbability; // probability * 10,000 public: - ShardServer(Logger& logger, ShardShared& shared, ShardId shid, const ShardOptions& options): + ShardServer(Logger& logger, ShardId shid, const ShardOptions& options, ShardShared& shared): _env(logger, "server"), _shared(shared), _shid(shid), - _port(options.port), - _ownIp(options.ownIp), - _stop(false), - _shuckleHost(options.shuckleHost), - _shucklePort(options.shucklePort), + _desiredPort(options.port), _packetDropRand((int)shid.u8 + 1), // CDC is 0 _incomingPacketDropProbability(0), _outgoingPacketDropProbability(0) @@ -85,7 +81,7 @@ public: virtual void terminate() override { _env.flush(); - _stop.store(true); + _shared.stop.store(true); } virtual void onAbort() override { @@ -105,7 +101,7 @@ public: serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); serverAddr.sin_port = htons(0); if (bind(sock, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) != 0) { - throw SYSCALL_EXCEPTION("cannot bind socket to port %s", _port); + throw SYSCALL_EXCEPTION("cannot bind socket to port %s", _desiredPort); } { socklen_t addrLen = sizeof(serverAddr); @@ -123,10 +119,8 @@ public: } } - _port = ntohs(serverAddr.sin_port); - LOG_INFO(_env, "Bound shard %s to port %s", _shid, ntohs(serverAddr.sin_port)); - - _registerWithShuckle(); + _shared.ownPort.store(ntohs(serverAddr.sin_port)); + LOG_INFO(_env, "Bound shard %s to port %s", _shid, _shared.ownPort.load()); struct sockaddr_in clientAddr; std::vector recvBuf(UDP_MTU); @@ -136,7 +130,7 @@ public: auto logEntry = std::make_unique(); for (;;) { - if (_stop.load()) { + if (_shared.stop.load()) { LOG_DEBUG(_env, "got told to stop, stopping"); break; } @@ -241,25 +235,6 @@ public: // If we're terminating gracefully we're the last ones, close the db nicely _shared.db.close(); } - -private: - void _registerWithShuckle() { - ALWAYS_ASSERT(_port != 0); - for (;;) { - if (_stop.load()) { - return; - } - LOG_INFO(_env, "Registering ourselves (shard %s, port %s) with shuckle", _shid, _port); - std::string err = registerShard(_shuckleHost, _shucklePort, 100_ms, _shid, _ownIp, _port); - if (!err.empty()) { - RAISE_ALERT(_env, "Couldn't register ourselves with shuckle: %s", err); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - continue; - } - break; - } - LOG_INFO(_env, "Successfully registered with shuckle"); - } }; static void* runShardServer(void* server) { @@ -267,29 +242,85 @@ static void* runShardServer(void* server) { return nullptr; } -struct ShardShuckleUpdater : Undertaker::Reapable { +struct ShardRegisterer : Undertaker::Reapable { +private: + Env _env; + ShardShared& _shared; + ShardId _shid; + std::array _ownIp; + std::string _shuckleHost; + uint16_t _shucklePort; +public: + ShardRegisterer(Logger& logger, ShardId shid, const ShardOptions& options, ShardShared& shared): + _env(logger, "registerer"), + _shared(shared), + _shid(shid), + _ownIp(options.ownIp), + _shuckleHost(options.shuckleHost), + _shucklePort(options.shucklePort) + {} + + virtual ~ShardRegisterer() = default; + + virtual void terminate() override { + _env.flush(); + _shared.stop.store(true); + } + + virtual void onAbort() override { + _env.flush(); + } + + void run() { + for (;;) { + if (_shared.stop.load()) { + return; + } + uint16_t port = _shared.ownPort.load(); + if (port == 0) { + // shard server isn't up yet + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + LOG_INFO(_env, "Registering ourselves (shard %s, port %s) with shuckle", _shid, port); + std::string err = registerShard(_shuckleHost, _shucklePort, 100_ms, _shid, _ownIp, port); + if (!err.empty()) { + RAISE_ALERT(_env, "Couldn't register ourselves with shuckle: %s", err); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + continue; + } + LOG_INFO(_env, "Successfully registered with shuckle, will register again in one minute"); + std::this_thread::sleep_for(std::chrono::minutes(1)); + } + } +}; + +static void* runShardRegisterer(void* server) { + ((ShardRegisterer*)server)->run(); + return nullptr; +} + +struct ShardBlockServiceUpdater : Undertaker::Reapable { private: Env _env; ShardShared& _shared; - std::atomic _stop; ShardId _shid; std::string _shuckleHost; uint16_t _shucklePort; public: - ShardShuckleUpdater(Logger& logger, ShardShared& shared, ShardId shid, const ShardOptions& options): - _env(logger, "shuckle_updater"), + ShardBlockServiceUpdater(Logger& logger, ShardId shid, const ShardOptions& options, ShardShared& shared): + _env(logger, "block_service_updater"), _shared(shared), - _stop(false), _shid(shid), _shuckleHost(options.shuckleHost), _shucklePort(options.shucklePort) {} - virtual ~ShardShuckleUpdater() = default; + virtual ~ShardBlockServiceUpdater() = default; virtual void terminate() override { _env.flush(); - _stop.store(true); + _shared.stop.store(true); } virtual void onAbort() override { @@ -308,7 +339,7 @@ public: continue; \ for (;;) { - if (_stop.load()) { + if (_shared.stop.load()) { LOG_DEBUG(_env, "got told to stop, stopping"); break; } @@ -353,8 +384,8 @@ public: } }; -static void* runShardShuckleUpdater(void* server) { - ((ShardShuckleUpdater*)server)->run(); +static void* runShardBlockServiceUpdater(void* server) { + ((ShardBlockServiceUpdater*)server)->run(); return nullptr; } @@ -377,7 +408,7 @@ void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& option ShardShared shared(db); { - auto server = std::make_unique(logger, shared, shid, options); + auto server = std::make_unique(logger, shid, options, shared); pthread_t tid; if (pthread_create(&tid, nullptr, &runShardServer, &*server) != 0) { throw SYSCALL_EXCEPTION("pthread_create"); @@ -386,12 +417,21 @@ void runShard(ShardId shid, const std::string& dbDir, const ShardOptions& option } { - auto shuckleUpdater = std::make_unique(logger, shared, shid, options); + auto shuckleRegisterer = std::make_unique(logger, shid, options, shared); pthread_t tid; - if (pthread_create(&tid, nullptr, &runShardShuckleUpdater, &*shuckleUpdater) != 0) { + if (pthread_create(&tid, nullptr, &runShardRegisterer, &*shuckleRegisterer) != 0) { throw SYSCALL_EXCEPTION("pthread_create"); } - undertaker->checkin(std::move(shuckleUpdater), tid, "shuckle_updater"); + undertaker->checkin(std::move(shuckleRegisterer), tid, "registerer"); + } + + { + auto shuckleUpdater = std::make_unique(logger, shid, options, shared); + pthread_t tid; + if (pthread_create(&tid, nullptr, &runShardBlockServiceUpdater, &*shuckleUpdater) != 0) { + throw SYSCALL_EXCEPTION("pthread_create"); + } + undertaker->checkin(std::move(shuckleUpdater), tid, "block_service_updater"); } undertaker->reap(); diff --git a/go/eggs/managedprocess.go b/go/eggs/managedprocess.go index c6b5faf1..5cf803e6 100644 --- a/go/eggs/managedprocess.go +++ b/go/eggs/managedprocess.go @@ -212,9 +212,9 @@ func (procs *ManagedProcesses) Close() { } proc.cmd.Process.Signal(syscall.SIGTERM) terminated := uint64(0) - // wait at most 5 seconds for process to come down + // wait at most 20 seconds for process to come down go func() { - time.Sleep(5 * time.Second) + time.Sleep(20 * time.Second) if atomic.LoadUint64(&terminated) == 0 { fmt.Printf("process %s not terminating, killing it\n", proc.name) proc.cmd.Process.Kill() // ignoring error on purpose, there isn't much to do by now