Files
bugsink/snappea/utils.py
2025-01-29 09:04:19 +01:00

42 lines
1.2 KiB
Python

from contextlib import contextmanager
import importlib
from .settings import get_settings
def add_task_kwargs():
"""Hook for extending Task kwargs"""
if not hasattr(add_task_kwargs, "func"):
# the configured function is cached on add_task_kwargs itself
hook = get_settings().HOOK_ADD_TASK_KWARGS
module_name, function_name = hook.rsplit('.', 1)
module = importlib.import_module(module_name)
add_task_kwargs.func = getattr(module, function_name)
return add_task_kwargs.func()
def run_task_context(task_args, task_kwargs):
"""Hook for running a task in a context; the task's args and kwargs are passed for optional pre-processing"""
if not hasattr(add_task_kwargs, "func"):
# the configured function is cached on run_task_context itself
hook = get_settings().HOOK_RUN_TASK_CONTEXT
module_name, function_name = hook.rsplit('.', 1)
module = importlib.import_module(module_name)
run_task_context.func = getattr(module, function_name)
return run_task_context.func(task_args, task_kwargs)
def dont_add_anything():
# no-op impl of add_task_kwargs
return {}
@contextmanager
def no_context(task_args, task_kwargs):
# no-op impl of run_task_context
yield