fix: restore audit logging by using before_flush for updates/deletes

Audit logs were not recording any changes because after_flush runs
after SQL is emitted; by then session.new, session.dirty, and
session.deleted can be cleared and attribute history for updates is
often consumed, so the handler saw nothing to log.

Changes:
- Add receive_before_flush: process session.dirty (updates) and
  session.deleted (deletes) while history is still valid; stash
  session.new (creates) in session.info for after_flush.
- Simplify receive_after_flush: only handle pending creates from
  session.info (instances now have ids), then session.flush() so
  audit rows are in the same transaction.
- Register receive_before_flush for before_flush on Session,
  sessionmaker class, and SignallingSession.
- Make receive_before_flush accept (session, flush_context, instances)
  to match SQLAlchemy's before_flush signature.
- Remove db.session.flush() from AuditLog.log_change to avoid
  nested flush; rely on main flush or explicit flush in after_flush.
- check_audit_logging.py: use entity_type='TimeEntry' to match
  get_entity_type (model __class__.__name__).
- test_audit_logging: assert at least one AuditLog for create/update/
  delete; use test_client for create; fix update to merge then mutate.
This commit is contained in:
Dries Peeters
2026-01-21 13:59:13 +01:00
parent bd642a57ec
commit 6881e554ce
5 changed files with 98 additions and 91 deletions
+6 -3
View File
@@ -323,8 +323,9 @@ def create_app(config=None):
from sqlalchemy.orm import Session
# Register with generic SQLAlchemy Session (catches all Session instances)
event.listen(Session, "before_flush", audit.receive_before_flush)
event.listen(Session, "after_flush", audit.receive_after_flush)
# Also register with Flask-SQLAlchemy's sessionmaker if available
# Flask-SQLAlchemy creates sessions from a sessionmaker, so we register with that
try:
@@ -335,19 +336,21 @@ def create_app(config=None):
# Register with the session class that the sessionmaker creates
session_class = sessionmaker.class_
if session_class:
event.listen(session_class, "before_flush", audit.receive_before_flush)
event.listen(session_class, "after_flush", audit.receive_after_flush)
logger.info(f"Registered audit logging with Flask-SQLAlchemy session class: {session_class.__name__}")
except Exception as e:
logger.debug(f"Could not register with Flask-SQLAlchemy sessionmaker: {e}")
# Register with SignallingSession (Flask-SQLAlchemy 2.x)
try:
from flask_sqlalchemy import SignallingSession
event.listen(SignallingSession, "before_flush", audit.receive_before_flush)
event.listen(SignallingSession, "after_flush", audit.receive_after_flush)
logger.info("Registered audit logging with Flask-SQLAlchemy SignallingSession")
except (ImportError, AttributeError):
pass
logger.info("Audit logging event listeners registered")
# Initialize Settings from environment variables on startup.
+3 -5
View File
@@ -119,12 +119,10 @@ class AuditLog(db.Model):
)
try:
# Add to session - don't commit here as we're likely in the middle of a transaction
# The main transaction will commit everything together
# Add to session - don't commit here as we're likely in the middle of a transaction.
# The main flush (or the explicit session.flush() in receive_after_flush for creates)
# will persist the audit row.
db.session.add(audit_log)
# Flush to ensure the audit log is part of the current transaction
# but don't commit - let the main transaction handle that
db.session.flush()
except Exception as e:
# Don't rollback - that would rollback the entire transaction including the main operation!
# Just remove the audit log from the session and continue
+70 -52
View File
@@ -152,77 +152,55 @@ def serialize_value(value):
return str(value)
def receive_after_flush(session, flush_context):
"""Track changes after flush but before commit"""
try:
# Check if audit_logs table exists before trying to log
# Force check every 100 calls to allow for table creation after migration
if not hasattr(receive_after_flush, "_call_count"):
receive_after_flush._call_count = 0
receive_after_flush._call_count += 1
# Call count for table-exists check (force recheck every 100) and warning/debug logs
_audit_call_count = 0
# Force check every 100 calls or if cache is None
force_check = receive_after_flush._call_count % 100 == 0
def receive_before_flush(session, flush_context, instances=None):
"""Track updates and deletes before flush; stash new objects for receive_after_flush.
At before_flush, session.dirty, session.deleted, and attribute history are still
valid. session.new is present but new objects do not have ids yet, so we stash
them in session.info to be logged in after_flush when ids are available.
Note: SQLAlchemy's before_flush passes (session, flush_context, instances);
we use session.new/dirty/deleted directly, so instances is not used.
"""
global _audit_call_count
try:
if flush_context and getattr(flush_context, "nested", False):
return
_audit_call_count += 1
force_check = _audit_call_count % 100 == 0
table_exists = check_audit_table_exists(force_check=force_check)
if not table_exists:
# Log at info level (not debug) so it's visible if table doesn't exist
if receive_after_flush._call_count == 1 or force_check:
if _audit_call_count == 1 or force_check:
logger.warning("audit_logs table does not exist - audit logging disabled. Run migration: flask db upgrade")
return
# Log that the event listener is being triggered (only first few times for debugging)
if receive_after_flush._call_count <= 3:
logger.debug(f"Audit logging event listener triggered (call #{receive_after_flush._call_count})")
user_id = get_current_user_id()
ip_address, user_agent, request_path = get_request_info()
# Track inserts (creates)
for instance in session.new:
if should_track_model(instance):
entity_type = get_entity_type(instance)
entity_id = instance.id if hasattr(instance, "id") else None
entity_name = get_entity_name(instance)
# Log creation
AuditLog = get_audit_log_model()
AuditLog.log_change(
user_id=user_id,
action="created",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
change_description=f"Created {entity_type.lower()} '{entity_name}'",
ip_address=ip_address,
user_agent=user_agent,
request_path=request_path,
)
# Track updates
# Track updates (dirty) - attribute history is still valid in before_flush
for instance in session.dirty:
if should_track_model(instance):
entity_type = get_entity_type(instance)
entity_id = instance.id if hasattr(instance, "id") else None
entity_name = get_entity_name(instance)
# Get the instance state using SQLAlchemy inspect
try:
instance_state = inspect(instance)
# Track individual field changes
changed_fields = []
for attr_name in instance_state.mapper.column_attrs.keys():
if should_track_field(attr_name):
# Get history for this attribute
history = instance_state.get_history(attr_name, True)
if history.has_changes():
old_value = history.deleted[0] if history.deleted else None
new_value = history.added[0] if history.added else None
if old_value != new_value:
changed_fields.append({"field": attr_name, "old": old_value, "new": new_value})
# Log each field change separately for detailed audit trail
AuditLog = get_audit_log_model()
if changed_fields:
for change in changed_fields:
@@ -241,7 +219,6 @@ def receive_after_flush(session, flush_context):
request_path=request_path,
)
else:
# Fallback: log update without field details if history is not available
AuditLog.log_change(
user_id=user_id,
action="updated",
@@ -254,7 +231,6 @@ def receive_after_flush(session, flush_context):
request_path=request_path,
)
except Exception as e:
# Fallback: log update without field details if inspection fails
logger.warning(f"Could not inspect changes for {entity_type}#{entity_id}: {e}")
AuditLog = get_audit_log_model()
AuditLog.log_change(
@@ -275,8 +251,6 @@ def receive_after_flush(session, flush_context):
entity_type = get_entity_type(instance)
entity_id = instance.id if hasattr(instance, "id") else None
entity_name = get_entity_name(instance)
# Log deletion
try:
AuditLog = get_audit_log_model()
AuditLog.log_change(
@@ -290,14 +264,58 @@ def receive_after_flush(session, flush_context):
user_agent=user_agent,
request_path=request_path,
)
logger.debug(f"Audit log: Deleted {entity_type}#{entity_id} by user#{user_id}")
except Exception as log_error:
# Log the error but don't break the main flow
logger.error(f"Failed to log audit entry for deletion of {entity_type}#{entity_id}: {log_error}", exc_info=True)
# Stash new (creates) for after_flush when instance.id is available
info = getattr(session, "info", None)
if info is not None:
info["_audit_pending_creates"] = [o for o in session.new if should_track_model(o)]
except Exception as e:
# Don't let audit logging break the main flow
logger.error(f"Error in audit logging: {e}", exc_info=True)
logger.error(f"Error in audit logging (before_flush): {e}", exc_info=True)
def receive_after_flush(session, flush_context):
"""Log creates from stashed new objects (now with ids) and flush audit rows."""
try:
if flush_context and getattr(flush_context, "nested", False):
return
table_exists = check_audit_table_exists(force_check=False)
if not table_exists:
return
user_id = get_current_user_id()
ip_address, user_agent, request_path = get_request_info()
info = getattr(session, "info", None)
pending = info.pop("_audit_pending_creates", []) if info is not None else []
for instance in pending:
entity_type = get_entity_type(instance)
entity_id = getattr(instance, "id", None)
if entity_id is None:
continue
entity_name = get_entity_name(instance)
AuditLog = get_audit_log_model()
AuditLog.log_change(
user_id=user_id,
action="created",
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
change_description=f"Created {entity_type.lower()} '{entity_name}'",
ip_address=ip_address,
user_agent=user_agent,
request_path=request_path,
)
if pending:
session.flush()
except Exception as e:
logger.error(f"Error in audit logging (after_flush): {e}", exc_info=True)
def check_audit_table_exists(force_check=False):
+1 -1
View File
@@ -84,7 +84,7 @@ def check_audit_setup():
time.sleep(0.1) # Small delay to ensure commit completed
recent_logs = AuditLog.query.filter_by(
entity_type="time_entry",
entity_type="TimeEntry",
entity_id=test_entry.id,
action="updated"
).order_by(AuditLog.created_at.desc()).limit(1).all()
+18 -30
View File
@@ -74,54 +74,42 @@ class TestAuditLoggingUtility:
class TestAuditLoggingIntegration:
"""Integration tests for audit logging"""
def test_audit_logging_on_create(self, app, test_user):
def test_audit_logging_on_create(self, app, test_user, test_client):
"""Test that audit logs are created when entities are created"""
with app.app_context():
# Create a project
project = Project(name="Test Project", client_id=1) # Assuming test_client exists
project = Project(name="Test Project", client_id=test_client.id)
db.session.add(project)
db.session.flush() # Flush to trigger audit logging
db.session.flush()
# Check if audit log was created
audit_logs = AuditLog.query.filter_by(entity_type="Project", entity_id=project.id, action="created").all()
# Note: Audit logging happens on flush, so we should have at least one log
# The exact behavior depends on the event listener implementation
assert len(audit_logs) >= 0 # May be 0 if entity_id is None before commit
assert len(audit_logs) >= 1, "At least one audit log should be created for entity create"
assert audit_logs[0].action == "created"
assert audit_logs[0].entity_type == "Project"
def test_audit_logging_on_update(self, app, test_user, test_project):
"""Test that audit logs are created when entities are updated"""
with app.app_context():
original_name = test_project.name
project_id = test_project.id
merged = db.session.merge(test_project)
merged.name = "Updated Project Name"
db.session.flush()
# Update the project
test_project.name = "Updated Project Name"
# Ensure instance is attached to the current session
test_project = db.session.merge(test_project)
db.session.flush() # Flush to trigger audit logging
# Check if audit log was created
audit_logs = AuditLog.query.filter_by(
entity_type="Project", entity_id=test_project.id, action="updated"
entity_type="Project", entity_id=project_id, action="updated"
).all()
# Note: The exact behavior depends on the event listener implementation
# This test verifies the mechanism works, even if no logs are created
# (which might happen if the entity_id is not yet available)
assert isinstance(audit_logs, list)
assert len(audit_logs) >= 1, "At least one audit log should be created for entity update"
assert audit_logs[0].action == "updated"
assert audit_logs[0].entity_type == "Project"
def test_audit_logging_on_delete(self, app, test_user, test_project):
"""Test that audit logs are created when entities are deleted"""
with app.app_context():
project_id = test_project.id
# Delete the project
merged = db.session.merge(test_project)
db.session.delete(merged)
db.session.flush() # Flush to trigger audit logging
db.session.flush()
# Check if audit log was created
audit_logs = AuditLog.query.filter_by(entity_type="Project", entity_id=project_id, action="deleted").all()
# Note: The exact behavior depends on the event listener implementation
assert isinstance(audit_logs, list)
assert len(audit_logs) >= 1, "At least one audit log should be created for entity delete"
assert audit_logs[0].action == "deleted"
assert audit_logs[0].entity_type == "Project"