Convert snappea 'wakeup' signal to inotify-based

see https://github.com/python/cpython/issues/118143
This commit is contained in:
Klaas van Schelven
2024-04-22 15:34:33 +02:00
parent 46f34370f4
commit 4fbc283f3e
5 changed files with 117 additions and 75 deletions

View File

@@ -6,6 +6,7 @@ jsonschema==4.19.*
semver==3.0.*
django-admin-autocomplete-filter==0.7.*
pygments==2.16.*
inotify_simple
# testing/development only:
requests

View File

@@ -1,24 +1,29 @@
import threading
class SafeDict:
class Workers:
"""Python's dict is 'probably thread safe', but since this is hard to reason about, and in fact .items() may be
unsafe, we just use a lock. See e.g.
* https://stackoverflow.com/questions/6953351/thread-safety-in-pythons-dictionary
* https://stackoverflow.com/questions/66556511/is-listdict-items-thread-safe
Furthermore: we need a way to do a Thread-safe update of what we actually have running, so worker_thread.start() is
tied to the dict-update in a lock.
"""
def __init__(self):
self.d = {}
self.lock = threading.Lock()
def set(self, k, v):
def start(self, task_id, worker_thread):
with self.lock:
self.d[k] = v
self.d[task_id] = worker_thread
worker_thread.start()
def unset(self, k):
def stopped(self, task_id):
with self.lock:
del self.d[k]
del self.d[task_id]
def list(self):
with self.lock:

View File

@@ -1,5 +1,5 @@
import uuid
import os
import signal
import json
from django.conf import settings
@@ -23,11 +23,12 @@ def shared_task(function):
# notes on the lack of immediate_atomic go here
Task.objects.create(task_name=name, args=json.dumps(args), kwargs=json.dumps(kwargs))
with open("/tmp/snappea.pid", "r") as f:
# NOTE perhaps we can let systemd take a role here, it seems to do pids
# TODO: handling of [1] no such file [2] no such process
foreman_pid = int(f.read())
os.kill(foreman_pid, signal.SIGUSR1)
wakeup_calls_dir = os.path.join('/tmp', 'snappea')
if not os.path.exists(wakeup_calls_dir):
os.mkdir(wakeup_calls_dir, exist_ok=True)
with open(os.path.join(wakeup_calls_dir, str(uuid.uuid4())), "w"):
pass
name = function.__module__ + "." + function.__name__
function.delay = delayed_function

View File

@@ -19,3 +19,8 @@ def example_worker():
@shared_task
def example_failing_worker():
raise Exception("I am failing")
@shared_task
def fast_task():
pass

View File

