mirror of
https://github.com/bugsink/bugsink.git
synced 2026-05-04 05:50:24 -05:00
Enforce a single pc_registry for a single ingesting process
Using a pid-file that's implied by the ingestion directory. We do this in `get_pc_registry`, i.e. on the first request. This means failure is in the first request on the 2nd process. Why not on startup? Because we don't have a configtest or generic on-startup location (yet). Making _that_ could be another source of fragility, and getting e.g. the nr of processes might be non-trivial / config-dependent.
This commit is contained in:
@@ -96,6 +96,5 @@ BUGSINK = {
|
||||
# "MAX_ENVELOPE_SIZE": 100 * _MEBIBYTE,
|
||||
# "MAX_ENVELOPE_COMPRESSED_SIZE": 20 * _MEBIBYTE,
|
||||
|
||||
|
||||
"INGEST_STORE_BASE_DIR": "/home/bugsink/ingestion",
|
||||
}
|
||||
|
||||
+59
-5
@@ -1,14 +1,22 @@
|
||||
import contextlib
|
||||
import atexit
|
||||
import os
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from projects.models import Project
|
||||
from events.models import Event
|
||||
from issues.models import Issue
|
||||
from performance.context_managers import time_to_logger
|
||||
from snappea.settings import get_settings as get_snappea_settings
|
||||
|
||||
from .period_counter import PeriodCounter
|
||||
from .app_settings import get_settings
|
||||
|
||||
|
||||
ingest_logger = logging.getLogger("bugsink.ingest")
|
||||
performance_logger = logging.getLogger("bugsink.performance.registry")
|
||||
|
||||
|
||||
@@ -48,18 +56,63 @@ class PeriodCounterRegistry(object):
|
||||
return by_project, by_issue
|
||||
|
||||
|
||||
def get_pc_registry():
|
||||
# lazy initialization; this is TSTTCPW for 'run once when the server starts'; it has the obvious drawback that it
|
||||
# slows down the first (handled) event. When we get to the "real gunicorn server" we should probably just hook into
|
||||
# gunicorn's events to initialize this
|
||||
# (note once you do this: don't hook into app-initialization; there you cannot expect to be running DB stuff)
|
||||
class NotSingleton(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _cleanup_pidfile():
|
||||
pid_filename = os.path.join(get_settings().INGEST_STORE_BASE_DIR, "pc_registry.pid")
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
os.remove(pid_filename)
|
||||
|
||||
|
||||
def _ensure_singleton():
|
||||
# We ensure that the assumption that there's only one process doing ingestion (this is a requirement of our setup
|
||||
# because our in-memory pc_registry only works correctly if there's only one that takes all `.inc` calls).
|
||||
#
|
||||
# Implementation copied with modifications from to snappea/foreman.py. We construct a PID_FILENAME from
|
||||
# INGEST_STORE_BASE_DIR (that's a good place because it maps nicely to 'one place on the FS where ingestion-related
|
||||
# stuff happens).
|
||||
#
|
||||
pid_filename = os.path.join(get_settings().INGEST_STORE_BASE_DIR, "pc_registry.pid")
|
||||
|
||||
# this implementation is not supposed to be bullet-proof for race conditions (nor is it cross-platform) but it
|
||||
# guards against:
|
||||
# * misconfigurations, in particular running multiple gunicorn workers when either TASK_ALWAYS_EAGER or
|
||||
# DIGEST_IMMEDIATELY is True (in both cases ingestion happens right in the web process, and if there is more than
|
||||
# one web process, we get multiple pc_registry instances)
|
||||
# * programming errors by the bugsink developers (non-ingesting processes calling `get_pc_registry`)
|
||||
|
||||
pid = os.getpid()
|
||||
|
||||
if os.path.exists(pid_filename):
|
||||
with open(pid_filename, "r") as f:
|
||||
old_pid = int(f.read())
|
||||
if os.path.exists(f"/proc/{old_pid}"):
|
||||
eager = "eager" if get_snappea_settings().TASK_ALWAYS_EAGER else "not eager"
|
||||
digest_immediately = "digest immediately" if get_settings().DIGEST_IMMEDIATELY else "digest later"
|
||||
running = settings.I_AM_RUNNING.lower()
|
||||
raise NotSingleton("Other pc_registry exists. I am '%s' in mode '%s, %s'" % (
|
||||
running, digest_immediately, eager))
|
||||
else:
|
||||
ingest_logger.warning("Stale pc_registry pid file found, removing %s", pid_filename)
|
||||
os.remove(pid_filename)
|
||||
|
||||
os.makedirs(os.path.dirname(pid_filename), exist_ok=True)
|
||||
with open(pid_filename, "w") as f:
|
||||
f.write(str(pid))
|
||||
|
||||
atexit.register(_cleanup_pidfile)
|
||||
|
||||
|
||||
def get_pc_registry():
|
||||
# note: must be run inside a transaction to ensure consistency because we use .iterator()
|
||||
# https://docs.djangoproject.com/en/5.0/ref/databases/#sqlite-isolation
|
||||
|
||||
global _registry
|
||||
if _registry is None:
|
||||
with time_to_logger(performance_logger, "period counter registry initialization"):
|
||||
_ensure_singleton()
|
||||
_registry = PeriodCounterRegistry()
|
||||
return _registry
|
||||
|
||||
@@ -68,3 +121,4 @@ def reset_pc_registry():
|
||||
# needed for tests
|
||||
global _registry
|
||||
_registry = None
|
||||
_cleanup_pidfile()
|
||||
|
||||
@@ -16,6 +16,8 @@ if sys.argv[1:2] == ['runsnappea']:
|
||||
I_AM_RUNNING = "SNAPPEA"
|
||||
elif sys.argv[1:2] == ['test']:
|
||||
I_AM_RUNNING = "TEST"
|
||||
elif [s.endswith("gunicorn") for s in sys.argv[:1]] == [True]:
|
||||
I_AM_RUNNING = "GUNICORN"
|
||||
else:
|
||||
I_AM_RUNNING = "OTHER"
|
||||
|
||||
|
||||
@@ -131,7 +131,7 @@ The recommended way to run Bugsink is using Gunicorn, a WSGI server.
|
||||
You can start the Bugsink server by running:
|
||||
|
||||
```bash
|
||||
PYTHONUNBUFFERED=1 gunicorn --bind="127.0.0.1:9000" --access-logfile - --capture-output --error-logfile - bugsink.wsgi
|
||||
PYTHONUNBUFFERED=1 gunicorn --bind="127.0.0.1:9000" --workers=1 --access-logfile - --capture-output --error-logfile - bugsink.wsgi
|
||||
```
|
||||
|
||||
You should see output indicating that the server is running. You can now access Bugsink by visiting
|
||||
|
||||
+1
-2
@@ -126,8 +126,7 @@ class BaseIngestAPIView(View):
|
||||
|
||||
# leave this at the top -- the point is to trigger load_from_scratch if needed, which may involve reading from
|
||||
# the DB which should come before any DB writing. A note on locking the PC: because period_counter accesses are
|
||||
# inside an immediate transaction, they are serialized "for free", so threading will "just work". Even inside
|
||||
# snappea and the django dev server.
|
||||
# inside an immediate transaction, they are serialized already, so threading will "just work".
|
||||
get_pc_registry()
|
||||
|
||||
# NOTE: we don't do anything with project-period-counting yet; we'll revisit this bit, and its desired location,
|
||||
|
||||
+1
-3
@@ -120,9 +120,7 @@ class Foreman:
|
||||
self.worker_semaphore = threading.Semaphore(self.settings.NUM_WORKERS)
|
||||
|
||||
# "initialize" the application.
|
||||
# this is likely in the wrong place, but I want to have it "somewhere" (before the workers start). And thinking
|
||||
# of the right place is something I want to postpone (it's also something we need to do for gunicorn, where it's
|
||||
# even harder because multi-process means the n (number of workers) don't share the same instance.
|
||||
# this might be in the wrong place, but I want to have it "somewhere" (before the workers start).
|
||||
from bugsink.registry import get_pc_registry
|
||||
with durable_atomic():
|
||||
get_pc_registry()
|
||||
|
||||
Reference in New Issue
Block a user