Use proper thread-local for wakeup_file uuid part

Stackoverflow said "In Python, everything is shared, except for function-local
variables (because each function call gets its own set of locals, and threads
are always separate function calls.) "

whatever was meant exactly by that, it's not true for us, because our functions
are called on-import, and the vars-in-closure are shared between threads. i.e.:

>>> from threading import Thread
>>> def closure():
...   l = []
...   def inner():
...     l.append("x")
...     print(len(l))
...   return inner
...
>>> inner = closure()
>>> thread = Thread(target=inner)
>>> thread.start()
1
>>> thread = Thread(target=inner)
>>> thread.start()
2
This commit is contained in:
Klaas van Schelven
2024-04-23 09:42:22 +02:00
parent c7d96c362e
commit 08330dd274
2 changed files with 28 additions and 15 deletions

View File

@@ -1,4 +1,7 @@
import uuid
import logging
import threading
logger = logging.getLogger("snappea.foreman")
@@ -24,3 +27,22 @@ class Registry:
registry = Registry()
# We use a random filename for wakeup_file, but it is random only for the sending thread. This has the advantage that
# when many wakeup signals are sent but not consumed they will not fill up our wakeup_calls_dir in O(n) fashion. This
# filling-up could otherwise happen, because the Foreman takes on some chunk of work from the DB (currently: 100
# records) which may take a while to be processed (especially if this value is larger than the number of workers) and
# the wake up signals may flood the wakeup_dir in that time.
#
# Using a single file per-client does not introduce race conditions, though this is much harder to see than for the
# file-per-task case. To see why this is the case (TODO copy notes from paper, including those for the previous case)
#
# (The fact that this is hard to see could provide an argument for reverting to per-task-uuid; to keep the directory
# from overflowing we would have to make the batch-size (much) smaller. (we cannot just put signal cleanups inside the
# worker-creation loop, because they always need to precede the querying for tasks).
#
# Note that our current solution (less than one wake-up signal per task) has moved us away from "everything as files"
# (i.e. tied us stronger to actually maintaining the queue in sqlite)
localStorage = threading.local()
localStorage.uuid = str(uuid.uuid4())
thread_uuid = localStorage.uuid

View File

@@ -1,9 +1,8 @@
import uuid
import os
import json
from django.conf import settings
from . import registry
from . import registry, thread_uuid
from .models import Task
@@ -20,9 +19,13 @@ def shared_task(function):
# the non-eager case either).
return
# notes on the lack of immediate_atomic go here
# No need for a transaction: we just write something (not connected to any other object, and we will never touch
# it again).
Task.objects.create(task_name=name, args=json.dumps(args), kwargs=json.dumps(kwargs))
wakeup_calls_dir = os.path.join('/tmp', 'snappea')
wakeup_file = os.path.join(wakeup_calls_dir, thread_uuid)
if not os.path.exists(wakeup_calls_dir):
os.mkdir(wakeup_calls_dir, exist_ok=True)
@@ -30,18 +33,6 @@ def shared_task(function):
with open(wakeup_file, "w"):
pass
# We use a random filename for wakeup_file, but because this variable is bound to the shared_task decorator it is
# not recalculated on every call to .delay(). This has the advantage that when many wakeup signals are sent but not
# consumed they will not fill up our wakeup_calls_dir in O(n) fashion. This filling-up could otherwise happen,
# because the Foreman takes on some chunk of work from the DB (currently: 100 records) which may take a while to be
# processed (especially if this value is larger than the number of workers) and the wake up signals may flood the
# wakeup_dir in that time.
#
# Because the call to uuid() is stored as a local variable of a function (namely: shared_task), it is by definition
# thread-local. (CHECK: is this really so? and is this even important?)
wakeup_calls_dir = os.path.join('/tmp', 'snappea')
wakeup_file = os.path.join(wakeup_calls_dir, str(uuid.uuid4()))
name = function.__module__ + "." + function.__name__
function.delay = delayed_function
registry[name] = function