Files
WYGIWYH/app/apps/rules/tasks.py

775 lines
32 KiB
Python

import decimal
import logging
import traceback
from copy import deepcopy
from datetime import datetime, date
from decimal import Decimal
from itertools import chain
from pprint import pformat
from random import randint, random
from typing import Literal
from cachalot.api import cachalot_disabled
from dateutil.relativedelta import relativedelta
from django.contrib.auth import get_user_model
from django.forms import model_to_dict
from procrastinate.contrib.django import app
from simpleeval import EvalWithCompoundTypes
from apps.accounts.models import Account
from apps.common.middleware.thread_local import write_current_user, delete_current_user
from apps.rules.models import (
TransactionRule,
TransactionRuleAction,
)
from apps.transactions.models import (
Transaction,
TransactionCategory,
TransactionTag,
TransactionEntity,
)
from apps.rules.utils import transactions
logger = logging.getLogger(__name__)
class DryRunResults:
def __init__(self, dry_run: bool):
self.results = []
self.dry_run = dry_run
def header(self, header: str, action):
if not self.dry_run:
return
result = {"type": "header", "header_type": header, "action": action}
self.results.append(result)
def triggering_transaction(self, instance):
if not self.dry_run:
return
if isinstance(instance, Transaction):
instance = instance.deepcopy()
elif isinstance(instance, dict):
instance = deepcopy(instance)
result = {
"type": "triggering_transaction",
"transaction": instance,
}
self.results.append(result)
def edit_transaction(
self, instance, action, old_value, new_value, field, tags, entities
):
if not self.dry_run:
return
if isinstance(instance, Transaction):
instance = instance.deepcopy()
elif isinstance(instance, dict):
instance = deepcopy(instance)
result = {
"type": "edit_transaction",
"transaction": instance,
"action": action,
"old_value": old_value,
"new_value": new_value,
"field": field,
"tags": tags,
"entities": entities,
}
self.results.append(result)
def update_or_create_transaction(
self,
updated: bool,
action,
query,
tags,
entities,
start_instance=None,
end_instance=None,
):
if not self.dry_run:
return
if isinstance(end_instance, Transaction):
end_instance = end_instance.deepcopy()
elif isinstance(end_instance, dict):
end_instance = deepcopy(end_instance)
result = {
"type": "update_or_create_transaction",
"start_transaction": start_instance,
"end_transaction": end_instance,
"updated": updated,
"action": action,
"query": query,
"tags": tags,
"entities": entities,
}
self.results.append(result)
def error(self, error, level: Literal["error", "warning", "info"] = "error"):
if not self.dry_run:
return
result = {
"type": "error",
"error": error,
"traceback": traceback.format_exc(),
"level": level,
}
self.results.append(result)
@app.task(name="check_for_transaction_rules")
def check_for_transaction_rules(
instance_id=None,
transaction_data=None,
old_data=None,
user_id=None,
signal=None,
is_hard_deleted=False,
dry_run=False,
rule_id=None,
):
def _log(message: str, level="info"):
if dry_run:
if logs is not None:
logs.append(message)
if level == "error":
logs.append(traceback.format_exc())
else:
if level == "info":
logger.info(message)
elif level == "error":
logger.error(message, exc_info=True)
def _clear_names(prefix: str):
for k in list(simple.names.keys()):
if k.startswith(prefix):
del simple.names[k]
def _get_names(transaction: Transaction | dict, prefix: str = ""):
if isinstance(transaction, Transaction):
return {
"is_on_create": True if signal == "transaction_created" else False,
"is_on_delete": True if signal == "transaction_deleted" else False,
"is_on_update": True if signal == "transaction_updated" else False,
f"{prefix}id": transaction.id,
f"{prefix}account_name": (
transaction.account.name if transaction.id else None
),
f"{prefix}account_id": (
transaction.account.id if transaction.id else None
),
f"{prefix}account_group_name": (
transaction.account.group.name
if transaction.id and transaction.account.group
else None
),
f"{prefix}account_group_id": (
transaction.account.group.id
if transaction.id and transaction.account.group
else None
),
f"{prefix}is_asset_account": (
transaction.account.is_asset if transaction.id else None
),
f"{prefix}is_archived_account": (
transaction.account.is_archived if transaction.id else None
),
f"{prefix}category_name": (
transaction.category.name if transaction.category else None
),
f"{prefix}category_id": (
transaction.category.id if transaction.category else None
),
f"{prefix}tag_names": (
[x.name for x in transaction.tags.all()] if transaction.id else []
),
f"{prefix}tag_ids": (
[x.id for x in transaction.tags.all()] if transaction.id else []
),
f"{prefix}entities_names": (
[x.name for x in transaction.entities.all()]
if transaction.id
else []
),
f"{prefix}entities_ids": (
[x.id for x in transaction.entities.all()] if transaction.id else []
),
f"{prefix}is_expense": transaction.type == Transaction.Type.EXPENSE,
f"{prefix}is_income": transaction.type == Transaction.Type.INCOME,
f"{prefix}is_paid": transaction.is_paid,
f"{prefix}description": transaction.description,
f"{prefix}amount": transaction.amount or 0,
f"{prefix}notes": transaction.notes,
f"{prefix}date": transaction.date,
f"{prefix}reference_date": transaction.reference_date,
f"{prefix}internal_note": transaction.internal_note,
f"{prefix}internal_id": transaction.internal_id,
f"{prefix}is_deleted": transaction.deleted,
f"{prefix}is_muted": transaction.mute,
f"{prefix}is_recurring": transaction.recurring_transaction is not None,
f"{prefix}is_installment": transaction.installment_plan is not None,
f"{prefix}installment_number": (
transaction.installment_id if transaction.installment_plan else None
),
f"{prefix}installment_total": (
transaction.installment_plan.number_of_installments
if transaction.installment_plan
else None
),
}
else:
return {
"is_on_create": True if signal == "transaction_created" else False,
"is_on_delete": True if signal == "transaction_deleted" else False,
"is_on_update": True if signal == "transaction_updated" else False,
f"{prefix}id": transaction.get("id"),
f"{prefix}account_name": transaction.get("account", (None, None))[1],
f"{prefix}account_id": transaction.get("account", (None, None))[0],
f"{prefix}account_group_name": transaction.get(
"account_group", (None, None)
)[1],
f"{prefix}account_group_id": transaction.get(
"account_group", (None, None)
)[0],
f"{prefix}is_asset_account": transaction.get("is_asset"),
f"{prefix}is_archived_account": transaction.get("is_archived"),
f"{prefix}category_name": transaction.get("category", (None, None))[1],
f"{prefix}category_id": transaction.get("category", (None, None))[0],
f"{prefix}tag_names": [x[1] for x in transaction.get("tags", [])],
f"{prefix}tag_ids": [x[0] for x in transaction.get("tags", [])],
f"{prefix}entities_names": [
x[1] for x in transaction.get("entities", [])
],
f"{prefix}entities_ids": [
x[0] for x in transaction.get("entities", [])
],
f"{prefix}is_expense": transaction.get("type")
== Transaction.Type.EXPENSE,
f"{prefix}is_income": transaction.get("type")
== Transaction.Type.INCOME,
f"{prefix}is_paid": transaction.get("is_paid"),
f"{prefix}description": transaction.get("description", ""),
f"{prefix}amount": Decimal(transaction.get("amount")),
f"{prefix}notes": transaction.get("notes", ""),
f"{prefix}date": datetime.fromisoformat(transaction.get("date")),
f"{prefix}reference_date": datetime.fromisoformat(
transaction.get("reference_date")
),
f"{prefix}internal_note": transaction.get("internal_note", ""),
f"{prefix}internal_id": transaction.get("internal_id", ""),
f"{prefix}is_deleted": transaction.get("deleted", True),
f"{prefix}is_muted": transaction.get("mute", False),
f"{prefix}is_recurring": transaction.get(
"recurring_transaction", False
),
f"{prefix}is_installment": transaction.get("installment", False),
f"{prefix}installment_number": transaction.get("installment_id"),
f"{prefix}installment_total": transaction.get("installment_total"),
}
def _process_update_or_create_transaction_action(processed_action):
"""Helper to process a single linked transaction action"""
dry_run_results.header("update_or_create_transaction", action=processed_action)
# Build search query using the helper method
search_query = processed_action.build_search_query(simple)
_log(f"Searching transactions using: {search_query}")
starting_instance = None
# Find latest matching transaction or create new
if search_query:
searched_transactions = Transaction.objects.filter(search_query).order_by(
"-date", "-id"
)
if searched_transactions.exists():
transaction = searched_transactions.first()
existing = True
if dry_run:
starting_instance = transaction.deepcopy()
_log("Found at least one matching transaction, using latest:")
_log("{}".format(pformat(model_to_dict(transaction))))
else:
transaction = Transaction()
existing = False
_log(
"No matching transaction found, creating a new transaction",
)
else:
transaction = Transaction()
existing = False
_log(
"No matching transaction found, creating a new transaction",
)
simple.names.update(_get_names(transaction, prefix="my_"))
if processed_action.filter:
value = simple.eval(processed_action.filter)
if not value:
dry_run_results.error(
error="Filter did not match. Execution of this action has stopped.",
)
_log("Filter did not match. Execution of this action has stopped.")
return # Short-circuit execution if filter evaluates to false
# Set fields if provided
if processed_action.set_account:
value = simple.eval(processed_action.set_account)
if isinstance(value, int):
transaction.account = Account.objects.get(id=value)
else:
transaction.account = Account.objects.get(name=value)
if processed_action.set_type:
transaction.type = simple.eval(processed_action.set_type)
if processed_action.set_is_paid:
transaction.is_paid = simple.eval(processed_action.set_is_paid)
if processed_action.set_mute:
transaction.is_paid = simple.eval(processed_action.set_mute)
if processed_action.set_date:
transaction.date = simple.eval(processed_action.set_date)
if processed_action.set_reference_date:
transaction.reference_date = simple.eval(
processed_action.set_reference_date
)
if processed_action.set_amount:
transaction.amount = simple.eval(processed_action.set_amount)
if processed_action.set_description:
transaction.description = simple.eval(processed_action.set_description)
if processed_action.set_internal_note:
transaction.internal_note = simple.eval(processed_action.set_internal_note)
if processed_action.set_internal_id:
transaction.internal_id = simple.eval(processed_action.set_internal_id)
if processed_action.set_notes:
transaction.notes = simple.eval(processed_action.set_notes)
if processed_action.set_category:
value = simple.eval(processed_action.set_category)
if isinstance(value, int):
transaction.category = TransactionCategory.objects.get(id=value)
else:
transaction.category = TransactionCategory.objects.get(name=value)
if not transaction.id:
_log("Transaction will be created as:")
else:
_log("Trasanction will be updated as:")
_log(
"{}".format(
pformat(model_to_dict(transaction, exclude=["tags", "entities"])),
)
)
transaction.save()
# Handle M2M fields after save
tags = []
if processed_action.set_tags:
tags = simple.eval(processed_action.set_tags)
_log(f" And tags will be set as: {tags}")
transaction.tags.clear()
if isinstance(tags, (list, tuple)):
for tag in tags:
if isinstance(tag, int):
transaction.tags.add(TransactionTag.objects.get(id=tag))
else:
transaction.tags.add(TransactionTag.objects.get(name=tag))
elif isinstance(tags, (int, str)):
if isinstance(tags, int):
transaction.tags.add(TransactionTag.objects.get(id=tags))
else:
transaction.tags.add(TransactionTag.objects.get(name=tags))
entities = []
if processed_action.set_entities:
entities = simple.eval(processed_action.set_entities)
_log(f" And entities will be set as: {entities}")
transaction.entities.clear()
if isinstance(entities, (list, tuple)):
for entity in entities:
if isinstance(entity, int):
transaction.entities.add(
TransactionEntity.objects.get(id=entity)
)
else:
transaction.entities.add(
TransactionEntity.objects.get(name=entity)
)
elif isinstance(entities, (int, str)):
if isinstance(entities, int):
transaction.entities.add(TransactionEntity.objects.get(id=entities))
else:
transaction.entities.add(
TransactionEntity.objects.get(name=entities)
)
dry_run_results.update_or_create_transaction(
start_instance=starting_instance,
end_instance=transaction,
updated=existing,
action=processed_action,
query=search_query,
entities=entities,
tags=tags,
)
# transaction.full_clean()
def _process_edit_transaction_action(transaction, processed_action) -> Transaction:
dry_run_results.header("edit_transaction", action=processed_action)
field = processed_action.field
original_value = getattr(transaction, field)
new_value = simple.eval(processed_action.value)
tags = []
entities = []
_log(
f"Changing field '{field}' from '{original_value}' to '{new_value}'",
)
if field == TransactionRuleAction.Field.account:
if isinstance(new_value, int):
account = Account.objects.get(id=new_value)
transaction.account = account
elif isinstance(new_value, str):
account = Account.objects.filter(name=new_value).first()
transaction.account = account
elif field == TransactionRuleAction.Field.category:
if isinstance(new_value, int):
category = TransactionCategory.objects.get(id=new_value)
transaction.category = category
elif isinstance(new_value, str):
category = TransactionCategory.objects.get(name=new_value)
transaction.category = category
elif field == TransactionRuleAction.Field.tags:
transaction.tags.clear()
if isinstance(new_value, list):
for tag_value in new_value:
if isinstance(tag_value, int):
tag = TransactionTag.objects.get(id=tag_value)
transaction.tags.add(tag)
tags.append(tag)
elif isinstance(tag_value, str):
tag = TransactionTag.objects.get(name=tag_value)
transaction.tags.add(tag)
tags.append(tag)
elif isinstance(new_value, (int, str)):
if isinstance(new_value, int):
tag = TransactionTag.objects.get(id=new_value)
else:
tag = TransactionTag.objects.get(name=new_value)
transaction.tags.add(tag)
tags.append(tag)
elif field == TransactionRuleAction.Field.entities:
transaction.entities.clear()
if isinstance(new_value, list):
for entity_value in new_value:
if isinstance(entity_value, int):
entity = TransactionEntity.objects.get(id=entity_value)
transaction.entities.add(entity)
entities.append(entity)
elif isinstance(entity_value, str):
entity = TransactionEntity.objects.get(name=entity_value)
transaction.entities.add(entity)
entities.append(entity)
elif isinstance(new_value, (int, str)):
if isinstance(new_value, int):
entity = TransactionEntity.objects.get(id=new_value)
else:
entity = TransactionEntity.objects.get(name=new_value)
transaction.entities.add(entity)
entities.append(entity)
else:
setattr(
transaction,
field,
new_value,
)
dry_run_results.edit_transaction(
instance=transaction,
action=processed_action,
old_value=original_value,
new_value=new_value,
field=field,
tags=tags,
entities=entities,
)
transaction.full_clean()
return transaction
user = get_user_model().objects.get(id=user_id)
if not dry_run:
write_current_user(user)
logs = [] if dry_run else None
dry_run_results = DryRunResults(dry_run=dry_run)
if dry_run and not rule_id:
raise Exception("Cannot dry run without a rule id")
try:
with cachalot_disabled():
# For deleted transactions
if signal == "transaction_deleted" and transaction_data:
# Create a transaction-like object from the serialized data
if is_hard_deleted:
instance = transaction_data
else:
instance = Transaction.deleted_objects.get(id=instance_id)
else:
# Regular transaction processing for creates and updates
instance = Transaction.objects.get(id=instance_id)
dry_run_results.triggering_transaction(instance)
functions = {
"relativedelta": relativedelta,
"str": str,
"int": int,
"float": float,
"abs": abs,
"randint": randint,
"random": random,
"decimal": decimal.Decimal,
"datetime": datetime,
"date": date,
"transactions": transactions.TransactionsGetter,
}
_log("Starting rule execution...")
_log("Available functions: {}".format(functions.keys()))
names = _get_names(instance)
simple = EvalWithCompoundTypes(names=names, functions=functions)
if signal == "transaction_updated" and old_data:
simple.names.update(_get_names(old_data, "old_"))
# Select rules based on the signal type
if dry_run and rule_id:
rules = TransactionRule.objects.filter(id=rule_id)
elif signal == "transaction_created":
rules = TransactionRule.objects.filter(
active=True, on_create=True
).order_by("order", "id")
elif signal == "transaction_updated":
rules = TransactionRule.objects.filter(
active=True, on_update=True
).order_by("order", "id")
elif signal == "transaction_deleted":
rules = TransactionRule.objects.filter(
active=True, on_delete=True
).order_by("order", "id")
else:
rules = TransactionRule.objects.filter(active=True).order_by(
"order", "id"
)
_log("Testing {} rule(s)...".format(len(rules)))
# Process the rules as before
for rule in rules:
_log("Testing rule: {}".format(rule.name))
if simple.eval(rule.trigger):
_log("Initial trigger matched!")
# For deleted transactions, we want to limit what actions can be performed
if signal == "transaction_deleted":
_log(
"Event is of type 'delete'. Only processing Update or Create actions..."
)
# Process only create/update actions, not edit actions
for action in rule.update_or_create_transaction_actions.all():
try:
_log(
"Processing action with id {} and order {}...".format(
action.id, action.order
)
)
_process_update_or_create_transaction_action(
processed_action=action,
)
except Exception as e:
dry_run_results.error(
"Error raised: '{}'. Check the logs tab for more "
"information".format(e)
)
_log(
f"Error processing update or create transaction action {action.id} on deletion",
level="error",
)
else:
# Normal processing for non-deleted transactions
edit_actions = list(rule.transaction_actions.all())
update_or_create_actions = list(
rule.update_or_create_transaction_actions.all()
)
# Check if any action has a non-zero order
has_custom_order = any(
a.order > 0 for a in edit_actions
) or any(a.order > 0 for a in update_or_create_actions)
if has_custom_order:
_log(
"One or more actions have a custom order, actions will be processed ordered by "
"order and creation date..."
)
# Combine and sort actions by order
all_actions = sorted(
chain(edit_actions, update_or_create_actions),
key=lambda a: (a.order, a.id),
)
for action in all_actions:
try:
if isinstance(action, TransactionRuleAction):
_log(
"Processing 'edit_transaction' action with id {} and order {}...".format(
action.id, action.order
)
)
instance = _process_edit_transaction_action(
transaction=instance,
processed_action=action,
)
if rule.sequenced:
# Update names for next actions
simple.names.update(_get_names(instance))
else:
_log(
"Processing 'update_or_create_transaction' action with id {} and order {}...".format(
action.id, action.order
)
)
_process_update_or_create_transaction_action(
processed_action=action,
)
_clear_names("my_")
except Exception as e:
dry_run_results.error(
"Error raised: '{}'. Check the logs tab for more "
"information".format(e)
)
_log(
f"Error processing action {action.id}",
level="error",
)
# Save at the end
if signal != "transaction_deleted":
instance.save()
else:
_log(
"No actions have a custom order, actions will be processed ordered by creation "
"date, with Edit actions running first, then Update or Create actions..."
)
# Original behavior
for action in edit_actions:
_log(
"Processing 'edit_transaction' action with id {}...".format(
action.id
)
)
try:
instance = _process_edit_transaction_action(
transaction=instance,
processed_action=action,
)
if rule.sequenced:
# Update names for next actions
simple.names.update(_get_names(instance))
except Exception as e:
dry_run_results.error(
"Error raised: '{}'. Check the logs tab for more "
"information".format(e)
)
_log(
f"Error processing edit transaction action {action.id}",
level="error",
)
if rule.sequenced:
# Update names for next actions
simple.names.update(_get_names(instance))
if signal != "transaction_deleted":
instance.save()
for action in update_or_create_actions:
_log(
"Processing 'update_or_create_transaction' action with id {}...".format(
action.id
)
)
try:
_process_update_or_create_transaction_action(
processed_action=action,
)
_clear_names("my_")
except Exception as e:
dry_run_results.error(
"Error raised: '{}'. Check the logs tab for more "
"information".format(e)
)
_log(
f"Error processing update or create transaction action {action.id}",
level="error",
)
else:
dry_run_results.error(
error="Initial trigger didn't match, this rule will be skipped",
)
_log("Initial trigger didn't match, this rule will be skipped")
except Exception as e:
_log(
"** Error while executing 'check_for_transaction_rules' task",
level="error",
)
if not dry_run:
delete_current_user()
raise e
if not dry_run:
delete_current_user()
return logs, dry_run_results.results
return None