mirror of
https://github.com/bugsink/bugsink.git
synced 2025-12-29 17:30:25 -06:00
Additional test scripts for gzip/deflate bombs
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
import io
|
||||
import zlib
|
||||
import os
|
||||
import uuid
|
||||
import brotli
|
||||
@@ -5,6 +7,7 @@ import brotli
|
||||
import requests
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from bugsink.streams import WBITS_PARAM_FOR_GZIP, WBITS_PARAM_FOR_DEFLATE
|
||||
|
||||
from compat.dsn import get_envelope_url, get_header_value
|
||||
|
||||
@@ -40,7 +43,7 @@ class Command(BaseCommand):
|
||||
return int(size_str)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
compress = "br"
|
||||
compress = options['compress']
|
||||
size = self._parse_size(options['size'])
|
||||
|
||||
if options['dsn'] is None:
|
||||
@@ -80,7 +83,9 @@ class Command(BaseCommand):
|
||||
print("Constructing bomb of size %d bytes..." % size)
|
||||
|
||||
brotli_compressor = brotli.Compressor()
|
||||
data_bytes = brotli_compressor.process(header)
|
||||
|
||||
output_stream = io.BytesIO()
|
||||
output_stream.write(brotli_compressor.process(header))
|
||||
|
||||
print("Constructing chunk of size %d bytes..." % construction_chunk_size)
|
||||
chunk = b'\x00' * construction_chunk_size
|
||||
@@ -88,27 +93,74 @@ class Command(BaseCommand):
|
||||
chunk_count = size // construction_chunk_size
|
||||
for i in range(chunk_count):
|
||||
print(" Adding chunk %d of %d..." % (i, chunk_count))
|
||||
data_bytes += brotli_compressor.process(chunk)
|
||||
output_stream.write(brotli_compressor.process(chunk))
|
||||
|
||||
remaining = size - (chunk_count * construction_chunk_size)
|
||||
if remaining > 0:
|
||||
print(" Adding final chunk")
|
||||
data_bytes += brotli_compressor.process(b'\x00' * remaining)
|
||||
output_stream.write(brotli_compressor.process(b'\x00' * remaining))
|
||||
|
||||
data_bytes += brotli_compressor.finish()
|
||||
output_stream.write(brotli_compressor.finish())
|
||||
data_bytes = output_stream.getvalue()
|
||||
|
||||
print("Bomb constructed, size is %d bytes." % len(data_bytes))
|
||||
return data_bytes
|
||||
|
||||
def br_bomb(self, header, size):
|
||||
if os.path.exists("/tmp/br-bomb-%d" % size):
|
||||
with open("/tmp/br-bomb-%d" % size, "rb") as f:
|
||||
filename = "/tmp/br-bomb-%d" % size
|
||||
if os.path.exists(filename):
|
||||
with open(filename, "rb") as f:
|
||||
data_bytes = f.read()
|
||||
print("Using cached brotli bomb of size %d bytes." % len(data_bytes))
|
||||
print("Using cached bomb %s of size %d bytes." % (filename, len(data_bytes)))
|
||||
return data_bytes
|
||||
|
||||
data_bytes = self.construct_br_bomb(header, size)
|
||||
with open("/tmp/br-bomb-%s" % size, "wb") as f:
|
||||
with open(filename, "wb") as f:
|
||||
f.write(data_bytes)
|
||||
|
||||
return data_bytes
|
||||
|
||||
def construct_zlib_bomb(self, header, size, wbits):
|
||||
construction_chunk_size = min(100 * MiB, size // 10)
|
||||
|
||||
print("Constructing bomb of size %d bytes..." % size)
|
||||
|
||||
zlib_compressor = zlib.compressobj(level=9, wbits=wbits)
|
||||
|
||||
output_stream = io.BytesIO()
|
||||
output_stream.write(zlib_compressor.compress(header))
|
||||
|
||||
print("Constructing chunk of size %d bytes..." % construction_chunk_size)
|
||||
chunk = b'\x00' * construction_chunk_size
|
||||
|
||||
chunk_count = size // construction_chunk_size
|
||||
for i in range(chunk_count):
|
||||
print(" Adding chunk %d of %d..." % (i, chunk_count))
|
||||
output_stream.write(zlib_compressor.compress(chunk))
|
||||
|
||||
remaining = size - (chunk_count * construction_chunk_size)
|
||||
if remaining > 0:
|
||||
print(" Adding final chunk")
|
||||
output_stream.write(zlib_compressor.compress(b'\x00' * remaining))
|
||||
|
||||
output_stream.write(zlib_compressor.flush())
|
||||
data_bytes = output_stream.getvalue()
|
||||
|
||||
print("Bomb constructed, size is %d bytes." % len(data_bytes))
|
||||
return data_bytes
|
||||
|
||||
def zlib_bomb(self, header, size, wbits):
|
||||
algo = "gzip" if wbits == WBITS_PARAM_FOR_GZIP else "deflate"
|
||||
filename = "/tmp/%s-bomb-%d" % (algo, size)
|
||||
|
||||
if os.path.exists(filename):
|
||||
with open(filename, "rb") as f:
|
||||
data_bytes = f.read()
|
||||
print("Using cached bomb %s of size %d bytes." % (filename, len(data_bytes)))
|
||||
return data_bytes
|
||||
|
||||
data_bytes = self.construct_zlib_bomb(header, size, wbits)
|
||||
with open(filename, "wb") as f:
|
||||
f.write(data_bytes)
|
||||
|
||||
return data_bytes
|
||||
@@ -138,6 +190,23 @@ class Command(BaseCommand):
|
||||
timeout=100,
|
||||
)
|
||||
|
||||
elif compress in ["gzip", "deflate"]:
|
||||
if compress == "gzip":
|
||||
headers["Content-Encoding"] = "gzip"
|
||||
wbits = WBITS_PARAM_FOR_GZIP
|
||||
elif compress == "deflate":
|
||||
headers["Content-Encoding"] = "deflate"
|
||||
wbits = WBITS_PARAM_FOR_DEFLATE
|
||||
|
||||
compressed_data = self.zlib_bomb(header_bytes, size, wbits)
|
||||
|
||||
response = requests.post(
|
||||
get_envelope_url(dsn),
|
||||
headers=headers,
|
||||
data=compressed_data,
|
||||
timeout=100,
|
||||
)
|
||||
|
||||
else:
|
||||
raise Exception("Unsupported compression method: %s" % compress)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user