From d807ea2c50a359ecc0ac0af6b0ea0ad8b5b13804 Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Wed, 5 Nov 2025 11:10:14 +0100 Subject: [PATCH] Minidump: via envelope interface See #82 --- bugsink/app_settings.py | 1 + bugsink/streams.py | 4 +- ingest/filestore.py | 8 ++- ingest/tasks.py | 8 ++- ingest/views.py | 152 +++++++++++++++++++++++++--------------- 5 files changed, 113 insertions(+), 60 deletions(-) diff --git a/bugsink/app_settings.py b/bugsink/app_settings.py index 3ba51aa..1d59347 100644 --- a/bugsink/app_settings.py +++ b/bugsink/app_settings.py @@ -49,6 +49,7 @@ DEFAULTS = { # MAX* below mirror the (current) values for the Sentry Relay "MAX_EVENT_SIZE": _MEBIBYTE, + "MAX_ATTACHMENT_SIZE": 100 * _MEBIBYTE, "MAX_EVENT_COMPRESSED_SIZE": 200 * _KIBIBYTE, # Note: this only applies to the deprecated "store" endpoint. "MAX_ENVELOPE_SIZE": 100 * _MEBIBYTE, "MAX_ENVELOPE_COMPRESSED_SIZE": 20 * _MEBIBYTE, diff --git a/bugsink/streams.py b/bugsink/streams.py index e3c6120..cb9d5b8 100644 --- a/bugsink/streams.py +++ b/bugsink/streams.py @@ -116,7 +116,7 @@ class MaxDataReader: self.bytes_read = 0 self.stream = stream - if isinstance(max_length, str): # reusing this is a bit of a hack, but leads to readable code at usage + if isinstance(max_length, str): # support for settings-name max_length makes both the code and errors better self.max_length = get_settings()[max_length] self.reason = "%s: %s" % (max_length, self.max_length) else: @@ -145,7 +145,7 @@ class MaxDataWriter: self.bytes_written = 0 self.stream = stream - if isinstance(max_length, str): # reusing this is a bit of a hack, but leads to readable code at usage + if isinstance(max_length, str): # support for settings-name max_length makes both the code and errors better self.max_length = get_settings()[max_length] self.reason = "%s: %s" % (max_length, self.max_length) else: diff --git a/ingest/filestore.py b/ingest/filestore.py index b8737cf..82a0321 100644 --- a/ingest/filestore.py +++ b/ingest/filestore.py @@ -4,7 +4,7 @@ from django.utils._os import safe_join from bugsink.app_settings import get_settings -def get_filename_for_event_id(event_id): +def get_filename_for_event_id(event_id, filetype="event"): # The idea of having some levels of directories here (to avoid too many files in a single dir) is not yet # implemented. However, counterpoint: when doing stress tests, it was quite hard to get a serious backlog going # (snappea was very well able to play catch-up). So this might not be necessary. @@ -15,4 +15,8 @@ def get_filename_for_event_id(event_id): # without needing to inspect all call-sites) event_id_normalized = uuid.UUID(event_id).hex - return safe_join(get_settings().INGEST_STORE_BASE_DIR, event_id_normalized) + basename = event_id_normalized + if filetype == "minidump": + basename += ".dmp" + + return safe_join(get_settings().INGEST_STORE_BASE_DIR, basename) diff --git a/ingest/tasks.py b/ingest/tasks.py index 362de72..522ee3d 100644 --- a/ingest/tasks.py +++ b/ingest/tasks.py @@ -18,8 +18,14 @@ def digest(event_id, event_metadata): with open(get_filename_for_event_id(event_id), "rb") as f: event_data = json.loads(f.read().decode("utf-8")) + if event_metadata.get("has_minidump"): + with open(get_filename_for_event_id(event_id, filetype="minidump"), "rb") as f: + minidump_bytes = f.read() + else: + minidump_bytes = None + try: - BaseIngestAPIView.digest_event(event_metadata, event_data) + BaseIngestAPIView.digest_event(event_metadata, event_data, minidump_bytes=minidump_bytes) except ValidationError as e: logger.warning("ValidationError in digest_event", exc_info=e) finally: diff --git a/ingest/views.py b/ingest/views.py index 91c7b3a..1a08819 100644 --- a/ingest/views.py +++ b/ingest/views.py @@ -1,3 +1,4 @@ +from collections import defaultdict import uuid import hashlib import os @@ -150,6 +151,34 @@ class BaseIngestAPIView(View): sentry_key = cls.get_sentry_key_for_request(request) return cls.get_project(project_pk, sentry_key) + @classmethod + def process_minidump(cls, ingested_at, minidump_bytes, project, request): + # This is for the "pure" minidump case, i.e. no associated event data. TSTTCPW: convert the minidump data to an + # event and then proceed as usual. + + performance_logger.info("ingested minidump with %s bytes", len(minidump_bytes)) + + event_id = uuid.uuid4().hex + event_data = { + "event_id": event_id, + "platform": "native", + "extra": {}, + "errors": [], + } + + merge_minidump_event(event_data, minidump_bytes) + + # write the event data to disk: + filename = get_filename_for_event_id(event_data["event_id"]) + b108_makedirs(os.path.dirname(filename)) + with open(filename, 'w') as f: + json.dump(event_data, f) + + event_metadata = cls.get_event_meta(event_data["event_id"], ingested_at, request, project) + digest.delay(event_data["event_id"], event_metadata) + + return event_id + @classmethod def get_event_meta(cls, event_id, ingested_at, request, project): # Meta means: not part of the event data. Basically: information that is available at the time of ingestion, and @@ -224,7 +253,7 @@ class BaseIngestAPIView(View): @classmethod @immediate_atomic() - def digest_event(cls, event_metadata, event_data, digested_at=None): + def digest_event(cls, event_metadata, event_data, digested_at=None, minidump_bytes=None): # ingested_at is passed from the point-of-ingestion; digested_at is determined here. Because this happens inside # `immediate_atomic`, we know digestions are serialized, and assuming non-decreasing server clocks, not decrea- # sing. (no so for ingestion times: clock-watching happens outside the snappe transaction, and threading in the @@ -254,6 +283,10 @@ class BaseIngestAPIView(View): if get_settings().VALIDATE_ON_DIGEST in ["warn", "strict"]: cls.validate_event_data(event_data, get_settings().VALIDATE_ON_DIGEST) + if minidump_bytes is not None: + # we merge after validation: validation is about what's provided _externally_, not our own merging. + merge_minidump_event(event_data, minidump_bytes) + # I resisted the temptation to put `get_denormalized_fields_for_data` in an if-statement: you basically "always" # need this info... except when duplicate event-ids are sent. But the latter is the exception, and putting this # in an if-statement would require more rework (and possibly extra queries) than it's worth. @@ -601,43 +634,78 @@ class IngestEnvelopeAPIView(BaseIngestAPIView): return HttpResponse(status=HTTP_429_TOO_MANY_REQUESTS) def factory(item_headers): - if item_headers.get("type") == "event": - # envelope_headers["event_id"] is required when type=event per the spec (and takes precedence over the - # payload's event_id), so we can rely on it having been set. - if "event_id" not in envelope_headers: - raise ParseError("event_id not found in envelope headers") + type_ = item_headers.get("type") - try: - # validate that the event_id is a valid UUID as per the spec (validate at the edge) - uuid.UUID(envelope_headers["event_id"]) - except ValueError: - raise ParseError("event_id in envelope headers is not a valid UUID") + if ((type_ not in ["event", "attachment"]) or + (item_headers.get("type") == "attachment" and + item_headers.get("attachment_type") != "event.minidump")): - filename = get_filename_for_event_id(envelope_headers["event_id"]) - b108_makedirs(os.path.dirname(filename)) - return MaxDataWriter("MAX_EVENT_SIZE", open(filename, 'wb')) + # non-event/minidumps can be discarded; (we don't check for individual size limits, because these differ + # per item type, we have the envelope limit to protect us, and we incur almost no cost (NullWriter)) + return NullWriter() - # everything else can be discarded; (we don't check for individual size limits, because these differ - # per item type, we have the envelope limit to protect us, and we incur almost no cost (NullWriter) anyway. - return NullWriter() + # envelope_headers["event_id"] is required when type in ["event", "attachment"] per the spec (and takes + # precedence over the payload's event_id), so we can rely on it having been set. + if "event_id" not in envelope_headers: + raise ParseError("event_id not found in envelope headers") - for item_headers, event_output_stream in parser.get_items(factory): - if item_headers.get("type") != "event": - logger.info("skipping non-event item: %s", item_headers.get("type")) + try: + # validate that the event_id is a valid UUID as per the spec (validate at the edge) + uuid.UUID(envelope_headers["event_id"]) + except ValueError: + raise ParseError("event_id in envelope headers is not a valid UUID") - if item_headers.get("type") == "transaction": - # From the spec of type=event: This Item is mutually exclusive with `"transaction"` Items. - # i.e. when we see a transaction, a regular event will not be present and we can stop. - logger.info("discarding the rest of the envelope") - break + filetype = "event" if type_ == "event" else "minidump" + filename = get_filename_for_event_id(envelope_headers["event_id"], filetype=filetype) + b108_makedirs(os.path.dirname(filename)) + size_conf = "MAX_EVENT_SIZE" if type_ == "event" else "MAX_ATTACHMENT_SIZE" + return MaxDataWriter(size_conf, open(filename, 'wb')) + + # We ingest the whole envelope first and organize by type; this enables "digest once" across envelope-parts + items_by_type = defaultdict(list) + for item_headers, output_stream in parser.get_items(factory): + type_ = item_headers.get("type") + if type_ not in ["event", "attachment"]: + logger.info("skipping non-supported envelope item: %s", item_headers.get("type")) continue - performance_logger.info("ingested event with %s bytes", event_output_stream.bytes_written) - event_metadata = self.get_event_meta(envelope_headers["event_id"], ingested_at, request, project) + if type_ == "attachment" and item_headers.get("attachment_type") != "event.minidump": + logger.info("skipping non-supported attachment type: %s", item_headers.get("attachment_type")) + continue + + performance_logger.info("ingested %s with %s bytes", type_, output_stream.bytes_written) + items_by_type[type_].append(output_stream) + + event_count = len(items_by_type.get("event", [])) + minidump_count = len(items_by_type.get("attachment", [])) + + if event_count > 1 or minidump_count > 1: + # TODO: we do 2 passes (one for storing, one for calling the right task), and we check certain conditions + # only on the second pass; this means that we may not clean up after ourselves yet. + # TODO we don't do any minidump files cleanup yet in any of the cases. + + logger.info( + "can only deal with one event/minidump per envelope but found %s/%s, ignoring this envelope.", + event_count, minidump_count) + return HttpResponse() + + event_metadata = self.get_event_meta(envelope_headers["event_id"], ingested_at, request, project) + + if event_count == 1: + if minidump_count == 1: + event_metadata["has_minidump"] = True digest.delay(envelope_headers["event_id"], event_metadata) - break # From the spec of type=event: This Item may occur at most once per Envelope. once seen: done + else: + # as it stands, we implement the minidump->event path for the minidump-only case on-ingest; we could push + # this to a task too if needed or for reasons of symmetry. + with open(get_filename_for_event_id(envelope_headers["event_id"], filetype="minidump"), 'rb') as f: + minidump_bytes = f.read() + + # TODO: error handling + # NOTE "The file should start with the MDMP magic bytes." is not checked yet. + self.process_minidump(ingested_at, minidump_bytes, project, request) return HttpResponse() @@ -663,18 +731,6 @@ class MinidumpAPIView(BaseIngestAPIView): # A Base "Ingest" APIView in the sense that it reuses some key building blocks (auth). # I'm not 100% sure whether "philosophically" the minidump endpoint is also "ingesting"; we'll see. - @classmethod - def _ingest(cls, ingested_at, event_data, project, request): - # TSTTCPW: convert the minidump data to an event and then proceed as usual. - filename = get_filename_for_event_id(event_data["event_id"]) - b108_makedirs(os.path.dirname(filename)) - with open(filename, 'w') as f: - json.dump(event_data, f) - - # performance_logger.info("ingested event with %s bytes", event_output_stream.bytes_written) TODO for minidump - event_metadata = cls.get_event_meta(event_data["event_id"], ingested_at, request, project) - digest.delay(event_data["event_id"], event_metadata) - def post(self, request, project_pk=None): # not reusing the CORS stuff here; minidump-from-browser doesn't make sense. @@ -682,26 +738,12 @@ class MinidumpAPIView(BaseIngestAPIView): project = self.get_project_for_request(project_pk, request) try: - # in this flow, we don't get an event_id from the client, so we just generate one here. - event_id = uuid.uuid4().hex - minidump_bytes = request.FILES["upload_file_minidump"].read() - data = { - "event_id": event_id, - "platform": "native", - "extra": {}, - "errors": [], - } - - merge_minidump_event(data, minidump_bytes) - - self._ingest(ingested_at, data, project, request) + event_id = self.process_minidump(ingested_at, minidump_bytes, project, request) return JsonResponse({"id": event_id}) - except Exception as e: - raise return JsonResponse({"detail": str(e)}, status=HTTP_400_BAD_REQUEST)