Snappea: Only do another Task query when the previous result was perhaps limited by the TASK_QS_SIZE

This commit is contained in:
Klaas van Schelven
2024-04-23 12:07:03 +02:00
parent a7d484f070
commit 1ef7458619

View File

@@ -16,6 +16,7 @@ from .models import Task
from .datastructures import Workers
TASK_QS_SIZE = 100
GRACEFUL_TIMEOUT = .1
NUM_WORKERS = 4
@@ -157,7 +158,9 @@ class Foreman:
# is not really a problem, we'll just notice that there is "No task found" an equal amount of times and go into
# deep sleep after.
logger.info("Startup: Clearing Task backlog")
while self.create_workers() > 0:
while self.create_workers() == TASK_QS_SIZE:
# `== TASK_QS_SIZE` means we may have more work to do, because the [:TASK_QS_SIZE] slice may be the reason
# no more work is found. We keep doing this until we're sure there is no more work to do.
pass
logger.info("Startup: Task backlog empty now, proceeding to main loop")
@@ -172,8 +175,8 @@ class Foreman:
os.unlink(os.path.join(self.wakeup_calls_dir, event.name))
self.check_for_stopping() # always check after .read()
while self.create_workers() > 0:
pass
while self.create_workers() == TASK_QS_SIZE:
pass # `== TASK_QS_SIZE`: as documented above
def create_workers(self):
"""returns the number of workers created (AKA tasks done)"""
@@ -192,13 +195,14 @@ class Foreman:
# (we've put _some_ limit on the amount of tasks to get in a single query to avoid memory overflows when there
# is a lot of work. the expected case is: when the snappeaserver has been gone for a while, and work has been
# built up in the backlog; we want to at least be resilient for that case.)
tasks = Task.objects.all()[:100]
tasks = Task.objects.all()[:TASK_QS_SIZE]
task_i = -1
for task_i, task in enumerate(tasks):
logger.debug("Main loop: Creating worker for with task %s", short_id(task.id))
logger.debug("Main loop: Checking (potentially waiting) for available worker slots")
logger.debug("Main loop: Checking (maybe waiting) for available worker slots")
self.worker_semaphore.acquire()
logger.debug("Main loop: Worker slot available")
self.check_for_stopping() # always check after .acquire()
# TODO note on why no transactions are needed (it's just a single call anyway)
# TODO note on the guarantees we provide (not many)
@@ -215,9 +219,9 @@ class Foreman:
task_count = task_i + 1
# Seeing "0 tasks handled" is expected, both on Startup and right before we go into "Wait for wakeup". See also
# Seeing "0 tasks..." is expected, both on Startup and right before we go into "Wait for wakeup". See also
# above, starting with 'We get "a lot" of Tasks at once'
logger.debug("Main loop: %s tasks handled", task_count)
logger.debug("Main loop: %s tasks popped from queue and started as workers ", task_count)
return task_count
def check_for_stopping(self):