From 1ef7458619efe3fa00a70175bfa6de7f0b6a47da Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Tue, 23 Apr 2024 12:07:03 +0200 Subject: [PATCH] Snappea: Only do another Task query when the previous result was perhaps limited by the TASK_QS_SIZE --- snappea/foreman.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/snappea/foreman.py b/snappea/foreman.py index 9dc49f1..5a3738c 100644 --- a/snappea/foreman.py +++ b/snappea/foreman.py @@ -16,6 +16,7 @@ from .models import Task from .datastructures import Workers +TASK_QS_SIZE = 100 GRACEFUL_TIMEOUT = .1 NUM_WORKERS = 4 @@ -157,7 +158,9 @@ class Foreman: # is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into # deep sleep after. logger.info("Startup: Clearing Task backlog") - while self.create_workers() > 0: + while self.create_workers() == TASK_QS_SIZE: + # `== TASK_QS_SIZE` means we may have more work to do, because the [:TASK_QS_SIZE] slice may be the reason + # no more work is found. We keep doing this until we're sure there is no more work to do. pass logger.info("Startup: Task backlog empty now, proceeding to main loop") @@ -172,8 +175,8 @@ class Foreman: os.unlink(os.path.join(self.wakeup_calls_dir, event.name)) self.check_for_stopping() # always check after .read() - while self.create_workers() > 0: - pass + while self.create_workers() == TASK_QS_SIZE: + pass # `== TASK_QS_SIZE`: as documented above def create_workers(self): """returns the number of workers created (AKA tasks done)""" @@ -192,13 +195,14 @@ class Foreman: # (we've put _some_ limit on the amount of tasks to get in a single query to avoid memory overflows when there # is a lot of work. the expected case is: when the snappeaserver has been gone for a while, and work has been # built up in the backlog; we want to at least be resilient for that case.) - tasks = Task.objects.all()[:100] + tasks = Task.objects.all()[:TASK_QS_SIZE] task_i = -1 for task_i, task in enumerate(tasks): logger.debug("Main loop: Creating worker for with task %s", short_id(task.id)) - logger.debug("Main loop: Checking (potentially waiting) for available worker slots") + logger.debug("Main loop: Checking (maybe waiting) for available worker slots") self.worker_semaphore.acquire() + logger.debug("Main loop: Worker slot available") self.check_for_stopping() # always check after .acquire() # TODO note on why no transactions are needed (it's just a single call anyway) # TODO note on the guarantees we provide (not many) @@ -215,9 +219,9 @@ class Foreman: task_count = task_i + 1 - # Seeing "0 tasks handled" is expected, both on Startup and right before we go into "Wait for wakeup". See also + # Seeing "0 tasks..." is expected, both on Startup and right before we go into "Wait for wakeup". See also # above, starting with 'We get "a lot" of Tasks at once' - logger.debug("Main loop: %s tasks handled", task_count) + logger.debug("Main loop: %s tasks popped from queue and started as workers ", task_count) return task_count def check_for_stopping(self):