Avoid cross-transaction pollution

in IMMEDIATE mode, project was passed from an (implicit) read transaction to
the immediate_atomic-wrapped code; this meant that it was possible to miscount
project.digested_event_count
This commit is contained in:
Klaas van Schelven
2024-10-08 11:53:31 +02:00
parent 91f7338abe
commit 0e48f346d1
2 changed files with 19 additions and 8 deletions

View File

@@ -44,7 +44,6 @@ def _digest_params(event_data, project, request, now=None):
"debug_info": "",
},
"event_data": event_data,
"project": project,
"digested_at": now,
}

View File

@@ -99,7 +99,7 @@ class BaseIngestAPIView(View):
event_data_bytes = event_data_stream.getvalue()
event_data = json.loads(event_data_bytes.decode("utf-8"))
performance_logger.info("ingested event with %s bytes", len(event_data_bytes))
cls.digest_event(event_metadata, event_data, project=project)
cls.digest_event(event_metadata, event_data)
else:
# In this case the stream will be a file that has been written the event's content to it.
# To ensure that the (possibly EAGER) handling of the digest has the file available, we flush it here:
@@ -176,7 +176,7 @@ class BaseIngestAPIView(View):
@classmethod
@immediate_atomic()
def digest_event(cls, event_metadata, event_data, project=None, digested_at=None):
def digest_event(cls, event_metadata, event_data, digested_at=None):
# ingested_at is passed from the point-of-ingestion; digested_at is determined here. Because this happens inside
# `immediate_atomic`, we know digestions are serialized, and assuming non-decreasing server clocks, not decrea-
# sing. (no so for ingestion times: clock-watching happens outside the snappe transaction, and threading in the
@@ -193,10 +193,7 @@ class BaseIngestAPIView(View):
ingested_at = parse_timestamp(event_metadata["ingested_at"])
digested_at = datetime.now(timezone.utc) if digested_at is None else digested_at # explicit passing: test only
if project is None:
# having project as an optional argument allows us to pass this in when we have the information available
# (in the DIGEST_IMMEDIATELY case) which saves us a query.
project = Project.objects.get(pk=event_metadata["project_id"])
project = Project.objects.get(pk=event_metadata["project_id"])
if not cls.count_project_periods_and_act_on_it(project, digested_at):
return # if over-quota: just return (any cleanup is done calling-side)
@@ -451,7 +448,7 @@ class IngestEventAPIView(BaseIngestAPIView):
event_metadata = self.get_event_meta(event_data["event_id"], ingested_at, request, project)
self.digest_event(event_metadata, event_data, project=project)
self.digest_event(event_metadata, event_data)
return HttpResponse()
@@ -469,6 +466,21 @@ class IngestEnvelopeAPIView(BaseIngestAPIView):
MaxDataReader("MAX_ENVELOPE_COMPRESSED_SIZE", request))))
envelope_headers = parser.get_envelope_headers()
# Getting the project is the only DB-touching (a read) we do before we (only in IMMEDIATE/EAGER modes), start
# start read/writing in digest_event. Notes on transactions:
#
# * we could add `durable_atomic` here for explicitness / if we ever do more than one read (for consistent
# snapshots. As it stands, not needed. (I believe this is implicit due to Django or even sqlite itself)
# * For the IMMEDIATE/EAGER cases we don't suffer from locks b/c sqlite upgrades read to write; I've tested this
# by adding sleep statements between the read/writes. I believe this is b/c of our immediate_atomic on
# digest_event. When removing that, and wrapping all of the present method in `durable_atomic`, read-write
# upgrades indeed fail.
# * digest_event gets its own `project`, so there's no cross-transaction "pollution".
# * Road not taken: pulling `immediate_atomic` up to the present level, but only for IMMEDIATE/EAGER modes (the
# only modes where this would make sense). This allows for passing of project between the 2 methods, but the
# added complexity (conditional transactions both here and in digest_event) is not worth it for modes that are
# non-production anyway.
if "dsn" in envelope_headers:
project = self.get_project(project_pk, get_sentry_key(envelope_headers["dsn"]))
else: