diff --git a/snappea/__init__.py b/snappea/__init__.py index 4b9a5c2..34c8b2c 100644 --- a/snappea/__init__.py +++ b/snappea/__init__.py @@ -1,4 +1,7 @@ +import uuid import logging +import threading + logger = logging.getLogger("snappea.foreman") @@ -24,3 +27,22 @@ class Registry: registry = Registry() + +# We use a random filename for wakeup_file, but it is random only for the sending thread. This has the advantage that +# when many wakeup signals are sent but not consumed they will not fill up our wakeup_calls_dir in O(n) fashion. This +# filling-up could otherwise happen, because the Foreman takes on some chunk of work from the DB (currently: 100 +# records) which may take a while to be processed (especially if this value is larger than the number of workers) and +# the wake up signals may flood the wakeup_dir in that time. +# +# Using a single file per-client does not introduce race conditions, though this is much harder to see than for the +# file-per-task case. To see why this is the case (TODO copy notes from paper, including those for the previous case) +# +# (The fact that this is hard to see could provide an argument for reverting to per-task-uuid; to keep the directory +# from overflowing we would have to make the batch-size (much) smaller. (we cannot just put signal cleanups inside the +# worker-creation loop, because they always need to precede the querying for tasks). +# +# Note that our current solution (less than one wake-up signal per task) has moved us away from "everything as files" +# (i.e. tied us stronger to actually maintaining the queue in sqlite) +localStorage = threading.local() +localStorage.uuid = str(uuid.uuid4()) +thread_uuid = localStorage.uuid diff --git a/snappea/decorators.py b/snappea/decorators.py index 745fdd9..34a0682 100644 --- a/snappea/decorators.py +++ b/snappea/decorators.py @@ -1,9 +1,8 @@ -import uuid import os import json from django.conf import settings -from . import registry +from . import registry, thread_uuid from .models import Task @@ -20,9 +19,13 @@ def shared_task(function): # the non-eager case either). return - # notes on the lack of immediate_atomic go here + # No need for a transaction: we just write something (not connected to any other object, and we will never touch + # it again). Task.objects.create(task_name=name, args=json.dumps(args), kwargs=json.dumps(kwargs)) + wakeup_calls_dir = os.path.join('/tmp', 'snappea') + wakeup_file = os.path.join(wakeup_calls_dir, thread_uuid) + if not os.path.exists(wakeup_calls_dir): os.mkdir(wakeup_calls_dir, exist_ok=True) @@ -30,18 +33,6 @@ def shared_task(function): with open(wakeup_file, "w"): pass - # We use a random filename for wakeup_file, but because this variable is bound to the shared_task decorator it is - # not recalculated on every call to .delay(). This has the advantage that when many wakeup signals are sent but not - # consumed they will not fill up our wakeup_calls_dir in O(n) fashion. This filling-up could otherwise happen, - # because the Foreman takes on some chunk of work from the DB (currently: 100 records) which may take a while to be - # processed (especially if this value is larger than the number of workers) and the wake up signals may flood the - # wakeup_dir in that time. - # - # Because the call to uuid() is stored as a local variable of a function (namely: shared_task), it is by definition - # thread-local. (CHECK: is this really so? and is this even important?) - wakeup_calls_dir = os.path.join('/tmp', 'snappea') - wakeup_file = os.path.join(wakeup_calls_dir, str(uuid.uuid4())) - name = function.__module__ + "." + function.__name__ function.delay = delayed_function registry[name] = function