@@ -1,15 +1,20 @@
import os
import glob
import uuid
import sys
import json
import os
import logging
import time
import signal
import threading
from inotify_simple import INotify, flags
from sentry_sdk import capture_exception
from . import registry
from .models import Task
from .datastructures import SafeDict
from .datastructures import Workers
from .example_tasks import * # noqa FOR NOW UNTIL WE implement task discovery
GRACEFUL_TIMEOUT = 10
@@ -45,24 +50,26 @@ class Foreman:
and 1 read are needed per task). I do not expect this to be in any way limiting (TODO check)
The main idea is this endless loop of checking for new work and doing it. This leaves the question of how we "go to
sleep" when there is no more work and how we wake up from that. This is implemented using [1] OS signals across
processes (SIGUSR1 is sent from the 'client' after a Task object is created) and [2] a semaphore (to implement the
wake-up on the main loop). Note that this idea is somewhat incidental though, I also considered polling in a busy
loop or sending characters over a unix socket.
Python signal handlers suspend whatever is currenlty going on (even other signal handlers). I find it hard to reason
about that. This makes reasoning about what happens if they were to be interrupted both more likely and harder if
they contain a lot of code. Solution: release a semaphore, that some other (sequentially looping) piece of code is
waiting for.
sleep" when there is no more work and how we wake up from that. This is implemented using inotify on a directory
created specifically for that purpose (for each Task a file is dropped there) (and a blocking read on the INotify
object). Note that this idea is somewhat incidental though (0MQ or polling the DB in a busy loop are some
alternatives).
"""
def __init__(self):
self.workers = SafeDict()
self.workers = Workers()
self.stopping = False
signal.signal(signal.SIGINT, self.handle_sigint)
signal.signal(signal.SIGUSR1, self.handle_sigusr1)
# We use inotify to wake up the Foreman when a new Task is created.
self.wakeup_calls_dir = os.path.join('/tmp', 'snappea')
if not os.path.exists(self.wakeup_calls_dir):
os.makedirs(self.wakeup_calls_dir, exist_ok=True)
self.wakeup_calls = INotify()
self.wakeup_calls.add_watch(self.wakeup_calls_dir, flags.CREATE)
# Pid stuff
pid = os.getpid()
logger.info("Foreman created, my pid is %s", pid)
with open("/tmp/snappea.pid", "w") as f: # TODO configurable location
@@ -72,7 +79,7 @@ class Foreman:
# this is 0
self.signal_semaphore = threading.Semaphore(0)
# Counts the number of available worker threads. When this is 0, create_worker will first wait until a worker
# Counts the number of available worker threads. When this is 0, create_workers will first wait until a worker
# stops. (the value of this semaphore is implicitly NUM_WORKERS - active_workers)
self.worker_semaphore = threading.Semaphore(NUM_WORKERS)
@@ -88,7 +95,7 @@ class Foreman:
capture_exception(e)
finally:
logger.info("worker done: %s", task_id)
self.workers.unset(task_id)
self.workers.stopped(task_id)
self.worker_semaphore.release()
worker_thread = threading.Thread(target=non_failing_function, args=args, kwargs=kwargs)
@@ -97,79 +104,102 @@ class Foreman:
# we can, however, set deamon=True which will ensure that an exit of the main thread is the end of the program
# (we have implemented manual waiting for GRACEFUL_TIMEOUT separately).
worker_thread.daemon = True
worker_thread.start()
self.workers.set(task_id, worker_thread)
self.workers.start(task_id, worker_thread)
return worker_thread
def handle_sigusr1(self, sig, frame):
logger.debug("Received SIGUSR1")
self.signal_semaphore.release()
def handle_sigint(self, signal, frame):
# We take the same approach as with handle SIGUSR1: we set a flag and release a semaphore. The advantage is that
# we don't have to think about e.g. handle_sigint being called while we're handling a previous call to it. The
# (slight) disadvantage is that we need to sprinkle calls to check_for_stopping() in more locations (at least
# after every semaphore is acquired)
# We set a flag and release a semaphore. The advantage is that we don't have to think about e.g. handle_sigint
# being called while we're handling a previous call to it. The (slight) disadvantage is that we need to sprinkle
# calls to check_for_stopping() in more locations (at least after every semaphore is acquired)
logger.debug("Received SIGINT")
if not self.stopping: # without this if-statement, repeated signals would extend the deadline
self.stopping = True
self.stop_deadline = time.time() + GRACEFUL_TIMEOUT
# Ensure that anything we might be waiting for is unblocked. A single .release call is enough because after
# every acquire call in our codebase check_for_stopping() is the first thing we do, so the release cannot be
# inadvertently be "used up" by something else.
self.signal_semaphore.release()
# Ensure that anything we might be waiting for is unblocked. A single notification file and .release call is
# enough because after every wakeup_calls.read() / acquire call in our codebase the first thing we do is
# check_for_stopping(), so the release cannot be inadvertently be "used up" by something else.
with open(os.path.join(self.wakeup_calls_dir, str(uuid.uuid4())), "w"):
pass
self.worker_semaphore.release()
def run_forever(self):
# Before we do our regular sleep-wake-check-do loop, we clear any outstanding work. sigusr1 signals coming in
pre_existing_wakeup_notifications = glob.glob(os.path.join(self.wakeup_calls_dir, "*"))
if len(pre_existing_wakeup_notifications) > 0:
# We clear the wakeup_calls_dir on startup. Not strictly necessary because such files would be cleared by in
# the loop anyway, but it's more efficient to do it first.
logger.info("Clearing %s pre-existing wakeup notifications", len(pre_existing_wakeup_notifications))
for filename in pre_existing_wakeup_notifications:
os.remove(filename)
# Before we do our regular sleep-wake-check-do loop, we clear any outstanding work. wake up signals coming in
# during this time-period will simply "count up" the semaphore even though the work is already being done. This
# is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into
# deep sleep after.
logger.info("Checking Task backlog")
while self.create_worker():
self.check_for_stopping()
while self.create_workers() > 0:
pass
logger.info("Task backlog empty now, proceeding to main loop")
while True:
logger.debug("Checking (potentially waiting) for SIGUSR1")
self.signal_semaphore.acquire()
self.check_for_stopping()
self.create_worker()
for event in self.wakeup_calls.read():
# I think we can just do os.unlink(), without being afraid of an error either here or on the side where
# we write the file. I don't have a link to the man page to back this up, but when running "many" calls
# (using 2 processes with each simple tight loop, one creating the files and one deleting them, I did
# not get any errors)
os.unlink(os.path.join(self.wakeup_calls_dir, event.name))
def create_worker(self):
"""returns a boolean 'was anything done?'"""
self.check_for_stopping() # always check after .read()
while self.create_workers() > 0:
pass
logger.debug("Checking (potentially waiting) for available worker slots")
self.worker_semaphore.acquire()
self.check_for_stopping() # always check after .acquire()
def create_workers(self):
"""returns the number of workers created (AKA tasks done)"""
task = Task.objects.first()
if task is None:
# Seeing this is expected on-bootup (one after all Peas are dealt with, and once for each SIGUSR1 that was
# received while clearing the initial backlog, but before we went into sleeping mode). If you see it later,
# it's odd. (We could even assert for it)
logger.debug("Querying for tasks")
# We get "a lot" of Tasks at once, rather than one by one. We assume (but did not test) this is more efficient
# than getting the Tasks one by one. It also has consequences for the case where many Tasks (and thus
# wakeup notifications) come in at the same time: in such cases, we may deal with more than one Task for a
# single iteration through run_forever's while loop. The final loop before sleeping will then have a "No task
# found" (and associated useless READ on the database). Why we do this: the .all() approach works with how we
# deal with wake up notifications, namely: whenever we get some, we .read() (and delete) all of them away in one
# swoop. This means a number of notifications will fold into a single iteration through our main run_forever
# loop and thus we need to do more than a single Task. Also, the waisted READ is precisely when there is nothing
# to do (i.e. it's waisting time when we have a lot of time).
# (we've put _some_ limit on the amount of tasks to get in a single query to avoid memory overflows when there
# is a lot of work. the expected case is: when the snappeaserver has been gone for a while, and work has been
# built up in the backlog; we want to at least be resilient for that case.)
tasks = Task.objects.all()[:100]
task_i = -1
for task_i, task in enumerate(tasks):
logger.debug("Creating worker for with task", task.id)
logger.debug("Checking (potentially waiting) for available worker slots")
self.worker_semaphore.acquire()
self.check_for_stopping() # always check after .acquire()
# TODO note on why no transactions are needed (it's just a single call anyway)
# TODO note on the guarantees we provide (not many)
# TODO this bit is the main bit where an exception handler is missing: for both the (potentially failing) DB
# write and the business of looking up tasks by name.
task_id = task.id
function = registry[task.task_name]
args = json.loads(task.args)
kwargs = json.loads(task.kwargs)
self.check_for_stopping() # check_for_stopping() right before taking on the work
task.delete()
self.run_in_thread(task_id, function, *args, **kwargs)
if task_i == -1:
# Seeing this is expected on-bootup (one after all Peas are dealt with, and once for each noification that
# was received while clearing the initial backlog, but before we went into "sleeping" (wait for
# notification) mode). See also above, starting with 'We get "a lot" of Tasks at once'
logger.info("No task found")
# We acquired the worker_semaphore at the start of this method, but we're not using it. Release immediately!
self.worker_semaphore.release()
return False
# TODO note on why no transactions are needed (it's just a single call anyway)
# TODO note on the guarantees we provide (not many)
# TODO this bit is the main bit where an exception handler is missing: for both the (potentially failing) DB
# write and the business of looking up tasks by name.
task_id = task.id
function = registry[task.task_name]
args = json.loads(task.args)
kwargs = json.loads(task.kwargs)
self.check_for_stopping()
task.delete()
self.run_in_thread(task_id, function, *args, **kwargs)
return True
return task_i + 1
def check_for_stopping(self):
if not self.stopping: