mirror of
https://github.com/bugsink/bugsink.git
synced 2025-12-19 03:19:52 -06:00
Introduce FileEventStorage
An (optional) way to store the `event_data` (full event as JSON)
outside the DB. This is expected to be useful for larger setups,
because it gives you:
* A more portable database (e.g. backups); (depeding on event size
the impact on your DB is ~50x.
* Less worries about hitting "physical" limits (e.g. disk size, max
file size) for your DB.
Presumably (more testing will happen going forwards) it will:
* Speed up migrations (especially on sqlite, which does full table
copies)
* Speed up event ingestion(?)
Further improvements in this commit:
* `delete_with_limit` was removed; this removes one tie-in to MySQL/Sqlite
(See #21 for this bullet)
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -15,6 +15,9 @@ __pycache__
|
||||
/bugsink/_version.py
|
||||
/dist/
|
||||
|
||||
# This matches the filestorage mentioned in development.py
|
||||
/filestorage
|
||||
|
||||
|
||||
# sqlite
|
||||
/db.sqlite3
|
||||
|
||||
@@ -62,6 +62,7 @@ DEFAULTS = {
|
||||
|
||||
# Locations of files & directories:
|
||||
"INGEST_STORE_BASE_DIR": "/tmp/bugsink/ingestion",
|
||||
"EVENT_STORAGES": {},
|
||||
|
||||
# Security:
|
||||
"MINIMIZE_INFORMATION_EXPOSURE": False,
|
||||
|
||||
@@ -95,6 +95,15 @@ BUGSINK = {
|
||||
"MAX_EVENTS_PER_PROJECT_PER_5_MINUTES": 1_000_000,
|
||||
"MAX_EVENTS_PER_PROJECT_PER_HOUR": 50_000_000,
|
||||
|
||||
"EVENT_STORAGES": {
|
||||
"local_flat_files": {
|
||||
"STORAGE": "events.storage.FileEventStorage",
|
||||
"OPTIONS": {
|
||||
"basepath": os.path.join(BASE_DIR, "filestorage"),
|
||||
},
|
||||
"USE_FOR_WRITE": True,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
20
events/migrations/0019_event_storage_backend.py
Normal file
20
events/migrations/0019_event_storage_backend.py
Normal file
@@ -0,0 +1,20 @@
|
||||
# Generated by Django 4.2.19 on 2025-02-11 20:48
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("events", "0018_remove_event_has_exception_remove_event_has_logentry"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="event",
|
||||
name="storage_backend",
|
||||
field=models.CharField(
|
||||
blank=True, default=None, editable=False, max_length=255, null=True
|
||||
),
|
||||
),
|
||||
]
|
||||
@@ -12,6 +12,7 @@ from compat.timestamp import parse_timestamp
|
||||
from issues.utils import get_title_for_exception_type_and_value
|
||||
|
||||
from .retention import get_random_irrelevance
|
||||
from .storage_registry import get_write_storage, get_storage
|
||||
|
||||
|
||||
class Platform(models.TextChoices):
|
||||
@@ -48,6 +49,15 @@ def maybe_empty(s):
|
||||
return "" if not s else s
|
||||
|
||||
|
||||
def write_to_storage(event_id, parsed_data):
|
||||
"""
|
||||
event_id means event.id, i.e. the internal one. This saves us from thinking about the security implications of
|
||||
using an externally provided ID across storage backends.
|
||||
"""
|
||||
with get_write_storage().open(event_id, "w") as f:
|
||||
json.dump(parsed_data, f)
|
||||
|
||||
|
||||
class Event(models.Model):
|
||||
# Lines quotes with ">" are from the following to resources:
|
||||
# https://develop.sentry.dev/sdk/event-payloads/ (supposedly more human-readable)
|
||||
@@ -139,6 +149,8 @@ class Event(models.Model):
|
||||
irrelevance_for_retention = models.PositiveIntegerField(blank=False, null=False)
|
||||
never_evict = models.BooleanField(blank=False, null=False, default=False)
|
||||
|
||||
storage_backend = models.CharField(max_length=255, blank=True, null=True, default=None, editable=False)
|
||||
|
||||
# The following list of attributes are mentioned in the docs but are not attrs on our model (because we don't need
|
||||
# them to be [yet]):
|
||||
#
|
||||
@@ -168,10 +180,20 @@ class Event(models.Model):
|
||||
]
|
||||
|
||||
def get_raw_data(self):
|
||||
return self.data
|
||||
if self.storage_backend is None:
|
||||
return self.data
|
||||
|
||||
storage = get_storage(self.storage_backend)
|
||||
with storage.open(self.id, "r") as f:
|
||||
return f.read()
|
||||
|
||||
def get_parsed_data(self):
|
||||
return json.loads(self.data)
|
||||
if self.storage_backend is None:
|
||||
return json.loads(self.data)
|
||||
|
||||
storage = get_storage(self.storage_backend)
|
||||
with storage.open(self.id, "r") as f:
|
||||
return json.load(f)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return f"/issues/issue/{ self.issue_id }/event/{ self.id }/"
|
||||
@@ -197,6 +219,8 @@ class Event(models.Model):
|
||||
|
||||
irrelevance_for_retention = get_random_irrelevance(stored_event_count)
|
||||
|
||||
write_storage = get_write_storage()
|
||||
|
||||
# A note on truncation (max_length): the fields we truncate here are directly from the SDK, so they "should have
|
||||
# been" truncated already. But we err on the side of caution: this is the kind of SDK error that we can, and
|
||||
# just want to, paper over (it's not worth dropping the event for).
|
||||
@@ -208,7 +232,8 @@ class Event(models.Model):
|
||||
grouping=grouping,
|
||||
ingested_at=event_metadata["ingested_at"],
|
||||
digested_at=digested_at,
|
||||
data=json.dumps(parsed_data),
|
||||
data=json.dumps(parsed_data) if write_storage is None else "",
|
||||
storage_backend=None if write_storage is None else write_storage.name,
|
||||
|
||||
timestamp=parse_timestamp(parsed_data["timestamp"]),
|
||||
platform=parsed_data["platform"][:64],
|
||||
@@ -235,6 +260,9 @@ class Event(models.Model):
|
||||
)
|
||||
created = True
|
||||
|
||||
if write_storage is not None:
|
||||
write_to_storage(event.id, parsed_data)
|
||||
|
||||
return event, created
|
||||
except IntegrityError as e:
|
||||
if not re.match(
|
||||
|
||||
@@ -4,12 +4,12 @@ from django.db.models import Q, Min, Max
|
||||
from random import random
|
||||
from datetime import timezone, datetime
|
||||
|
||||
from django.db.models.sql.compiler import SQLDeleteCompiler
|
||||
from django.db import connection
|
||||
|
||||
from bugsink.moreiterutils import pairwise, map_N_until
|
||||
from performance.context_managers import time_and_query_count
|
||||
|
||||
from .storage_registry import get_storage
|
||||
|
||||
bugsink_logger = logging.getLogger("bugsink")
|
||||
performance_logger = logging.getLogger("bugsink.performance.retention")
|
||||
|
||||
|
||||
@@ -269,20 +269,6 @@ def evict_for_irrelevance(
|
||||
return evicted
|
||||
|
||||
|
||||
def delete_with_limit(qs, limit):
|
||||
# Django does not support this out of the box (i.e. it does not support LIMIT in DELETE queries). Both Sqlite and
|
||||
# MySQL do in fact support it (whereas many other DBs do not), so we just reach down into Django's internals.
|
||||
sql, params = SQLDeleteCompiler(qs.query, connection, 'default').as_sql()
|
||||
limited_sql = sql + " LIMIT %s"
|
||||
limited_params = params + (limit,)
|
||||
|
||||
with connection.cursor() as cursor:
|
||||
cursor.execute(limited_sql, limited_params)
|
||||
nr_of_deletions = cursor.rowcount
|
||||
|
||||
return nr_of_deletions
|
||||
|
||||
|
||||
def evict_for_epoch_and_irrelevance(project, max_epoch, max_irrelevance, max_event_count, include_never_evict):
|
||||
from issues.models import TurningPoint
|
||||
from .models import Event
|
||||
@@ -317,5 +303,36 @@ def evict_for_epoch_and_irrelevance(project, max_epoch, max_irrelevance, max_eve
|
||||
# we need to manually ensure that no FKs to the deleted items exist:
|
||||
TurningPoint.objects.filter(triggering_event__in=qs).update(triggering_event=None)
|
||||
|
||||
nr_of_deletions = delete_with_limit(qs, max_event_count)
|
||||
# We generate the list of events-to-delete (including the LIMIT) before proceeding; this allows us:
|
||||
# A. to have a portable delete_with_limit (e.g. Django does not support that, nor does Postgres).
|
||||
# B. to apply both deletion and cleanup_events_on_storage() on the same list.
|
||||
#
|
||||
# Implementation notes:
|
||||
# 1. We force evaluation here with a `list()`; this means subsequent usages do _not_ try to "just use an inner
|
||||
# query". Although inner queries are attractive in the abstract, the literature suggests that performance may be
|
||||
# unpredictable (e.g. on MySQL). By using a list, we lift the (max 500) ids-to-match to the actual query, which
|
||||
# is quite ugly, but predictable and (at least on sqlite where I tested this) lightning-fast.
|
||||
# 2. order_by: "pick something" to ensure the 2 usages of the "subquery" point to the same thing. (somewhat
|
||||
# superceded by [1] above; but I like to be defensive and predictable). tie-breaking on digest_order seems
|
||||
# consistent with the semantics of eviction.
|
||||
pks_to_delete = list(qs.order_by("digest_order")[:max_event_count].values_list("pk", flat=True))
|
||||
|
||||
if len(pks_to_delete) > 0:
|
||||
cleanup_events_on_storage(
|
||||
Event.objects.filter(pk__in=pks_to_delete).exclude(storage_backend=None)
|
||||
.values_list("id", "storage_backend")
|
||||
)
|
||||
nr_of_deletions = Event.objects.filter(pk__in=pks_to_delete).delete()[1]["events.Event"]
|
||||
else:
|
||||
nr_of_deletions = 0
|
||||
|
||||
return nr_of_deletions
|
||||
|
||||
|
||||
def cleanup_events_on_storage(todos):
|
||||
for event_id, storage_backend in todos:
|
||||
try:
|
||||
get_storage(storage_backend).delete(event_id)
|
||||
except Exception as e:
|
||||
# in a try/except such that we can continue with the rest of the batch
|
||||
bugsink_logger.error("Error during cleanup of %s on %s: %s", event_id, storage_backend, e)
|
||||
|
||||
58
events/storage.py
Normal file
58
events/storage.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import contextlib
|
||||
import os.path
|
||||
|
||||
|
||||
class EventStorage(object):
|
||||
|
||||
def __init__(self, name, **options):
|
||||
self.name = name
|
||||
|
||||
def save(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def exists(self, event_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
def delete(self, event_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
def open(self, event_id):
|
||||
raise NotImplementedError()
|
||||
|
||||
# one might imagine contexts where something like "url" is useful (e.g. S3, and pointing the end-user straight at
|
||||
# the event file) but such a model means you'll need to think about the security implications of that URL, which is
|
||||
# not worth it, so we only support "pass through application layer" (where the auth stuff is) models of usage.
|
||||
|
||||
|
||||
class FileEventStorage(EventStorage):
|
||||
|
||||
def __init__(self, name, basepath=None):
|
||||
super().__init__(name)
|
||||
|
||||
if basepath is None:
|
||||
raise ValueError("Basepath must be provided")
|
||||
|
||||
self.basepath = basepath
|
||||
|
||||
def _event_path(self, event_id):
|
||||
# the dashes in uuid are preserved in the filename for readability; since their location is consistent, this is
|
||||
# not a problem.
|
||||
return os.path.join(self.basepath, str(event_id) + ".json")
|
||||
|
||||
@contextlib.contextmanager
|
||||
def open(self, event_id, mode='r'):
|
||||
if mode not in ('r', 'w'):
|
||||
# EventStorage's API is generally _very_ limited (unique IDs, write-once) so we can (and should) be very
|
||||
# strict about what we allow; we further imply "text mode" and "utf-8 encoding" given the JSON context.
|
||||
raise ValueError("EventStorage.open() mode must be 'r' or 'w'")
|
||||
|
||||
# We open with utf-8 encoding explicitly to pre-empt the future of pep-0686 (it's also the only thing that makes
|
||||
# sense in the context of JSON)
|
||||
with open(self._event_path(event_id), mode, encoding="utf-8") as f:
|
||||
yield f
|
||||
|
||||
def exists(self, event_id):
|
||||
return os.path.exists(self._event_path(event_id))
|
||||
|
||||
def delete(self, event_id):
|
||||
os.remove(self._event_path(event_id))
|
||||
50
events/storage_registry.py
Normal file
50
events/storage_registry.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import importlib
|
||||
|
||||
from bugsink.app_settings import get_settings
|
||||
|
||||
|
||||
_storages = None
|
||||
_write_storage = None
|
||||
|
||||
|
||||
def _ensure_storages():
|
||||
global _storages
|
||||
global _write_storage
|
||||
|
||||
if _storages is not None:
|
||||
return
|
||||
|
||||
_storages = {name: _resolve(name, conf) for name, conf in get_settings().EVENT_STORAGES.items()}
|
||||
|
||||
matching = [name for name, conf in get_settings().EVENT_STORAGES.items() if conf.get("USE_FOR_WRITE", False)]
|
||||
|
||||
if len(matching) == 1:
|
||||
_write_storage = _storages[matching[0]]
|
||||
|
||||
elif len(matching) > 1:
|
||||
raise ValueError("Multiple USE_FOR_WRITE storages found.")
|
||||
|
||||
# else len==0 is implied by the initial value of _write_storage (None)
|
||||
|
||||
|
||||
def get_write_storage():
|
||||
"""
|
||||
Gets the USE_FOR_WRITE storage. None means "in-database" (which is not shoe-horned in the EventStorage API).
|
||||
"""
|
||||
_ensure_storages()
|
||||
return _write_storage
|
||||
|
||||
|
||||
def get_storage(storage_name):
|
||||
_ensure_storages()
|
||||
return _storages[storage_name]
|
||||
|
||||
|
||||
def _resolve(name, conf):
|
||||
storage_name = conf["STORAGE"]
|
||||
|
||||
module_name, class_name = storage_name.rsplit('.', 1)
|
||||
module = importlib.import_module(module_name)
|
||||
clazz = getattr(module, class_name)
|
||||
|
||||
return clazz(name, **conf.get("OPTIONS", {}))
|
||||
Reference in New Issue
Block a user