From c8d5144ed4406ba5fcb734a2ea01f81692df0cbe Mon Sep 17 00:00:00 2001 From: matt Date: Mon, 11 Aug 2025 23:10:44 -0400 Subject: [PATCH] [Python] Feat: Dependency Injection, Improved error handling (#2067) * feat: first pass at dependency injection * feat: add DI example + tests * feat: finish up tests * feat: docs * chore: gen * chore: lint * chore: changelog + ver * fix: split up paragraphs * refactor: improve impl * chore: gen * feat: inject input + ctx into deps * chore: gen * [Python] Feat: More use of `logger.exception` (#2069) * feat: add more instances of `logger.exception` * chore: ver * chore: changelog * fix: one more error log * chore: gen * chore: gen * chore: lint * fix: improve shutdown * chore: changelog * unwind: exit handler * feat: task run error * feat: improve error serde with more context * chore: changelog * fix: changelog * chore: gen * fix: rm celpy + lark dep, casing issues * chore: changelog * fix: respect log levels over the API * fix: changelog * refactor: rename log forwarder * fix: circular import --- examples/go/bulk-operations/main.go | 4 + examples/go/streaming/consumer/main.go | 1 + examples/go/streaming/server/main.go | 1 + examples/go/streaming/shared/task.go | 1 + .../test_dependency_injection.py | 49 ++++++ .../python/dependency_injection/worker.py | 159 +++++++++++++++++ examples/python/durable/test_durable.py | 2 +- examples/python/on_failure/worker.py | 19 +- examples/python/quickstart/poetry.lock | 76 ++++---- examples/python/worker.py | 12 ++ .../src/next/lib/docs/generated/home/_meta.ts | 8 + .../python/dependency_injection/index.ts | 5 + .../test_dependency_injection.ts | 12 ++ .../python/dependency_injection/worker.ts | 21 +++ .../snips/python/durable/test_durable.ts | 2 +- .../lib/docs/generated/snips/python/index.ts | 2 + .../snips/python/on_failure/worker.ts | 10 +- .../lib/docs/generated/snips/python/worker.ts | 2 +- .../python/dependency_injection/index.ts | 5 + .../test_dependency_injection.ts | 11 ++ .../python/dependency_injection/worker.ts | 20 +++ .../snips/python/durable/test_durable.ts | 2 +- .../docs/lib/generated/snips/python/index.ts | 2 + .../snips/python/on_failure/worker.ts | 10 +- frontend/docs/pages/home/_meta.js | 3 +- .../docs/pages/home/dependency-injection.mdx | 45 +++++ frontend/docs/pages/sdks/python/runnables.mdx | 72 ++++---- sdks/python/CHANGELOG.md | 23 +++ .../test_dependency_injection.py | 49 ++++++ .../examples/dependency_injection/worker.py | 162 ++++++++++++++++++ sdks/python/examples/durable/test_durable.py | 2 +- sdks/python/examples/on_failure/worker.py | 19 +- sdks/python/examples/worker.py | 12 ++ sdks/python/hatchet_sdk/__init__.py | 3 +- sdks/python/hatchet_sdk/clients/events.py | 7 +- sdks/python/hatchet_sdk/context/context.py | 35 +++- sdks/python/hatchet_sdk/exceptions.py | 75 +++++++- sdks/python/hatchet_sdk/hatchet.py | 40 +++-- sdks/python/hatchet_sdk/rate_limit.py | 22 +-- sdks/python/hatchet_sdk/runnables/task.py | 128 ++++++++++++-- sdks/python/hatchet_sdk/runnables/workflow.py | 31 +++- sdks/python/hatchet_sdk/utils/typing.py | 27 +++ .../hatchet_sdk/worker/runner/runner.py | 46 +++-- .../worker/runner/utils/capture_logs.py | 35 ++-- sdks/python/poetry.lock | 77 +-------- sdks/python/pyproject.toml | 3 +- .../test_case.py | 2 +- 47 files changed, 1082 insertions(+), 272 deletions(-) create mode 100644 examples/python/dependency_injection/test_dependency_injection.py create mode 100644 examples/python/dependency_injection/worker.py create mode 100644 frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/index.ts create mode 100644 frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/test_dependency_injection.ts create mode 100644 frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/worker.ts create mode 100644 frontend/docs/lib/generated/snips/python/dependency_injection/index.ts create mode 100644 frontend/docs/lib/generated/snips/python/dependency_injection/test_dependency_injection.ts create mode 100644 frontend/docs/lib/generated/snips/python/dependency_injection/worker.ts create mode 100644 frontend/docs/pages/home/dependency-injection.mdx create mode 100644 sdks/python/examples/dependency_injection/test_dependency_injection.py create mode 100644 sdks/python/examples/dependency_injection/worker.py diff --git a/examples/go/bulk-operations/main.go b/examples/go/bulk-operations/main.go index ef17f4b35..38621a5da 100644 --- a/examples/go/bulk-operations/main.go +++ b/examples/go/bulk-operations/main.go @@ -35,6 +35,7 @@ func main() { selectedWorkflow := (*workflows.Rows)[0] selectedWorkflowUUID := uuid.MustParse(selectedWorkflow.Metadata.Id) + // > List runs workflowRuns, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{ WorkflowIds: &[]types.UUID{selectedWorkflowUUID}, @@ -49,6 +50,7 @@ func main() { runIds = append(runIds, uuid.MustParse(run.Metadata.Id)) } + // > Cancel by run ids _, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{ ExternalIds: &runIds, @@ -57,6 +59,7 @@ func main() { log.Fatalf("failed to cancel runs by ids: %v", err) } + // > Cancel by filters tNow := time.Now().UTC() @@ -73,5 +76,6 @@ func main() { log.Fatalf("failed to cancel runs by filters: %v", err) } + fmt.Println("cancelled all runs for workflow", selectedWorkflow.Name) } diff --git a/examples/go/streaming/consumer/main.go b/examples/go/streaming/consumer/main.go index 1fcb2887f..8ee008885 100644 --- a/examples/go/streaming/consumer/main.go +++ b/examples/go/streaming/consumer/main.go @@ -37,3 +37,4 @@ func main() { fmt.Println("\nStreaming completed!") } + diff --git a/examples/go/streaming/server/main.go b/examples/go/streaming/server/main.go index d82959c5a..c558216db 100644 --- a/examples/go/streaming/server/main.go +++ b/examples/go/streaming/server/main.go @@ -58,3 +58,4 @@ func main() { log.Println("Failed to start server:", err) } } + diff --git a/examples/go/streaming/shared/task.go b/examples/go/streaming/shared/task.go index b080638d2..552c2a180 100644 --- a/examples/go/streaming/shared/task.go +++ b/examples/go/streaming/shared/task.go @@ -50,6 +50,7 @@ func StreamTask(ctx worker.HatchetContext, input StreamTaskInput) (*StreamTaskOu }, nil } + func StreamingWorkflow(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[StreamTaskInput, StreamTaskOutput] { return factory.NewTask( create.StandaloneTask{ diff --git a/examples/python/dependency_injection/test_dependency_injection.py b/examples/python/dependency_injection/test_dependency_injection.py new file mode 100644 index 000000000..8c991d6f7 --- /dev/null +++ b/examples/python/dependency_injection/test_dependency_injection.py @@ -0,0 +1,49 @@ +import pytest + +from examples.dependency_injection.worker import ( + ASYNC_DEPENDENCY_VALUE, + SYNC_DEPENDENCY_VALUE, + Output, + async_dep, + async_task_with_dependencies, + di_workflow, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + sync_dep, + sync_task_with_dependencies, +) +from hatchet_sdk import EmptyModel +from hatchet_sdk.runnables.workflow import Standalone + + +@pytest.mark.parametrize( + "task", + [ + async_task_with_dependencies, + sync_task_with_dependencies, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + ], +) +@pytest.mark.asyncio(loop_scope="session") +async def test_di_standalones( + task: Standalone[EmptyModel, Output], +) -> None: + result = await task.aio_run() + + assert isinstance(result, Output) + assert result.sync_dep == SYNC_DEPENDENCY_VALUE + assert result.async_dep == ASYNC_DEPENDENCY_VALUE + + +@pytest.mark.asyncio(loop_scope="session") +async def test_di_workflows() -> None: + result = await di_workflow.aio_run() + + assert len(result) == 4 + + for output in result.values(): + parsed = Output.model_validate(output) + + assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE + assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE diff --git a/examples/python/dependency_injection/worker.py b/examples/python/dependency_injection/worker.py new file mode 100644 index 000000000..ceaf8e3cd --- /dev/null +++ b/examples/python/dependency_injection/worker.py @@ -0,0 +1,159 @@ +# > Simple + +from typing import Annotated + +from pydantic import BaseModel + +from hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet + +hatchet = Hatchet(debug=False) + +SYNC_DEPENDENCY_VALUE = "sync_dependency_value" +ASYNC_DEPENDENCY_VALUE = "async_dependency_value" + + +# > Declare dependencies +async def async_dep(input: EmptyModel, ctx: Context) -> str: + return ASYNC_DEPENDENCY_VALUE + + +def sync_dep(input: EmptyModel, ctx: Context) -> str: + return SYNC_DEPENDENCY_VALUE + + + + +class Output(BaseModel): + sync_dep: str + async_dep: str + + +# > Inject dependencies +@hatchet.task() +async def async_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + + + +@hatchet.task() +def sync_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@hatchet.durable_task() +async def durable_async_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@hatchet.durable_task() +def durable_sync_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +di_workflow = hatchet.workflow( + name="dependency-injection-workflow", +) + + +@di_workflow.task() +async def wf_async_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@di_workflow.task() +def wf_sync_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@di_workflow.durable_task() +async def wf_durable_async_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@di_workflow.durable_task() +def wf_durable_sync_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +def main() -> None: + worker = hatchet.worker( + "dependency-injection-worker", + workflows=[ + async_task_with_dependencies, + sync_task_with_dependencies, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + di_workflow, + ], + ) + worker.start() + + + +if __name__ == "__main__": + main() diff --git a/examples/python/durable/test_durable.py b/examples/python/durable/test_durable.py index e4c5c182b..1287c3e78 100644 --- a/examples/python/durable/test_durable.py +++ b/examples/python/durable/test_durable.py @@ -71,4 +71,4 @@ async def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None: second_sleep_result = await first_sleep.aio_result() """We've already slept for a little bit by the time the task is cancelled""" - assert second_sleep_result["runtime"] < SLEEP_TIME + assert second_sleep_result["runtime"] <= SLEEP_TIME diff --git a/examples/python/on_failure/worker.py b/examples/python/on_failure/worker.py index ad386e4f1..d1b7edb8e 100644 --- a/examples/python/on_failure/worker.py +++ b/examples/python/on_failure/worker.py @@ -2,8 +2,9 @@ import json from datetime import timedelta from hatchet_sdk import Context, EmptyModel, Hatchet +from hatchet_sdk.exceptions import TaskRunError -hatchet = Hatchet(debug=True) +hatchet = Hatchet(debug=False) ERROR_TEXT = "step1 failed" @@ -49,14 +50,20 @@ def details_step1(input: EmptyModel, ctx: Context) -> None: # 👀 After the workflow fails, this special step will run @on_failure_wf_with_details.on_failure_task() -def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]: - error = ctx.fetch_task_run_error(details_step1) +def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]: + error = ctx.get_task_run_error(details_step1) + + if not error: + return {"status": "unexpected success"} # 👀 we can access the failure details here - print(json.dumps(error, indent=2)) + assert isinstance(error, TaskRunError) - if error and error.startswith(ERROR_TEXT): - return {"status": "success"} + if "step1 failed" in error.exc: + return { + "status": "success", + "failed_run_external_id": error.task_run_external_id, + } raise Exception("unexpected failure") diff --git a/examples/python/quickstart/poetry.lock b/examples/python/quickstart/poetry.lock index 7cbfb2201..0bb6a6dde 100644 --- a/examples/python/quickstart/poetry.lock +++ b/examples/python/quickstart/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -114,7 +114,7 @@ propcache = ">=0.2.0" yarl = ">=1.17.0,<2.0" [package.extras] -speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""] +speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] [[package]] name = "aiohttp-retry" @@ -199,12 +199,12 @@ files = [ ] [package.extras] -benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -cov = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -dev = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] +benchmark = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +cov = ["cloudpickle", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier"] -tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\""] +tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] [[package]] name = "cel-python" @@ -460,14 +460,14 @@ setuptools = "*" [[package]] name = "hatchet-sdk" -version = "1.16.5" +version = "1.0.0a1" description = "" optional = false python-versions = "<4.0,>=3.10" groups = ["main"] files = [ - {file = "hatchet_sdk-1.16.5-py3-none-any.whl", hash = "sha256:3a21edf4d5f1655facaea3dcfabb7b00dafc35a0a593ff088cbe21d1eb7b4db8"}, - {file = "hatchet_sdk-1.16.5.tar.gz", hash = "sha256:c00e85a3a9b1789b85679e528deddd12a6d70ba321e16de9ec07e3fd1d373fb8"}, + {file = "hatchet_sdk-1.0.0a1-py3-none-any.whl", hash = "sha256:bfc84358c8842cecd0d95b30645109733b7292dff0db1a776ca862785ee93d7f"}, + {file = "hatchet_sdk-1.0.0a1.tar.gz", hash = "sha256:f0272bbaac6faed75ff727826e9f7b1ac42ae597f9b590e14d392aada9c9692f"}, ] [package.dependencies] @@ -483,11 +483,13 @@ grpcio-tools = [ {version = ">=1.64.1,<1.68.dev0 || >=1.69.dev0", markers = "python_version < \"3.13\""}, {version = ">=1.69.0", markers = "python_version >= \"3.13\""}, ] -prometheus-client = ">=0.21.1" -protobuf = ">=5.29.5,<6.0.0" +nest-asyncio = ">=1.6.0,<2.0.0" +prometheus-client = ">=0.21.1,<0.22.0" +protobuf = ">=5.29.1,<6.0.0" pydantic = ">=2.6.3,<3.0.0" pydantic-settings = ">=2.7.1,<3.0.0" python-dateutil = ">=2.9.0.post0,<3.0.0" +pyyaml = ">=6.0.1,<7.0.0" tenacity = ">=8.4.1" urllib3 = ">=1.26.20" @@ -643,6 +645,18 @@ files = [ [package.dependencies] typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.11\""} +[[package]] +name = "nest-asyncio" +version = "1.6.0" +description = "Patch asyncio to allow nested event loops" +optional = false +python-versions = ">=3.5" +groups = ["main"] +files = [ + {file = "nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c"}, + {file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"}, +] + [[package]] name = "prometheus-client" version = "0.21.1" @@ -768,23 +782,23 @@ files = [ [[package]] name = "protobuf" -version = "5.29.5" +version = "5.29.4" description = "" optional = false python-versions = ">=3.8" groups = ["main"] files = [ - {file = "protobuf-5.29.5-cp310-abi3-win32.whl", hash = "sha256:3f1c6468a2cfd102ff4703976138844f78ebd1fb45f49011afc5139e9e283079"}, - {file = "protobuf-5.29.5-cp310-abi3-win_amd64.whl", hash = "sha256:3f76e3a3675b4a4d867b52e4a5f5b78a2ef9565549d4037e06cf7b0942b1d3fc"}, - {file = "protobuf-5.29.5-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:e38c5add5a311f2a6eb0340716ef9b039c1dfa428b28f25a7838ac329204a671"}, - {file = "protobuf-5.29.5-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:fa18533a299d7ab6c55a238bf8629311439995f2e7eca5caaff08663606e9015"}, - {file = "protobuf-5.29.5-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:63848923da3325e1bf7e9003d680ce6e14b07e55d0473253a690c3a8b8fd6e61"}, - {file = "protobuf-5.29.5-cp38-cp38-win32.whl", hash = "sha256:ef91363ad4faba7b25d844ef1ada59ff1604184c0bcd8b39b8a6bef15e1af238"}, - {file = "protobuf-5.29.5-cp38-cp38-win_amd64.whl", hash = "sha256:7318608d56b6402d2ea7704ff1e1e4597bee46d760e7e4dd42a3d45e24b87f2e"}, - {file = "protobuf-5.29.5-cp39-cp39-win32.whl", hash = "sha256:6f642dc9a61782fa72b90878af134c5afe1917c89a568cd3476d758d3c3a0736"}, - {file = "protobuf-5.29.5-cp39-cp39-win_amd64.whl", hash = "sha256:470f3af547ef17847a28e1f47200a1cbf0ba3ff57b7de50d22776607cd2ea353"}, - {file = "protobuf-5.29.5-py3-none-any.whl", hash = "sha256:6cf42630262c59b2d8de33954443d94b746c952b01434fc58a417fdbd2e84bd5"}, - {file = "protobuf-5.29.5.tar.gz", hash = "sha256:bc1463bafd4b0929216c35f437a8e28731a2b7fe3d98bb77a600efced5a15c84"}, + {file = "protobuf-5.29.4-cp310-abi3-win32.whl", hash = "sha256:13eb236f8eb9ec34e63fc8b1d6efd2777d062fa6aaa68268fb67cf77f6839ad7"}, + {file = "protobuf-5.29.4-cp310-abi3-win_amd64.whl", hash = "sha256:bcefcdf3976233f8a502d265eb65ea740c989bacc6c30a58290ed0e519eb4b8d"}, + {file = "protobuf-5.29.4-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:307ecba1d852ec237e9ba668e087326a67564ef83e45a0189a772ede9e854dd0"}, + {file = "protobuf-5.29.4-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:aec4962f9ea93c431d5714ed1be1c93f13e1a8618e70035ba2b0564d9e633f2e"}, + {file = "protobuf-5.29.4-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:d7d3f7d1d5a66ed4942d4fefb12ac4b14a29028b209d4bfb25c68ae172059922"}, + {file = "protobuf-5.29.4-cp38-cp38-win32.whl", hash = "sha256:1832f0515b62d12d8e6ffc078d7e9eb06969aa6dc13c13e1036e39d73bebc2de"}, + {file = "protobuf-5.29.4-cp38-cp38-win_amd64.whl", hash = "sha256:476cb7b14914c780605a8cf62e38c2a85f8caff2e28a6a0bad827ec7d6c85d68"}, + {file = "protobuf-5.29.4-cp39-cp39-win32.whl", hash = "sha256:fd32223020cb25a2cc100366f1dedc904e2d71d9322403224cdde5fdced0dabe"}, + {file = "protobuf-5.29.4-cp39-cp39-win_amd64.whl", hash = "sha256:678974e1e3a9b975b8bc2447fca458db5f93a2fb6b0c8db46b6675b5b5346812"}, + {file = "protobuf-5.29.4-py3-none-any.whl", hash = "sha256:3fde11b505e1597f71b875ef2fc52062b6a9740e5f7c8997ce878b6009145862"}, + {file = "protobuf-5.29.4.tar.gz", hash = "sha256:4f1dfcd7997b31ef8f53ec82781ff434a28bf71d9102ddde14d076adcfc78c99"}, ] [[package]] @@ -806,7 +820,7 @@ typing-extensions = ">=4.12.2" [package.extras] email = ["email-validator (>=2.0.0)"] -timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""] +timezone = ["tzdata"] [[package]] name = "pydantic-core" @@ -1048,13 +1062,13 @@ files = [ ] [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""] -core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.8.0)"] +core = ["importlib_metadata (>=6)", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] enabler = ["pytest-enabler (>=2.2)"] -test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] -type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.14.*)", "pytest-mypy"] [[package]] name = "six" @@ -1133,7 +1147,7 @@ files = [ ] [package.extras] -brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] @@ -1238,4 +1252,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "06949f41c097b940db1cb406e6ea99dec9c1bad09389ff5d08cd2d5b16f053a4" +content-hash = "74c12e499aa797ca5c8559af579f1212b0e4e3a77f068f9385db39d70ba304e0" diff --git a/examples/python/worker.py b/examples/python/worker.py index dd8c3bd96..7475123d4 100644 --- a/examples/python/worker.py +++ b/examples/python/worker.py @@ -15,6 +15,13 @@ from examples.concurrency_workflow_level.worker import ( from examples.conditions.worker import task_condition_workflow from examples.dag.worker import dag_workflow from examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf +from examples.dependency_injection.worker import ( + async_task_with_dependencies, + di_workflow, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + sync_task_with_dependencies, +) from examples.durable.worker import durable_workflow, wait_for_sleep_twice from examples.events.worker import event_workflow from examples.fanout.worker import child_wf, parent_wf @@ -61,6 +68,7 @@ def main() -> None: sync_fanout_child, non_retryable_workflow, concurrency_workflow_level_workflow, + di_workflow, lifespan_task, simple, simple_durable, @@ -70,6 +78,10 @@ def main() -> None: webhook, return_exceptions_task, wait_for_sleep_twice, + async_task_with_dependencies, + sync_task_with_dependencies, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, ], lifespan=lifespan, ) diff --git a/frontend/app/src/next/lib/docs/generated/home/_meta.ts b/frontend/app/src/next/lib/docs/generated/home/_meta.ts index 4aadfb16d..c00a1d854 100644 --- a/frontend/app/src/next/lib/docs/generated/home/_meta.ts +++ b/frontend/app/src/next/lib/docs/generated/home/_meta.ts @@ -68,6 +68,10 @@ const meta = { title: 'Bulk Run Many', href: '/home/bulk-run', }, + webhooks: { + title: 'Webhooks', + href: '/home/webhooks', + }, '--flow-control': { title: 'Flow Control', type: 'separator', @@ -248,6 +252,10 @@ const meta = { title: 'Lifespans', href: '/home/lifespans', }, + dependency_injection: { + title: 'Dependency Injection', + href: '/home/dependency-injection', + }, blog: { title: 'Blog', type: 'page', diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/index.ts b/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/index.ts new file mode 100644 index 000000000..53c740d97 --- /dev/null +++ b/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/index.ts @@ -0,0 +1,5 @@ +import test_dependency_injection from './test_dependency_injection'; +import worker from './worker'; + +export { test_dependency_injection }; +export { worker }; diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/test_dependency_injection.ts b/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/test_dependency_injection.ts new file mode 100644 index 000000000..08b6a8de2 --- /dev/null +++ b/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/test_dependency_injection.ts @@ -0,0 +1,12 @@ +import { Snippet } from '@/next/lib/docs/generated/snips/types'; + +const snippet: Snippet = { + language: 'python', + content: + 'import pytest\n\nfrom examples.dependency_injection.worker import (\n ASYNC_DEPENDENCY_VALUE,\n SYNC_DEPENDENCY_VALUE,\n Output,\n async_dep,\n async_task_with_dependencies,\n di_workflow,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n sync_dep,\n sync_task_with_dependencies,\n)\nfrom hatchet_sdk import EmptyModel\nfrom hatchet_sdk.runnables.workflow import Standalone\n\n\n@pytest.mark.parametrize(\n "task",\n [\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n ],\n)\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_di_standalones(\n task: Standalone[EmptyModel, Output],\n) -> None:\n result = await task.aio_run()\n\n assert isinstance(result, Output)\n assert result.sync_dep == SYNC_DEPENDENCY_VALUE\n assert result.async_dep == ASYNC_DEPENDENCY_VALUE\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_di_workflows() -> None:\n result = await di_workflow.aio_run()\n\n assert len(result) == 4\n\n for output in result.values():\n parsed = Output.model_validate(output)\n\n assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE\n assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE\n', + source: 'out/python/dependency_injection/test_dependency_injection.py', + blocks: {}, + highlights: {}, +}; + +export default snippet; diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/worker.ts b/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/worker.ts new file mode 100644 index 000000000..0ba97428d --- /dev/null +++ b/frontend/app/src/next/lib/docs/generated/snips/python/dependency_injection/worker.ts @@ -0,0 +1,21 @@ +import { Snippet } from '@/next/lib/docs/generated/snips/types'; + +const snippet: Snippet = { + language: 'python', + content: + '# > Simple\n\nfrom typing import Annotated\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\nSYNC_DEPENDENCY_VALUE = "sync_dependency_value"\nASYNC_DEPENDENCY_VALUE = "async_dependency_value"\n\n\n# > Declare dependencies\nasync def async_dep(input: EmptyModel, ctx: Context) -> str:\n return ASYNC_DEPENDENCY_VALUE\n\n\ndef sync_dep(input: EmptyModel, ctx: Context) -> str:\n return SYNC_DEPENDENCY_VALUE\n\n\n\n\nclass Output(BaseModel):\n sync_dep: str\n async_dep: str\n\n\n# > Inject dependencies\n@hatchet.task()\nasync def async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n\n\n@hatchet.task()\ndef sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\nasync def durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\ndef durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndi_workflow = hatchet.workflow(\n name="dependency-injection-workflow",\n)\n\n\n@di_workflow.task()\nasync def wf_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.task()\ndef wf_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\nasync def wf_durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\ndef wf_durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "dependency-injection-worker",\n workflows=[\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n di_workflow,\n ],\n )\n worker.start()\n\n\n\nif __name__ == "__main__":\n main()\n', + source: 'out/python/dependency_injection/worker.py', + blocks: { + declare_dependencies: { + start: 16, + stop: 23, + }, + inject_dependencies: { + start: 32, + stop: 44, + }, + }, + highlights: {}, +}; + +export default snippet; diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts b/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts index 8da1c1970..12bbaf42a 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {"test": "test"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == "ACTIVE"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker_durable")\n for w in active_workers\n )\n\n assert result["durable_task"]["status"] == "success"\n\n wait_group_1 = result["wait_for_or_group_1"]\n wait_group_2 = result["wait_for_or_group_2"]\n\n assert abs(wait_group_1["runtime"] - SLEEP_TIME) < 3\n\n assert wait_group_1["key"] == wait_group_2["key"]\n assert wait_group_1["key"] == "CREATE"\n assert "sleep" in wait_group_1["event_id"]\n assert "event" in wait_group_2["event_id"]\n\n wait_for_multi_sleep = result["wait_for_multi_sleep"]\n\n assert wait_for_multi_sleep["runtime"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n """We\'ve already slept for a little bit by the time the task is cancelled"""\n assert second_sleep_result["runtime"] < SLEEP_TIME\n', + 'import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {"test": "test"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == "ACTIVE"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker_durable")\n for w in active_workers\n )\n\n assert result["durable_task"]["status"] == "success"\n\n wait_group_1 = result["wait_for_or_group_1"]\n wait_group_2 = result["wait_for_or_group_2"]\n\n assert abs(wait_group_1["runtime"] - SLEEP_TIME) < 3\n\n assert wait_group_1["key"] == wait_group_2["key"]\n assert wait_group_1["key"] == "CREATE"\n assert "sleep" in wait_group_1["event_id"]\n assert "event" in wait_group_2["event_id"]\n\n wait_for_multi_sleep = result["wait_for_multi_sleep"]\n\n assert wait_for_multi_sleep["runtime"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n """We\'ve already slept for a little bit by the time the task is cancelled"""\n assert second_sleep_result["runtime"] <= SLEEP_TIME\n', source: 'out/python/durable/test_durable.py', blocks: {}, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/index.ts b/frontend/app/src/next/lib/docs/generated/snips/python/index.ts index fdc204ae7..ef700dd95 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/index.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/index.ts @@ -18,6 +18,7 @@ import * as cron from './cron'; import * as dag from './dag'; import * as dedupe from './dedupe'; import * as delayed from './delayed'; +import * as dependency_injection from './dependency_injection'; import * as durable from './durable'; import * as durable_event from './durable_event'; import * as durable_sleep from './durable_sleep'; @@ -67,6 +68,7 @@ export { cron }; export { dag }; export { dedupe }; export { delayed }; +export { dependency_injection }; export { durable }; export { durable_event }; export { durable_sleep }; diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/on_failure/worker.ts b/frontend/app/src/next/lib/docs/generated/snips/python/on_failure/worker.ts index 3e96977f7..4e345a921 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/on_failure/worker.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/on_failure/worker.ts @@ -3,16 +3,16 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=True)\n\nERROR_TEXT = "step1 failed"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name="OnFailureWorkflow")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {"status": "success"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name="OnFailureWorkflowWithDetails")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n error = ctx.fetch_task_run_error(details_step1)\n\n # 👀 we can access the failure details here\n print(json.dumps(error, indent=2))\n\n if error and error.startswith(ERROR_TEXT):\n return {"status": "success"}\n\n raise Exception("unexpected failure")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "on-failure-worker",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', + 'import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\nfrom hatchet_sdk.exceptions import TaskRunError\n\nhatchet = Hatchet(debug=False)\n\nERROR_TEXT = "step1 failed"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name="OnFailureWorkflow")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {"status": "success"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name="OnFailureWorkflowWithDetails")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]:\n error = ctx.get_task_run_error(details_step1)\n\n if not error:\n return {"status": "unexpected success"}\n\n # 👀 we can access the failure details here\n assert isinstance(error, TaskRunError)\n\n if "step1 failed" in error.exc:\n return {\n "status": "success",\n "failed_run_external_id": error.task_run_external_id,\n }\n\n raise Exception("unexpected failure")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "on-failure-worker",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', source: 'out/python/on_failure/worker.py', blocks: { onfailure_step: { - start: 11, - stop: 34, + start: 12, + stop: 35, }, onfailure_with_details: { - start: 38, - stop: 63, + start: 39, + stop: 70, }, }, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts b/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts index da0116c95..c908f51cc 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.durable.worker import durable_workflow, wait_for_sleep_twice\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom examples.webhooks.worker import webhook\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "e2e-test-worker",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n webhook,\n return_exceptions_task,\n wait_for_sleep_twice,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', + 'from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.dependency_injection.worker import (\n async_task_with_dependencies,\n di_workflow,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n sync_task_with_dependencies,\n)\nfrom examples.durable.worker import durable_workflow, wait_for_sleep_twice\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom examples.webhooks.worker import webhook\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "e2e-test-worker",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n di_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n webhook,\n return_exceptions_task,\n wait_for_sleep_twice,\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', source: 'out/python/worker.py', blocks: {}, highlights: {}, diff --git a/frontend/docs/lib/generated/snips/python/dependency_injection/index.ts b/frontend/docs/lib/generated/snips/python/dependency_injection/index.ts new file mode 100644 index 000000000..ddb6489f8 --- /dev/null +++ b/frontend/docs/lib/generated/snips/python/dependency_injection/index.ts @@ -0,0 +1,5 @@ +import test_dependency_injection from './test_dependency_injection'; +import worker from './worker'; + +export { test_dependency_injection } +export { worker } diff --git a/frontend/docs/lib/generated/snips/python/dependency_injection/test_dependency_injection.ts b/frontend/docs/lib/generated/snips/python/dependency_injection/test_dependency_injection.ts new file mode 100644 index 000000000..3f776037d --- /dev/null +++ b/frontend/docs/lib/generated/snips/python/dependency_injection/test_dependency_injection.ts @@ -0,0 +1,11 @@ +import { Snippet } from '@/lib/generated/snips/types'; + +const snippet: Snippet = { + "language": "python", + "content": "import pytest\n\nfrom examples.dependency_injection.worker import (\n ASYNC_DEPENDENCY_VALUE,\n SYNC_DEPENDENCY_VALUE,\n Output,\n async_dep,\n async_task_with_dependencies,\n di_workflow,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n sync_dep,\n sync_task_with_dependencies,\n)\nfrom hatchet_sdk import EmptyModel\nfrom hatchet_sdk.runnables.workflow import Standalone\n\n\n@pytest.mark.parametrize(\n \"task\",\n [\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n ],\n)\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_di_standalones(\n task: Standalone[EmptyModel, Output],\n) -> None:\n result = await task.aio_run()\n\n assert isinstance(result, Output)\n assert result.sync_dep == SYNC_DEPENDENCY_VALUE\n assert result.async_dep == ASYNC_DEPENDENCY_VALUE\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_di_workflows() -> None:\n result = await di_workflow.aio_run()\n\n assert len(result) == 4\n\n for output in result.values():\n parsed = Output.model_validate(output)\n\n assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE\n assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE\n", + "source": "out/python/dependency_injection/test_dependency_injection.py", + "blocks": {}, + "highlights": {} +}; + +export default snippet; diff --git a/frontend/docs/lib/generated/snips/python/dependency_injection/worker.ts b/frontend/docs/lib/generated/snips/python/dependency_injection/worker.ts new file mode 100644 index 000000000..8fffaf6b7 --- /dev/null +++ b/frontend/docs/lib/generated/snips/python/dependency_injection/worker.ts @@ -0,0 +1,20 @@ +import { Snippet } from '@/lib/generated/snips/types'; + +const snippet: Snippet = { + "language": "python", + "content": "# > Simple\n\nfrom typing import Annotated\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\nSYNC_DEPENDENCY_VALUE = \"sync_dependency_value\"\nASYNC_DEPENDENCY_VALUE = \"async_dependency_value\"\n\n\n# > Declare dependencies\nasync def async_dep(input: EmptyModel, ctx: Context) -> str:\n return ASYNC_DEPENDENCY_VALUE\n\n\ndef sync_dep(input: EmptyModel, ctx: Context) -> str:\n return SYNC_DEPENDENCY_VALUE\n\n\n\n\nclass Output(BaseModel):\n sync_dep: str\n async_dep: str\n\n\n# > Inject dependencies\n@hatchet.task()\nasync def async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n\n\n@hatchet.task()\ndef sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\nasync def durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\ndef durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndi_workflow = hatchet.workflow(\n name=\"dependency-injection-workflow\",\n)\n\n\n@di_workflow.task()\nasync def wf_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.task()\ndef wf_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\nasync def wf_durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\ndef wf_durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"dependency-injection-worker\",\n workflows=[\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n di_workflow,\n ],\n )\n worker.start()\n\n\n\nif __name__ == \"__main__\":\n main()\n", + "source": "out/python/dependency_injection/worker.py", + "blocks": { + "declare_dependencies": { + "start": 16, + "stop": 23 + }, + "inject_dependencies": { + "start": 32, + "stop": 44 + } + }, + "highlights": {} +}; + +export default snippet; diff --git a/frontend/docs/lib/generated/snips/python/durable/test_durable.ts b/frontend/docs/lib/generated/snips/python/durable/test_durable.ts index 058ded548..5acc42cc8 100644 --- a/frontend/docs/lib/generated/snips/python/durable/test_durable.ts +++ b/frontend/docs/lib/generated/snips/python/durable/test_durable.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {\"test\": \"test\"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == \"ACTIVE\"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker\")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker_durable\")\n for w in active_workers\n )\n\n assert result[\"durable_task\"][\"status\"] == \"success\"\n\n wait_group_1 = result[\"wait_for_or_group_1\"]\n wait_group_2 = result[\"wait_for_or_group_2\"]\n\n assert abs(wait_group_1[\"runtime\"] - SLEEP_TIME) < 3\n\n assert wait_group_1[\"key\"] == wait_group_2[\"key\"]\n assert wait_group_1[\"key\"] == \"CREATE\"\n assert \"sleep\" in wait_group_1[\"event_id\"]\n assert \"event\" in wait_group_2[\"event_id\"]\n\n wait_for_multi_sleep = result[\"wait_for_multi_sleep\"]\n\n assert wait_for_multi_sleep[\"runtime\"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n \"\"\"We've already slept for a little bit by the time the task is cancelled\"\"\"\n assert second_sleep_result[\"runtime\"] < SLEEP_TIME\n", + "content": "import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {\"test\": \"test\"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == \"ACTIVE\"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker\")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker_durable\")\n for w in active_workers\n )\n\n assert result[\"durable_task\"][\"status\"] == \"success\"\n\n wait_group_1 = result[\"wait_for_or_group_1\"]\n wait_group_2 = result[\"wait_for_or_group_2\"]\n\n assert abs(wait_group_1[\"runtime\"] - SLEEP_TIME) < 3\n\n assert wait_group_1[\"key\"] == wait_group_2[\"key\"]\n assert wait_group_1[\"key\"] == \"CREATE\"\n assert \"sleep\" in wait_group_1[\"event_id\"]\n assert \"event\" in wait_group_2[\"event_id\"]\n\n wait_for_multi_sleep = result[\"wait_for_multi_sleep\"]\n\n assert wait_for_multi_sleep[\"runtime\"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n \"\"\"We've already slept for a little bit by the time the task is cancelled\"\"\"\n assert second_sleep_result[\"runtime\"] <= SLEEP_TIME\n", "source": "out/python/durable/test_durable.py", "blocks": {}, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/index.ts b/frontend/docs/lib/generated/snips/python/index.ts index 099d890e9..962dc4669 100644 --- a/frontend/docs/lib/generated/snips/python/index.ts +++ b/frontend/docs/lib/generated/snips/python/index.ts @@ -18,6 +18,7 @@ import * as cron from './cron'; import * as dag from './dag'; import * as dedupe from './dedupe'; import * as delayed from './delayed'; +import * as dependency_injection from './dependency_injection'; import * as durable from './durable'; import * as durable_event from './durable_event'; import * as durable_sleep from './durable_sleep'; @@ -67,6 +68,7 @@ export { cron }; export { dag }; export { dedupe }; export { delayed }; +export { dependency_injection }; export { durable }; export { durable_event }; export { durable_sleep }; diff --git a/frontend/docs/lib/generated/snips/python/on_failure/worker.ts b/frontend/docs/lib/generated/snips/python/on_failure/worker.ts index 6d9481a32..9e0c79fb7 100644 --- a/frontend/docs/lib/generated/snips/python/on_failure/worker.ts +++ b/frontend/docs/lib/generated/snips/python/on_failure/worker.ts @@ -2,16 +2,16 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=True)\n\nERROR_TEXT = \"step1 failed\"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name=\"OnFailureWorkflow\")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {\"status\": \"success\"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name=\"OnFailureWorkflowWithDetails\")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n error = ctx.fetch_task_run_error(details_step1)\n\n # 👀 we can access the failure details here\n print(json.dumps(error, indent=2))\n\n if error and error.startswith(ERROR_TEXT):\n return {\"status\": \"success\"}\n\n raise Exception(\"unexpected failure\")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"on-failure-worker\",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", + "content": "import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\nfrom hatchet_sdk.exceptions import TaskRunError\n\nhatchet = Hatchet(debug=False)\n\nERROR_TEXT = \"step1 failed\"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name=\"OnFailureWorkflow\")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {\"status\": \"success\"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name=\"OnFailureWorkflowWithDetails\")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]:\n error = ctx.get_task_run_error(details_step1)\n\n if not error:\n return {\"status\": \"unexpected success\"}\n\n # 👀 we can access the failure details here\n assert isinstance(error, TaskRunError)\n\n if \"step1 failed\" in error.exc:\n return {\n \"status\": \"success\",\n \"failed_run_external_id\": error.task_run_external_id,\n }\n\n raise Exception(\"unexpected failure\")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"on-failure-worker\",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", "source": "out/python/on_failure/worker.py", "blocks": { "onfailure_step": { - "start": 11, - "stop": 34 + "start": 12, + "stop": 35 }, "onfailure_with_details": { - "start": 38, - "stop": 63 + "start": 39, + "stop": 70 } }, "highlights": {} diff --git a/frontend/docs/pages/home/_meta.js b/frontend/docs/pages/home/_meta.js index fff880346..1cdd53532 100644 --- a/frontend/docs/pages/home/_meta.js +++ b/frontend/docs/pages/home/_meta.js @@ -25,7 +25,7 @@ export default { "cron-runs": "Cron Trigger", "run-on-event": "Event Trigger", "bulk-run": "Bulk Run Many", - "webhooks": "Webhooks", + webhooks: "Webhooks", "--flow-control": { title: "Flow Control", type: "separator", @@ -122,6 +122,7 @@ export default { asyncio: "Asyncio", pydantic: "Pydantic", lifespans: "Lifespans", + "dependency-injection": "Dependency Injection", blog: { title: "Blog", type: "page", diff --git a/frontend/docs/pages/home/dependency-injection.mdx b/frontend/docs/pages/home/dependency-injection.mdx new file mode 100644 index 000000000..ff8e88587 --- /dev/null +++ b/frontend/docs/pages/home/dependency-injection.mdx @@ -0,0 +1,45 @@ +import snips from "@/lib/snips"; +import { Snippet } from "@/components/code"; +import { Callout, Card, Cards, Steps, Tabs } from "nextra/components"; +import UniversalTabs from "@/components/UniversalTabs"; + +# Dependency Injection + + + Dependency injection is an **experimental feature** in Hatchet, and is subject + to change. + + +Hatchet's Python SDK allows you to inject **_dependencies_** into your tasks, FastAPI style. These dependencies can be either synchronous or asynchronous functions. They are executed before the task is triggered, and their results are injected into the task as parameters. + +This behaves almost identically to [FastAPI's dependency injection](https://fastapi.tiangolo.com/tutorial/dependencies/), and is intended to be used in the same way. Dependencies are useful for sharing logic between tasks that you'd like to avoid repeating, or would like to factor out of the task logic itself (e.g. to make testing easier). + + +Since dependencies are run before tasks are executed, having many dependencies (or any that take a long time to evaluate) can cause tasks to experience significantly delayed start times, as they must wait for all dependencies to finish evaluating. + + + +## Usage + +To add dependencies to your tasks, import `Depends` from the `hatchet_sdk`. Then: + + + +In this example, we've declared two dependencies: one synchronous and one asynchronous. You can do anything you like in your dependencies, such as creating database sessions, managing configuration, sharing instances of service-layer logic, and more. + +Once you've defined your dependency functions, inject them into your tasks as follows: + + + + + Important note: Your dependency functions must take two positional arguments: + the workflow input and the `Context` (the same as any other task). + + +That's it! Now, whenever your task is triggered, its dependencies will be evaluated, and the results will be injected into the task at runtime for you to use as needed. diff --git a/frontend/docs/pages/sdks/python/runnables.mdx b/frontend/docs/pages/sdks/python/runnables.mdx index 8342e5f4a..7d1ce8d32 100644 --- a/frontend/docs/pages/sdks/python/runnables.mdx +++ b/frontend/docs/pages/sdks/python/runnables.mdx @@ -42,7 +42,7 @@ Workflows support various execution patterns including: Tasks within workflows can be defined with `@workflow.task()` or `@workflow.durable_task()` decorators and can be arranged into complex dependency patterns. -### Methods +Methods: | Name | Description | | ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | @@ -118,9 +118,9 @@ Parameters: Returns: -| Type | Description | -| ------------------------------------------------------------------------------------------------- | ------------------------------------------ | -| `Callable[[Callable[[TWorkflowInput, Context], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | +| Type | Description | +| --------------------------------------------------------------------------------------------------------------- | ------------------------------------------ | +| `Callable[[Callable[Concatenate[TWorkflowInput, Context, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | #### `durable_task` @@ -150,9 +150,9 @@ Parameters: Returns: -| Type | Description | -| -------------------------------------------------------------------------------------------------------- | ------------------------------------------ | -| `Callable[[Callable[[TWorkflowInput, DurableContext], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | +| Type | Description | +| ---------------------------------------------------------------------------------------------------------------------- | ------------------------------------------ | +| `Callable[[Callable[Concatenate[TWorkflowInput, DurableContext, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | #### `on_failure_task` @@ -173,9 +173,9 @@ Parameters: Returns: -| Type | Description | -| ------------------------------------------------------------------------------------------------- | ------------------------------------------ | -| `Callable[[Callable[[TWorkflowInput, Context], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | +| Type | Description | +| --------------------------------------------------------------------------------------------------------------- | ------------------------------------------ | +| `Callable[[Callable[Concatenate[TWorkflowInput, Context, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | #### `on_success_task` @@ -196,9 +196,9 @@ Parameters: Returns: -| Type | Description | -| ------------------------------------------------------------------------------------------------- | ------------------------------------------ | -| `Callable[[Callable[[TWorkflowInput, Context], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | +| Type | Description | +| --------------------------------------------------------------------------------------------------------------- | ------------------------------------------ | +| `Callable[[Callable[Concatenate[TWorkflowInput, Context, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. | #### `run` @@ -526,12 +526,12 @@ Returns: Bases: `Generic[TWorkflowInput, R]` -### Methods +Methods: -| Name | Description | -| -------------- | ------------------------------------------------------------------------------- | -| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. | -| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. | +| Name | Description | +| -------------- | ------------------------------------------------------------------------------ | +| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test | +| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test | ### Functions @@ -541,13 +541,14 @@ Mimic the execution of a task. This method is intended to be used to unit test t Parameters: -| Name | Type | Description | Default | -| --------------------- | -------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -| `input` | `TWorkflowInput \| None` | The input to the task. | `None` | -| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` | -| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` | -| `retry_count` | `int` | The number of times the task has been retried. | `0` | -| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` | +| Name | Type | Description | Default | +| --------------------- | -------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | +| `input` | `TWorkflowInput \| None` | The input to the task. | `None` | +| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` | +| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` | +| `retry_count` | `int` | The number of times the task has been retried. | `0` | +| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` | +| `dependencies` | `dict[str, Any] \| None` | Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`. | `None` | Returns: @@ -567,13 +568,14 @@ Mimic the execution of a task. This method is intended to be used to unit test t Parameters: -| Name | Type | Description | Default | -| --------------------- | -------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -| `input` | `TWorkflowInput \| None` | The input to the task. | `None` | -| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` | -| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` | -| `retry_count` | `int` | The number of times the task has been retried. | `0` | -| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` | +| Name | Type | Description | Default | +| --------------------- | -------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | +| `input` | `TWorkflowInput \| None` | The input to the task. | `None` | +| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` | +| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` | +| `retry_count` | `int` | The number of times the task has been retried. | `0` | +| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` | +| `dependencies` | `dict[str, Any] \| None` | Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`. | `None` | Returns: @@ -591,7 +593,7 @@ Raises: Bases: `BaseWorkflow[TWorkflowInput]`, `Generic[TWorkflowInput, R]` -### Methods +Methods: | Name | Description | | ---------------------- | ------------------------------------------------------------------------------------------------------------------------ | @@ -617,8 +619,8 @@ Bases: `BaseWorkflow[TWorkflowInput]`, `Generic[TWorkflowInput, R]` | `get_run_ref` | Get a reference to a task run by its run ID. | | `get_result` | Get the result of a task run by its run ID. | | `aio_get_result` | Get the result of a task run by its run ID. | -| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. | -| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. | +| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test | +| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test | ### Functions diff --git a/sdks/python/CHANGELOG.md b/sdks/python/CHANGELOG.md index 688196fef..0c42cd80a 100644 --- a/sdks/python/CHANGELOG.md +++ b/sdks/python/CHANGELOG.md @@ -5,6 +5,29 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.17.0] - 2025-08-12 + +### Added + +- Adds support for dependency injection in tasks via the `Depends` class. +- Deprecated `fetch_task_run_error` in favor of `get_task_run_error`, which returns a `TaskRunError` object instead of a string. This allows for better error handling and debugging. + +### Changed + +- Uses `logger.exception` in place of `logger.error` in the action runner to improve (e.g.) Sentry error reporting +- Extends the `TaskRunError` to include the `task_run_external_id`, which is useful for debugging and tracing errors in task runs. +- Fixes an issue with logging which allows log levels to be respected over the API. + +### Removed + +- Removes the `cel-python` dependency + +## [1.16.5] - 2025-08-07 + +### Changed + +- Relaxes constraint on Prometheus dependency + ## [1.16.4] - 2025-07-28 ### Added diff --git a/sdks/python/examples/dependency_injection/test_dependency_injection.py b/sdks/python/examples/dependency_injection/test_dependency_injection.py new file mode 100644 index 000000000..8c991d6f7 --- /dev/null +++ b/sdks/python/examples/dependency_injection/test_dependency_injection.py @@ -0,0 +1,49 @@ +import pytest + +from examples.dependency_injection.worker import ( + ASYNC_DEPENDENCY_VALUE, + SYNC_DEPENDENCY_VALUE, + Output, + async_dep, + async_task_with_dependencies, + di_workflow, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + sync_dep, + sync_task_with_dependencies, +) +from hatchet_sdk import EmptyModel +from hatchet_sdk.runnables.workflow import Standalone + + +@pytest.mark.parametrize( + "task", + [ + async_task_with_dependencies, + sync_task_with_dependencies, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + ], +) +@pytest.mark.asyncio(loop_scope="session") +async def test_di_standalones( + task: Standalone[EmptyModel, Output], +) -> None: + result = await task.aio_run() + + assert isinstance(result, Output) + assert result.sync_dep == SYNC_DEPENDENCY_VALUE + assert result.async_dep == ASYNC_DEPENDENCY_VALUE + + +@pytest.mark.asyncio(loop_scope="session") +async def test_di_workflows() -> None: + result = await di_workflow.aio_run() + + assert len(result) == 4 + + for output in result.values(): + parsed = Output.model_validate(output) + + assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE + assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE diff --git a/sdks/python/examples/dependency_injection/worker.py b/sdks/python/examples/dependency_injection/worker.py new file mode 100644 index 000000000..68f14161e --- /dev/null +++ b/sdks/python/examples/dependency_injection/worker.py @@ -0,0 +1,162 @@ +# > Simple + +from typing import Annotated + +from pydantic import BaseModel + +from hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet + +hatchet = Hatchet(debug=False) + +SYNC_DEPENDENCY_VALUE = "sync_dependency_value" +ASYNC_DEPENDENCY_VALUE = "async_dependency_value" + + +# > Declare dependencies +async def async_dep(input: EmptyModel, ctx: Context) -> str: + return ASYNC_DEPENDENCY_VALUE + + +def sync_dep(input: EmptyModel, ctx: Context) -> str: + return SYNC_DEPENDENCY_VALUE + + +# !! + + +class Output(BaseModel): + sync_dep: str + async_dep: str + + +# > Inject dependencies +@hatchet.task() +async def async_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +# !! + + +@hatchet.task() +def sync_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@hatchet.durable_task() +async def durable_async_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@hatchet.durable_task() +def durable_sync_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +di_workflow = hatchet.workflow( + name="dependency-injection-workflow", +) + + +@di_workflow.task() +async def wf_async_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@di_workflow.task() +def wf_sync_task_with_dependencies( + _i: EmptyModel, + ctx: Context, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@di_workflow.durable_task() +async def wf_durable_async_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +@di_workflow.durable_task() +def wf_durable_sync_task_with_dependencies( + _i: EmptyModel, + ctx: DurableContext, + async_dep: Annotated[str, Depends(async_dep)], + sync_dep: Annotated[str, Depends(sync_dep)], +) -> Output: + return Output( + sync_dep=sync_dep, + async_dep=async_dep, + ) + + +def main() -> None: + worker = hatchet.worker( + "dependency-injection-worker", + workflows=[ + async_task_with_dependencies, + sync_task_with_dependencies, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + di_workflow, + ], + ) + worker.start() + + +# !! + +if __name__ == "__main__": + main() diff --git a/sdks/python/examples/durable/test_durable.py b/sdks/python/examples/durable/test_durable.py index e4c5c182b..1287c3e78 100644 --- a/sdks/python/examples/durable/test_durable.py +++ b/sdks/python/examples/durable/test_durable.py @@ -71,4 +71,4 @@ async def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None: second_sleep_result = await first_sleep.aio_result() """We've already slept for a little bit by the time the task is cancelled""" - assert second_sleep_result["runtime"] < SLEEP_TIME + assert second_sleep_result["runtime"] <= SLEEP_TIME diff --git a/sdks/python/examples/on_failure/worker.py b/sdks/python/examples/on_failure/worker.py index b90368cc4..8726af226 100644 --- a/sdks/python/examples/on_failure/worker.py +++ b/sdks/python/examples/on_failure/worker.py @@ -2,8 +2,9 @@ import json from datetime import timedelta from hatchet_sdk import Context, EmptyModel, Hatchet +from hatchet_sdk.exceptions import TaskRunError -hatchet = Hatchet(debug=True) +hatchet = Hatchet(debug=False) ERROR_TEXT = "step1 failed" @@ -50,14 +51,20 @@ def details_step1(input: EmptyModel, ctx: Context) -> None: # 👀 After the workflow fails, this special step will run @on_failure_wf_with_details.on_failure_task() -def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]: - error = ctx.fetch_task_run_error(details_step1) +def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]: + error = ctx.get_task_run_error(details_step1) + + if not error: + return {"status": "unexpected success"} # 👀 we can access the failure details here - print(json.dumps(error, indent=2)) + assert isinstance(error, TaskRunError) - if error and error.startswith(ERROR_TEXT): - return {"status": "success"} + if "step1 failed" in error.exc: + return { + "status": "success", + "failed_run_external_id": error.task_run_external_id, + } raise Exception("unexpected failure") diff --git a/sdks/python/examples/worker.py b/sdks/python/examples/worker.py index dd8c3bd96..7475123d4 100644 --- a/sdks/python/examples/worker.py +++ b/sdks/python/examples/worker.py @@ -15,6 +15,13 @@ from examples.concurrency_workflow_level.worker import ( from examples.conditions.worker import task_condition_workflow from examples.dag.worker import dag_workflow from examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf +from examples.dependency_injection.worker import ( + async_task_with_dependencies, + di_workflow, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, + sync_task_with_dependencies, +) from examples.durable.worker import durable_workflow, wait_for_sleep_twice from examples.events.worker import event_workflow from examples.fanout.worker import child_wf, parent_wf @@ -61,6 +68,7 @@ def main() -> None: sync_fanout_child, non_retryable_workflow, concurrency_workflow_level_workflow, + di_workflow, lifespan_task, simple, simple_durable, @@ -70,6 +78,10 @@ def main() -> None: webhook, return_exceptions_task, wait_for_sleep_twice, + async_task_with_dependencies, + sync_task_with_dependencies, + durable_async_task_with_dependencies, + durable_sync_task_with_dependencies, ], lifespan=lifespan, ) diff --git a/sdks/python/hatchet_sdk/__init__.py b/sdks/python/hatchet_sdk/__init__.py index a0e875679..ac9213a66 100644 --- a/sdks/python/hatchet_sdk/__init__.py +++ b/sdks/python/hatchet_sdk/__init__.py @@ -155,7 +155,7 @@ from hatchet_sdk.exceptions import ( from hatchet_sdk.features.cel import CELEvaluationResult, CELFailure, CELSuccess from hatchet_sdk.features.runs import BulkCancelReplayOpts, RunFilter from hatchet_sdk.hatchet import Hatchet -from hatchet_sdk.runnables.task import Task +from hatchet_sdk.runnables.task import Depends, Task from hatchet_sdk.runnables.types import ( ConcurrencyExpression, ConcurrencyLimitStrategy, @@ -198,6 +198,7 @@ __all__ = [ "CreateWorkflowVersionOpts", "DedupeViolationError", "DefaultFilter", + "Depends", "DurableContext", "EmptyModel", "Event", diff --git a/sdks/python/hatchet_sdk/clients/events.py b/sdks/python/hatchet_sdk/clients/events.py index 6ae4c24f4..e19af8517 100644 --- a/sdks/python/hatchet_sdk/clients/events.py +++ b/sdks/python/hatchet_sdk/clients/events.py @@ -28,7 +28,7 @@ from hatchet_sdk.contracts.events_pb2 import ( ) from hatchet_sdk.contracts.events_pb2_grpc import EventsServiceStub from hatchet_sdk.metadata import get_metadata -from hatchet_sdk.utils.typing import JSONSerializableMapping +from hatchet_sdk.utils.typing import JSONSerializableMapping, LogLevel def proto_timestamp_now() -> timestamp_pb2.Timestamp: @@ -180,11 +180,14 @@ class EventClient(BaseRestClient): ) @tenacity_retry - def log(self, message: str, step_run_id: str) -> None: + def log( + self, message: str, step_run_id: str, level: LogLevel | None = None + ) -> None: request = PutLogRequest( stepRunId=step_run_id, createdAt=proto_timestamp_now(), message=message, + level=level.value if level else None, ) self.events_service_client.PutLog(request, metadata=get_metadata(self.token)) diff --git a/sdks/python/hatchet_sdk/context/context.py b/sdks/python/hatchet_sdk/context/context.py index 46b10251c..280c67cfd 100644 --- a/sdks/python/hatchet_sdk/context/context.py +++ b/sdks/python/hatchet_sdk/context/context.py @@ -21,10 +21,11 @@ from hatchet_sdk.conditions import ( flatten_conditions, ) from hatchet_sdk.context.worker_context import WorkerContext +from hatchet_sdk.exceptions import TaskRunError from hatchet_sdk.features.runs import RunsClient from hatchet_sdk.logger import logger from hatchet_sdk.utils.timedelta_to_expression import Duration, timedelta_to_expr -from hatchet_sdk.utils.typing import JSONSerializableMapping +from hatchet_sdk.utils.typing import JSONSerializableMapping, LogLevel from hatchet_sdk.worker.runner.utils.capture_logs import AsyncLogSender, LogRecord if TYPE_CHECKING: @@ -211,7 +212,9 @@ class Context: line = str(line) logger.info(line) - self.log_sender.publish(LogRecord(message=line, step_run_id=self.step_run_id)) + self.log_sender.publish( + LogRecord(message=line, step_run_id=self.step_run_id, level=LogLevel.INFO) + ) def release_slot(self) -> None: """ @@ -359,6 +362,27 @@ class Context: self, task: "Task[TWorkflowInput, R]", ) -> str | None: + """ + **DEPRECATED**: Use `get_task_run_error` instead. + + A helper intended to be used in an on-failure step to retrieve the error that occurred in a specific upstream task run. + + :param task: The task whose error you want to retrieve. + :return: The error message of the task run, or None if no error occurred. + """ + warn( + "`fetch_task_run_error` is deprecated. Use `get_task_run_error` instead.", + DeprecationWarning, + stacklevel=2, + ) + errors = self.data.step_run_errors + + return errors.get(task.name) + + def get_task_run_error( + self, + task: "Task[TWorkflowInput, R]", + ) -> TaskRunError | None: """ A helper intended to be used in an on-failure step to retrieve the error that occurred in a specific upstream task run. @@ -367,7 +391,12 @@ class Context: """ errors = self.data.step_run_errors - return errors.get(task.name) + error = errors.get(task.name) + + if not error: + return None + + return TaskRunError.deserialize(error) class DurableContext(Context): diff --git a/sdks/python/hatchet_sdk/exceptions.py b/sdks/python/hatchet_sdk/exceptions.py index 699774ee1..25532b9e6 100644 --- a/sdks/python/hatchet_sdk/exceptions.py +++ b/sdks/python/hatchet_sdk/exceptions.py @@ -1,4 +1,10 @@ +import json import traceback +from typing import cast + + +class InvalidDependencyError(Exception): + pass class NonRetryableException(Exception): # noqa: N818 @@ -9,28 +15,42 @@ class DedupeViolationError(Exception): """Raised by the Hatchet library to indicate that a workflow has already been run with this deduplication value.""" +TASK_RUN_ERROR_METADATA_KEY = "__hatchet_error_metadata__" + + class TaskRunError(Exception): def __init__( self, exc: str, exc_type: str, trace: str, + task_run_external_id: str | None, ) -> None: self.exc = exc self.exc_type = exc_type self.trace = trace + self.task_run_external_id = task_run_external_id def __str__(self) -> str: - return self.serialize() + return self.serialize(include_metadata=False) def __repr__(self) -> str: return str(self) - def serialize(self) -> str: + def serialize(self, include_metadata: bool) -> str: if not self.exc_type or not self.exc: return "" - return ( + metadata = json.dumps( + { + TASK_RUN_ERROR_METADATA_KEY: { + "task_run_external_id": self.task_run_external_id, + } + }, + indent=None, + ) + + result = ( self.exc_type.replace(": ", ":::") + ": " + self.exc.replace("\n", "\\\n") @@ -38,6 +58,40 @@ class TaskRunError(Exception): + self.trace ) + if include_metadata: + return result + "\n\n" + metadata + + return result + + @classmethod + def _extract_metadata(cls, serialized: str) -> tuple[str, dict[str, str | None]]: + metadata = serialized.split("\n")[-1] + + try: + parsed = json.loads(metadata) + + if ( + TASK_RUN_ERROR_METADATA_KEY in parsed + and "task_run_external_id" in parsed[TASK_RUN_ERROR_METADATA_KEY] + ): + serialized = serialized.replace(metadata, "").strip() + return serialized, cast( + dict[str, str | None], parsed[TASK_RUN_ERROR_METADATA_KEY] + ) + + return serialized, {} + except json.JSONDecodeError: + return serialized, {} + + @classmethod + def _unpack_serialized_error(cls, serialized: str) -> tuple[str | None, str, str]: + serialized, metadata = cls._extract_metadata(serialized) + + external_id = metadata.get("task_run_external_id", None) + header, trace = serialized.split("\n", 1) + + return external_id, header, trace + @classmethod def deserialize(cls, serialized: str) -> "TaskRunError": if not serialized: @@ -45,10 +99,16 @@ class TaskRunError(Exception): exc="", exc_type="", trace="", + task_run_external_id=None, ) + task_run_external_id = None + try: - header, trace = serialized.split("\n", 1) + task_run_external_id, header, trace = cls._unpack_serialized_error( + serialized + ) + exc_type, exc = header.split(": ", 1) except ValueError: ## If we get here, we saw an error that was not serialized how we expected, @@ -57,6 +117,7 @@ class TaskRunError(Exception): exc=serialized, exc_type="HatchetError", trace="", + task_run_external_id=task_run_external_id, ) exc_type = exc_type.replace(":::", ": ") @@ -66,16 +127,20 @@ class TaskRunError(Exception): exc=exc, exc_type=exc_type, trace=trace, + task_run_external_id=task_run_external_id, ) @classmethod - def from_exception(cls, exc: Exception) -> "TaskRunError": + def from_exception( + cls, exc: Exception, task_run_external_id: str | None + ) -> "TaskRunError": return cls( exc=str(exc), exc_type=type(exc).__name__, trace="".join( traceback.format_exception(type(exc), exc, exc.__traceback__) ), + task_run_external_id=task_run_external_id, ) diff --git a/sdks/python/hatchet_sdk/hatchet.py b/sdks/python/hatchet_sdk/hatchet.py index b574474c0..cca906c0c 100644 --- a/sdks/python/hatchet_sdk/hatchet.py +++ b/sdks/python/hatchet_sdk/hatchet.py @@ -3,7 +3,7 @@ import logging from collections.abc import Callable from datetime import timedelta from functools import cached_property -from typing import Any, cast, overload +from typing import Any, Concatenate, ParamSpec, cast, overload from hatchet_sdk import Context, DurableContext from hatchet_sdk.client import Client @@ -40,6 +40,8 @@ from hatchet_sdk.utils.timedelta_to_expression import Duration from hatchet_sdk.utils.typing import CoroutineLike from hatchet_sdk.worker.worker import LifespanFn, Worker +P = ParamSpec("P") + class Hatchet: """ @@ -346,7 +348,7 @@ class Hatchet: backoff_max_seconds: int | None = None, default_filters: list[DefaultFilter] | None = None, ) -> Callable[ - [Callable[[EmptyModel, Context], R | CoroutineLike[R]]], + [Callable[Concatenate[EmptyModel, Context, P], R | CoroutineLike[R]]], Standalone[EmptyModel, R], ]: ... @@ -372,7 +374,7 @@ class Hatchet: backoff_max_seconds: int | None = None, default_filters: list[DefaultFilter] | None = None, ) -> Callable[ - [Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]], + [Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]], Standalone[TWorkflowInput, R], ]: ... @@ -398,11 +400,11 @@ class Hatchet: default_filters: list[DefaultFilter] | None = None, ) -> ( Callable[ - [Callable[[EmptyModel, Context], R | CoroutineLike[R]]], + [Callable[Concatenate[EmptyModel, Context, P], R | CoroutineLike[R]]], Standalone[EmptyModel, R], ] | Callable[ - [Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]], + [Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]], Standalone[TWorkflowInput, R], ] ): @@ -447,7 +449,9 @@ class Hatchet: """ def inner( - func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]], + func: Callable[ + Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R] + ], ) -> Standalone[TWorkflowInput, R]: inferred_name = name or func.__name__ @@ -518,7 +522,7 @@ class Hatchet: backoff_max_seconds: int | None = None, default_filters: list[DefaultFilter] | None = None, ) -> Callable[ - [Callable[[EmptyModel, DurableContext], R | CoroutineLike[R]]], + [Callable[Concatenate[EmptyModel, DurableContext, P], R | CoroutineLike[R]]], Standalone[EmptyModel, R], ]: ... @@ -544,7 +548,11 @@ class Hatchet: backoff_max_seconds: int | None = None, default_filters: list[DefaultFilter] | None = None, ) -> Callable[ - [Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]]], + [ + Callable[ + Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R] + ] + ], Standalone[TWorkflowInput, R], ]: ... @@ -570,11 +578,19 @@ class Hatchet: default_filters: list[DefaultFilter] | None = None, ) -> ( Callable[ - [Callable[[EmptyModel, DurableContext], R | CoroutineLike[R]]], + [ + Callable[ + Concatenate[EmptyModel, DurableContext, P], R | CoroutineLike[R] + ] + ], Standalone[EmptyModel, R], ] | Callable[ - [Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]]], + [ + Callable[ + Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R] + ] + ], Standalone[TWorkflowInput, R], ] ): @@ -619,7 +635,9 @@ class Hatchet: """ def inner( - func: Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]], + func: Callable[ + Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R] + ], ) -> Standalone[TWorkflowInput, R]: inferred_name = name or func.__name__ workflow = Workflow[TWorkflowInput]( diff --git a/sdks/python/hatchet_sdk/rate_limit.py b/sdks/python/hatchet_sdk/rate_limit.py index c85f8be0b..8603d5f03 100644 --- a/sdks/python/hatchet_sdk/rate_limit.py +++ b/sdks/python/hatchet_sdk/rate_limit.py @@ -1,20 +1,10 @@ from enum import Enum -from celpy import CELEvalError, Environment # type: ignore from pydantic import BaseModel, model_validator from hatchet_sdk.contracts.v1.workflows_pb2 import CreateTaskRateLimit -def validate_cel_expression(expr: str) -> bool: - env = Environment() - try: - env.compile(expr) - return True - except CELEvalError: - return False - - class RateLimitDuration(str, Enum): SECOND = "SECOND" MINUTE = "MINUTE" @@ -72,17 +62,7 @@ class RateLimit(BaseModel): if self.dynamic_key and self.static_key: raise ValueError("Cannot have both static key and dynamic key set") - if self.dynamic_key and not validate_cel_expression(self.dynamic_key): - raise ValueError(f"Invalid CEL expression: {self.dynamic_key}") - - if not isinstance(self.units, int) and not validate_cel_expression(self.units): - raise ValueError(f"Invalid CEL expression: {self.units}") - - if ( - self.limit - and not isinstance(self.limit, int) - and not validate_cel_expression(self.limit) - ): + if self.limit and not isinstance(self.limit, int): raise ValueError(f"Invalid CEL expression: {self.limit}") if self.dynamic_key and not self.limit: diff --git a/sdks/python/hatchet_sdk/runnables/task.py b/sdks/python/hatchet_sdk/runnables/task.py index f7a844c66..9735a08e2 100644 --- a/sdks/python/hatchet_sdk/runnables/task.py +++ b/sdks/python/hatchet_sdk/runnables/task.py @@ -1,5 +1,21 @@ +import asyncio from collections.abc import Callable -from typing import TYPE_CHECKING, Any, Generic, cast, get_type_hints +from inspect import Parameter, iscoroutinefunction, signature +from typing import ( + TYPE_CHECKING, + Annotated, + Any, + Concatenate, + Generic, + ParamSpec, + TypeVar, + cast, + get_args, + get_origin, + get_type_hints, +) + +from pydantic import BaseModel, ConfigDict from hatchet_sdk.conditions import ( Action, @@ -18,6 +34,7 @@ from hatchet_sdk.contracts.v1.workflows_pb2 import ( CreateTaskRateLimit, DesiredWorkerLabels, ) +from hatchet_sdk.exceptions import InvalidDependencyError from hatchet_sdk.runnables.types import ( ConcurrencyExpression, EmptyModel, @@ -25,7 +42,6 @@ from hatchet_sdk.runnables.types import ( StepType, TWorkflowInput, is_async_fn, - is_durable_sync_fn, is_sync_fn, ) from hatchet_sdk.utils.timedelta_to_expression import Duration, timedelta_to_expr @@ -41,16 +57,45 @@ from hatchet_sdk.worker.runner.utils.capture_logs import AsyncLogSender if TYPE_CHECKING: from hatchet_sdk.runnables.workflow import Workflow +T = TypeVar("T") +P = ParamSpec("P") + + +class Depends(Generic[T, TWorkflowInput]): + def __init__( + self, fn: Callable[[TWorkflowInput, Context], T | CoroutineLike[T]] + ) -> None: + sig = signature(fn) + params = list(sig.parameters.values()) + + if len(params) != 2: + raise InvalidDependencyError( + f"Dependency function {fn.__name__} must have exactly two parameters: input and ctx." + ) + + self.fn = fn + + +class DependencyToInject(BaseModel): + model_config = ConfigDict(arbitrary_types_allowed=True) + + name: str + value: Any + class Task(Generic[TWorkflowInput, R]): def __init__( self, _fn: ( - Callable[[TWorkflowInput, Context], R | CoroutineLike[R]] - | Callable[[TWorkflowInput, Context], AwaitableLike[R]] + Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]] + | Callable[Concatenate[TWorkflowInput, Context, P], AwaitableLike[R]] | ( - Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]] - | Callable[[TWorkflowInput, DurableContext], AwaitableLike[R]] + Callable[ + Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R] + ] + | Callable[ + Concatenate[TWorkflowInput, DurableContext, P], AwaitableLike[R] + ] ) ), is_durable: bool, @@ -100,33 +145,74 @@ class Task(Generic[TWorkflowInput, R]): step_output=return_type if is_basemodel_subclass(return_type) else None, ) - def call(self, ctx: Context | DurableContext) -> R: + async def _parse_parameter( + self, + name: str, + param: Parameter, + input: TWorkflowInput, + ctx: Context | DurableContext, + ) -> DependencyToInject | None: + annotation = param.annotation + + if get_origin(annotation) is Annotated: + args = get_args(annotation) + + if len(args) < 2: + return None + + metadata = args[1:] + + for item in metadata: + if isinstance(item, Depends): + if iscoroutinefunction(item.fn): + return DependencyToInject( + name=name, value=await item.fn(input, ctx) + ) + + return DependencyToInject( + name=name, value=await asyncio.to_thread(item.fn, input, ctx) + ) + + return None + + async def _unpack_dependencies( + self, ctx: Context | DurableContext + ) -> dict[str, Any]: + sig = signature(self.fn) + input = self.workflow._get_workflow_input(ctx) + return { + parsed.name: parsed.value + for n, p in sig.parameters.items() + if (parsed := await self._parse_parameter(n, p, input, ctx)) is not None + } + + def call( + self, ctx: Context | DurableContext, dependencies: dict[str, Any] | None = None + ) -> R: if self.is_async_function: raise TypeError(f"{self.name} is not a sync function. Use `acall` instead.") workflow_input = self.workflow._get_workflow_input(ctx) + dependencies = dependencies or {} - if self.is_durable: - fn = cast(Callable[[TWorkflowInput, DurableContext], R], self.fn) - if is_durable_sync_fn(fn): - return fn(workflow_input, cast(DurableContext, ctx)) - else: - fn = cast(Callable[[TWorkflowInput, Context], R], self.fn) - if is_sync_fn(fn): - return fn(workflow_input, ctx) + if is_sync_fn(self.fn): # type: ignore + return self.fn(workflow_input, cast(Context, ctx), **dependencies) # type: ignore raise TypeError(f"{self.name} is not a sync function. Use `acall` instead.") - async def aio_call(self, ctx: Context | DurableContext) -> R: + async def aio_call( + self, ctx: Context | DurableContext, dependencies: dict[str, Any] | None = None + ) -> R: if not self.is_async_function: raise TypeError( f"{self.name} is not an async function. Use `call` instead." ) workflow_input = self.workflow._get_workflow_input(ctx) + dependencies = dependencies or {} if is_async_fn(self.fn): # type: ignore - return await self.fn(workflow_input, cast(Context, ctx)) # type: ignore + return await self.fn(workflow_input, cast(Context, ctx), **dependencies) # type: ignore raise TypeError(f"{self.name} is not an async function. Use `call` instead.") @@ -255,6 +341,7 @@ class Task(Generic[TWorkflowInput, R]): parent_outputs: dict[str, JSONSerializableMapping] | None = None, retry_count: int = 0, lifespan: Any = None, + dependencies: dict[str, Any] | None = None, ) -> R: """ Mimic the execution of a task. This method is intended to be used to unit test @@ -266,6 +353,7 @@ class Task(Generic[TWorkflowInput, R]): :param parent_outputs: Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. :param retry_count: The number of times the task has been retried. :param lifespan: The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. + :param dependencies: Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`. :return: The output of the task. :raises TypeError: If the task is an async function and `mock_run` is called, or if the task is a sync function and `aio_mock_run` is called. @@ -280,7 +368,7 @@ class Task(Generic[TWorkflowInput, R]): input, additional_metadata, parent_outputs, retry_count, lifespan ) - return self.call(ctx) + return self.call(ctx, dependencies) async def aio_mock_run( self, @@ -289,6 +377,7 @@ class Task(Generic[TWorkflowInput, R]): parent_outputs: dict[str, JSONSerializableMapping] | None = None, retry_count: int = 0, lifespan: Any = None, + dependencies: dict[str, Any] | None = None, ) -> R: """ Mimic the execution of a task. This method is intended to be used to unit test @@ -300,6 +389,7 @@ class Task(Generic[TWorkflowInput, R]): :param parent_outputs: Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. :param retry_count: The number of times the task has been retried. :param lifespan: The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. + :param dependencies: Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`. :return: The output of the task. :raises TypeError: If the task is an async function and `mock_run` is called, or if the task is a sync function and `aio_mock_run` is called. @@ -318,4 +408,4 @@ class Task(Generic[TWorkflowInput, R]): lifespan, ) - return await self.aio_call(ctx) + return await self.aio_call(ctx, dependencies) diff --git a/sdks/python/hatchet_sdk/runnables/workflow.py b/sdks/python/hatchet_sdk/runnables/workflow.py index 30bf15dec..2c3d28fdf 100644 --- a/sdks/python/hatchet_sdk/runnables/workflow.py +++ b/sdks/python/hatchet_sdk/runnables/workflow.py @@ -5,8 +5,10 @@ from functools import cached_property from typing import ( TYPE_CHECKING, Any, + Concatenate, Generic, Literal, + ParamSpec, TypeVar, cast, get_type_hints, @@ -60,6 +62,7 @@ if TYPE_CHECKING: T = TypeVar("T") +P = ParamSpec("P") def fall_back_to_default(value: T, param_default: T, fallback_value: T | None) -> T: @@ -800,7 +803,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]): skip_if: list[Condition | OrGroup] | None = None, cancel_if: list[Condition | OrGroup] | None = None, ) -> Callable[ - [Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]], + [Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]], Task[TWorkflowInput, R], ]: """ @@ -845,7 +848,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]): ) def inner( - func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]], + func: Callable[ + Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R] + ], ) -> Task[TWorkflowInput, R]: task = Task( _fn=func, @@ -892,7 +897,11 @@ class Workflow(BaseWorkflow[TWorkflowInput]): skip_if: list[Condition | OrGroup] | None = None, cancel_if: list[Condition | OrGroup] | None = None, ) -> Callable[ - [Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]]], + [ + Callable[ + Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R] + ] + ], Task[TWorkflowInput, R], ]: """ @@ -941,7 +950,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]): ) def inner( - func: Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]], + func: Callable[ + Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R] + ], ) -> Task[TWorkflowInput, R]: task = Task( _fn=func, @@ -983,7 +994,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]): backoff_max_seconds: int | None = None, concurrency: list[ConcurrencyExpression] | None = None, ) -> Callable[ - [Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]], + [Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]], Task[TWorkflowInput, R], ]: """ @@ -1009,7 +1020,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]): """ def inner( - func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]], + func: Callable[ + Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R] + ], ) -> Task[TWorkflowInput, R]: task = Task( is_durable=False, @@ -1051,7 +1064,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]): backoff_max_seconds: int | None = None, concurrency: list[ConcurrencyExpression] | None = None, ) -> Callable[ - [Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]], + [Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]], Task[TWorkflowInput, R], ]: """ @@ -1077,7 +1090,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]): """ def inner( - func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]], + func: Callable[ + Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R] + ], ) -> Task[TWorkflowInput, R]: task = Task( is_durable=False, diff --git a/sdks/python/hatchet_sdk/utils/typing.py b/sdks/python/hatchet_sdk/utils/typing.py index 5bf50fe99..0d5dafad7 100644 --- a/sdks/python/hatchet_sdk/utils/typing.py +++ b/sdks/python/hatchet_sdk/utils/typing.py @@ -1,5 +1,6 @@ import sys from collections.abc import Awaitable, Coroutine, Generator +from enum import Enum from typing import Any, Literal, TypeAlias, TypeGuard, TypeVar from pydantic import BaseModel @@ -31,3 +32,29 @@ else: STOP_LOOP_TYPE = Literal["STOP_LOOP"] STOP_LOOP: STOP_LOOP_TYPE = "STOP_LOOP" # Sentinel object to stop the loop + + +class LogLevel(str, Enum): + DEBUG = "DEBUG" + INFO = "INFO" + WARN = "WARN" + ERROR = "ERROR" + + @classmethod + def from_levelname(cls, levelname: str) -> "LogLevel": + levelname = levelname.upper() + + if levelname == "DEBUG": + return cls.DEBUG + + if levelname == "INFO": + return cls.INFO + + if levelname in ["WARNING", "WARN"]: + return cls.WARN + + if levelname == "ERROR": + return cls.ERROR + + # fall back to INFO + return cls.INFO diff --git a/sdks/python/hatchet_sdk/worker/runner/runner.py b/sdks/python/hatchet_sdk/worker/runner/runner.py index 0dbec9b59..093300626 100644 --- a/sdks/python/hatchet_sdk/worker/runner/runner.py +++ b/sdks/python/hatchet_sdk/worker/runner/runner.py @@ -166,22 +166,22 @@ class Runner: except Exception as e: should_not_retry = isinstance(e, NonRetryableException) - exc = TaskRunError.from_exception(e) + exc = TaskRunError.from_exception(e, action.step_run_id) # This except is coming from the application itself, so we want to send that to the Hatchet instance self.event_queue.put( ActionEvent( action=action, type=STEP_EVENT_TYPE_FAILED, - payload=exc.serialize(), + payload=exc.serialize(include_metadata=True), should_not_retry=should_not_retry, ) ) - log_with_level = logger.info if should_not_retry else logger.error + log_with_level = logger.info if should_not_retry else logger.exception log_with_level( - f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}" + f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}" ) return @@ -198,18 +198,18 @@ class Runner: ) ) except IllegalTaskOutputError as e: - exc = TaskRunError.from_exception(e) + exc = TaskRunError.from_exception(e, action.step_run_id) self.event_queue.put( ActionEvent( action=action, type=STEP_EVENT_TYPE_FAILED, - payload=exc.serialize(), + payload=exc.serialize(include_metadata=True), should_not_retry=False, ) ) - logger.error( - f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}" + logger.exception( + f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}" ) return @@ -230,19 +230,19 @@ class Runner: try: output = task.result() except Exception as e: - exc = TaskRunError.from_exception(e) + exc = TaskRunError.from_exception(e, action.step_run_id) self.event_queue.put( ActionEvent( action=action, type=GROUP_KEY_EVENT_TYPE_FAILED, - payload=exc.serialize(), + payload=exc.serialize(include_metadata=True), should_not_retry=False, ) ) - logger.error( - f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}" + logger.exception( + f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}" ) return @@ -259,18 +259,18 @@ class Runner: ) ) except IllegalTaskOutputError as e: - exc = TaskRunError.from_exception(e) + exc = TaskRunError.from_exception(e, action.step_run_id) self.event_queue.put( ActionEvent( action=action, type=STEP_EVENT_TYPE_FAILED, - payload=exc.serialize(), + payload=exc.serialize(include_metadata=True), should_not_retry=False, ) ) - logger.error( - f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}" + logger.exception( + f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}" ) return @@ -280,12 +280,16 @@ class Runner: return inner_callback def thread_action_func( - self, ctx: Context, task: Task[TWorkflowInput, R], action: Action + self, + ctx: Context, + task: Task[TWorkflowInput, R], + action: Action, + dependencies: dict[str, Any], ) -> R: if action.step_run_id or action.get_group_key_run_id: self.threads[action.key] = current_thread() - return task.call(ctx) + return task.call(ctx, dependencies) # We wrap all actions in an async func async def async_wrapped_action_func( @@ -300,9 +304,12 @@ class Runner: ctx_action_key.set(action.key) ctx_additional_metadata.set(action.additional_metadata) + dependencies = await task._unpack_dependencies(ctx) + try: if task.is_async_function: - return await task.aio_call(ctx) + return await task.aio_call(ctx, dependencies) + pfunc = functools.partial( # we must copy the context vars to the new thread, as only asyncio natively supports # contextvars @@ -343,6 +350,7 @@ class Runner: ctx, task, action, + dependencies, ) loop = asyncio.get_event_loop() diff --git a/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py b/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py index 2b21dc82f..010837abc 100644 --- a/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py +++ b/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py @@ -16,7 +16,12 @@ from hatchet_sdk.runnables.contextvars import ( ctx_worker_id, ctx_workflow_run_id, ) -from hatchet_sdk.utils.typing import STOP_LOOP, STOP_LOOP_TYPE, JSONSerializableMapping +from hatchet_sdk.utils.typing import ( + STOP_LOOP, + STOP_LOOP_TYPE, + JSONSerializableMapping, + LogLevel, +) T = TypeVar("T") P = ParamSpec("P") @@ -67,6 +72,7 @@ def copy_context_vars( class LogRecord(BaseModel): message: str step_run_id: str + level: LogLevel class AsyncLogSender: @@ -86,6 +92,7 @@ class AsyncLogSender: self.event_client.log, message=record.message, step_run_id=record.step_run_id, + level=record.level, ) except Exception: logger.exception("failed to send log to Hatchet") @@ -97,7 +104,7 @@ class AsyncLogSender: logger.warning("log queue is full, dropping log message") -class CustomLogHandler(logging.StreamHandler): # type: ignore[type-arg] +class LogForwardingHandler(logging.StreamHandler): # type: ignore[type-arg] def __init__(self, log_sender: AsyncLogSender, stream: StringIO): super().__init__(stream) @@ -112,7 +119,13 @@ class CustomLogHandler(logging.StreamHandler): # type: ignore[type-arg] if not step_run_id: return - self.log_sender.publish(LogRecord(message=log_entry, step_run_id=step_run_id)) + self.log_sender.publish( + LogRecord( + message=log_entry, + step_run_id=step_run_id, + level=LogLevel.from_levelname(record.levelname), + ) + ) def capture_logs( @@ -121,27 +134,27 @@ def capture_logs( @functools.wraps(func) async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: log_stream = StringIO() - custom_handler = CustomLogHandler(log_sender, log_stream) - custom_handler.setLevel(logger.level) + log_forwarder = LogForwardingHandler(log_sender, log_stream) + log_forwarder.setLevel(logger.level) if logger.handlers: for handler in logger.handlers: if handler.formatter: - custom_handler.setFormatter(handler.formatter) + log_forwarder.setFormatter(handler.formatter) break for handler in logger.handlers: for filter_obj in handler.filters: - custom_handler.addFilter(filter_obj) + log_forwarder.addFilter(filter_obj) - if not any(h for h in logger.handlers if isinstance(h, CustomLogHandler)): - logger.addHandler(custom_handler) + if not any(h for h in logger.handlers if isinstance(h, LogForwardingHandler)): + logger.addHandler(log_forwarder) try: result = await func(*args, **kwargs) finally: - custom_handler.flush() - logger.removeHandler(custom_handler) + log_forwarder.flush() + logger.removeHandler(log_forwarder) log_stream.close() return result diff --git a/sdks/python/poetry.lock b/sdks/python/poetry.lock index c664e043b..fce519b42 100644 --- a/sdks/python/poetry.lock +++ b/sdks/python/poetry.lock @@ -319,26 +319,6 @@ files = [ [package.dependencies] beautifulsoup4 = "*" -[[package]] -name = "cel-python" -version = "0.2.0" -description = "Pure Python implementation of Google Common Expression Language" -optional = false -python-versions = "<4.0,>=3.8" -groups = ["main"] -files = [ - {file = "cel_python-0.2.0-py3-none-any.whl", hash = "sha256:478ff73def7b39d51e6982f95d937a57c2b088c491c578fe5cecdbd79f476f60"}, - {file = "cel_python-0.2.0.tar.gz", hash = "sha256:75de72a5cf223ec690b236f0cc24da267219e667bd3e7f8f4f20595fcc1c0c0f"}, -] - -[package.dependencies] -jmespath = ">=1.0.1,<2.0.0" -lark = ">=0.12.0,<0.13.0" -python-dateutil = ">=2.9.0.post0,<3.0.0" -pyyaml = ">=6.0.1,<7.0.0" -types-python-dateutil = ">=2.9.0.20240316,<3.0.0.0" -types-pyyaml = ">=6.0.12.20240311,<7.0.0.0" - [[package]] name = "certifi" version = "2025.6.15" @@ -1104,35 +1084,6 @@ files = [ {file = "jiter-0.10.0.tar.gz", hash = "sha256:07a7142c38aacc85194391108dc91b5b57093c978a9932bd86a36862759d9500"}, ] -[[package]] -name = "jmespath" -version = "1.0.1" -description = "JSON Matching Expressions" -optional = false -python-versions = ">=3.7" -groups = ["main"] -files = [ - {file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"}, - {file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"}, -] - -[[package]] -name = "lark" -version = "0.12.0" -description = "a modern parsing library" -optional = false -python-versions = "*" -groups = ["main"] -files = [ - {file = "lark-0.12.0-py2.py3-none-any.whl", hash = "sha256:ed1d891cbcf5151ead1c1d14663bf542443e579e63a76ae175b01b899bd854ca"}, - {file = "lark-0.12.0.tar.gz", hash = "sha256:7da76fcfddadabbbbfd949bbae221efd33938451d90b1fefbbc423c3cccf48ef"}, -] - -[package.extras] -atomic-cache = ["atomicwrites"] -nearley = ["js2py"] -regex = ["regex"] - [[package]] name = "markdown" version = "3.8" @@ -2378,7 +2329,7 @@ version = "6.0.2" description = "YAML parser and emitter for Python" optional = false python-versions = ">=3.8" -groups = ["main", "docs"] +groups = ["docs"] files = [ {file = "PyYAML-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0a9a2848a5b7feac301353437eb7d5957887edbf81d56e903999a75a3d743086"}, {file = "PyYAML-6.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:29717114e51c84ddfba879543fb232a6ed60086602313ca38cce623c1d62cfbf"}, @@ -2708,30 +2659,6 @@ files = [ {file = "types_psutil-6.1.0.20241221.tar.gz", hash = "sha256:600f5a36bd5e0eb8887f0e3f3ff2cf154d90690ad8123c8a707bba4ab94d3185"}, ] -[[package]] -name = "types-python-dateutil" -version = "2.9.0.20250516" -description = "Typing stubs for python-dateutil" -optional = false -python-versions = ">=3.9" -groups = ["main"] -files = [ - {file = "types_python_dateutil-2.9.0.20250516-py3-none-any.whl", hash = "sha256:2b2b3f57f9c6a61fba26a9c0ffb9ea5681c9b83e69cd897c6b5f668d9c0cab93"}, - {file = "types_python_dateutil-2.9.0.20250516.tar.gz", hash = "sha256:13e80d6c9c47df23ad773d54b2826bd52dbbb41be87c3f339381c1700ad21ee5"}, -] - -[[package]] -name = "types-pyyaml" -version = "6.0.12.20250516" -description = "Typing stubs for PyYAML" -optional = false -python-versions = ">=3.9" -groups = ["main"] -files = [ - {file = "types_pyyaml-6.0.12.20250516-py3-none-any.whl", hash = "sha256:8478208feaeb53a34cb5d970c56a7cd76b72659442e733e268a94dc72b2d0530"}, - {file = "types_pyyaml-6.0.12.20250516.tar.gz", hash = "sha256:9f21a70216fc0fa1b216a8176db5f9e0af6eb35d2f2932acb87689d03a5bf6ba"}, -] - [[package]] name = "types-requests" version = "2.32.4.20250611" @@ -3104,4 +3031,4 @@ otel = ["opentelemetry-api", "opentelemetry-distro", "opentelemetry-exporter-otl [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "6b1e0ad6f029925d5a3c5c2b264456680376e833f6ed344f9d5da0cf8c418a31" +content-hash = "7ed68cf2a4288fc5b8817a794d847c2b56ce7398770e1d8fdd2be02461dbd1a0" diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index db85ba3ce..d67cf7a0b 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.16.5" +version = "1.17.0" description = "" authors = ["Alexander Belanger "] readme = "README.md" @@ -25,7 +25,6 @@ aiostream = "^0.5.2" aiohttp = "^3.10.5" aiohttp-retry = "^2.8.3" tenacity = ">=8.4.1" -cel-python = "^0.2.0" opentelemetry-api = { version = "^1.28.0", optional = true } opentelemetry-sdk = { version = "^1.28.0", optional = true } opentelemetry-instrumentation = { version = ">=0.49b0", optional = true } diff --git a/sdks/python/tests/correct_failure_on_timeout_with_multi_concurrency/test_case.py b/sdks/python/tests/correct_failure_on_timeout_with_multi_concurrency/test_case.py index 34565dc6c..90d48bb68 100644 --- a/sdks/python/tests/correct_failure_on_timeout_with_multi_concurrency/test_case.py +++ b/sdks/python/tests/correct_failure_on_timeout_with_multi_concurrency/test_case.py @@ -45,7 +45,7 @@ async def test_failure_on_timeout( except Exception: pass - await asyncio.sleep(3 * TIMEOUT_SECONDS) + await asyncio.sleep(4 * TIMEOUT_SECONDS) results = { r.workflow_run_id: await hatchet.runs.aio_get(r.workflow_run_id) for r in runs