Files
bugsink/snappea/stats.py
Klaas van Schelven 53d4be8183 Fix 'different_runtime_limit' race conditions
This commit fixes 3 related issues with the way runtime_limit was administered;
which could lead to race conditions (and hence: the wrong runtime_limit
applying at some point in time). Post-fix, the folllowing holds:

1. We use thread_locals to store this info, since there are at least 2 sources of
    threaded code that touch this (snappea's workers and the django debugserver)

2. We distinguish between the "from connection settings" timeout and the
    "temporarily overridden" ones, since we cannot assume
    connection-initialization happens first (as per the comment in base.py)

3. We store runtime-limits per alias ('using'). Needed for [2] (each connection
    may have a different moment-of-initialization, clobbering CM-set values from
    the other connection) and also needed once you realize there may be
    different defaults for the timeouts.

General context: I've recently started introducing the 'different runtime'
helper quite a bit more; and across connections (snappea!), which created more
and more doubts as to it actually working as advertised.

Thoughts on "using" being required. I used to think "you can reason about a
global timeout value, and the current transaction makes clear what you're
actually doing", but as per the notes above that doesn't really work.

Thoughts on reproducing:
A few thoughts/notes on reproducing problems with race conditions. Basic note:
that's always hairy. So in the end I settled on a solution that's hopefully
easy to reason about, even if it's verbose.

When I started work on this commit, I focussed on thread-safety; "proving the
problem" consisted of F5/^R on a web page with 2 context managers with different
timeouts, hoping to show that the stack unrolling didn't work properly.
However, during those "tests" I noticed quite a few resets-to-5s (from the
connection defaults), which prompted fix [2] from above.
2025-04-22 22:08:53 +02:00

133 lines
6.5 KiB
Python

from datetime import datetime, timezone, timedelta
import threading
import logging
import sentry_sdk
from django.db import OperationalError
from django.db.models import Count
from sentry_sdk_extensions import capture_or_log_exception
from bugsink.transaction import immediate_atomic
from bugsink.timed_sqlite_backend.base import different_runtime_limit
from performance.context_managers import time_to_logger
from .models import Task, Stat
from .settings import get_settings
logger = logging.getLogger("snappea.foreman")
performance_logger = logging.getLogger("bugsink.performance.snappea")
class Stats:
def __init__(self):
self.lock = threading.Lock()
# initialize to "one minute ago" to ensure reset-at-first-check
self.last_write_at = (datetime.now(timezone.utc) - timedelta(minutes=1)).timetuple()[:5]
self.d = {}
def _ensure_task(self, task_name):
if task_name not in self.d:
self.d[task_name] = {
# we _could_ monitor starts, but my guess is it's not that interesting, since no timings are available,
# you don't actually gain much insight from the count of started tasks, and you're even in danger of
# setting people on the wrong track because start/done will differ slightly over the per-minute buckets.
# "starts": 0,
"done": 0,
"errors": 0,
"wall_time": 0,
"wait_time": 0,
"write_time": 0,
"max_wall_time": 0,
"max_wait_time": 0,
"max_write_time": 0,
}
def done(self, task_name, wall_time, wait_time, write_time, error):
# we take "did it error" as a param to enable a single call-side path avoid duplicating taking timings call-side
try:
with self.lock:
self._possibly_write()
self._ensure_task(task_name)
self.d[task_name]["done"] += 1
self.d[task_name]["wall_time"] += wall_time
self.d[task_name]["wait_time"] += wait_time
self.d[task_name]["write_time"] += write_time
self.d[task_name]["max_wall_time"] = max(self.d[task_name]["max_wall_time"], wall_time)
self.d[task_name]["max_wait_time"] = max(self.d[task_name]["max_wait_time"], wait_time)
self.d[task_name]["max_write_time"] = max(self.d[task_name]["max_write_time"], write_time)
if error:
self.d[task_name]["errors"] += 1
except Exception as e:
# Problems with Stats should _never_ bring down snappea.
if sentry_sdk.is_initialized():
# Only for the case where full error is captured to Dogfooded Bugsink, do we want to draw some
# attention to this; in the other case the big error in the logs (full traceback) is clear enough.
logger.warning("Snappea Stat Exception: %s", str(e))
capture_or_log_exception(e, logger)
def _possibly_write(self):
# we only write once-a-minute; this means the cost of writing stats is amortized (at least when it matters, i.e.
# under pressure) by approx 1/(60*30); (the cost (see time_to_logger) was 15ms on my local env in initial tests)
#
# "edge" cases, in which nothing is written:
# * snappea-shutdown
# * "no new minute" (only happens when there's almost no load, in which case you don't care)
# but low overhead, robustness and a simple impl are more important than after-the-comma precision.
# further reason to use separate "Stat" rather than "just write into Task": the latter would incur extra write-
# pressure on the Snappea DB: because of at-most-once we delete _before_ doing any work, and the stats are only
# known _after_ the work is done (which would then be the extra write).
# we look at the clock ourselves, rather than pass this in, such that the looking at the clock happens only
# once we've grabbed the lock; this ensures our times are monotonicially increasing (assuming no system
# clock funnyness).
now = datetime.now(timezone.utc)
tup = now.timetuple()[:5] # len("YMDHM") i.e. cut off at minute
if tup > self.last_write_at: # > rather than != means backwards clock drifting is silently/gracefully ignored
# the Stat w/ timestamp x is for the one-minute bucket from that point in time forwards:
timestamp = datetime(*(self.last_write_at), tzinfo=timezone.utc)
self._write(timestamp)
# re-init:
self.last_write_at = tup
self.d = {}
def _write(self, timestamp):
if get_settings().STATS_RETENTION_MINUTES == 0:
# no stats retention; don't write anything either. this won't attempt to clean up either, which is OK in my
# book
return
with time_to_logger(performance_logger, "Snappea write Stats"):
with immediate_atomic(using="snappea"): # explicit is better than impl.; and we combine read/write here
# having stats is great, but I don't want to hog task-processing too long (which would happen
# precisely when the backlog grows large)
with different_runtime_limit(0.1, using="snappea"):
try:
task_counts = Task.objects.values("task_name").annotate(count=Count("task_name"))
except OperationalError as e:
if e.args[0] != "interrupted":
raise
task_counts = None # explicitly "unknown"; which we want to propagate to the stats in that case
task_counts_d = {d['task_name']: d['count'] for d in task_counts} if task_counts is not None else None
# store stats; combining the counted 'handled' (done) tasks with the 'counts' (backlog)
stats = [
Stat(
timestamp=timestamp,
task_name=task_name,
task_count=task_counts_d.get(task_name, 0) if task_counts is not None else None,
**kwargs,
) for task_name, kwargs in self.d.items()
]
Stat.objects.bulk_create(stats)
Stat.objects.filter(
timestamp__lt=timestamp - timedelta(minutes=get_settings().STATS_RETENTION_MINUTES)).delete()