[Python] Feat: Dependency Injection, Improved error handling (#2067)

* feat: first pass at dependency injection

* feat: add DI example + tests

* feat: finish up tests

* feat: docs

* chore: gen

* chore: lint

* chore: changelog + ver

* fix: split up paragraphs

* refactor: improve impl

* chore: gen

* feat: inject input + ctx into deps

* chore: gen

* [Python] Feat: More use of `logger.exception` (#2069)

* feat: add more instances of `logger.exception`

* chore: ver

* chore: changelog

* fix: one more error log

* chore: gen

* chore: gen

* chore: lint

* fix: improve shutdown

* chore: changelog

* unwind: exit handler

* feat: task run error

* feat: improve error serde with more context

* chore: changelog

* fix: changelog

* chore: gen

* fix: rm celpy + lark dep, casing issues

* chore: changelog

* fix: respect log levels over the API

* fix: changelog

* refactor: rename log forwarder

* fix: circular import
This commit is contained in:
matt
2025-08-11 23:10:44 -04:00
committed by GitHub
parent 8a0e88ac48
commit c8d5144ed4
47 changed files with 1082 additions and 272 deletions
+4
View File
@@ -35,6 +35,7 @@ func main() {
selectedWorkflow := (*workflows.Rows)[0]
selectedWorkflowUUID := uuid.MustParse(selectedWorkflow.Metadata.Id)
// > List runs
workflowRuns, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{
WorkflowIds: &[]types.UUID{selectedWorkflowUUID},
@@ -49,6 +50,7 @@ func main() {
runIds = append(runIds, uuid.MustParse(run.Metadata.Id))
}
// > Cancel by run ids
_, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{
ExternalIds: &runIds,
@@ -57,6 +59,7 @@ func main() {
log.Fatalf("failed to cancel runs by ids: %v", err)
}
// > Cancel by filters
tNow := time.Now().UTC()
@@ -73,5 +76,6 @@ func main() {
log.Fatalf("failed to cancel runs by filters: %v", err)
}
fmt.Println("cancelled all runs for workflow", selectedWorkflow.Name)
}
+1
View File
@@ -37,3 +37,4 @@ func main() {
fmt.Println("\nStreaming completed!")
}
+1
View File
@@ -58,3 +58,4 @@ func main() {
log.Println("Failed to start server:", err)
}
}
+1
View File
@@ -50,6 +50,7 @@ func StreamTask(ctx worker.HatchetContext, input StreamTaskInput) (*StreamTaskOu
}, nil
}
func StreamingWorkflow(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[StreamTaskInput, StreamTaskOutput] {
return factory.NewTask(
create.StandaloneTask{
@@ -0,0 +1,49 @@
import pytest
from examples.dependency_injection.worker import (
ASYNC_DEPENDENCY_VALUE,
SYNC_DEPENDENCY_VALUE,
Output,
async_dep,
async_task_with_dependencies,
di_workflow,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
sync_dep,
sync_task_with_dependencies,
)
from hatchet_sdk import EmptyModel
from hatchet_sdk.runnables.workflow import Standalone
@pytest.mark.parametrize(
"task",
[
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
],
)
@pytest.mark.asyncio(loop_scope="session")
async def test_di_standalones(
task: Standalone[EmptyModel, Output],
) -> None:
result = await task.aio_run()
assert isinstance(result, Output)
assert result.sync_dep == SYNC_DEPENDENCY_VALUE
assert result.async_dep == ASYNC_DEPENDENCY_VALUE
@pytest.mark.asyncio(loop_scope="session")
async def test_di_workflows() -> None:
result = await di_workflow.aio_run()
assert len(result) == 4
for output in result.values():
parsed = Output.model_validate(output)
assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE
assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE
@@ -0,0 +1,159 @@
# > Simple
from typing import Annotated
from pydantic import BaseModel
from hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet
hatchet = Hatchet(debug=False)
SYNC_DEPENDENCY_VALUE = "sync_dependency_value"
ASYNC_DEPENDENCY_VALUE = "async_dependency_value"
# > Declare dependencies
async def async_dep(input: EmptyModel, ctx: Context) -> str:
return ASYNC_DEPENDENCY_VALUE
def sync_dep(input: EmptyModel, ctx: Context) -> str:
return SYNC_DEPENDENCY_VALUE
class Output(BaseModel):
sync_dep: str
async_dep: str
# > Inject dependencies
@hatchet.task()
async def async_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@hatchet.task()
def sync_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@hatchet.durable_task()
async def durable_async_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@hatchet.durable_task()
def durable_sync_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
di_workflow = hatchet.workflow(
name="dependency-injection-workflow",
)
@di_workflow.task()
async def wf_async_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@di_workflow.task()
def wf_sync_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@di_workflow.durable_task()
async def wf_durable_async_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@di_workflow.durable_task()
def wf_durable_sync_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
def main() -> None:
worker = hatchet.worker(
"dependency-injection-worker",
workflows=[
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
di_workflow,
],
)
worker.start()
if __name__ == "__main__":
main()
+1 -1
View File
@@ -71,4 +71,4 @@ async def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:
second_sleep_result = await first_sleep.aio_result()
"""We've already slept for a little bit by the time the task is cancelled"""
assert second_sleep_result["runtime"] < SLEEP_TIME
assert second_sleep_result["runtime"] <= SLEEP_TIME
+13 -6
View File
@@ -2,8 +2,9 @@ import json
from datetime import timedelta
from hatchet_sdk import Context, EmptyModel, Hatchet
from hatchet_sdk.exceptions import TaskRunError
hatchet = Hatchet(debug=True)
hatchet = Hatchet(debug=False)
ERROR_TEXT = "step1 failed"
@@ -49,14 +50,20 @@ def details_step1(input: EmptyModel, ctx: Context) -> None:
# 👀 After the workflow fails, this special step will run
@on_failure_wf_with_details.on_failure_task()
def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:
error = ctx.fetch_task_run_error(details_step1)
def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]:
error = ctx.get_task_run_error(details_step1)
if not error:
return {"status": "unexpected success"}
# 👀 we can access the failure details here
print(json.dumps(error, indent=2))
assert isinstance(error, TaskRunError)
if error and error.startswith(ERROR_TEXT):
return {"status": "success"}
if "step1 failed" in error.exc:
return {
"status": "success",
"failed_run_external_id": error.task_run_external_id,
}
raise Exception("unexpected failure")
+45 -31
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -114,7 +114,7 @@ propcache = ">=0.2.0"
yarl = ">=1.17.0,<2.0"
[package.extras]
speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""]
speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"]
[[package]]
name = "aiohttp-retry"
@@ -199,12 +199,12 @@ files = [
]
[package.extras]
benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"]
cov = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"]
dev = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"]
benchmark = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins", "pytest-xdist[psutil]"]
cov = ["cloudpickle", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"]
dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"]
docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier"]
tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"]
tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\""]
tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"]
tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"]
[[package]]
name = "cel-python"
@@ -460,14 +460,14 @@ setuptools = "*"
[[package]]
name = "hatchet-sdk"
version = "1.16.5"
version = "1.0.0a1"
description = ""
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "hatchet_sdk-1.16.5-py3-none-any.whl", hash = "sha256:3a21edf4d5f1655facaea3dcfabb7b00dafc35a0a593ff088cbe21d1eb7b4db8"},
{file = "hatchet_sdk-1.16.5.tar.gz", hash = "sha256:c00e85a3a9b1789b85679e528deddd12a6d70ba321e16de9ec07e3fd1d373fb8"},
{file = "hatchet_sdk-1.0.0a1-py3-none-any.whl", hash = "sha256:bfc84358c8842cecd0d95b30645109733b7292dff0db1a776ca862785ee93d7f"},
{file = "hatchet_sdk-1.0.0a1.tar.gz", hash = "sha256:f0272bbaac6faed75ff727826e9f7b1ac42ae597f9b590e14d392aada9c9692f"},
]
[package.dependencies]
@@ -483,11 +483,13 @@ grpcio-tools = [
{version = ">=1.64.1,<1.68.dev0 || >=1.69.dev0", markers = "python_version < \"3.13\""},
{version = ">=1.69.0", markers = "python_version >= \"3.13\""},
]
prometheus-client = ">=0.21.1"
protobuf = ">=5.29.5,<6.0.0"
nest-asyncio = ">=1.6.0,<2.0.0"
prometheus-client = ">=0.21.1,<0.22.0"
protobuf = ">=5.29.1,<6.0.0"
pydantic = ">=2.6.3,<3.0.0"
pydantic-settings = ">=2.7.1,<3.0.0"
python-dateutil = ">=2.9.0.post0,<3.0.0"
pyyaml = ">=6.0.1,<7.0.0"
tenacity = ">=8.4.1"
urllib3 = ">=1.26.20"
@@ -643,6 +645,18 @@ files = [
[package.dependencies]
typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.11\""}
[[package]]
name = "nest-asyncio"
version = "1.6.0"
description = "Patch asyncio to allow nested event loops"
optional = false
python-versions = ">=3.5"
groups = ["main"]
files = [
{file = "nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c"},
{file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"},
]
[[package]]
name = "prometheus-client"
version = "0.21.1"
@@ -768,23 +782,23 @@ files = [
[[package]]
name = "protobuf"
version = "5.29.5"
version = "5.29.4"
description = ""
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "protobuf-5.29.5-cp310-abi3-win32.whl", hash = "sha256:3f1c6468a2cfd102ff4703976138844f78ebd1fb45f49011afc5139e9e283079"},
{file = "protobuf-5.29.5-cp310-abi3-win_amd64.whl", hash = "sha256:3f76e3a3675b4a4d867b52e4a5f5b78a2ef9565549d4037e06cf7b0942b1d3fc"},
{file = "protobuf-5.29.5-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:e38c5add5a311f2a6eb0340716ef9b039c1dfa428b28f25a7838ac329204a671"},
{file = "protobuf-5.29.5-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:fa18533a299d7ab6c55a238bf8629311439995f2e7eca5caaff08663606e9015"},
{file = "protobuf-5.29.5-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:63848923da3325e1bf7e9003d680ce6e14b07e55d0473253a690c3a8b8fd6e61"},
{file = "protobuf-5.29.5-cp38-cp38-win32.whl", hash = "sha256:ef91363ad4faba7b25d844ef1ada59ff1604184c0bcd8b39b8a6bef15e1af238"},
{file = "protobuf-5.29.5-cp38-cp38-win_amd64.whl", hash = "sha256:7318608d56b6402d2ea7704ff1e1e4597bee46d760e7e4dd42a3d45e24b87f2e"},
{file = "protobuf-5.29.5-cp39-cp39-win32.whl", hash = "sha256:6f642dc9a61782fa72b90878af134c5afe1917c89a568cd3476d758d3c3a0736"},
{file = "protobuf-5.29.5-cp39-cp39-win_amd64.whl", hash = "sha256:470f3af547ef17847a28e1f47200a1cbf0ba3ff57b7de50d22776607cd2ea353"},
{file = "protobuf-5.29.5-py3-none-any.whl", hash = "sha256:6cf42630262c59b2d8de33954443d94b746c952b01434fc58a417fdbd2e84bd5"},
{file = "protobuf-5.29.5.tar.gz", hash = "sha256:bc1463bafd4b0929216c35f437a8e28731a2b7fe3d98bb77a600efced5a15c84"},
{file = "protobuf-5.29.4-cp310-abi3-win32.whl", hash = "sha256:13eb236f8eb9ec34e63fc8b1d6efd2777d062fa6aaa68268fb67cf77f6839ad7"},
{file = "protobuf-5.29.4-cp310-abi3-win_amd64.whl", hash = "sha256:bcefcdf3976233f8a502d265eb65ea740c989bacc6c30a58290ed0e519eb4b8d"},
{file = "protobuf-5.29.4-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:307ecba1d852ec237e9ba668e087326a67564ef83e45a0189a772ede9e854dd0"},
{file = "protobuf-5.29.4-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:aec4962f9ea93c431d5714ed1be1c93f13e1a8618e70035ba2b0564d9e633f2e"},
{file = "protobuf-5.29.4-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:d7d3f7d1d5a66ed4942d4fefb12ac4b14a29028b209d4bfb25c68ae172059922"},
{file = "protobuf-5.29.4-cp38-cp38-win32.whl", hash = "sha256:1832f0515b62d12d8e6ffc078d7e9eb06969aa6dc13c13e1036e39d73bebc2de"},
{file = "protobuf-5.29.4-cp38-cp38-win_amd64.whl", hash = "sha256:476cb7b14914c780605a8cf62e38c2a85f8caff2e28a6a0bad827ec7d6c85d68"},
{file = "protobuf-5.29.4-cp39-cp39-win32.whl", hash = "sha256:fd32223020cb25a2cc100366f1dedc904e2d71d9322403224cdde5fdced0dabe"},
{file = "protobuf-5.29.4-cp39-cp39-win_amd64.whl", hash = "sha256:678974e1e3a9b975b8bc2447fca458db5f93a2fb6b0c8db46b6675b5b5346812"},
{file = "protobuf-5.29.4-py3-none-any.whl", hash = "sha256:3fde11b505e1597f71b875ef2fc52062b6a9740e5f7c8997ce878b6009145862"},
{file = "protobuf-5.29.4.tar.gz", hash = "sha256:4f1dfcd7997b31ef8f53ec82781ff434a28bf71d9102ddde14d076adcfc78c99"},
]
[[package]]
@@ -806,7 +820,7 @@ typing-extensions = ">=4.12.2"
[package.extras]
email = ["email-validator (>=2.0.0)"]
timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""]
timezone = ["tzdata"]
[[package]]
name = "pydantic-core"
@@ -1048,13 +1062,13 @@ files = [
]
[package.extras]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""]
core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"]
check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.8.0)"]
core = ["importlib_metadata (>=6)", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"]
cover = ["pytest-cov"]
doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"]
enabler = ["pytest-enabler (>=2.2)"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"]
test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"]
type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.14.*)", "pytest-mypy"]
[[package]]
name = "six"
@@ -1133,7 +1147,7 @@ files = [
]
[package.extras]
brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""]
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"]
h2 = ["h2 (>=4,<5)"]
socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"]
zstd = ["zstandard (>=0.18.0)"]
@@ -1238,4 +1252,4 @@ propcache = ">=0.2.0"
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "06949f41c097b940db1cb406e6ea99dec9c1bad09389ff5d08cd2d5b16f053a4"
content-hash = "74c12e499aa797ca5c8559af579f1212b0e4e3a77f068f9385db39d70ba304e0"
+12
View File
@@ -15,6 +15,13 @@ from examples.concurrency_workflow_level.worker import (
from examples.conditions.worker import task_condition_workflow
from examples.dag.worker import dag_workflow
from examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf
from examples.dependency_injection.worker import (
async_task_with_dependencies,
di_workflow,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
sync_task_with_dependencies,
)
from examples.durable.worker import durable_workflow, wait_for_sleep_twice
from examples.events.worker import event_workflow
from examples.fanout.worker import child_wf, parent_wf
@@ -61,6 +68,7 @@ def main() -> None:
sync_fanout_child,
non_retryable_workflow,
concurrency_workflow_level_workflow,
di_workflow,
lifespan_task,
simple,
simple_durable,
@@ -70,6 +78,10 @@ def main() -> None:
webhook,
return_exceptions_task,
wait_for_sleep_twice,
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
],
lifespan=lifespan,
)
@@ -68,6 +68,10 @@ const meta = {
title: 'Bulk Run Many',
href: '/home/bulk-run',
},
webhooks: {
title: 'Webhooks',
href: '/home/webhooks',
},
'--flow-control': {
title: 'Flow Control',
type: 'separator',
@@ -248,6 +252,10 @@ const meta = {
title: 'Lifespans',
href: '/home/lifespans',
},
dependency_injection: {
title: 'Dependency Injection',
href: '/home/dependency-injection',
},
blog: {
title: 'Blog',
type: 'page',
@@ -0,0 +1,5 @@
import test_dependency_injection from './test_dependency_injection';
import worker from './worker';
export { test_dependency_injection };
export { worker };
@@ -0,0 +1,12 @@
import { Snippet } from '@/next/lib/docs/generated/snips/types';
const snippet: Snippet = {
language: 'python',
content:
'import pytest\n\nfrom examples.dependency_injection.worker import (\n ASYNC_DEPENDENCY_VALUE,\n SYNC_DEPENDENCY_VALUE,\n Output,\n async_dep,\n async_task_with_dependencies,\n di_workflow,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n sync_dep,\n sync_task_with_dependencies,\n)\nfrom hatchet_sdk import EmptyModel\nfrom hatchet_sdk.runnables.workflow import Standalone\n\n\n@pytest.mark.parametrize(\n "task",\n [\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n ],\n)\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_di_standalones(\n task: Standalone[EmptyModel, Output],\n) -> None:\n result = await task.aio_run()\n\n assert isinstance(result, Output)\n assert result.sync_dep == SYNC_DEPENDENCY_VALUE\n assert result.async_dep == ASYNC_DEPENDENCY_VALUE\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_di_workflows() -> None:\n result = await di_workflow.aio_run()\n\n assert len(result) == 4\n\n for output in result.values():\n parsed = Output.model_validate(output)\n\n assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE\n assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE\n',
source: 'out/python/dependency_injection/test_dependency_injection.py',
blocks: {},
highlights: {},
};
export default snippet;
@@ -0,0 +1,21 @@
import { Snippet } from '@/next/lib/docs/generated/snips/types';
const snippet: Snippet = {
language: 'python',
content:
'# > Simple\n\nfrom typing import Annotated\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\nSYNC_DEPENDENCY_VALUE = "sync_dependency_value"\nASYNC_DEPENDENCY_VALUE = "async_dependency_value"\n\n\n# > Declare dependencies\nasync def async_dep(input: EmptyModel, ctx: Context) -> str:\n return ASYNC_DEPENDENCY_VALUE\n\n\ndef sync_dep(input: EmptyModel, ctx: Context) -> str:\n return SYNC_DEPENDENCY_VALUE\n\n\n\n\nclass Output(BaseModel):\n sync_dep: str\n async_dep: str\n\n\n# > Inject dependencies\n@hatchet.task()\nasync def async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n\n\n@hatchet.task()\ndef sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\nasync def durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\ndef durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndi_workflow = hatchet.workflow(\n name="dependency-injection-workflow",\n)\n\n\n@di_workflow.task()\nasync def wf_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.task()\ndef wf_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\nasync def wf_durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\ndef wf_durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "dependency-injection-worker",\n workflows=[\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n di_workflow,\n ],\n )\n worker.start()\n\n\n\nif __name__ == "__main__":\n main()\n',
source: 'out/python/dependency_injection/worker.py',
blocks: {
declare_dependencies: {
start: 16,
stop: 23,
},
inject_dependencies: {
start: 32,
stop: 44,
},
},
highlights: {},
};
export default snippet;
@@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types';
const snippet: Snippet = {
language: 'python',
content:
'import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {"test": "test"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == "ACTIVE"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker_durable")\n for w in active_workers\n )\n\n assert result["durable_task"]["status"] == "success"\n\n wait_group_1 = result["wait_for_or_group_1"]\n wait_group_2 = result["wait_for_or_group_2"]\n\n assert abs(wait_group_1["runtime"] - SLEEP_TIME) < 3\n\n assert wait_group_1["key"] == wait_group_2["key"]\n assert wait_group_1["key"] == "CREATE"\n assert "sleep" in wait_group_1["event_id"]\n assert "event" in wait_group_2["event_id"]\n\n wait_for_multi_sleep = result["wait_for_multi_sleep"]\n\n assert wait_for_multi_sleep["runtime"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n """We\'ve already slept for a little bit by the time the task is cancelled"""\n assert second_sleep_result["runtime"] < SLEEP_TIME\n',
'import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {"test": "test"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == "ACTIVE"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker_durable")\n for w in active_workers\n )\n\n assert result["durable_task"]["status"] == "success"\n\n wait_group_1 = result["wait_for_or_group_1"]\n wait_group_2 = result["wait_for_or_group_2"]\n\n assert abs(wait_group_1["runtime"] - SLEEP_TIME) < 3\n\n assert wait_group_1["key"] == wait_group_2["key"]\n assert wait_group_1["key"] == "CREATE"\n assert "sleep" in wait_group_1["event_id"]\n assert "event" in wait_group_2["event_id"]\n\n wait_for_multi_sleep = result["wait_for_multi_sleep"]\n\n assert wait_for_multi_sleep["runtime"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n """We\'ve already slept for a little bit by the time the task is cancelled"""\n assert second_sleep_result["runtime"] <= SLEEP_TIME\n',
source: 'out/python/durable/test_durable.py',
blocks: {},
highlights: {},
@@ -18,6 +18,7 @@ import * as cron from './cron';
import * as dag from './dag';
import * as dedupe from './dedupe';
import * as delayed from './delayed';
import * as dependency_injection from './dependency_injection';
import * as durable from './durable';
import * as durable_event from './durable_event';
import * as durable_sleep from './durable_sleep';
@@ -67,6 +68,7 @@ export { cron };
export { dag };
export { dedupe };
export { delayed };
export { dependency_injection };
export { durable };
export { durable_event };
export { durable_sleep };
@@ -3,16 +3,16 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types';
const snippet: Snippet = {
language: 'python',
content:
'import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=True)\n\nERROR_TEXT = "step1 failed"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name="OnFailureWorkflow")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {"status": "success"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name="OnFailureWorkflowWithDetails")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n error = ctx.fetch_task_run_error(details_step1)\n\n # 👀 we can access the failure details here\n print(json.dumps(error, indent=2))\n\n if error and error.startswith(ERROR_TEXT):\n return {"status": "success"}\n\n raise Exception("unexpected failure")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "on-failure-worker",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n',
'import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\nfrom hatchet_sdk.exceptions import TaskRunError\n\nhatchet = Hatchet(debug=False)\n\nERROR_TEXT = "step1 failed"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name="OnFailureWorkflow")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {"status": "success"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name="OnFailureWorkflowWithDetails")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]:\n error = ctx.get_task_run_error(details_step1)\n\n if not error:\n return {"status": "unexpected success"}\n\n # 👀 we can access the failure details here\n assert isinstance(error, TaskRunError)\n\n if "step1 failed" in error.exc:\n return {\n "status": "success",\n "failed_run_external_id": error.task_run_external_id,\n }\n\n raise Exception("unexpected failure")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "on-failure-worker",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n',
source: 'out/python/on_failure/worker.py',
blocks: {
onfailure_step: {
start: 11,
stop: 34,
start: 12,
stop: 35,
},
onfailure_with_details: {
start: 38,
stop: 63,
start: 39,
stop: 70,
},
},
highlights: {},
@@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types';
const snippet: Snippet = {
language: 'python',
content:
'from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.durable.worker import durable_workflow, wait_for_sleep_twice\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom examples.webhooks.worker import webhook\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "e2e-test-worker",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n webhook,\n return_exceptions_task,\n wait_for_sleep_twice,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n',
'from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.dependency_injection.worker import (\n async_task_with_dependencies,\n di_workflow,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n sync_task_with_dependencies,\n)\nfrom examples.durable.worker import durable_workflow, wait_for_sleep_twice\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom examples.webhooks.worker import webhook\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "e2e-test-worker",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n di_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n webhook,\n return_exceptions_task,\n wait_for_sleep_twice,\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n',
source: 'out/python/worker.py',
blocks: {},
highlights: {},
@@ -0,0 +1,5 @@
import test_dependency_injection from './test_dependency_injection';
import worker from './worker';
export { test_dependency_injection }
export { worker }
@@ -0,0 +1,11 @@
import { Snippet } from '@/lib/generated/snips/types';
const snippet: Snippet = {
"language": "python",
"content": "import pytest\n\nfrom examples.dependency_injection.worker import (\n ASYNC_DEPENDENCY_VALUE,\n SYNC_DEPENDENCY_VALUE,\n Output,\n async_dep,\n async_task_with_dependencies,\n di_workflow,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n sync_dep,\n sync_task_with_dependencies,\n)\nfrom hatchet_sdk import EmptyModel\nfrom hatchet_sdk.runnables.workflow import Standalone\n\n\n@pytest.mark.parametrize(\n \"task\",\n [\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n ],\n)\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_di_standalones(\n task: Standalone[EmptyModel, Output],\n) -> None:\n result = await task.aio_run()\n\n assert isinstance(result, Output)\n assert result.sync_dep == SYNC_DEPENDENCY_VALUE\n assert result.async_dep == ASYNC_DEPENDENCY_VALUE\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_di_workflows() -> None:\n result = await di_workflow.aio_run()\n\n assert len(result) == 4\n\n for output in result.values():\n parsed = Output.model_validate(output)\n\n assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE\n assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE\n",
"source": "out/python/dependency_injection/test_dependency_injection.py",
"blocks": {},
"highlights": {}
};
export default snippet;
@@ -0,0 +1,20 @@
import { Snippet } from '@/lib/generated/snips/types';
const snippet: Snippet = {
"language": "python",
"content": "# > Simple\n\nfrom typing import Annotated\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\nSYNC_DEPENDENCY_VALUE = \"sync_dependency_value\"\nASYNC_DEPENDENCY_VALUE = \"async_dependency_value\"\n\n\n# > Declare dependencies\nasync def async_dep(input: EmptyModel, ctx: Context) -> str:\n return ASYNC_DEPENDENCY_VALUE\n\n\ndef sync_dep(input: EmptyModel, ctx: Context) -> str:\n return SYNC_DEPENDENCY_VALUE\n\n\n\n\nclass Output(BaseModel):\n sync_dep: str\n async_dep: str\n\n\n# > Inject dependencies\n@hatchet.task()\nasync def async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n\n\n@hatchet.task()\ndef sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\nasync def durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@hatchet.durable_task()\ndef durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndi_workflow = hatchet.workflow(\n name=\"dependency-injection-workflow\",\n)\n\n\n@di_workflow.task()\nasync def wf_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.task()\ndef wf_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: Context,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\nasync def wf_durable_async_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\n@di_workflow.durable_task()\ndef wf_durable_sync_task_with_dependencies(\n _i: EmptyModel,\n ctx: DurableContext,\n async_dep: Annotated[str, Depends(async_dep)],\n sync_dep: Annotated[str, Depends(sync_dep)],\n) -> Output:\n return Output(\n sync_dep=sync_dep,\n async_dep=async_dep,\n )\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"dependency-injection-worker\",\n workflows=[\n async_task_with_dependencies,\n sync_task_with_dependencies,\n durable_async_task_with_dependencies,\n durable_sync_task_with_dependencies,\n di_workflow,\n ],\n )\n worker.start()\n\n\n\nif __name__ == \"__main__\":\n main()\n",
"source": "out/python/dependency_injection/worker.py",
"blocks": {
"declare_dependencies": {
"start": 16,
"stop": 23
},
"inject_dependencies": {
"start": 32,
"stop": 44
}
},
"highlights": {}
};
export default snippet;
@@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types';
const snippet: Snippet = {
"language": "python",
"content": "import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {\"test\": \"test\"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == \"ACTIVE\"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker\")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker_durable\")\n for w in active_workers\n )\n\n assert result[\"durable_task\"][\"status\"] == \"success\"\n\n wait_group_1 = result[\"wait_for_or_group_1\"]\n wait_group_2 = result[\"wait_for_or_group_2\"]\n\n assert abs(wait_group_1[\"runtime\"] - SLEEP_TIME) < 3\n\n assert wait_group_1[\"key\"] == wait_group_2[\"key\"]\n assert wait_group_1[\"key\"] == \"CREATE\"\n assert \"sleep\" in wait_group_1[\"event_id\"]\n assert \"event\" in wait_group_2[\"event_id\"]\n\n wait_for_multi_sleep = result[\"wait_for_multi_sleep\"]\n\n assert wait_for_multi_sleep[\"runtime\"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n \"\"\"We've already slept for a little bit by the time the task is cancelled\"\"\"\n assert second_sleep_result[\"runtime\"] < SLEEP_TIME\n",
"content": "import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {\"test\": \"test\"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == \"ACTIVE\"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker\")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker_durable\")\n for w in active_workers\n )\n\n assert result[\"durable_task\"][\"status\"] == \"success\"\n\n wait_group_1 = result[\"wait_for_or_group_1\"]\n wait_group_2 = result[\"wait_for_or_group_2\"]\n\n assert abs(wait_group_1[\"runtime\"] - SLEEP_TIME) < 3\n\n assert wait_group_1[\"key\"] == wait_group_2[\"key\"]\n assert wait_group_1[\"key\"] == \"CREATE\"\n assert \"sleep\" in wait_group_1[\"event_id\"]\n assert \"event\" in wait_group_2[\"event_id\"]\n\n wait_for_multi_sleep = result[\"wait_for_multi_sleep\"]\n\n assert wait_for_multi_sleep[\"runtime\"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n \"\"\"We've already slept for a little bit by the time the task is cancelled\"\"\"\n assert second_sleep_result[\"runtime\"] <= SLEEP_TIME\n",
"source": "out/python/durable/test_durable.py",
"blocks": {},
"highlights": {}
@@ -18,6 +18,7 @@ import * as cron from './cron';
import * as dag from './dag';
import * as dedupe from './dedupe';
import * as delayed from './delayed';
import * as dependency_injection from './dependency_injection';
import * as durable from './durable';
import * as durable_event from './durable_event';
import * as durable_sleep from './durable_sleep';
@@ -67,6 +68,7 @@ export { cron };
export { dag };
export { dedupe };
export { delayed };
export { dependency_injection };
export { durable };
export { durable_event };
export { durable_sleep };
@@ -2,16 +2,16 @@ import { Snippet } from '@/lib/generated/snips/types';
const snippet: Snippet = {
"language": "python",
"content": "import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=True)\n\nERROR_TEXT = \"step1 failed\"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name=\"OnFailureWorkflow\")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {\"status\": \"success\"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name=\"OnFailureWorkflowWithDetails\")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n error = ctx.fetch_task_run_error(details_step1)\n\n # 👀 we can access the failure details here\n print(json.dumps(error, indent=2))\n\n if error and error.startswith(ERROR_TEXT):\n return {\"status\": \"success\"}\n\n raise Exception(\"unexpected failure\")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"on-failure-worker\",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n",
"content": "import json\nfrom datetime import timedelta\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\nfrom hatchet_sdk.exceptions import TaskRunError\n\nhatchet = Hatchet(debug=False)\n\nERROR_TEXT = \"step1 failed\"\n\n# > OnFailure Step\n# This workflow will fail because the step will throw an error\n# we define an onFailure step to handle this case\n\non_failure_wf = hatchet.workflow(name=\"OnFailureWorkflow\")\n\n\n@on_failure_wf.task(execution_timeout=timedelta(seconds=1))\ndef step1(input: EmptyModel, ctx: Context) -> None:\n # 👀 this step will always raise an exception\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf.on_failure_task()\ndef on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:\n # 👀 we can do things like perform cleanup logic\n # or notify a user here\n\n # 👀 Fetch the errors from upstream step runs from the context\n print(ctx.task_run_errors)\n\n return {\"status\": \"success\"}\n\n\n\n\n# > OnFailure With Details\n# We can access the failure details in the onFailure step\n# via the context method\n\non_failure_wf_with_details = hatchet.workflow(name=\"OnFailureWorkflowWithDetails\")\n\n\n# ... defined as above\n@on_failure_wf_with_details.task(execution_timeout=timedelta(seconds=1))\ndef details_step1(input: EmptyModel, ctx: Context) -> None:\n raise Exception(ERROR_TEXT)\n\n\n# 👀 After the workflow fails, this special step will run\n@on_failure_wf_with_details.on_failure_task()\ndef details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]:\n error = ctx.get_task_run_error(details_step1)\n\n if not error:\n return {\"status\": \"unexpected success\"}\n\n # 👀 we can access the failure details here\n assert isinstance(error, TaskRunError)\n\n if \"step1 failed\" in error.exc:\n return {\n \"status\": \"success\",\n \"failed_run_external_id\": error.task_run_external_id,\n }\n\n raise Exception(\"unexpected failure\")\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"on-failure-worker\",\n slots=4,\n workflows=[on_failure_wf, on_failure_wf_with_details],\n )\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n",
"source": "out/python/on_failure/worker.py",
"blocks": {
"onfailure_step": {
"start": 11,
"stop": 34
"start": 12,
"stop": 35
},
"onfailure_with_details": {
"start": 38,
"stop": 63
"start": 39,
"stop": 70
}
},
"highlights": {}
+2 -1
View File
@@ -25,7 +25,7 @@ export default {
"cron-runs": "Cron Trigger",
"run-on-event": "Event Trigger",
"bulk-run": "Bulk Run Many",
"webhooks": "Webhooks",
webhooks: "Webhooks",
"--flow-control": {
title: "Flow Control",
type: "separator",
@@ -122,6 +122,7 @@ export default {
asyncio: "Asyncio",
pydantic: "Pydantic",
lifespans: "Lifespans",
"dependency-injection": "Dependency Injection",
blog: {
title: "Blog",
type: "page",
@@ -0,0 +1,45 @@
import snips from "@/lib/snips";
import { Snippet } from "@/components/code";
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
import UniversalTabs from "@/components/UniversalTabs";
# Dependency Injection
<Callout type="error" emoji="🚨">
Dependency injection is an **experimental feature** in Hatchet, and is subject
to change.
</Callout>
Hatchet's Python SDK allows you to inject **_dependencies_** into your tasks, FastAPI style. These dependencies can be either synchronous or asynchronous functions. They are executed before the task is triggered, and their results are injected into the task as parameters.
This behaves almost identically to [FastAPI's dependency injection](https://fastapi.tiangolo.com/tutorial/dependencies/), and is intended to be used in the same way. Dependencies are useful for sharing logic between tasks that you'd like to avoid repeating, or would like to factor out of the task logic itself (e.g. to make testing easier).
<Callout type="warning" emoji="⚠️">
Since dependencies are run before tasks are executed, having many dependencies (or any that take a long time to evaluate) can cause tasks to experience significantly delayed start times, as they must wait for all dependencies to finish evaluating.
</Callout>
## Usage
To add dependencies to your tasks, import `Depends` from the `hatchet_sdk`. Then:
<Snippet
src={snips.python.dependency_injection.worker}
block="declare_dependencies"
/>
In this example, we've declared two dependencies: one synchronous and one asynchronous. You can do anything you like in your dependencies, such as creating database sessions, managing configuration, sharing instances of service-layer logic, and more.
Once you've defined your dependency functions, inject them into your tasks as follows:
<Snippet
src={snips.python.dependency_injection.worker}
block="inject_dependencies"
/>
<Callout type="warning" emoji="⚠️">
Important note: Your dependency functions must take two positional arguments:
the workflow input and the `Context` (the same as any other task).
</Callout>
That's it! Now, whenever your task is triggered, its dependencies will be evaluated, and the results will be injected into the task at runtime for you to use as needed.
+37 -35
View File
@@ -42,7 +42,7 @@ Workflows support various execution patterns including:
Tasks within workflows can be defined with `@workflow.task()` or `@workflow.durable_task()` decorators and can be arranged into complex dependency patterns.
### Methods
Methods:
| Name | Description |
| ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ |
@@ -118,9 +118,9 @@ Parameters:
Returns:
| Type | Description |
| ------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[[TWorkflowInput, Context], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
| Type | Description |
| --------------------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[Concatenate[TWorkflowInput, Context, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
#### `durable_task`
@@ -150,9 +150,9 @@ Parameters:
Returns:
| Type | Description |
| -------------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[[TWorkflowInput, DurableContext], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
| Type | Description |
| ---------------------------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[Concatenate[TWorkflowInput, DurableContext, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
#### `on_failure_task`
@@ -173,9 +173,9 @@ Parameters:
Returns:
| Type | Description |
| ------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[[TWorkflowInput, Context], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
| Type | Description |
| --------------------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[Concatenate[TWorkflowInput, Context, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
#### `on_success_task`
@@ -196,9 +196,9 @@ Parameters:
Returns:
| Type | Description |
| ------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[[TWorkflowInput, Context], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
| Type | Description |
| --------------------------------------------------------------------------------------------------------------- | ------------------------------------------ |
| `Callable[[Callable[Concatenate[TWorkflowInput, Context, P], R \| CoroutineLike[R]]], Task[TWorkflowInput, R]]` | A decorator which creates a `Task` object. |
#### `run`
@@ -526,12 +526,12 @@ Returns:
Bases: `Generic[TWorkflowInput, R]`
### Methods
Methods:
| Name | Description |
| -------------- | ------------------------------------------------------------------------------- |
| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. |
| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. |
| Name | Description |
| -------------- | ------------------------------------------------------------------------------ |
| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test |
| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test |
### Functions
@@ -541,13 +541,14 @@ Mimic the execution of a task. This method is intended to be used to unit test t
Parameters:
| Name | Type | Description | Default |
| --------------------- | -------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `input` | `TWorkflowInput \| None` | The input to the task. | `None` |
| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` |
| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` |
| `retry_count` | `int` | The number of times the task has been retried. | `0` |
| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` |
| Name | Type | Description | Default |
| --------------------- | -------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `input` | `TWorkflowInput \| None` | The input to the task. | `None` |
| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` |
| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` |
| `retry_count` | `int` | The number of times the task has been retried. | `0` |
| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` |
| `dependencies` | `dict[str, Any] \| None` | Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`. | `None` |
Returns:
@@ -567,13 +568,14 @@ Mimic the execution of a task. This method is intended to be used to unit test t
Parameters:
| Name | Type | Description | Default |
| --------------------- | -------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `input` | `TWorkflowInput \| None` | The input to the task. | `None` |
| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` |
| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` |
| `retry_count` | `int` | The number of times the task has been retried. | `0` |
| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` |
| Name | Type | Description | Default |
| --------------------- | -------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- |
| `input` | `TWorkflowInput \| None` | The input to the task. | `None` |
| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to attach to the task. | `None` |
| `parent_outputs` | `dict[str, JSONSerializableMapping] \| None` | Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`. | `None` |
| `retry_count` | `int` | The number of times the task has been retried. | `0` |
| `lifespan` | `Any` | The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task. | `None` |
| `dependencies` | `dict[str, Any] \| None` | Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`. | `None` |
Returns:
@@ -591,7 +593,7 @@ Raises:
Bases: `BaseWorkflow[TWorkflowInput]`, `Generic[TWorkflowInput, R]`
### Methods
Methods:
| Name | Description |
| ---------------------- | ------------------------------------------------------------------------------------------------------------------------ |
@@ -617,8 +619,8 @@ Bases: `BaseWorkflow[TWorkflowInput]`, `Generic[TWorkflowInput, R]`
| `get_run_ref` | Get a reference to a task run by its run ID. |
| `get_result` | Get the result of a task run by its run ID. |
| `aio_get_result` | Get the result of a task run by its run ID. |
| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. |
| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test. |
| `mock_run` | Mimic the execution of a task. This method is intended to be used to unit test |
| `aio_mock_run` | Mimic the execution of a task. This method is intended to be used to unit test |
### Functions
+23
View File
@@ -5,6 +5,29 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.17.0] - 2025-08-12
### Added
- Adds support for dependency injection in tasks via the `Depends` class.
- Deprecated `fetch_task_run_error` in favor of `get_task_run_error`, which returns a `TaskRunError` object instead of a string. This allows for better error handling and debugging.
### Changed
- Uses `logger.exception` in place of `logger.error` in the action runner to improve (e.g.) Sentry error reporting
- Extends the `TaskRunError` to include the `task_run_external_id`, which is useful for debugging and tracing errors in task runs.
- Fixes an issue with logging which allows log levels to be respected over the API.
### Removed
- Removes the `cel-python` dependency
## [1.16.5] - 2025-08-07
### Changed
- Relaxes constraint on Prometheus dependency
## [1.16.4] - 2025-07-28
### Added
@@ -0,0 +1,49 @@
import pytest
from examples.dependency_injection.worker import (
ASYNC_DEPENDENCY_VALUE,
SYNC_DEPENDENCY_VALUE,
Output,
async_dep,
async_task_with_dependencies,
di_workflow,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
sync_dep,
sync_task_with_dependencies,
)
from hatchet_sdk import EmptyModel
from hatchet_sdk.runnables.workflow import Standalone
@pytest.mark.parametrize(
"task",
[
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
],
)
@pytest.mark.asyncio(loop_scope="session")
async def test_di_standalones(
task: Standalone[EmptyModel, Output],
) -> None:
result = await task.aio_run()
assert isinstance(result, Output)
assert result.sync_dep == SYNC_DEPENDENCY_VALUE
assert result.async_dep == ASYNC_DEPENDENCY_VALUE
@pytest.mark.asyncio(loop_scope="session")
async def test_di_workflows() -> None:
result = await di_workflow.aio_run()
assert len(result) == 4
for output in result.values():
parsed = Output.model_validate(output)
assert parsed.sync_dep == SYNC_DEPENDENCY_VALUE
assert parsed.async_dep == ASYNC_DEPENDENCY_VALUE
@@ -0,0 +1,162 @@
# > Simple
from typing import Annotated
from pydantic import BaseModel
from hatchet_sdk import Context, Depends, DurableContext, EmptyModel, Hatchet
hatchet = Hatchet(debug=False)
SYNC_DEPENDENCY_VALUE = "sync_dependency_value"
ASYNC_DEPENDENCY_VALUE = "async_dependency_value"
# > Declare dependencies
async def async_dep(input: EmptyModel, ctx: Context) -> str:
return ASYNC_DEPENDENCY_VALUE
def sync_dep(input: EmptyModel, ctx: Context) -> str:
return SYNC_DEPENDENCY_VALUE
# !!
class Output(BaseModel):
sync_dep: str
async_dep: str
# > Inject dependencies
@hatchet.task()
async def async_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
# !!
@hatchet.task()
def sync_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@hatchet.durable_task()
async def durable_async_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@hatchet.durable_task()
def durable_sync_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
di_workflow = hatchet.workflow(
name="dependency-injection-workflow",
)
@di_workflow.task()
async def wf_async_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@di_workflow.task()
def wf_sync_task_with_dependencies(
_i: EmptyModel,
ctx: Context,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@di_workflow.durable_task()
async def wf_durable_async_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
@di_workflow.durable_task()
def wf_durable_sync_task_with_dependencies(
_i: EmptyModel,
ctx: DurableContext,
async_dep: Annotated[str, Depends(async_dep)],
sync_dep: Annotated[str, Depends(sync_dep)],
) -> Output:
return Output(
sync_dep=sync_dep,
async_dep=async_dep,
)
def main() -> None:
worker = hatchet.worker(
"dependency-injection-worker",
workflows=[
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
di_workflow,
],
)
worker.start()
# !!
if __name__ == "__main__":
main()
+1 -1
View File
@@ -71,4 +71,4 @@ async def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:
second_sleep_result = await first_sleep.aio_result()
"""We've already slept for a little bit by the time the task is cancelled"""
assert second_sleep_result["runtime"] < SLEEP_TIME
assert second_sleep_result["runtime"] <= SLEEP_TIME
+13 -6
View File
@@ -2,8 +2,9 @@ import json
from datetime import timedelta
from hatchet_sdk import Context, EmptyModel, Hatchet
from hatchet_sdk.exceptions import TaskRunError
hatchet = Hatchet(debug=True)
hatchet = Hatchet(debug=False)
ERROR_TEXT = "step1 failed"
@@ -50,14 +51,20 @@ def details_step1(input: EmptyModel, ctx: Context) -> None:
# 👀 After the workflow fails, this special step will run
@on_failure_wf_with_details.on_failure_task()
def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str]:
error = ctx.fetch_task_run_error(details_step1)
def details_on_failure(input: EmptyModel, ctx: Context) -> dict[str, str | None]:
error = ctx.get_task_run_error(details_step1)
if not error:
return {"status": "unexpected success"}
# 👀 we can access the failure details here
print(json.dumps(error, indent=2))
assert isinstance(error, TaskRunError)
if error and error.startswith(ERROR_TEXT):
return {"status": "success"}
if "step1 failed" in error.exc:
return {
"status": "success",
"failed_run_external_id": error.task_run_external_id,
}
raise Exception("unexpected failure")
+12
View File
@@ -15,6 +15,13 @@ from examples.concurrency_workflow_level.worker import (
from examples.conditions.worker import task_condition_workflow
from examples.dag.worker import dag_workflow
from examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf
from examples.dependency_injection.worker import (
async_task_with_dependencies,
di_workflow,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
sync_task_with_dependencies,
)
from examples.durable.worker import durable_workflow, wait_for_sleep_twice
from examples.events.worker import event_workflow
from examples.fanout.worker import child_wf, parent_wf
@@ -61,6 +68,7 @@ def main() -> None:
sync_fanout_child,
non_retryable_workflow,
concurrency_workflow_level_workflow,
di_workflow,
lifespan_task,
simple,
simple_durable,
@@ -70,6 +78,10 @@ def main() -> None:
webhook,
return_exceptions_task,
wait_for_sleep_twice,
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
],
lifespan=lifespan,
)
+2 -1
View File
@@ -155,7 +155,7 @@ from hatchet_sdk.exceptions import (
from hatchet_sdk.features.cel import CELEvaluationResult, CELFailure, CELSuccess
from hatchet_sdk.features.runs import BulkCancelReplayOpts, RunFilter
from hatchet_sdk.hatchet import Hatchet
from hatchet_sdk.runnables.task import Task
from hatchet_sdk.runnables.task import Depends, Task
from hatchet_sdk.runnables.types import (
ConcurrencyExpression,
ConcurrencyLimitStrategy,
@@ -198,6 +198,7 @@ __all__ = [
"CreateWorkflowVersionOpts",
"DedupeViolationError",
"DefaultFilter",
"Depends",
"DurableContext",
"EmptyModel",
"Event",
+5 -2
View File
@@ -28,7 +28,7 @@ from hatchet_sdk.contracts.events_pb2 import (
)
from hatchet_sdk.contracts.events_pb2_grpc import EventsServiceStub
from hatchet_sdk.metadata import get_metadata
from hatchet_sdk.utils.typing import JSONSerializableMapping
from hatchet_sdk.utils.typing import JSONSerializableMapping, LogLevel
def proto_timestamp_now() -> timestamp_pb2.Timestamp:
@@ -180,11 +180,14 @@ class EventClient(BaseRestClient):
)
@tenacity_retry
def log(self, message: str, step_run_id: str) -> None:
def log(
self, message: str, step_run_id: str, level: LogLevel | None = None
) -> None:
request = PutLogRequest(
stepRunId=step_run_id,
createdAt=proto_timestamp_now(),
message=message,
level=level.value if level else None,
)
self.events_service_client.PutLog(request, metadata=get_metadata(self.token))
+32 -3
View File
@@ -21,10 +21,11 @@ from hatchet_sdk.conditions import (
flatten_conditions,
)
from hatchet_sdk.context.worker_context import WorkerContext
from hatchet_sdk.exceptions import TaskRunError
from hatchet_sdk.features.runs import RunsClient
from hatchet_sdk.logger import logger
from hatchet_sdk.utils.timedelta_to_expression import Duration, timedelta_to_expr
from hatchet_sdk.utils.typing import JSONSerializableMapping
from hatchet_sdk.utils.typing import JSONSerializableMapping, LogLevel
from hatchet_sdk.worker.runner.utils.capture_logs import AsyncLogSender, LogRecord
if TYPE_CHECKING:
@@ -211,7 +212,9 @@ class Context:
line = str(line)
logger.info(line)
self.log_sender.publish(LogRecord(message=line, step_run_id=self.step_run_id))
self.log_sender.publish(
LogRecord(message=line, step_run_id=self.step_run_id, level=LogLevel.INFO)
)
def release_slot(self) -> None:
"""
@@ -359,6 +362,27 @@ class Context:
self,
task: "Task[TWorkflowInput, R]",
) -> str | None:
"""
**DEPRECATED**: Use `get_task_run_error` instead.
A helper intended to be used in an on-failure step to retrieve the error that occurred in a specific upstream task run.
:param task: The task whose error you want to retrieve.
:return: The error message of the task run, or None if no error occurred.
"""
warn(
"`fetch_task_run_error` is deprecated. Use `get_task_run_error` instead.",
DeprecationWarning,
stacklevel=2,
)
errors = self.data.step_run_errors
return errors.get(task.name)
def get_task_run_error(
self,
task: "Task[TWorkflowInput, R]",
) -> TaskRunError | None:
"""
A helper intended to be used in an on-failure step to retrieve the error that occurred in a specific upstream task run.
@@ -367,7 +391,12 @@ class Context:
"""
errors = self.data.step_run_errors
return errors.get(task.name)
error = errors.get(task.name)
if not error:
return None
return TaskRunError.deserialize(error)
class DurableContext(Context):
+70 -5
View File
@@ -1,4 +1,10 @@
import json
import traceback
from typing import cast
class InvalidDependencyError(Exception):
pass
class NonRetryableException(Exception): # noqa: N818
@@ -9,28 +15,42 @@ class DedupeViolationError(Exception):
"""Raised by the Hatchet library to indicate that a workflow has already been run with this deduplication value."""
TASK_RUN_ERROR_METADATA_KEY = "__hatchet_error_metadata__"
class TaskRunError(Exception):
def __init__(
self,
exc: str,
exc_type: str,
trace: str,
task_run_external_id: str | None,
) -> None:
self.exc = exc
self.exc_type = exc_type
self.trace = trace
self.task_run_external_id = task_run_external_id
def __str__(self) -> str:
return self.serialize()
return self.serialize(include_metadata=False)
def __repr__(self) -> str:
return str(self)
def serialize(self) -> str:
def serialize(self, include_metadata: bool) -> str:
if not self.exc_type or not self.exc:
return ""
return (
metadata = json.dumps(
{
TASK_RUN_ERROR_METADATA_KEY: {
"task_run_external_id": self.task_run_external_id,
}
},
indent=None,
)
result = (
self.exc_type.replace(": ", ":::")
+ ": "
+ self.exc.replace("\n", "\\\n")
@@ -38,6 +58,40 @@ class TaskRunError(Exception):
+ self.trace
)
if include_metadata:
return result + "\n\n" + metadata
return result
@classmethod
def _extract_metadata(cls, serialized: str) -> tuple[str, dict[str, str | None]]:
metadata = serialized.split("\n")[-1]
try:
parsed = json.loads(metadata)
if (
TASK_RUN_ERROR_METADATA_KEY in parsed
and "task_run_external_id" in parsed[TASK_RUN_ERROR_METADATA_KEY]
):
serialized = serialized.replace(metadata, "").strip()
return serialized, cast(
dict[str, str | None], parsed[TASK_RUN_ERROR_METADATA_KEY]
)
return serialized, {}
except json.JSONDecodeError:
return serialized, {}
@classmethod
def _unpack_serialized_error(cls, serialized: str) -> tuple[str | None, str, str]:
serialized, metadata = cls._extract_metadata(serialized)
external_id = metadata.get("task_run_external_id", None)
header, trace = serialized.split("\n", 1)
return external_id, header, trace
@classmethod
def deserialize(cls, serialized: str) -> "TaskRunError":
if not serialized:
@@ -45,10 +99,16 @@ class TaskRunError(Exception):
exc="",
exc_type="",
trace="",
task_run_external_id=None,
)
task_run_external_id = None
try:
header, trace = serialized.split("\n", 1)
task_run_external_id, header, trace = cls._unpack_serialized_error(
serialized
)
exc_type, exc = header.split(": ", 1)
except ValueError:
## If we get here, we saw an error that was not serialized how we expected,
@@ -57,6 +117,7 @@ class TaskRunError(Exception):
exc=serialized,
exc_type="HatchetError",
trace="",
task_run_external_id=task_run_external_id,
)
exc_type = exc_type.replace(":::", ": ")
@@ -66,16 +127,20 @@ class TaskRunError(Exception):
exc=exc,
exc_type=exc_type,
trace=trace,
task_run_external_id=task_run_external_id,
)
@classmethod
def from_exception(cls, exc: Exception) -> "TaskRunError":
def from_exception(
cls, exc: Exception, task_run_external_id: str | None
) -> "TaskRunError":
return cls(
exc=str(exc),
exc_type=type(exc).__name__,
trace="".join(
traceback.format_exception(type(exc), exc, exc.__traceback__)
),
task_run_external_id=task_run_external_id,
)
+29 -11
View File
@@ -3,7 +3,7 @@ import logging
from collections.abc import Callable
from datetime import timedelta
from functools import cached_property
from typing import Any, cast, overload
from typing import Any, Concatenate, ParamSpec, cast, overload
from hatchet_sdk import Context, DurableContext
from hatchet_sdk.client import Client
@@ -40,6 +40,8 @@ from hatchet_sdk.utils.timedelta_to_expression import Duration
from hatchet_sdk.utils.typing import CoroutineLike
from hatchet_sdk.worker.worker import LifespanFn, Worker
P = ParamSpec("P")
class Hatchet:
"""
@@ -346,7 +348,7 @@ class Hatchet:
backoff_max_seconds: int | None = None,
default_filters: list[DefaultFilter] | None = None,
) -> Callable[
[Callable[[EmptyModel, Context], R | CoroutineLike[R]]],
[Callable[Concatenate[EmptyModel, Context, P], R | CoroutineLike[R]]],
Standalone[EmptyModel, R],
]: ...
@@ -372,7 +374,7 @@ class Hatchet:
backoff_max_seconds: int | None = None,
default_filters: list[DefaultFilter] | None = None,
) -> Callable[
[Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]],
[Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]],
Standalone[TWorkflowInput, R],
]: ...
@@ -398,11 +400,11 @@ class Hatchet:
default_filters: list[DefaultFilter] | None = None,
) -> (
Callable[
[Callable[[EmptyModel, Context], R | CoroutineLike[R]]],
[Callable[Concatenate[EmptyModel, Context, P], R | CoroutineLike[R]]],
Standalone[EmptyModel, R],
]
| Callable[
[Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]],
[Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]],
Standalone[TWorkflowInput, R],
]
):
@@ -447,7 +449,9 @@ class Hatchet:
"""
def inner(
func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]],
func: Callable[
Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]
],
) -> Standalone[TWorkflowInput, R]:
inferred_name = name or func.__name__
@@ -518,7 +522,7 @@ class Hatchet:
backoff_max_seconds: int | None = None,
default_filters: list[DefaultFilter] | None = None,
) -> Callable[
[Callable[[EmptyModel, DurableContext], R | CoroutineLike[R]]],
[Callable[Concatenate[EmptyModel, DurableContext, P], R | CoroutineLike[R]]],
Standalone[EmptyModel, R],
]: ...
@@ -544,7 +548,11 @@ class Hatchet:
backoff_max_seconds: int | None = None,
default_filters: list[DefaultFilter] | None = None,
) -> Callable[
[Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]]],
[
Callable[
Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R]
]
],
Standalone[TWorkflowInput, R],
]: ...
@@ -570,11 +578,19 @@ class Hatchet:
default_filters: list[DefaultFilter] | None = None,
) -> (
Callable[
[Callable[[EmptyModel, DurableContext], R | CoroutineLike[R]]],
[
Callable[
Concatenate[EmptyModel, DurableContext, P], R | CoroutineLike[R]
]
],
Standalone[EmptyModel, R],
]
| Callable[
[Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]]],
[
Callable[
Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R]
]
],
Standalone[TWorkflowInput, R],
]
):
@@ -619,7 +635,9 @@ class Hatchet:
"""
def inner(
func: Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]],
func: Callable[
Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R]
],
) -> Standalone[TWorkflowInput, R]:
inferred_name = name or func.__name__
workflow = Workflow[TWorkflowInput](
+1 -21
View File
@@ -1,20 +1,10 @@
from enum import Enum
from celpy import CELEvalError, Environment # type: ignore
from pydantic import BaseModel, model_validator
from hatchet_sdk.contracts.v1.workflows_pb2 import CreateTaskRateLimit
def validate_cel_expression(expr: str) -> bool:
env = Environment()
try:
env.compile(expr)
return True
except CELEvalError:
return False
class RateLimitDuration(str, Enum):
SECOND = "SECOND"
MINUTE = "MINUTE"
@@ -72,17 +62,7 @@ class RateLimit(BaseModel):
if self.dynamic_key and self.static_key:
raise ValueError("Cannot have both static key and dynamic key set")
if self.dynamic_key and not validate_cel_expression(self.dynamic_key):
raise ValueError(f"Invalid CEL expression: {self.dynamic_key}")
if not isinstance(self.units, int) and not validate_cel_expression(self.units):
raise ValueError(f"Invalid CEL expression: {self.units}")
if (
self.limit
and not isinstance(self.limit, int)
and not validate_cel_expression(self.limit)
):
if self.limit and not isinstance(self.limit, int):
raise ValueError(f"Invalid CEL expression: {self.limit}")
if self.dynamic_key and not self.limit:
+109 -19
View File
@@ -1,5 +1,21 @@
import asyncio
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Generic, cast, get_type_hints
from inspect import Parameter, iscoroutinefunction, signature
from typing import (
TYPE_CHECKING,
Annotated,
Any,
Concatenate,
Generic,
ParamSpec,
TypeVar,
cast,
get_args,
get_origin,
get_type_hints,
)
from pydantic import BaseModel, ConfigDict
from hatchet_sdk.conditions import (
Action,
@@ -18,6 +34,7 @@ from hatchet_sdk.contracts.v1.workflows_pb2 import (
CreateTaskRateLimit,
DesiredWorkerLabels,
)
from hatchet_sdk.exceptions import InvalidDependencyError
from hatchet_sdk.runnables.types import (
ConcurrencyExpression,
EmptyModel,
@@ -25,7 +42,6 @@ from hatchet_sdk.runnables.types import (
StepType,
TWorkflowInput,
is_async_fn,
is_durable_sync_fn,
is_sync_fn,
)
from hatchet_sdk.utils.timedelta_to_expression import Duration, timedelta_to_expr
@@ -41,16 +57,45 @@ from hatchet_sdk.worker.runner.utils.capture_logs import AsyncLogSender
if TYPE_CHECKING:
from hatchet_sdk.runnables.workflow import Workflow
T = TypeVar("T")
P = ParamSpec("P")
class Depends(Generic[T, TWorkflowInput]):
def __init__(
self, fn: Callable[[TWorkflowInput, Context], T | CoroutineLike[T]]
) -> None:
sig = signature(fn)
params = list(sig.parameters.values())
if len(params) != 2:
raise InvalidDependencyError(
f"Dependency function {fn.__name__} must have exactly two parameters: input and ctx."
)
self.fn = fn
class DependencyToInject(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
name: str
value: Any
class Task(Generic[TWorkflowInput, R]):
def __init__(
self,
_fn: (
Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]
| Callable[[TWorkflowInput, Context], AwaitableLike[R]]
Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]
| Callable[Concatenate[TWorkflowInput, Context, P], AwaitableLike[R]]
| (
Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]]
| Callable[[TWorkflowInput, DurableContext], AwaitableLike[R]]
Callable[
Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R]
]
| Callable[
Concatenate[TWorkflowInput, DurableContext, P], AwaitableLike[R]
]
)
),
is_durable: bool,
@@ -100,33 +145,74 @@ class Task(Generic[TWorkflowInput, R]):
step_output=return_type if is_basemodel_subclass(return_type) else None,
)
def call(self, ctx: Context | DurableContext) -> R:
async def _parse_parameter(
self,
name: str,
param: Parameter,
input: TWorkflowInput,
ctx: Context | DurableContext,
) -> DependencyToInject | None:
annotation = param.annotation
if get_origin(annotation) is Annotated:
args = get_args(annotation)
if len(args) < 2:
return None
metadata = args[1:]
for item in metadata:
if isinstance(item, Depends):
if iscoroutinefunction(item.fn):
return DependencyToInject(
name=name, value=await item.fn(input, ctx)
)
return DependencyToInject(
name=name, value=await asyncio.to_thread(item.fn, input, ctx)
)
return None
async def _unpack_dependencies(
self, ctx: Context | DurableContext
) -> dict[str, Any]:
sig = signature(self.fn)
input = self.workflow._get_workflow_input(ctx)
return {
parsed.name: parsed.value
for n, p in sig.parameters.items()
if (parsed := await self._parse_parameter(n, p, input, ctx)) is not None
}
def call(
self, ctx: Context | DurableContext, dependencies: dict[str, Any] | None = None
) -> R:
if self.is_async_function:
raise TypeError(f"{self.name} is not a sync function. Use `acall` instead.")
workflow_input = self.workflow._get_workflow_input(ctx)
dependencies = dependencies or {}
if self.is_durable:
fn = cast(Callable[[TWorkflowInput, DurableContext], R], self.fn)
if is_durable_sync_fn(fn):
return fn(workflow_input, cast(DurableContext, ctx))
else:
fn = cast(Callable[[TWorkflowInput, Context], R], self.fn)
if is_sync_fn(fn):
return fn(workflow_input, ctx)
if is_sync_fn(self.fn): # type: ignore
return self.fn(workflow_input, cast(Context, ctx), **dependencies) # type: ignore
raise TypeError(f"{self.name} is not a sync function. Use `acall` instead.")
async def aio_call(self, ctx: Context | DurableContext) -> R:
async def aio_call(
self, ctx: Context | DurableContext, dependencies: dict[str, Any] | None = None
) -> R:
if not self.is_async_function:
raise TypeError(
f"{self.name} is not an async function. Use `call` instead."
)
workflow_input = self.workflow._get_workflow_input(ctx)
dependencies = dependencies or {}
if is_async_fn(self.fn): # type: ignore
return await self.fn(workflow_input, cast(Context, ctx)) # type: ignore
return await self.fn(workflow_input, cast(Context, ctx), **dependencies) # type: ignore
raise TypeError(f"{self.name} is not an async function. Use `call` instead.")
@@ -255,6 +341,7 @@ class Task(Generic[TWorkflowInput, R]):
parent_outputs: dict[str, JSONSerializableMapping] | None = None,
retry_count: int = 0,
lifespan: Any = None,
dependencies: dict[str, Any] | None = None,
) -> R:
"""
Mimic the execution of a task. This method is intended to be used to unit test
@@ -266,6 +353,7 @@ class Task(Generic[TWorkflowInput, R]):
:param parent_outputs: Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`.
:param retry_count: The number of times the task has been retried.
:param lifespan: The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task.
:param dependencies: Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`.
:return: The output of the task.
:raises TypeError: If the task is an async function and `mock_run` is called, or if the task is a sync function and `aio_mock_run` is called.
@@ -280,7 +368,7 @@ class Task(Generic[TWorkflowInput, R]):
input, additional_metadata, parent_outputs, retry_count, lifespan
)
return self.call(ctx)
return self.call(ctx, dependencies)
async def aio_mock_run(
self,
@@ -289,6 +377,7 @@ class Task(Generic[TWorkflowInput, R]):
parent_outputs: dict[str, JSONSerializableMapping] | None = None,
retry_count: int = 0,
lifespan: Any = None,
dependencies: dict[str, Any] | None = None,
) -> R:
"""
Mimic the execution of a task. This method is intended to be used to unit test
@@ -300,6 +389,7 @@ class Task(Generic[TWorkflowInput, R]):
:param parent_outputs: Outputs from parent tasks, if any. This is useful for mimicking DAG functionality. For instance, if you have a task `step_2` that has a `parent` which is `step_1`, you can pass `parent_outputs={"step_1": {"result": "Hello, world!"}}` to `step_2.mock_run()` to be able to access `ctx.task_output(step_1)` in `step_2`.
:param retry_count: The number of times the task has been retried.
:param lifespan: The lifespan to be used in the task, which is useful if one was set on the worker. This will allow you to access `ctx.lifespan` inside of your task.
:param dependencies: Dependencies to be injected into the task. This is useful for tasks that have dependencies defined using `Depends`. **IMPORTANT**: You must pass the dependencies _directly_, **not** the `Depends` objects themselves. For example, if you have a task that has a dependency `config: Annotated[str, Depends(get_config)]`, you should pass `dependencies={"config": "config_value"}` to `aio_mock_run`.
:return: The output of the task.
:raises TypeError: If the task is an async function and `mock_run` is called, or if the task is a sync function and `aio_mock_run` is called.
@@ -318,4 +408,4 @@ class Task(Generic[TWorkflowInput, R]):
lifespan,
)
return await self.aio_call(ctx)
return await self.aio_call(ctx, dependencies)
+23 -8
View File
@@ -5,8 +5,10 @@ from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Concatenate,
Generic,
Literal,
ParamSpec,
TypeVar,
cast,
get_type_hints,
@@ -60,6 +62,7 @@ if TYPE_CHECKING:
T = TypeVar("T")
P = ParamSpec("P")
def fall_back_to_default(value: T, param_default: T, fallback_value: T | None) -> T:
@@ -800,7 +803,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
skip_if: list[Condition | OrGroup] | None = None,
cancel_if: list[Condition | OrGroup] | None = None,
) -> Callable[
[Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]],
[Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]],
Task[TWorkflowInput, R],
]:
"""
@@ -845,7 +848,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
)
def inner(
func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]],
func: Callable[
Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]
],
) -> Task[TWorkflowInput, R]:
task = Task(
_fn=func,
@@ -892,7 +897,11 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
skip_if: list[Condition | OrGroup] | None = None,
cancel_if: list[Condition | OrGroup] | None = None,
) -> Callable[
[Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]]],
[
Callable[
Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R]
]
],
Task[TWorkflowInput, R],
]:
"""
@@ -941,7 +950,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
)
def inner(
func: Callable[[TWorkflowInput, DurableContext], R | CoroutineLike[R]],
func: Callable[
Concatenate[TWorkflowInput, DurableContext, P], R | CoroutineLike[R]
],
) -> Task[TWorkflowInput, R]:
task = Task(
_fn=func,
@@ -983,7 +994,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
backoff_max_seconds: int | None = None,
concurrency: list[ConcurrencyExpression] | None = None,
) -> Callable[
[Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]],
[Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]],
Task[TWorkflowInput, R],
]:
"""
@@ -1009,7 +1020,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
"""
def inner(
func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]],
func: Callable[
Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]
],
) -> Task[TWorkflowInput, R]:
task = Task(
is_durable=False,
@@ -1051,7 +1064,7 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
backoff_max_seconds: int | None = None,
concurrency: list[ConcurrencyExpression] | None = None,
) -> Callable[
[Callable[[TWorkflowInput, Context], R | CoroutineLike[R]]],
[Callable[Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]]],
Task[TWorkflowInput, R],
]:
"""
@@ -1077,7 +1090,9 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
"""
def inner(
func: Callable[[TWorkflowInput, Context], R | CoroutineLike[R]],
func: Callable[
Concatenate[TWorkflowInput, Context, P], R | CoroutineLike[R]
],
) -> Task[TWorkflowInput, R]:
task = Task(
is_durable=False,
+27
View File
@@ -1,5 +1,6 @@
import sys
from collections.abc import Awaitable, Coroutine, Generator
from enum import Enum
from typing import Any, Literal, TypeAlias, TypeGuard, TypeVar
from pydantic import BaseModel
@@ -31,3 +32,29 @@ else:
STOP_LOOP_TYPE = Literal["STOP_LOOP"]
STOP_LOOP: STOP_LOOP_TYPE = "STOP_LOOP" # Sentinel object to stop the loop
class LogLevel(str, Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARN = "WARN"
ERROR = "ERROR"
@classmethod
def from_levelname(cls, levelname: str) -> "LogLevel":
levelname = levelname.upper()
if levelname == "DEBUG":
return cls.DEBUG
if levelname == "INFO":
return cls.INFO
if levelname in ["WARNING", "WARN"]:
return cls.WARN
if levelname == "ERROR":
return cls.ERROR
# fall back to INFO
return cls.INFO
+27 -19
View File
@@ -166,22 +166,22 @@ class Runner:
except Exception as e:
should_not_retry = isinstance(e, NonRetryableException)
exc = TaskRunError.from_exception(e)
exc = TaskRunError.from_exception(e, action.step_run_id)
# This except is coming from the application itself, so we want to send that to the Hatchet instance
self.event_queue.put(
ActionEvent(
action=action,
type=STEP_EVENT_TYPE_FAILED,
payload=exc.serialize(),
payload=exc.serialize(include_metadata=True),
should_not_retry=should_not_retry,
)
)
log_with_level = logger.info if should_not_retry else logger.error
log_with_level = logger.info if should_not_retry else logger.exception
log_with_level(
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}"
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}"
)
return
@@ -198,18 +198,18 @@ class Runner:
)
)
except IllegalTaskOutputError as e:
exc = TaskRunError.from_exception(e)
exc = TaskRunError.from_exception(e, action.step_run_id)
self.event_queue.put(
ActionEvent(
action=action,
type=STEP_EVENT_TYPE_FAILED,
payload=exc.serialize(),
payload=exc.serialize(include_metadata=True),
should_not_retry=False,
)
)
logger.error(
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}"
logger.exception(
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}"
)
return
@@ -230,19 +230,19 @@ class Runner:
try:
output = task.result()
except Exception as e:
exc = TaskRunError.from_exception(e)
exc = TaskRunError.from_exception(e, action.step_run_id)
self.event_queue.put(
ActionEvent(
action=action,
type=GROUP_KEY_EVENT_TYPE_FAILED,
payload=exc.serialize(),
payload=exc.serialize(include_metadata=True),
should_not_retry=False,
)
)
logger.error(
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}"
logger.exception(
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}"
)
return
@@ -259,18 +259,18 @@ class Runner:
)
)
except IllegalTaskOutputError as e:
exc = TaskRunError.from_exception(e)
exc = TaskRunError.from_exception(e, action.step_run_id)
self.event_queue.put(
ActionEvent(
action=action,
type=STEP_EVENT_TYPE_FAILED,
payload=exc.serialize(),
payload=exc.serialize(include_metadata=True),
should_not_retry=False,
)
)
logger.error(
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize()}"
logger.exception(
f"failed step run: {action.action_id}/{action.step_run_id}\n{exc.serialize(include_metadata=False)}"
)
return
@@ -280,12 +280,16 @@ class Runner:
return inner_callback
def thread_action_func(
self, ctx: Context, task: Task[TWorkflowInput, R], action: Action
self,
ctx: Context,
task: Task[TWorkflowInput, R],
action: Action,
dependencies: dict[str, Any],
) -> R:
if action.step_run_id or action.get_group_key_run_id:
self.threads[action.key] = current_thread()
return task.call(ctx)
return task.call(ctx, dependencies)
# We wrap all actions in an async func
async def async_wrapped_action_func(
@@ -300,9 +304,12 @@ class Runner:
ctx_action_key.set(action.key)
ctx_additional_metadata.set(action.additional_metadata)
dependencies = await task._unpack_dependencies(ctx)
try:
if task.is_async_function:
return await task.aio_call(ctx)
return await task.aio_call(ctx, dependencies)
pfunc = functools.partial(
# we must copy the context vars to the new thread, as only asyncio natively supports
# contextvars
@@ -343,6 +350,7 @@ class Runner:
ctx,
task,
action,
dependencies,
)
loop = asyncio.get_event_loop()
@@ -16,7 +16,12 @@ from hatchet_sdk.runnables.contextvars import (
ctx_worker_id,
ctx_workflow_run_id,
)
from hatchet_sdk.utils.typing import STOP_LOOP, STOP_LOOP_TYPE, JSONSerializableMapping
from hatchet_sdk.utils.typing import (
STOP_LOOP,
STOP_LOOP_TYPE,
JSONSerializableMapping,
LogLevel,
)
T = TypeVar("T")
P = ParamSpec("P")
@@ -67,6 +72,7 @@ def copy_context_vars(
class LogRecord(BaseModel):
message: str
step_run_id: str
level: LogLevel
class AsyncLogSender:
@@ -86,6 +92,7 @@ class AsyncLogSender:
self.event_client.log,
message=record.message,
step_run_id=record.step_run_id,
level=record.level,
)
except Exception:
logger.exception("failed to send log to Hatchet")
@@ -97,7 +104,7 @@ class AsyncLogSender:
logger.warning("log queue is full, dropping log message")
class CustomLogHandler(logging.StreamHandler): # type: ignore[type-arg]
class LogForwardingHandler(logging.StreamHandler): # type: ignore[type-arg]
def __init__(self, log_sender: AsyncLogSender, stream: StringIO):
super().__init__(stream)
@@ -112,7 +119,13 @@ class CustomLogHandler(logging.StreamHandler): # type: ignore[type-arg]
if not step_run_id:
return
self.log_sender.publish(LogRecord(message=log_entry, step_run_id=step_run_id))
self.log_sender.publish(
LogRecord(
message=log_entry,
step_run_id=step_run_id,
level=LogLevel.from_levelname(record.levelname),
)
)
def capture_logs(
@@ -121,27 +134,27 @@ def capture_logs(
@functools.wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
log_stream = StringIO()
custom_handler = CustomLogHandler(log_sender, log_stream)
custom_handler.setLevel(logger.level)
log_forwarder = LogForwardingHandler(log_sender, log_stream)
log_forwarder.setLevel(logger.level)
if logger.handlers:
for handler in logger.handlers:
if handler.formatter:
custom_handler.setFormatter(handler.formatter)
log_forwarder.setFormatter(handler.formatter)
break
for handler in logger.handlers:
for filter_obj in handler.filters:
custom_handler.addFilter(filter_obj)
log_forwarder.addFilter(filter_obj)
if not any(h for h in logger.handlers if isinstance(h, CustomLogHandler)):
logger.addHandler(custom_handler)
if not any(h for h in logger.handlers if isinstance(h, LogForwardingHandler)):
logger.addHandler(log_forwarder)
try:
result = await func(*args, **kwargs)
finally:
custom_handler.flush()
logger.removeHandler(custom_handler)
log_forwarder.flush()
logger.removeHandler(log_forwarder)
log_stream.close()
return result
+2 -75
View File
@@ -319,26 +319,6 @@ files = [
[package.dependencies]
beautifulsoup4 = "*"
[[package]]
name = "cel-python"
version = "0.2.0"
description = "Pure Python implementation of Google Common Expression Language"
optional = false
python-versions = "<4.0,>=3.8"
groups = ["main"]
files = [
{file = "cel_python-0.2.0-py3-none-any.whl", hash = "sha256:478ff73def7b39d51e6982f95d937a57c2b088c491c578fe5cecdbd79f476f60"},
{file = "cel_python-0.2.0.tar.gz", hash = "sha256:75de72a5cf223ec690b236f0cc24da267219e667bd3e7f8f4f20595fcc1c0c0f"},
]
[package.dependencies]
jmespath = ">=1.0.1,<2.0.0"
lark = ">=0.12.0,<0.13.0"
python-dateutil = ">=2.9.0.post0,<3.0.0"
pyyaml = ">=6.0.1,<7.0.0"
types-python-dateutil = ">=2.9.0.20240316,<3.0.0.0"
types-pyyaml = ">=6.0.12.20240311,<7.0.0.0"
[[package]]
name = "certifi"
version = "2025.6.15"
@@ -1104,35 +1084,6 @@ files = [
{file = "jiter-0.10.0.tar.gz", hash = "sha256:07a7142c38aacc85194391108dc91b5b57093c978a9932bd86a36862759d9500"},
]
[[package]]
name = "jmespath"
version = "1.0.1"
description = "JSON Matching Expressions"
optional = false
python-versions = ">=3.7"
groups = ["main"]
files = [
{file = "jmespath-1.0.1-py3-none-any.whl", hash = "sha256:02e2e4cc71b5bcab88332eebf907519190dd9e6e82107fa7f83b1003a6252980"},
{file = "jmespath-1.0.1.tar.gz", hash = "sha256:90261b206d6defd58fdd5e85f478bf633a2901798906be2ad389150c5c60edbe"},
]
[[package]]
name = "lark"
version = "0.12.0"
description = "a modern parsing library"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "lark-0.12.0-py2.py3-none-any.whl", hash = "sha256:ed1d891cbcf5151ead1c1d14663bf542443e579e63a76ae175b01b899bd854ca"},
{file = "lark-0.12.0.tar.gz", hash = "sha256:7da76fcfddadabbbbfd949bbae221efd33938451d90b1fefbbc423c3cccf48ef"},
]
[package.extras]
atomic-cache = ["atomicwrites"]
nearley = ["js2py"]
regex = ["regex"]
[[package]]
name = "markdown"
version = "3.8"
@@ -2378,7 +2329,7 @@ version = "6.0.2"
description = "YAML parser and emitter for Python"
optional = false
python-versions = ">=3.8"
groups = ["main", "docs"]
groups = ["docs"]
files = [
{file = "PyYAML-6.0.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:0a9a2848a5b7feac301353437eb7d5957887edbf81d56e903999a75a3d743086"},
{file = "PyYAML-6.0.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:29717114e51c84ddfba879543fb232a6ed60086602313ca38cce623c1d62cfbf"},
@@ -2708,30 +2659,6 @@ files = [
{file = "types_psutil-6.1.0.20241221.tar.gz", hash = "sha256:600f5a36bd5e0eb8887f0e3f3ff2cf154d90690ad8123c8a707bba4ab94d3185"},
]
[[package]]
name = "types-python-dateutil"
version = "2.9.0.20250516"
description = "Typing stubs for python-dateutil"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "types_python_dateutil-2.9.0.20250516-py3-none-any.whl", hash = "sha256:2b2b3f57f9c6a61fba26a9c0ffb9ea5681c9b83e69cd897c6b5f668d9c0cab93"},
{file = "types_python_dateutil-2.9.0.20250516.tar.gz", hash = "sha256:13e80d6c9c47df23ad773d54b2826bd52dbbb41be87c3f339381c1700ad21ee5"},
]
[[package]]
name = "types-pyyaml"
version = "6.0.12.20250516"
description = "Typing stubs for PyYAML"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "types_pyyaml-6.0.12.20250516-py3-none-any.whl", hash = "sha256:8478208feaeb53a34cb5d970c56a7cd76b72659442e733e268a94dc72b2d0530"},
{file = "types_pyyaml-6.0.12.20250516.tar.gz", hash = "sha256:9f21a70216fc0fa1b216a8176db5f9e0af6eb35d2f2932acb87689d03a5bf6ba"},
]
[[package]]
name = "types-requests"
version = "2.32.4.20250611"
@@ -3104,4 +3031,4 @@ otel = ["opentelemetry-api", "opentelemetry-distro", "opentelemetry-exporter-otl
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "6b1e0ad6f029925d5a3c5c2b264456680376e833f6ed344f9d5da0cf8c418a31"
content-hash = "7ed68cf2a4288fc5b8817a794d847c2b56ce7398770e1d8fdd2be02461dbd1a0"
+1 -2
View File
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "1.16.5"
version = "1.17.0"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"
@@ -25,7 +25,6 @@ aiostream = "^0.5.2"
aiohttp = "^3.10.5"
aiohttp-retry = "^2.8.3"
tenacity = ">=8.4.1"
cel-python = "^0.2.0"
opentelemetry-api = { version = "^1.28.0", optional = true }
opentelemetry-sdk = { version = "^1.28.0", optional = true }
opentelemetry-instrumentation = { version = ">=0.49b0", optional = true }
@@ -45,7 +45,7 @@ async def test_failure_on_timeout(
except Exception:
pass
await asyncio.sleep(3 * TIMEOUT_SECONDS)
await asyncio.sleep(4 * TIMEOUT_SECONDS)
results = {
r.workflow_run_id: await hatchet.runs.aio_get(r.workflow_run_id) for r in runs