From 08330dd274c22ff30fecd4bf71934cb215cac26e Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Tue, 23 Apr 2024 09:42:22 +0200 Subject: [PATCH] Use proper thread-local for wakeup_file uuid part Stackoverflow said "In Python, everything is shared, except for function-local variables (because each function call gets its own set of locals, and threads are always separate function calls.) " whatever was meant exactly by that, it's not true for us, because our functions are called on-import, and the vars-in-closure are shared between threads. i.e.: >>> from threading import Thread >>> def closure(): ... l = [] ... def inner(): ... l.append("x") ... print(len(l)) ... return inner ... >>> inner = closure() >>> thread = Thread(target=inner) >>> thread.start() 1 >>> thread = Thread(target=inner) >>> thread.start() 2 --- snappea/__init__.py | 22 ++++++++++++++++++++++ snappea/decorators.py | 21 ++++++--------------- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/snappea/__init__.py b/snappea/__init__.py index 4b9a5c2..34c8b2c 100644 --- a/snappea/__init__.py +++ b/snappea/__init__.py @@ -1,4 +1,7 @@ +import uuid import logging +import threading + logger = logging.getLogger("snappea.foreman") @@ -24,3 +27,22 @@ class Registry: registry = Registry() + +# We use a random filename for wakeup_file, but it is random only for the sending thread. This has the advantage that +# when many wakeup signals are sent but not consumed they will not fill up our wakeup_calls_dir in O(n) fashion. This +# filling-up could otherwise happen, because the Foreman takes on some chunk of work from the DB (currently: 100 +# records) which may take a while to be processed (especially if this value is larger than the number of workers) and +# the wake up signals may flood the wakeup_dir in that time. +# +# Using a single file per-client does not introduce race conditions, though this is much harder to see than for the +# file-per-task case. To see why this is the case (TODO copy notes from paper, including those for the previous case) +# +# (The fact that this is hard to see could provide an argument for reverting to per-task-uuid; to keep the directory +# from overflowing we would have to make the batch-size (much) smaller. (we cannot just put signal cleanups inside the +# worker-creation loop, because they always need to precede the querying for tasks). +# +# Note that our current solution (less than one wake-up signal per task) has moved us away from "everything as files" +# (i.e. tied us stronger to actually maintaining the queue in sqlite) +localStorage = threading.local() +localStorage.uuid = str(uuid.uuid4()) +thread_uuid = localStorage.uuid diff --git a/snappea/decorators.py b/snappea/decorators.py index 745fdd9..34a0682 100644 --- a/snappea/decorators.py +++ b/snappea/decorators.py @@ -1,9 +1,8 @@ -import uuid import os import json from django.conf import settings -from . import registry +from . import registry, thread_uuid from .models import Task @@ -20,9 +19,13 @@ def shared_task(function): # the non-eager case either). return - # notes on the lack of immediate_atomic go here + # No need for a transaction: we just write something (not connected to any other object, and we will never touch + # it again). Task.objects.create(task_name=name, args=json.dumps(args), kwargs=json.dumps(kwargs)) + wakeup_calls_dir = os.path.join('/tmp', 'snappea') + wakeup_file = os.path.join(wakeup_calls_dir, thread_uuid) + if not os.path.exists(wakeup_calls_dir): os.mkdir(wakeup_calls_dir, exist_ok=True) @@ -30,18 +33,6 @@ def shared_task(function): with open(wakeup_file, "w"): pass - # We use a random filename for wakeup_file, but because this variable is bound to the shared_task decorator it is - # not recalculated on every call to .delay(). This has the advantage that when many wakeup signals are sent but not - # consumed they will not fill up our wakeup_calls_dir in O(n) fashion. This filling-up could otherwise happen, - # because the Foreman takes on some chunk of work from the DB (currently: 100 records) which may take a while to be - # processed (especially if this value is larger than the number of workers) and the wake up signals may flood the - # wakeup_dir in that time. - # - # Because the call to uuid() is stored as a local variable of a function (namely: shared_task), it is by definition - # thread-local. (CHECK: is this really so? and is this even important?) - wakeup_calls_dir = os.path.join('/tmp', 'snappea') - wakeup_file = os.path.join(wakeup_calls_dir, str(uuid.uuid4())) - name = function.__module__ + "." + function.__name__ function.delay = delayed_function registry[name] = function