Snappea connection_close

This commit is contained in:
Klaas van Schelven
2024-07-05 16:28:23 +02:00
parent 1228199d96
commit c453ca00e5
2 changed files with 19 additions and 5 deletions

View File

@@ -28,6 +28,9 @@ def shared_task(function):
# lock" and "actually write".
Task.objects.create(task_name=name, args=json.dumps(args), kwargs=json.dumps(kwargs))
# not necessary: `connections["snappea"].close()`; Django does this at the end of the request and the
# foreman's thread cleanup code does it for worker threads.
wakeup_server()
name = function.__module__ + "." + function.__name__

View File

@@ -12,7 +12,7 @@ from inotify_simple import INotify, flags
from sentry_sdk import capture_exception
from django.conf import settings
from django.db import connection
from django.db import connections
from performance.context_managers import time_to_logger
from bugsink.transaction import durable_atomic
@@ -127,9 +127,9 @@ class Foreman:
with durable_atomic():
get_pc_registry()
self.connection_close()
self.connection_close() # close the connection that was created as part of `get_pc_registry()`.
def connection_close(self):
def connection_close(self, using="default"):
# (as a method to allow for a single point of documentation)
# As per Django's doc: https://docs.djangoproject.com/en/5.0/ref/databases/#caveats
@@ -162,7 +162,7 @@ class Foreman:
# user-visible consequence of not closing "the db" before the application exits should be precisely the same as
# if it was closed before exiting'. https://sqlite.org/forum/info/fc8d6e0ee055fef88a08f5092061108261445d17153457
# I have checked this myself (using `kill -9`) and it seems correct.
connection.close()
connections[using].close()
def run_in_thread(self, task_id, function, *args, **kwargs):
# NOTE: we expose args & kwargs in the logs; as it stands no sensitive stuff lives there in our case, but this
@@ -180,7 +180,11 @@ class Foreman:
logger.warning("Snappea caught Exception: %s", str(e))
capture_exception(e)
finally:
self.connection_close()
# equivalent to the below (in both cases nothing happens with already-closed/never opened connections)
# self.connection_close()
# self.connection_close(using="snappea") # for tasks delaying other tasks (not used currently)
for connection in connections.all():
connection.close()
logger.info("Worker done in %.3fs", time.time() - t0)
self.workers.stopped(task_id)
@@ -295,6 +299,7 @@ class Foreman:
logger.error('Create workers: can\'t execute "%s": %s', task.task_name, e)
with time_to_logger(performance_logger, "Snappea delete Task"):
task.delete() # we delete the task because we can't do anything with it, and we don't want to hang
self.connection_close(using="snappea")
capture_exception(e)
self.worker_semaphore.release()
continue
@@ -307,10 +312,16 @@ class Foreman:
# * delete-before-run is the implementation of our at-most-once guarantee
with time_to_logger(performance_logger, "Snappea Task.delete()"):
task.delete()
self.run_in_thread(task_id, function, *args, **kwargs)
task_count = task_i + 1
if task_count == 0:
# We occassionally want to close the connection to the DB-as-MQ, to allow for cleanups of the WAL file. (It
# is reopened automatically as needed). A good time for this is: when we had no work to do.
self.connection_close(using="snappea")
# Seeing "0 tasks..." is expected, both on Startup and right before we go into "Wait for wakeup". See also
# above, starting with 'We get "a lot" of Tasks at once'
logger.debug("Create workers: %s tasks popped from queue", task_count)