From 93365f4c8d6e01870fcadf8c6d7301d36928e479 Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Mon, 15 Jul 2024 14:10:20 +0200 Subject: [PATCH] Period-counting using SQL instead of custom-made (PoC) The direct cause for this was the following observation: there was no mechanism in place to safeguard counted events across evictions, i.e. the following order of events was not accounted for: * ingest/digest a bunch of events (PCs correctly updated) * eviction (PC still correct) * server/snappea restart (PC reloaded, but based on new events. not correct). I though about various approaches to fix this (e.g. snapshotting) but in the end such approaches added even more complexity to the PC mechanism. I decided to first check how non-performant the SQL route would be, and this PoC seems to say: just go SQL. There's also a small semantic change (probably in the direction of what you'd expect), namely: the periods are no longer 'calendar' periods. --- ...09_event_events_even_issue_i_90497b_idx.py | 17 ++++++ events/models.py | 1 + ingest/event_counter.py | 54 +++++++++++++++++++ ingest/views.py | 38 +++++-------- issues/models.py | 24 +++++---- 5 files changed, 99 insertions(+), 35 deletions(-) create mode 100644 events/migrations/0009_event_events_even_issue_i_90497b_idx.py create mode 100644 ingest/event_counter.py diff --git a/events/migrations/0009_event_events_even_issue_i_90497b_idx.py b/events/migrations/0009_event_events_even_issue_i_90497b_idx.py new file mode 100644 index 0000000..932ef95 --- /dev/null +++ b/events/migrations/0009_event_events_even_issue_i_90497b_idx.py @@ -0,0 +1,17 @@ +# Generated by Django 4.2.13 on 2024-07-15 12:13 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('events', '0008_remove_event_events_even_project_abe572_idx_and_more'), + ] + + operations = [ + migrations.AddIndex( + model_name='event', + index=models.Index(fields=['issue', 'server_side_timestamp'], name='events_even_issue_i_90497b_idx'), + ), + ] diff --git a/events/models.py b/events/models.py index b15c49c..d84d541 100644 --- a/events/models.py +++ b/events/models.py @@ -161,6 +161,7 @@ class Event(models.Model): ] indexes = [ models.Index(fields=["project", "never_evict", "server_side_timestamp", "irrelevance_for_retention"]), + models.Index(fields=["issue", "server_side_timestamp"]), ] def get_absolute_url(self): diff --git a/ingest/event_counter.py b/ingest/event_counter.py new file mode 100644 index 0000000..803221a --- /dev/null +++ b/ingest/event_counter.py @@ -0,0 +1,54 @@ +from datetime import timezone, datetime + +from django.db.models import Min + + +def _filter_for_periods(qs, period_name, nr_of_periods, now): + from issues.models import sub_periods_from_datetime # I'll move this soon + + if period_name == "total": + return qs + + return qs.filter(server_side_timestamp__gte=sub_periods_from_datetime(now, nr_of_periods, period_name)) + + +def check_for_thresholds(qs, now, thresholds): + from issues.models import add_periods_to_datetime # I'll move this soon + # thresholds :: [(period_name, nr_of_periods, gte_threshold, metadata), ...] + + # we only allow UTC, and we generally use Django model fields, which are UTC, so this should be good: + assert now.tzinfo == timezone.utc + + states_with_metadata = [] + + for (period_name, nr_of_periods, gte_threshold, metadata) in thresholds: + count = _filter_for_periods(qs, period_name, nr_of_periods, now).count() + state = count >= gte_threshold + + if state: + + if period_name == "total": + # when counting the 'total', there will never be a time when the condition becomes false. We + # just pick an arbitrarily large date; we'll deal with it by the end of the myria-annum. + # unlikely to actually end up in the DB (because it would imply the use of a quota for total). + below_threshold_from = datetime(9999, 12, 31, 23, 59, tzinfo=timezone.utc) + + else: + # `below_threshold_from` is the first moment in time where the condition no longer applies. + # just get the min value of server-time over the qs: + + below_threshold_from = add_periods_to_datetime( + _filter_for_periods(qs, period_name, nr_of_periods, now).aggregate( + agg=Min('server_side_timestamp'))['agg'], + nr_of_periods, period_name) + + else: + below_threshold_from = None + + states_with_metadata.append((state, below_threshold_from, metadata)) + + # we return tuples of (state, below_threshold_from, metadata) where metadata is something arbitrary that can be + # passed in (it allows us to tie back to "what caused this to be true/false?" + # TODO: I think that in practice the metadata is always implied by the thresholds, i.e. instead of + # passing-through we could just return the thresholds that were met. + return states_with_metadata diff --git a/ingest/views.py b/ingest/views.py index 3635aef..440a9a8 100644 --- a/ingest/views.py +++ b/ingest/views.py @@ -21,8 +21,6 @@ from issues.models import Issue, IssueStateManager, Grouping, TurningPoint, Turn from issues.utils import get_type_and_value_for_data, get_issue_grouper_for_data, get_denormalized_fields_for_data from issues.regressions import issue_is_regression -from bugsink.registry import get_pc_registry -from bugsink.period_counter import PeriodCounter from bugsink.transaction import immediate_atomic, delay_on_commit from bugsink.exceptions import ViolatedExpectation from bugsink.streams import content_encoding_reader, MaxDataReader, MaxDataWriter, NullWriter, MaxLengthExceeded @@ -37,6 +35,7 @@ from compat.timestamp import format_timestamp, parse_timestamp from .parsers import StreamingEnvelopeParser, ParseError from .filestore import get_filename_for_event_id from .tasks import digest +from .event_counter import check_for_thresholds HTTP_429_TOO_MANY_REQUESTS = 429 @@ -289,43 +288,30 @@ class BaseIngestAPIView(View): # Note on the division of work between ingest/digest: on ingest we just look at a (more or less) boolean "do you # accept anything" (and if not: when will you?). Here we do the actual work. - pc_registry = get_pc_registry() - # Projects may be created from the UI, which may run in a separate process, we create the PC here on demand. - if project.id not in pc_registry.by_project: - pc_registry.by_project[project.id] = PeriodCounter() - project_pc = pc_registry.by_project[project.id] + thresholds = [ + # +1, because the PC tests inclusively, but we want to trigger only on-exceed. + ("minute", 5, get_settings().MAX_EVENTS_PER_PROJECT_PER_5_MINUTES + 1, None), + ("minute", 60, get_settings().MAX_EVENTS_PER_PROJECT_PER_HOUR + 1, None), + ] - thresholds_by_purpose = { - "quota": [ - # +1, because the PC tests inclusively, but we want to trigger only on-exceed. - ("minute", 5, get_settings().MAX_EVENTS_PER_PROJECT_PER_5_MINUTES + 1, None), - ("minute", 60, get_settings().MAX_EVENTS_PER_PROJECT_PER_HOUR + 1, None), - ], - } - states_by_purpose = project_pc.inc(timestamp, thresholds=thresholds_by_purpose) + states = check_for_thresholds(Event.objects.filter(project=project), timestamp, thresholds) - until = max([below_from for (state, below_from, _) in states_by_purpose["quota"] if state], default=None) + until = max([below_from for (state, below_from, _) in states if state], default=None) _save_if_needed(project, "quota_exceeded_until", until) @classmethod def count_issue_periods_and_act_on_it(cls, issue, event, timestamp): - pc_registry = get_pc_registry() - if issue.id not in pc_registry.by_issue: - pc_registry.by_issue[issue.id] = PeriodCounter() - issue_pc = pc_registry.by_issue[issue.id] - # We just have "unmute" as a purpose here, not "quota". I thought I'd have per-issue quota earlier (which would # ensure some kind of fairness within a project) but: # * that doesn't quite work, because to determine the issue, you'd have to incur almost all of the digest cost. # * quota are expected to be set "high enough" anyway, i.e. only as a last line of defense against run-away # clients # * "even if" you'd get this to work there'd be scenarios where it's useless, e.g. misbehaving groupers. - thresholds_by_purpose = { - "unmute": IssueStateManager.get_unmute_thresholds(issue), - } - states_by_purpose = issue_pc.inc(timestamp, thresholds=thresholds_by_purpose) + thresholds = IssueStateManager.get_unmute_thresholds(issue) - for (state, until, vbc_dict) in states_by_purpose["unmute"]: + states = check_for_thresholds(Event.objects.filter(issue=issue), timestamp, thresholds) + + for (state, until, vbc_dict) in states: if not state: continue diff --git a/issues/models.py b/issues/models.py index f8d6ceb..032ac38 100644 --- a/issues/models.py +++ b/issues/models.py @@ -131,16 +131,22 @@ class Grouping(models.Model): return self.grouping_key +DATEUTIL_KWARGS_MAP = { + "year": "years", + "month": "months", + "week": "weeks", + "day": "days", + "hour": "hours", + "minute": "minutes", +} + + def add_periods_to_datetime(dt, nr_of_periods, period_name): - dateutil_kwargs_map = { - "year": "years", - "month": "months", - "week": "weeks", - "day": "days", - "hour": "hours", - "minute": "minutes", - } - return dt + relativedelta(**{dateutil_kwargs_map[period_name]: nr_of_periods}) + return dt + relativedelta(**{DATEUTIL_KWARGS_MAP[period_name]: nr_of_periods}) + + +def sub_periods_from_datetime(dt, nr_of_periods, period_name): + return dt - relativedelta(**{DATEUTIL_KWARGS_MAP[period_name]: nr_of_periods}) def format_unmute_reason(unmute_metadata):