From c453ca00e5f01473102cf485e7c9566e0cdff232 Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Fri, 5 Jul 2024 16:28:23 +0200 Subject: [PATCH] Snappea connection_close --- snappea/decorators.py | 3 +++ snappea/foreman.py | 21 ++++++++++++++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/snappea/decorators.py b/snappea/decorators.py index 57ca49f..d0c5cee 100644 --- a/snappea/decorators.py +++ b/snappea/decorators.py @@ -28,6 +28,9 @@ def shared_task(function): # lock" and "actually write". Task.objects.create(task_name=name, args=json.dumps(args), kwargs=json.dumps(kwargs)) + # not necessary: `connections["snappea"].close()`; Django does this at the end of the request and the + # foreman's thread cleanup code does it for worker threads. + wakeup_server() name = function.__module__ + "." + function.__name__ diff --git a/snappea/foreman.py b/snappea/foreman.py index ca46366..1cf9c4e 100644 --- a/snappea/foreman.py +++ b/snappea/foreman.py @@ -12,7 +12,7 @@ from inotify_simple import INotify, flags from sentry_sdk import capture_exception from django.conf import settings -from django.db import connection +from django.db import connections from performance.context_managers import time_to_logger from bugsink.transaction import durable_atomic @@ -127,9 +127,9 @@ class Foreman: with durable_atomic(): get_pc_registry() - self.connection_close() + self.connection_close() # close the connection that was created as part of `get_pc_registry()`. - def connection_close(self): + def connection_close(self, using="default"): # (as a method to allow for a single point of documentation) # As per Django's doc: https://docs.djangoproject.com/en/5.0/ref/databases/#caveats @@ -162,7 +162,7 @@ class Foreman: # user-visible consequence of not closing "the db" before the application exits should be precisely the same as # if it was closed before exiting'. https://sqlite.org/forum/info/fc8d6e0ee055fef88a08f5092061108261445d17153457 # I have checked this myself (using `kill -9`) and it seems correct. - connection.close() + connections[using].close() def run_in_thread(self, task_id, function, *args, **kwargs): # NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this @@ -180,7 +180,11 @@ class Foreman: logger.warning("Snappea caught Exception: %s", str(e)) capture_exception(e) finally: - self.connection_close() + # equivalent to the below (in both cases nothing happens with already-closed/never opened connections) + # self.connection_close() + # self.connection_close(using="snappea") # for tasks delaying other tasks (not used currently) + for connection in connections.all(): + connection.close() logger.info("Worker done in %.3fs", time.time() - t0) self.workers.stopped(task_id) @@ -295,6 +299,7 @@ class Foreman: logger.error('Create workers: can\'t execute "%s": %s', task.task_name, e) with time_to_logger(performance_logger, "Snappea delete Task"): task.delete() # we delete the task because we can't do anything with it, and we don't want to hang + self.connection_close(using="snappea") capture_exception(e) self.worker_semaphore.release() continue @@ -307,10 +312,16 @@ class Foreman: # * delete-before-run is the implementation of our at-most-once guarantee with time_to_logger(performance_logger, "Snappea Task.delete()"): task.delete() + self.run_in_thread(task_id, function, *args, **kwargs) task_count = task_i + 1 + if task_count == 0: + # We occassionally want to close the connection to the DB-as-MQ, to allow for cleanups of the WAL file. (It + # is reopened automatically as needed). A good time for this is: when we had no work to do. + self.connection_close(using="snappea") + # Seeing "0 tasks..." is expected, both on Startup and right before we go into "Wait for wakeup". See also # above, starting with 'We get "a lot" of Tasks at once' logger.debug("Create workers: %s tasks popped from queue", task_count)