From 4d996e0c51c2016f14697b5119f34e4dc96a2e60 Mon Sep 17 00:00:00 2001 From: Klaas van Schelven Date: Thu, 25 Apr 2024 12:09:54 +0200 Subject: [PATCH] Use my own impl. of conent_encoding based decompress In the process: make it read-read rather than read-write --- bugsink/settings.py | 2 - bugsink/streams.py | 55 +++++++++++- bugsink/tests.py | 52 ++++++++---- ingest/management/commands/send_json.py | 5 +- ingest/views.py | 5 +- sentry/middleware/__init__.py | 0 sentry/middleware/proxy.py | 107 ------------------------ 7 files changed, 94 insertions(+), 132 deletions(-) delete mode 100644 sentry/middleware/__init__.py delete mode 100644 sentry/middleware/proxy.py diff --git a/bugsink/settings.py b/bugsink/settings.py index 17a4b17..5188358 100644 --- a/bugsink/settings.py +++ b/bugsink/settings.py @@ -71,8 +71,6 @@ MIDDLEWARE = [ 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', - 'sentry.middleware.proxy.DecompressBodyMiddleware', - 'bugsink.middleware.PerformanceStatsMiddleware', ] diff --git a/bugsink/streams.py b/bugsink/streams.py index 92574a7..00282e7 100644 --- a/bugsink/streams.py +++ b/bugsink/streams.py @@ -1,4 +1,5 @@ import zlib +import io DEFAULT_CHUNK_SIZE = 8 * 1024 @@ -15,7 +16,7 @@ WBITS_PARAM_FOR_GZIP = 16 + zlib.MAX_WBITS # zlib.MAX_WBITS == 15 WBITS_PARAM_FOR_DEFLATE = -zlib.MAX_WBITS -def decompress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE): +def zlib_generator(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE): z = zlib.decompressobj(wbits=wbits) while True: @@ -23,13 +24,58 @@ def decompress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_ if not compressed_chunk: break - output_stream.write(z.decompress(compressed_chunk)) + yield z.decompress(compressed_chunk) - output_stream.write(z.flush()) + yield z.flush() -def compress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE): +class ZLibReader: + + def __init__(self, input_stream, wbits): + self.generator = zlib_generator(input_stream, wbits) + self.unread = b"" + + def read(self, size=None): + if size is None: + for chunk in self.generator: + self.unread += chunk + + result = self.unread + self.unread = b"" + return result + + while size > len(self.unread): + try: + chunk = next(self.generator) + if chunk == b"": + break + self.unread += chunk + except StopIteration: + break + + self.unread, result = self.unread[size:], self.unread[:size] + return result + + +def content_encoding_reader(request): + encoding = request.META.get("HTTP_CONTENT_ENCODING", "").lower() + + if encoding == "gzip": + return ZLibReader(request, WBITS_PARAM_FOR_GZIP) + + if encoding == "deflate": + return ZLibReader(request, WBITS_PARAM_FOR_DEFLATE) + + if encoding == "br": + raise NotImplementedError("Brotli not supported (yet)") + + return request + + +def compress_with_zlib(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE): # mostly useful for testing (compress-decompress cycles) + + output_stream = io.BytesIO() z = zlib.compressobj(wbits=wbits) while True: @@ -40,6 +86,7 @@ def compress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CH output_stream.write(z.compress(uncompressed_chunk)) output_stream.write(z.flush()) + return output_stream.getvalue() class MaxDataReader: diff --git a/bugsink/tests.py b/bugsink/tests.py index 0316aaa..a87c4f1 100644 --- a/bugsink/tests.py +++ b/bugsink/tests.py @@ -14,7 +14,7 @@ from .period_counter import PeriodCounter, _prev_tup, TL_DAY, TL_MONTH, TL_YEAR from .volume_based_condition import VolumeBasedCondition from .registry import PeriodCounterRegistry from .streams import ( - compress_with_zlib, decompress_with_zlib, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE, MaxDataReader, + compress_with_zlib, ZLibReader, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE, MaxDataReader, MaxDataWriter) @@ -207,24 +207,48 @@ class StreamsTestCase(RegularTestCase): def test_compress_decompress_gzip(self): myself_times_ten = open(__file__, 'rb').read() * 10 plain_stream = io.BytesIO(myself_times_ten) - compressed_stream = io.BytesIO() - result_stream = io.BytesIO() - compress_with_zlib(plain_stream, compressed_stream, WBITS_PARAM_FOR_GZIP) - compressed_stream.seek(0) - decompress_with_zlib(compressed_stream, result_stream, WBITS_PARAM_FOR_GZIP) - self.assertEquals(myself_times_ten, result_stream.getvalue()) + compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_GZIP)) + + result = b"" + reader = ZLibReader(compressed_stream, WBITS_PARAM_FOR_GZIP) + + while True: + chunk = reader.read(3) + result += chunk + if chunk == b"": + break + + self.assertEquals(myself_times_ten, result) def test_compress_decompress_deflate(self): myself_times_ten = open(__file__, 'rb').read() * 10 - plain_stream = io.BytesIO(open(__file__, 'rb').read() * 10) - compressed_stream = io.BytesIO() - result_stream = io.BytesIO() + plain_stream = io.BytesIO(myself_times_ten) - compress_with_zlib(plain_stream, compressed_stream, WBITS_PARAM_FOR_DEFLATE) - compressed_stream.seek(0) - decompress_with_zlib(compressed_stream, result_stream, WBITS_PARAM_FOR_DEFLATE) - self.assertEquals(myself_times_ten, result_stream.getvalue()) + compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_DEFLATE)) + + result = b"" + reader = ZLibReader(compressed_stream, WBITS_PARAM_FOR_DEFLATE) + + while True: + chunk = reader.read(3) + result += chunk + if chunk == b"": + break + + self.assertEquals(myself_times_ten, result) + + def test_compress_decompress_read_none(self): + myself_times_ten = open(__file__, 'rb').read() * 10 + plain_stream = io.BytesIO(myself_times_ten) + + compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_DEFLATE)) + + result = b"" + reader = ZLibReader(compressed_stream, WBITS_PARAM_FOR_DEFLATE) + + result = reader.read(None) + self.assertEquals(myself_times_ten, result) def test_max_data_reader(self): stream = io.BytesIO(b"hello" * 100) diff --git a/ingest/management/commands/send_json.py b/ingest/management/commands/send_json.py index 4c3d3dc..6e98b65 100644 --- a/ingest/management/commands/send_json.py +++ b/ingest/management/commands/send_json.py @@ -141,13 +141,12 @@ class Command(BaseCommand): headers["Content-Encoding"] = "deflate" wbits = WBITS_PARAM_FOR_DEFLATE - compressed_data = io.BytesIO() - compress_with_zlib(io.BytesIO(data_bytes), compressed_data, wbits) + compressed_data = compress_with_zlib(io.BytesIO(data_bytes), wbits) response = requests.post( get_envelope_url(dsn) if use_envelope else get_store_url(dsn), headers=headers, - data=compressed_data.getvalue(), + data=compressed_data, ) else: diff --git a/ingest/views.py b/ingest/views.py index 98152bf..a1c0526 100644 --- a/ingest/views.py +++ b/ingest/views.py @@ -23,6 +23,7 @@ from bugsink.registry import get_pc_registry from bugsink.period_counter import PeriodCounter from bugsink.transaction import immediate_atomic, delay_on_commit from bugsink.exceptions import ViolatedExpectation +from bugsink.streams import content_encoding_reader from events.models import Event from releases.models import create_release_if_needed @@ -247,7 +248,7 @@ class IngestEventAPIView(BaseIngestAPIView): def post(self, request, project_pk=None): project = self.get_project(request, project_pk) - request_data = json.loads(request.read()) + request_data = json.loads(content_encoding_reader(request).read()) try: self.process_event(request_data, project, request) @@ -262,7 +263,7 @@ class IngestEnvelopeAPIView(BaseIngestAPIView): def post(self, request, project_pk=None): project = self.get_project(request, project_pk) - parser = StreamingEnvelopeParser(request) + parser = StreamingEnvelopeParser(content_encoding_reader(request)) # TODO: use the envelope_header's DSN if it is available (exact order-of-operations will depend on load-shedding # mechanisms) diff --git a/sentry/middleware/__init__.py b/sentry/middleware/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/sentry/middleware/proxy.py b/sentry/middleware/proxy.py deleted file mode 100644 index 35b315c..0000000 --- a/sentry/middleware/proxy.py +++ /dev/null @@ -1,107 +0,0 @@ -# VENDORED from glitchtip, af9a700a8706f20771b005804d8c92ca95c8b072 -# Changes: -# * flake8 -# * max size manually changed -# -# Before 1.0 I want to probably not do this as a middleware at all, but ingest everything without unzipping and then in -# some async process do the actual unzipping (which is potentially costly) - -import io -import zlib - -from django.core.exceptions import RequestDataTooBig - - -Z_CHUNK = 1024 * 8 - - -class ZDecoder(io.RawIOBase): - """ - Base class for HTTP content decoders based on zlib - See: https://github.com/eBay/wextracto/blob/9c789b1c98d95a1e87dbedfd1541a8688d128f5c/wex/http_decoder.py - """ - - def __init__(self, fp, z=None): - self.fp = fp - self.z = z - self.flushed = None - self.counter = 0 - - def readable(self): - return True - - def readinto(self, buf): - - n = 0 - max_length = len(buf) - # DOS mitigation - block unzipped payloads larger than max allowed size - self.counter += 1 - if self.counter * max_length > 1_000_000_000_000: # TODO I replaced this - raise RequestDataTooBig() - - while max_length > 0: - if self.flushed is None: - chunk = self.fp.read(Z_CHUNK) - compressed = self.z.unconsumed_tail + chunk - decompressed = self.z.decompress(compressed, max_length) - - if not chunk: - self.flushed = self.z.flush() - else: - if not self.flushed: - return n - - decompressed = self.flushed[:max_length] - self.flushed = self.flushed[max_length:] - - buf[n:n + len(decompressed)] = decompressed - n += len(decompressed) - max_length = len(buf) - n - - return n - - -class DeflateDecoder(ZDecoder): - """ - Decoding for "content-encoding: deflate" - """ - def __init__(self, fp): - ZDecoder.__init__(self, fp, zlib.decompressobj(-zlib.MAX_WBITS)) - - -class GzipDecoder(ZDecoder): - """ - Decoding for "content-encoding: gzip" - """ - - def __init__(self, fp): - ZDecoder.__init__(self, fp, zlib.decompressobj(16 + zlib.MAX_WBITS)) - - -class DecompressBodyMiddleware(object): - def __init__(self, get_response): - self.get_response = get_response - - def __call__(self, request): - decode = False - encoding = request.META.get("HTTP_CONTENT_ENCODING", "").lower() - - if encoding == "gzip": - request._stream = GzipDecoder(request._stream) - decode = True - - if encoding == "deflate": - request._stream = DeflateDecoder(request._stream) - decode = True - - if decode: - # Since we don't know the original content length ahead of time, we - # need to set the content length reasonably high so read generally - # succeeds. This seems to be the only easy way for Django 1.6. - request.META["CONTENT_LENGTH"] = "4294967295" # 0xffffffff - - # The original content encoding is no longer valid, so we have to - # remove the header. Otherwise, LazyData will attempt to re-decode - # the body. - del request.META["HTTP_CONTENT_ENCODING"] - return self.get_response(request)