Files
ternfs-XTXMarkets/cpp/core/Loop.cpp
Francesco Mazzoli 8075e99bb6 Graceful shard teardown
See <https://mazzo.li/posts/stopping-linux-threads.html> for tradeoffs
regarding how to terminate threads gracefully.

The goal of this work was for valgrind to work correctly, which in turn
was to investigate #141. It looks like I have succeeded:

    ==2715080== Warning: unimplemented fcntl command: 1036
    ==2715080== 20,052 bytes in 5,013 blocks are definitely lost in loss record 133 of 135
    ==2715080==    at 0x483F013: operator new(unsigned long) (in /usr/lib/valgrind/vgpreload_memcheck-amd64-linux.so)
    ==2715080==    by 0x3B708E: allocate (new_allocator.h:121)
    ==2715080==    by 0x3B708E: allocate (allocator.h:173)
    ==2715080==    by 0x3B708E: allocate (alloc_traits.h:460)
    ==2715080==    by 0x3B708E: _M_allocate (stl_vector.h:346)
    ==2715080==    by 0x3B708E: std::vector<Crc, std::allocator<Crc> >::_M_default_append(unsigned long) (vector.tcc:635)
    ==2715080==    by 0x42BF1C: resize (stl_vector.h:940)
    ==2715080==    by 0x42BF1C: ShardDBImpl::_fileSpans(rocksdb::ReadOptions&, FileSpansReq const&, FileSpansResp&) (shard/ShardDB.cpp:921)
    ==2715080==    by 0x420867: ShardDBImpl::read(ShardReqContainer const&, ShardRespContainer&) (shard/ShardDB.cpp:1034)
    ==2715080==    by 0x3CB3EE: ShardServer::_handleRequest(int, sockaddr_in*, char*, unsigned long) (shard/Shard.cpp:347)
    ==2715080==    by 0x3C8A39: ShardServer::step() (shard/Shard.cpp:405)
    ==2715080==    by 0x40B1E8: run (core/Loop.cpp:67)
    ==2715080==    by 0x40B1E8: startLoop(void*) (core/Loop.cpp:37)
    ==2715080==    by 0x4BEA258: start_thread (in /usr/lib/libpthread-2.33.so)
    ==2715080==    by 0x4D005E2: clone (in /usr/lib/libc-2.33.so)
    ==2715080==
    ==2715080==
    ==2715080== Exit program on first error (--exit-on-first-error=yes)
2024-01-08 15:41:22 +00:00

136 lines
4.0 KiB
C++

#include <atomic>
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include "Assert.hpp"
#include "Loop.hpp"
#include "Exception.hpp"
// The sigset we run with normally, with SIGINT/SIGTERM masked.
static sigset_t baseSigset;
// The sigset to run when running blocking syscalls, with SIGINT/SIGTERM unmasked.
static sigset_t blockingSigset;
__attribute__((constructor))
static void setupSigsets() {
sigemptyset(&baseSigset);
blockingSigset = baseSigset;
sigaddset(&baseSigset, SIGINT);
sigaddset(&baseSigset, SIGTERM);
}
thread_local std::atomic<bool> stopLoop;
static void stopLoopHandler(int signum) {
stopLoop.store(true, std::memory_order_release);
}
void LoopThread::stop() {
pthread_kill(thread, SIGTERM);
}
Loop::Loop(Logger& logger, std::shared_ptr<XmonAgent>& xmon, const std::string& name) : _env(logger, xmon, name), _name(name) {}
static void* startLoop(void* rawLoop) {
std::unique_ptr<Loop> loop((Loop*)rawLoop);
loop->run();
return nullptr;
}
std::unique_ptr<LoopThread> Loop::Spawn(std::unique_ptr<Loop>&& loop) {
Loop* rawLoop = loop.release();
pthread_t thr;
{
pthread_attr_t attr;
int res = pthread_attr_init(&attr);
if (res != 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_attr_init");
}
res = pthread_attr_setsigmask_np(&attr, &baseSigset);
if (res != 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_attr_setsigmask_np");
}
res = pthread_create(&thr, &attr, &startLoop, rawLoop);
if (res != 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_create");
}
res = pthread_attr_destroy(&attr);
if (res != 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(res, "pthread_attr_destroy");
}
}
return std::make_unique<LoopThread>(thr);
}
void Loop::run() {
while (!stopLoop.load()) { step(); }
}
int Loop::poll(struct pollfd* fds, nfds_t nfds) {
return ppoll(fds, nfds, nullptr, &blockingSigset);
}
void Loop::stop() {
stopLoop.store(true, std::memory_order_release);
}
int Loop::sleep(Duration d) {
auto tspec = d.timespec();
return ppoll(nullptr, 0, &tspec, &blockingSigset);
}
void waitUntilStopped(std::vector<std::unique_ptr<LoopThread>>& loops) {
ALWAYS_ASSERT(getpid() == gettid(), "You can only run this function from the main thread");
// mask signals here too
{
int ret = pthread_sigmask(SIG_SETMASK, &baseSigset, nullptr);
if (ret < 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(ret, "pthread_sigmask");
}
}
// setup signal handler
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = &stopLoopHandler;
if (sigaction(SIGTERM, &sa, nullptr) < 0) {
throw SYSCALL_EXCEPTION("sigaction");
}
if (sigaction(SIGINT, &sa, nullptr) < 0) {
throw SYSCALL_EXCEPTION("sigaction");
}
}
// start waiting
while (!stopLoop.load()) {
struct timespec timeout { .tv_sec = 60*60*24 };
int ret = ppoll(nullptr, 0, &timeout, &blockingSigset);
if (ret < 0 && errno != EINTR) {
throw SYSCALL_EXCEPTION("ppoll");
}
}
// we've been told to stop, tear down all threads
for (int i = loops.size()-1; i >= 0; i--) {
loops[i]->stop();
struct timespec timeout;
if (clock_gettime(CLOCK_REALTIME, &timeout) < 0) {
throw SYSCALL_EXCEPTION("clock_gettime");
}
timeout.tv_sec += 10;
int ret = pthread_timedjoin_np(loops[i]->thread, nullptr, &timeout);
if (ret != 0 && ret == ETIMEDOUT) {
char name[16];
{
int ret = pthread_getname_np(loops[i]->thread, name, sizeof(name));
if (ret != 0) {
throw EXPLICIT_SYSCALL_EXCEPTION(ret, "pthread_getname_np");
}
}
throw EGGS_EXCEPTION("loop %s has not terminated in time, aborting", name);
}
}
}