Distinguish ingested_at and digested_at

This commit is contained in:
Klaas van Schelven
2024-07-18 14:45:59 +02:00
parent d23f1f0a3b
commit 3128392d9a
19 changed files with 157 additions and 61 deletions
+4 -2
View File
@@ -54,7 +54,8 @@ class EventAdmin(admin.ModelAdmin):
fields = [
'id',
'event_id',
'server_side_timestamp',
'ingested_at',
'digested_at',
'calculated_type',
'calculated_value',
'issue',
@@ -79,7 +80,8 @@ class EventAdmin(admin.ModelAdmin):
readonly_fields = [
'id',
'event_id',
'server_side_timestamp',
'ingested_at',
'digested_at',
'calculated_type',
'calculated_value',
'issue',
+2 -1
View File
@@ -28,7 +28,8 @@ def create_event(project=None, issue=None, timestamp=None, event_data=None):
return Event.objects.create(
project=project,
issue=issue,
server_side_timestamp=timestamp,
ingested_at=timestamp,
digested_at=timestamp,
timestamp=timestamp,
event_id=uuid.uuid4().hex,
has_exception=True,
@@ -0,0 +1,34 @@
# Generated by Django 4.2.13 on 2024-07-18 11:24
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('events', '0010_rename_ingest_order_event_digest_order_and_more'),
]
operations = [
migrations.RemoveIndex(
model_name='event',
name='events_even_project_adcdee_idx',
),
migrations.RemoveIndex(
model_name='event',
name='events_even_issue_i_90497b_idx',
),
migrations.RenameField(
model_name='event',
old_name='server_side_timestamp',
new_name='digested_at',
),
migrations.AddIndex(
model_name='event',
index=models.Index(fields=['project', 'never_evict', 'digested_at', 'irrelevance_for_retention'], name='events_even_project_ac6fc7_idx'),
),
migrations.AddIndex(
model_name='event',
index=models.Index(fields=['issue', 'digested_at'], name='events_even_issue_i_b18956_idx'),
),
]
@@ -0,0 +1,20 @@
# Generated by Django 4.2.13 on 2024-07-18 11:33
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
('events', '0011_remove_event_events_even_project_adcdee_idx_and_more'),
]
operations = [
migrations.AddField(
model_name='event',
name='ingested_at',
field=models.DateTimeField(default=django.utils.timezone.now),
preserve_default=False,
),
]
@@ -0,0 +1,25 @@
# Generated by Django 4.2.13 on 2024-07-18 11:34
from django.db import migrations
from django.db.models import F
def harmonize_ingested_at(apps, schema_editor):
Event = apps.get_model('events', 'Event')
Event.objects.update(ingested_at=F('digested_at'))
class Migration(migrations.Migration):
dependencies = [
('events', '0012_event_ingested_at'),
('ingest', '0001_set_sqlite_wal'),
('issues', '0006_issue_next_unmute_check'),
('projects', '0008_project_next_quota_check'),
('releases', '0001_initial'),
('teams', '0002_initial'),
]
operations = [
migrations.RunPython(harmonize_ingested_at),
]
+9 -5
View File
@@ -54,7 +54,8 @@ class Event(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, help_text="Bugsink-internal")
server_side_timestamp = models.DateTimeField(db_index=True, blank=False, null=False)
ingested_at = models.DateTimeField(blank=False, null=False)
digested_at = models.DateTimeField(db_index=True, blank=False, null=False)
# not actually expected to be null, but we want to be able to delete issues without deleting events (cleanup later)
issue = models.ForeignKey("issues.Issue", blank=False, null=True, on_delete=models.SET_NULL)
@@ -160,8 +161,8 @@ class Event(models.Model):
("issue", "digest_order"),
]
indexes = [
models.Index(fields=["project", "never_evict", "server_side_timestamp", "irrelevance_for_retention"]),
models.Index(fields=["issue", "server_side_timestamp"]),
models.Index(fields=["project", "never_evict", "digested_at", "irrelevance_for_retention"]),
models.Index(fields=["issue", "digested_at"]),
]
def get_absolute_url(self):
@@ -179,7 +180,9 @@ class Event(models.Model):
return get_title_for_exception_type_and_value(self.calculated_type, self.calculated_value)
@classmethod
def from_ingested(cls, event_metadata, digest_order, stored_event_count, issue, parsed_data, denormalized_fields):
def from_ingested(cls, event_metadata, digested_at, digest_order, stored_event_count, issue, parsed_data,
denormalized_fields):
# 'from_ingested' may be a bit of a misnomer... the full 'from_ingested' is done in 'digest_event' in the views.
# below at least puts the parsed_data in the right place, and does some of the basic object set up (FKs to other
# objects etc).
@@ -191,7 +194,8 @@ class Event(models.Model):
event_id=parsed_data["event_id"],
project_id=event_metadata["project_id"],
issue=issue,
server_side_timestamp=event_metadata["timestamp"],
ingested_at=event_metadata["ingested_at"],
digested_at=digested_at,
data=json.dumps(parsed_data),
timestamp=parse_timestamp(parsed_data["timestamp"]),
+5 -5
View File
@@ -36,12 +36,12 @@ def get_epoch_bounds(lower, upper=None):
return Q()
if lower is None:
return Q(server_side_timestamp__lt=datetime_for_epoch(upper))
return Q(digested_at__lt=datetime_for_epoch(upper))
if upper is None:
return Q(server_side_timestamp__gte=datetime_for_epoch(lower))
return Q(digested_at__gte=datetime_for_epoch(lower))
return Q(server_side_timestamp__gte=datetime_for_epoch(lower), server_side_timestamp__lt=datetime_for_epoch(upper))
return Q(digested_at__gte=datetime_for_epoch(lower), digested_at__lt=datetime_for_epoch(upper))
def nonzero_leading_bits(n):
@@ -124,7 +124,7 @@ def get_age_for_irrelevance(age_based_irrelevance):
def get_epoch_bounds_with_irrelevance(project, current_timestamp, qs_kwargs={"never_evict": False}):
from .models import Event
oldest = Event.objects.filter(project=project, **qs_kwargs).aggregate(val=Min('server_side_timestamp'))['val']
oldest = Event.objects.filter(project=project, **qs_kwargs).aggregate(val=Min('digested_at'))['val']
first_epoch = get_epoch(oldest) if oldest is not None else get_epoch(current_timestamp)
current_epoch = get_epoch(current_timestamp)
@@ -313,7 +313,7 @@ def evict_for_epoch_and_irrelevance(project, max_epoch, max_irrelevance, max_eve
qs = Event.objects.filter(project=project, irrelevance_for_retention__gt=max_irrelevance, **qs_kwargs)
if max_epoch is not None:
qs = qs.filter(server_side_timestamp__lt=datetime_for_epoch(max_epoch))
qs = qs.filter(digested_at__lt=datetime_for_epoch(max_epoch))
if include_never_evict:
# we need to manually ensure that no FKs to the deleted items exist:
+2 -2
View File
@@ -23,9 +23,9 @@ def retention_insight_values(project):
irrelevance_for_retention=irrelevance
)
if lb is not None:
qs = qs.filter(server_side_timestamp__gte=datetime_for_epoch(lb))
qs = qs.filter(digested_at__gte=datetime_for_epoch(lb))
if ub is not None:
qs = qs.filter(server_side_timestamp__lt=datetime_for_epoch(ub))
qs = qs.filter(digested_at__lt=datetime_for_epoch(ub))
howmany = qs.count()
results[irrelevance] = howmany
+2 -2
View File
@@ -9,7 +9,7 @@ def _filter_for_periods(qs, period_name, nr_of_periods, now):
if period_name == "total":
return qs
return qs.filter(server_side_timestamp__gte=sub_periods_from_datetime(now, nr_of_periods, period_name))
return qs.filter(digested_at__gte=sub_periods_from_datetime(now, nr_of_periods, period_name))
def check_for_thresholds(qs, now, thresholds, add_for_current=0):
@@ -57,7 +57,7 @@ def check_for_thresholds(qs, now, thresholds, add_for_current=0):
# back to accepting too soon. But this is self-correcting, so no need to deal with it.
below_threshold_from = add_periods_to_datetime(
_filter_for_periods(qs, period_name, nr_of_periods, now).aggregate(
agg=Min('server_side_timestamp'))['agg'] or now, # `or now` to handle funny `gte_threshold==0`
agg=Min('digested_at'))['agg'] or now, # `or now` to handle funny `gte_threshold==0`
nr_of_periods, period_name)
else:
-2
View File
@@ -13,8 +13,6 @@ logger = logging.getLogger("bugsink.ingest")
@shared_task
def digest(event_id, event_metadata):
# we'll put this in a separate place "soon" -- this is no longer view-specific
# speaking of separate places: the task "digest" now lives in the "ingest" app :-D
from .views import BaseIngestAPIView
with open(get_filename_for_event_id(event_id), "rb") as f:
+2 -3
View File
@@ -30,16 +30,15 @@ from .event_counter import check_for_thresholds
def _digest_params(event_data, project, request, now=None):
if now is None:
# because we want to count events before having created event objects (quota may block the latter) we cannot
# depend on event.timestamp; instead, we look on the clock once here, and then use that everywhere
now = datetime.datetime.now(timezone.utc)
# adapter to quickly reuse existing tests on refactored code. let's see where the code ends up before spending
# considerable time on rewriting the tests
return {
"event_metadata": {"project_id": project.id, "timestamp": format_timestamp(now), "debug_info": ""},
"event_metadata": {"project_id": project.id, "ingested_at": format_timestamp(now), "debug_info": ""},
"event_data": event_data,
"project": project,
"digested_at": now,
}
+41 -29
View File
@@ -88,12 +88,8 @@ class BaseIngestAPIView(View):
return cls.get_project(project_pk, sentry_key)
@classmethod
def process_event(cls, event_id, event_data_stream, project, request):
# because we want to count events before having created event objects (quota may block the latter) we cannot
# depend on event.timestamp; instead, we look on the clock once here, and then use that everywhere.
now = datetime.now(timezone.utc)
event_metadata = cls.get_event_meta(now, request, project)
def process_event(cls, ingested_at, event_id, event_data_stream, project, request):
event_metadata = cls.get_event_meta(ingested_at, request, project)
if get_settings().DIGEST_IMMEDIATELY:
# in this case the stream will be an BytesIO object, so we can actually call .get_value() on it.
@@ -110,25 +106,41 @@ class BaseIngestAPIView(View):
digest.delay(event_id, event_metadata)
@classmethod
def get_event_meta(cls, now, request, project):
def get_event_meta(cls, ingested_at, request, project):
# Meta means: not part of the event data. Basically: information that is available at the time of ingestion, and
# that must be passed to digest() in a serializable form.
debug_info = request.META.get("HTTP_X_BUGSINK_DEBUGINFO", "")
return {
"project_id": project.id,
"timestamp": format_timestamp(now),
"ingested_at": format_timestamp(ingested_at),
"debug_info": debug_info,
}
@classmethod
@immediate_atomic()
def digest_event(cls, event_metadata, event_data, project=None):
def digest_event(cls, event_metadata, event_data, project=None, digested_at=None):
# ingested_at is passed from the point-of-ingestion; digested_at is determined here. Because this happens inside
# `immediate_atomic`, we know digestions are serialized, and assuming non-decreasing server clocks, not decrea-
# sing. (no so for ingestion times: clock-watching happens outside the snappe transaction, and threading in the
# foreman is another source of shuffling).
#
# Because of this property we use digested_at for eviction and quota, and, because quota is a VBC-based and so
# is unmuting, in all ummuting-related checks. This saves us from having to precisely reason about edge-cases
# for non-increasing time, and the drawbacks are minimal, because the differences in time between ingest and
# digest are assumed to be relatively small, and as a user you don't really care (to the second) which
# timestamps trigger the quota/eviction.
#
# For other user-facing elements in the UI we prefer ingested_at though, because that's closer to the time
# something actually happened, and that's usually what you care for while debugging.
ingested_at = parse_timestamp(event_metadata["ingested_at"])
digested_at = datetime.now(timezone.utc) if digested_at is None else digested_at # explicit passing: test only
if project is None:
# having project as an optional argument allows us to pass this in when we have the information available
# (in the DIGEST_IMMEDIATELY case) which saves us a query.
project = Project.objects.get(pk=event_metadata["project_id"])
timestamp = parse_timestamp(event_metadata["timestamp"])
if not cls.count_project_periods_and_act_on_it(project, timestamp):
if not cls.count_project_periods_and_act_on_it(project, digested_at):
return # if over-quota: just return (any cleanup is done calling-side)
# I resisted the temptation to put `get_denormalized_fields_for_data` in an if-statement: you basically "always"
@@ -151,8 +163,8 @@ class BaseIngestAPIView(View):
issue = Issue.objects.create(
digest_order=issue_digest_order,
project_id=event_metadata["project_id"],
first_seen=timestamp,
last_seen=timestamp,
first_seen=ingested_at,
last_seen=ingested_at,
digested_event_count=1,
**denormalized_fields,
)
@@ -173,7 +185,7 @@ class BaseIngestAPIView(View):
issue_created = False
# update the denormalized fields
issue.last_seen = timestamp
issue.last_seen = ingested_at
issue.digested_event_count += 1
# NOTE: possibly expensive. "in theory" we can just do some bookkeeping for a denormalized value, but that may
@@ -182,19 +194,20 @@ class BaseIngestAPIView(View):
project_stored_event_count = (project.event_set.count() or 0) + 1
issue_stored_event_count = (issue.event_set.count() or 0) + 1
if should_evict(project, timestamp, project_stored_event_count):
if should_evict(project, digested_at, project_stored_event_count):
# Note: I considered pushing this into some async process, but it makes reasoning much harder, and it's
# doubtful whether it would help, because in the end there's just a single pipeline of ingested-related
# stuff todo, might as well do the work straight away. Similar thoughts about pushing this into something
# cron-like. (not exactly the same, because for cron-like time savings are possible if the cron-likeness
# causes the work to be outside of the 'rush hour' -- OTOH this also introduces a lot of complexity about
# "what is a limit anyway, if you can go either over it, or work is done before the limit is reached")
evict_for_max_events(project, timestamp, project_stored_event_count)
evict_for_max_events(project, digested_at, project_stored_event_count)
# NOTE: an event always has a single (automatically calculated) Grouping associated with it. Since we have that
# information available here, we could add it to the Event model.
event, event_created = Event.from_ingested(
event_metadata,
ingested_at,
issue.digested_event_count,
issue_stored_event_count,
issue,
@@ -221,7 +234,7 @@ class BaseIngestAPIView(View):
if issue_created:
TurningPoint.objects.create(
issue=issue, triggering_event=event, timestamp=timestamp,
issue=issue, triggering_event=event, timestamp=ingested_at,
kind=TurningPointKind.FIRST_SEEN)
event.never_evict = True
@@ -232,7 +245,7 @@ class BaseIngestAPIView(View):
# new issues cannot be regressions by definition, hence this is in the 'else' branch
if issue_is_regression(issue, event.release):
TurningPoint.objects.create(
issue=issue, triggering_event=event, timestamp=timestamp,
issue=issue, triggering_event=event, timestamp=ingested_at,
kind=TurningPointKind.REGRESSED)
event.never_evict = True
@@ -246,7 +259,7 @@ class BaseIngestAPIView(View):
# 'muted' issue is thus not treated as something to more deeply ignore than an unresolved issue (and in
# fact, conversely, it may be more loud when the for/until condition runs out). This is in fact analogous to
# "resolved" issues which are _also_ treated with more "suspicion" than their unresolved counterparts.
if issue.is_muted and issue.unmute_after is not None and timestamp > issue.unmute_after:
if issue.is_muted and issue.unmute_after is not None and digested_at > issue.unmute_after:
# note that unmuting on-ingest implies that issues that no longer occur stay muted. I'd say this is what
# you want: things that no longer happen should _not_ draw your attention, and if you've nicely moved
# some issue away from the "Open" tab it should not reappear there if a certain amount of time passes.
@@ -268,7 +281,7 @@ class BaseIngestAPIView(View):
if release.version + "\n" not in issue.events_at:
issue.events_at += release.version + "\n"
cls.count_issue_periods_and_act_on_it(issue, event, timestamp)
cls.count_issue_periods_and_act_on_it(issue, event, digested_at)
issue.save()
@@ -357,9 +370,9 @@ class BaseIngestAPIView(View):
class IngestEventAPIView(BaseIngestAPIView):
def _post(self, request, project_pk=None):
now = datetime.now(timezone.utc)
ingested_at = datetime.now(timezone.utc)
project = self.get_project_for_request(project_pk, request)
if project.quota_exceeded_until is not None and now < project.quota_exceeded_until:
if project.quota_exceeded_until is not None and ingested_at < project.quota_exceeded_until:
return HttpResponse(status=HTTP_429_TOO_MANY_REQUESTS)
# This endpoint is deprecated. Personally, I think it's the simpler (and given my goals therefore better) of the
@@ -368,9 +381,7 @@ class IngestEventAPIView(BaseIngestAPIView):
# internal methods quite changed a bit recently, and this one did not keep up. In particular: in our current set
# of interfaces we need an event_id before parsing (or after parsing but then we have double work). Easiest
# solution: just copy/paste from process_event(), and take only one branch.
now = datetime.now(timezone.utc)
event_metadata = self.get_event_meta(now, request, project)
event_metadata = self.get_event_meta(ingested_at, request, project)
# if get_settings().DIGEST_IMMEDIATELY: this is the only branch we implemented here.
event_data = json.loads(
@@ -385,7 +396,7 @@ class IngestEventAPIView(BaseIngestAPIView):
class IngestEnvelopeAPIView(BaseIngestAPIView):
def _post(self, request, project_pk=None):
now = datetime.now(timezone.utc)
ingested_at = datetime.now(timezone.utc)
# Note: wrapping the COMPRESSES_SIZE checks arount request makes it so that when clients do not compress their
# requests, they are still subject to the (smaller) maximums that apply pre-uncompress. This is exactly what we
@@ -400,7 +411,8 @@ class IngestEnvelopeAPIView(BaseIngestAPIView):
else:
project = self.get_project_for_request(project_pk, request)
if project.quota_exceeded_until is not None and now < project.quota_exceeded_until:
# TODO note not a problem
if project.quota_exceeded_until is not None and ingested_at < project.quota_exceeded_until:
# Sentry has x-sentry-rate-limits, but for now 429 is just fine. Client-side this is implemented as a 60s
# backoff.
#
@@ -448,7 +460,7 @@ class IngestEnvelopeAPIView(BaseIngestAPIView):
continue
self.process_event(envelope_headers["event_id"], event_output_stream, project, request)
self.process_event(ingested_at, envelope_headers["event_id"], event_output_stream, project, request)
break # From the spec of type=event: This Item may occur at most once per Envelope. once seen: done
finally:
+3 -3
View File
@@ -35,8 +35,8 @@ class Issue(models.Model):
digest_order = models.PositiveIntegerField(blank=False, null=False)
# denormalized/cached fields:
last_seen = models.DateTimeField(blank=False, null=False) # based on event.server_side_timestamp
first_seen = models.DateTimeField(blank=False, null=False) # based on event.server_side_timestamp
last_seen = models.DateTimeField(blank=False, null=False) # based on event.ingested_at
first_seen = models.DateTimeField(blank=False, null=False) # based on event.ingested_at
digested_event_count = models.IntegerField(blank=False, null=False)
calculated_type = models.CharField(max_length=255, blank=True, null=False, default="")
calculated_value = models.CharField(max_length=255, blank=True, null=False, default="")
@@ -237,7 +237,7 @@ class IssueStateManager(object):
# path is never reached via UI-based paths (because those are by definition not event-triggered); thus
# the 2 ways of creating TurningPoints do not collide.
TurningPoint.objects.create(
issue=issue, triggering_event=triggering_event, timestamp=triggering_event.server_side_timestamp,
issue=issue, triggering_event=triggering_event, timestamp=triggering_event.ingested_at,
kind=TurningPointKind.UNMUTED, metadata=json.dumps(unmute_metadata))
triggering_event.never_evict = True # .save() will be called by the caller of this function
+1 -1
View File
@@ -118,7 +118,7 @@
</div>
<div class="flex p-4 bg-slate-200 border-b-2"><!-- bottom nav bar -->
{% if is_event_page %}<div>Event {{ event.digest_order }} of {{ issue.digested_event_count}} which occured at <span class="font-bold">{{ event.server_side_timestamp|date:"j M G:i T" }}</span></div>{% endif %}
{% if is_event_page %}<div>Event {{ event.digest_order }} of {{ issue.digested_event_count}} which occured at <span class="font-bold">{{ event.ingested_at|date:"j M G:i T" }}</span></div>{% endif %}
<div class="ml-auto pr-4 font-bold text-slate-500">
<a href="/admin/issues/issue/{{ issue.id }}/change/">Issue Admin</a>
{% if is_event_page %}
+1 -1
View File
@@ -7,7 +7,7 @@
<div class="flex">
<div class="overflow-hidden">
<div class="italic">{{ event.server_side_timestamp|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
<div class="italic">{{ event.ingested_at|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
</div>
<div class="ml-auto flex-none">
+1 -1
View File
@@ -7,7 +7,7 @@
{# this is here to fool tailwind (because we're foolish enough to put html in python) <span class="text-xs"></span> #}
<div class="flex">
<div class="overflow-hidden">
<div class="italic">{{ event.server_side_timestamp|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
<div class="italic">{{ event.ingested_at|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
</div>
<div class="ml-auto flex-none">
+2 -2
View File
@@ -9,7 +9,7 @@
{# event-nav only #}
<div class="flex">
<div class="overflow-hidden">
<div class="italic">{{ event.server_side_timestamp|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
<div class="italic">{{ event.ingested_at|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
</div>
<div class="ml-auto flex-none">
@@ -30,7 +30,7 @@
<div class="flex">
<div class="overflow-hidden">
{% if forloop.counter0 == 0 %}
<div class="italic">{{ event.server_side_timestamp|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
<div class="italic">{{ event.ingested_at|date:"j M G:i T" }} (Event {{ event.digest_order }} of {{ issue.digested_event_count }})</div>
{% endif %}
<h1 class="text-2xl font-bold {% if forloop.counter0 > 0 %}mt-4{% endif %} text-ellipsis whitespace-nowrap overflow-hidden">{{ exception.type }}</h1>
<div class="text-lg mb-4 text-ellipsis whitespace-nowrap overflow-hidden">{{ exception.value }}</div>
+2 -1
View File
@@ -361,7 +361,8 @@ def issue_event_details(request, issue, event_pk=None, digest_order=None):
("bugsink_internal_id", event.id),
("issue_id", issue.id),
("timestamp", _date_with_milis_html(event.timestamp)),
("server_side_timestamp", _date_with_milis_html(event.server_side_timestamp)),
("ingested_at", _date_with_milis_html(event.ingested_at)),
("digested_at", _date_with_milis_html(event.digested_at)),
]
if parsed_data.get("logger"):
key_info.append(("logger", parsed_data["logger"]))
+1 -1
View File
@@ -105,7 +105,7 @@ def create_release_if_needed(project, version, event):
TurningPoint.objects.bulk_create([TurningPoint(
issue=issue, kind=TurningPointKind.NEXT_MATERIALIZED, triggering_event=event,
metadata=json.dumps({"actual_release": release.version}), timestamp=event.server_side_timestamp)
metadata=json.dumps({"actual_release": release.version}), timestamp=event.ingested_at)
for issue in resolved_by_next_qs
])
event.never_evict = True # .save() will be called by the caller of this function