mirror of
https://github.com/bugsink/bugsink.git
synced 2026-02-14 01:38:40 -06:00
Use my own impl. of conent_encoding based decompress
In the process: make it read-read rather than read-write
This commit is contained in:
@@ -71,8 +71,6 @@ MIDDLEWARE = [
|
||||
'django.contrib.messages.middleware.MessageMiddleware',
|
||||
'django.middleware.clickjacking.XFrameOptionsMiddleware',
|
||||
|
||||
'sentry.middleware.proxy.DecompressBodyMiddleware',
|
||||
|
||||
'bugsink.middleware.PerformanceStatsMiddleware',
|
||||
]
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import zlib
|
||||
import io
|
||||
|
||||
|
||||
DEFAULT_CHUNK_SIZE = 8 * 1024
|
||||
@@ -15,7 +16,7 @@ WBITS_PARAM_FOR_GZIP = 16 + zlib.MAX_WBITS # zlib.MAX_WBITS == 15
|
||||
WBITS_PARAM_FOR_DEFLATE = -zlib.MAX_WBITS
|
||||
|
||||
|
||||
def decompress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
|
||||
def zlib_generator(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
|
||||
z = zlib.decompressobj(wbits=wbits)
|
||||
|
||||
while True:
|
||||
@@ -23,13 +24,58 @@ def decompress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_
|
||||
if not compressed_chunk:
|
||||
break
|
||||
|
||||
output_stream.write(z.decompress(compressed_chunk))
|
||||
yield z.decompress(compressed_chunk)
|
||||
|
||||
output_stream.write(z.flush())
|
||||
yield z.flush()
|
||||
|
||||
|
||||
def compress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
|
||||
class ZLibReader:
|
||||
|
||||
def __init__(self, input_stream, wbits):
|
||||
self.generator = zlib_generator(input_stream, wbits)
|
||||
self.unread = b""
|
||||
|
||||
def read(self, size=None):
|
||||
if size is None:
|
||||
for chunk in self.generator:
|
||||
self.unread += chunk
|
||||
|
||||
result = self.unread
|
||||
self.unread = b""
|
||||
return result
|
||||
|
||||
while size > len(self.unread):
|
||||
try:
|
||||
chunk = next(self.generator)
|
||||
if chunk == b"":
|
||||
break
|
||||
self.unread += chunk
|
||||
except StopIteration:
|
||||
break
|
||||
|
||||
self.unread, result = self.unread[size:], self.unread[:size]
|
||||
return result
|
||||
|
||||
|
||||
def content_encoding_reader(request):
|
||||
encoding = request.META.get("HTTP_CONTENT_ENCODING", "").lower()
|
||||
|
||||
if encoding == "gzip":
|
||||
return ZLibReader(request, WBITS_PARAM_FOR_GZIP)
|
||||
|
||||
if encoding == "deflate":
|
||||
return ZLibReader(request, WBITS_PARAM_FOR_DEFLATE)
|
||||
|
||||
if encoding == "br":
|
||||
raise NotImplementedError("Brotli not supported (yet)")
|
||||
|
||||
return request
|
||||
|
||||
|
||||
def compress_with_zlib(input_stream, wbits, chunk_size=DEFAULT_CHUNK_SIZE):
|
||||
# mostly useful for testing (compress-decompress cycles)
|
||||
|
||||
output_stream = io.BytesIO()
|
||||
z = zlib.compressobj(wbits=wbits)
|
||||
|
||||
while True:
|
||||
@@ -40,6 +86,7 @@ def compress_with_zlib(input_stream, output_stream, wbits, chunk_size=DEFAULT_CH
|
||||
output_stream.write(z.compress(uncompressed_chunk))
|
||||
|
||||
output_stream.write(z.flush())
|
||||
return output_stream.getvalue()
|
||||
|
||||
|
||||
class MaxDataReader:
|
||||
|
||||
@@ -14,7 +14,7 @@ from .period_counter import PeriodCounter, _prev_tup, TL_DAY, TL_MONTH, TL_YEAR
|
||||
from .volume_based_condition import VolumeBasedCondition
|
||||
from .registry import PeriodCounterRegistry
|
||||
from .streams import (
|
||||
compress_with_zlib, decompress_with_zlib, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE, MaxDataReader,
|
||||
compress_with_zlib, ZLibReader, WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE, MaxDataReader,
|
||||
MaxDataWriter)
|
||||
|
||||
|
||||
@@ -207,24 +207,48 @@ class StreamsTestCase(RegularTestCase):
|
||||
def test_compress_decompress_gzip(self):
|
||||
myself_times_ten = open(__file__, 'rb').read() * 10
|
||||
plain_stream = io.BytesIO(myself_times_ten)
|
||||
compressed_stream = io.BytesIO()
|
||||
result_stream = io.BytesIO()
|
||||
|
||||
compress_with_zlib(plain_stream, compressed_stream, WBITS_PARAM_FOR_GZIP)
|
||||
compressed_stream.seek(0)
|
||||
decompress_with_zlib(compressed_stream, result_stream, WBITS_PARAM_FOR_GZIP)
|
||||
self.assertEquals(myself_times_ten, result_stream.getvalue())
|
||||
compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_GZIP))
|
||||
|
||||
result = b""
|
||||
reader = ZLibReader(compressed_stream, WBITS_PARAM_FOR_GZIP)
|
||||
|
||||
while True:
|
||||
chunk = reader.read(3)
|
||||
result += chunk
|
||||
if chunk == b"":
|
||||
break
|
||||
|
||||
self.assertEquals(myself_times_ten, result)
|
||||
|
||||
def test_compress_decompress_deflate(self):
|
||||
myself_times_ten = open(__file__, 'rb').read() * 10
|
||||
plain_stream = io.BytesIO(open(__file__, 'rb').read() * 10)
|
||||
compressed_stream = io.BytesIO()
|
||||
result_stream = io.BytesIO()
|
||||
plain_stream = io.BytesIO(myself_times_ten)
|
||||
|
||||
compress_with_zlib(plain_stream, compressed_stream, WBITS_PARAM_FOR_DEFLATE)
|
||||
compressed_stream.seek(0)
|
||||
decompress_with_zlib(compressed_stream, result_stream, WBITS_PARAM_FOR_DEFLATE)
|
||||
self.assertEquals(myself_times_ten, result_stream.getvalue())
|
||||
compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_DEFLATE))
|
||||
|
||||
result = b""
|
||||
reader = ZLibReader(compressed_stream, WBITS_PARAM_FOR_DEFLATE)
|
||||
|
||||
while True:
|
||||
chunk = reader.read(3)
|
||||
result += chunk
|
||||
if chunk == b"":
|
||||
break
|
||||
|
||||
self.assertEquals(myself_times_ten, result)
|
||||
|
||||
def test_compress_decompress_read_none(self):
|
||||
myself_times_ten = open(__file__, 'rb').read() * 10
|
||||
plain_stream = io.BytesIO(myself_times_ten)
|
||||
|
||||
compressed_stream = io.BytesIO(compress_with_zlib(plain_stream, WBITS_PARAM_FOR_DEFLATE))
|
||||
|
||||
result = b""
|
||||
reader = ZLibReader(compressed_stream, WBITS_PARAM_FOR_DEFLATE)
|
||||
|
||||
result = reader.read(None)
|
||||
self.assertEquals(myself_times_ten, result)
|
||||
|
||||
def test_max_data_reader(self):
|
||||
stream = io.BytesIO(b"hello" * 100)
|
||||
|
||||
@@ -141,13 +141,12 @@ class Command(BaseCommand):
|
||||
headers["Content-Encoding"] = "deflate"
|
||||
wbits = WBITS_PARAM_FOR_DEFLATE
|
||||
|
||||
compressed_data = io.BytesIO()
|
||||
compress_with_zlib(io.BytesIO(data_bytes), compressed_data, wbits)
|
||||
compressed_data = compress_with_zlib(io.BytesIO(data_bytes), wbits)
|
||||
|
||||
response = requests.post(
|
||||
get_envelope_url(dsn) if use_envelope else get_store_url(dsn),
|
||||
headers=headers,
|
||||
data=compressed_data.getvalue(),
|
||||
data=compressed_data,
|
||||
)
|
||||
|
||||
else:
|
||||
|
||||
@@ -23,6 +23,7 @@ from bugsink.registry import get_pc_registry
|
||||
from bugsink.period_counter import PeriodCounter
|
||||
from bugsink.transaction import immediate_atomic, delay_on_commit
|
||||
from bugsink.exceptions import ViolatedExpectation
|
||||
from bugsink.streams import content_encoding_reader
|
||||
|
||||
from events.models import Event
|
||||
from releases.models import create_release_if_needed
|
||||
@@ -247,7 +248,7 @@ class IngestEventAPIView(BaseIngestAPIView):
|
||||
def post(self, request, project_pk=None):
|
||||
project = self.get_project(request, project_pk)
|
||||
|
||||
request_data = json.loads(request.read())
|
||||
request_data = json.loads(content_encoding_reader(request).read())
|
||||
|
||||
try:
|
||||
self.process_event(request_data, project, request)
|
||||
@@ -262,7 +263,7 @@ class IngestEnvelopeAPIView(BaseIngestAPIView):
|
||||
def post(self, request, project_pk=None):
|
||||
project = self.get_project(request, project_pk)
|
||||
|
||||
parser = StreamingEnvelopeParser(request)
|
||||
parser = StreamingEnvelopeParser(content_encoding_reader(request))
|
||||
|
||||
# TODO: use the envelope_header's DSN if it is available (exact order-of-operations will depend on load-shedding
|
||||
# mechanisms)
|
||||
|
||||
@@ -1,107 +0,0 @@
|
||||
# VENDORED from glitchtip, af9a700a8706f20771b005804d8c92ca95c8b072
|
||||
# Changes:
|
||||
# * flake8
|
||||
# * max size manually changed
|
||||
#
|
||||
# Before 1.0 I want to probably not do this as a middleware at all, but ingest everything without unzipping and then in
|
||||
# some async process do the actual unzipping (which is potentially costly)
|
||||
|
||||
import io
|
||||
import zlib
|
||||
|
||||
from django.core.exceptions import RequestDataTooBig
|
||||
|
||||
|
||||
Z_CHUNK = 1024 * 8
|
||||
|
||||
|
||||
class ZDecoder(io.RawIOBase):
|
||||
"""
|
||||
Base class for HTTP content decoders based on zlib
|
||||
See: https://github.com/eBay/wextracto/blob/9c789b1c98d95a1e87dbedfd1541a8688d128f5c/wex/http_decoder.py
|
||||
"""
|
||||
|
||||
def __init__(self, fp, z=None):
|
||||
self.fp = fp
|
||||
self.z = z
|
||||
self.flushed = None
|
||||
self.counter = 0
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
|
||||
def readinto(self, buf):
|
||||
|
||||
n = 0
|
||||
max_length = len(buf)
|
||||
# DOS mitigation - block unzipped payloads larger than max allowed size
|
||||
self.counter += 1
|
||||
if self.counter * max_length > 1_000_000_000_000: # TODO I replaced this
|
||||
raise RequestDataTooBig()
|
||||
|
||||
while max_length > 0:
|
||||
if self.flushed is None:
|
||||
chunk = self.fp.read(Z_CHUNK)
|
||||
compressed = self.z.unconsumed_tail + chunk
|
||||
decompressed = self.z.decompress(compressed, max_length)
|
||||
|
||||
if not chunk:
|
||||
self.flushed = self.z.flush()
|
||||
else:
|
||||
if not self.flushed:
|
||||
return n
|
||||
|
||||
decompressed = self.flushed[:max_length]
|
||||
self.flushed = self.flushed[max_length:]
|
||||
|
||||
buf[n:n + len(decompressed)] = decompressed
|
||||
n += len(decompressed)
|
||||
max_length = len(buf) - n
|
||||
|
||||
return n
|
||||
|
||||
|
||||
class DeflateDecoder(ZDecoder):
|
||||
"""
|
||||
Decoding for "content-encoding: deflate"
|
||||
"""
|
||||
def __init__(self, fp):
|
||||
ZDecoder.__init__(self, fp, zlib.decompressobj(-zlib.MAX_WBITS))
|
||||
|
||||
|
||||
class GzipDecoder(ZDecoder):
|
||||
"""
|
||||
Decoding for "content-encoding: gzip"
|
||||
"""
|
||||
|
||||
def __init__(self, fp):
|
||||
ZDecoder.__init__(self, fp, zlib.decompressobj(16 + zlib.MAX_WBITS))
|
||||
|
||||
|
||||
class DecompressBodyMiddleware(object):
|
||||
def __init__(self, get_response):
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
decode = False
|
||||
encoding = request.META.get("HTTP_CONTENT_ENCODING", "").lower()
|
||||
|
||||
if encoding == "gzip":
|
||||
request._stream = GzipDecoder(request._stream)
|
||||
decode = True
|
||||
|
||||
if encoding == "deflate":
|
||||
request._stream = DeflateDecoder(request._stream)
|
||||
decode = True
|
||||
|
||||
if decode:
|
||||
# Since we don't know the original content length ahead of time, we
|
||||
# need to set the content length reasonably high so read generally
|
||||
# succeeds. This seems to be the only easy way for Django 1.6.
|
||||
request.META["CONTENT_LENGTH"] = "4294967295" # 0xffffffff
|
||||
|
||||
# The original content encoding is no longer valid, so we have to
|
||||
# remove the header. Otherwise, LazyData will attempt to re-decode
|
||||
# the body.
|
||||
del request.META["HTTP_CONTENT_ENCODING"]
|
||||
return self.get_response(request)
|
||||
Reference in New Issue
Block a user