diff --git a/examples/python/durable/test_durable.py b/examples/python/durable/test_durable.py index 90c0cdb25..e4c5c182b 100644 --- a/examples/python/durable/test_durable.py +++ b/examples/python/durable/test_durable.py @@ -2,7 +2,12 @@ import asyncio import pytest -from examples.durable.worker import EVENT_KEY, SLEEP_TIME, durable_workflow +from examples.durable.worker import ( + EVENT_KEY, + SLEEP_TIME, + durable_workflow, + wait_for_sleep_twice, +) from hatchet_sdk import Hatchet @@ -43,3 +48,27 @@ async def test_durable(hatchet: Hatchet) -> None: assert wait_group_1["key"] == "CREATE" assert "sleep" in wait_group_1["event_id"] assert "event" in wait_group_2["event_id"] + + wait_for_multi_sleep = result["wait_for_multi_sleep"] + + assert wait_for_multi_sleep["runtime"] > 3 * SLEEP_TIME + + +@pytest.mark.asyncio(loop_scope="session") +async def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None: + first_sleep = await wait_for_sleep_twice.aio_run_no_wait() + + await asyncio.sleep(SLEEP_TIME / 2) + + await hatchet.runs.aio_cancel(first_sleep.workflow_run_id) + + await first_sleep.aio_result() + + await hatchet.runs.aio_replay( + first_sleep.workflow_run_id, + ) + + second_sleep_result = await first_sleep.aio_result() + + """We've already slept for a little bit by the time the task is cancelled""" + assert second_sleep_result["runtime"] < SLEEP_TIME diff --git a/examples/python/durable/worker.py b/examples/python/durable/worker.py index 5849be149..eb334b066 100644 --- a/examples/python/durable/worker.py +++ b/examples/python/durable/worker.py @@ -1,3 +1,4 @@ +import asyncio import time from datetime import timedelta from uuid import uuid4 @@ -102,14 +103,49 @@ async def wait_for_or_group_2( } +@durable_workflow.durable_task() +async def wait_for_multi_sleep( + _i: EmptyModel, ctx: DurableContext +) -> dict[str, str | int]: + start = time.time() + + for _ in range(3): + await ctx.aio_sleep_for( + timedelta(seconds=SLEEP_TIME), + ) + + return { + "runtime": int(time.time() - start), + } + + @ephemeral_workflow.task() def ephemeral_task_2(input: EmptyModel, ctx: Context) -> None: print("Running non-durable task") +@hatchet.durable_task() +async def wait_for_sleep_twice( + input: EmptyModel, ctx: DurableContext +) -> dict[str, int]: + try: + start = time.time() + + await ctx.aio_sleep_for( + timedelta(seconds=SLEEP_TIME), + ) + + return { + "runtime": int(time.time() - start), + } + except asyncio.CancelledError: + return {"runtime": -1} + + def main() -> None: worker = hatchet.worker( - "durable-worker", workflows=[durable_workflow, ephemeral_workflow] + "durable-worker", + workflows=[durable_workflow, ephemeral_workflow, wait_for_sleep_twice], ) worker.start() diff --git a/examples/python/events/test_event.py b/examples/python/events/test_event.py index 9abb531cb..94c3b27be 100644 --- a/examples/python/events/test_event.py +++ b/examples/python/events/test_event.py @@ -44,6 +44,7 @@ async def event_filter( test_run_id: str, expression: str | None = None, payload: dict[str, str] = {}, + scope: str | None = None, ) -> AsyncGenerator[None, None]: expression = ( expression @@ -53,7 +54,7 @@ async def event_filter( f = await hatchet.filters.aio_create( workflow_id=event_workflow.id, expression=expression, - scope=test_run_id, + scope=scope or test_run_id, payload={"test_run_id": test_run_id, **payload}, ) @@ -529,3 +530,40 @@ async def test_multiple_runs_for_multiple_scope_matches( assert len(runs) == 2 assert {r.output.get("filter_id") for r in runs} == {"1", "2"} + + +@pytest.mark.asyncio(loop_scope="session") +async def test_multi_scope_bug(hatchet: Hatchet, test_run_id: str) -> None: + async with event_filter(hatchet, test_run_id, expression="1 == 1", scope="a"): + async with event_filter( + hatchet, + test_run_id, + expression="2 == 2", + scope="b", + ): + events = await hatchet.event.aio_bulk_push( + [ + BulkPushEventWithMetadata( + key=EVENT_KEY, + payload={ + "should_skip": False, + }, + additional_metadata={ + "should_have_runs": True, + "test_run_id": test_run_id, + }, + scope="a" if i % 2 == 0 else "b", + ) + for i in range(100) + ], + ) + + await asyncio.sleep(15) + + for event in events: + runs = await hatchet.runs.aio_list( + triggering_event_external_id=event.eventId, + additional_metadata={"test_run_id": test_run_id}, + ) + + assert len(runs.rows) == 1 diff --git a/examples/python/fanout/worker.py b/examples/python/fanout/worker.py index 197f85ee9..b96428d17 100644 --- a/examples/python/fanout/worker.py +++ b/examples/python/fanout/worker.py @@ -60,8 +60,6 @@ async def process2(input: ChildInput, ctx: Context) -> dict[str, str]: -child_wf.create_bulk_run_item() - def main() -> None: worker = hatchet.worker("fanout-worker", slots=40, workflows=[parent_wf, child_wf]) diff --git a/examples/python/fanout_sync/test_fanout_sync.py b/examples/python/fanout_sync/test_fanout_sync.py index 2171cba38..ea93d080f 100644 --- a/examples/python/fanout_sync/test_fanout_sync.py +++ b/examples/python/fanout_sync/test_fanout_sync.py @@ -34,8 +34,6 @@ async def test_additional_metadata_propagation_sync(hatchet: Hatchet) -> None: additional_metadata={"test_run_id": test_run_id}, ) - print(runs.model_dump_json(indent=2)) - assert runs.rows """Assert that the additional metadata is propagated to the child runs.""" diff --git a/examples/python/quickstart/poetry.lock b/examples/python/quickstart/poetry.lock index d69438232..0bb6a6dde 100644 --- a/examples/python/quickstart/poetry.lock +++ b/examples/python/quickstart/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -114,7 +114,7 @@ propcache = ">=0.2.0" yarl = ">=1.17.0,<2.0" [package.extras] -speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""] +speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] [[package]] name = "aiohttp-retry" @@ -199,12 +199,12 @@ files = [ ] [package.extras] -benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -cov = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -dev = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] +benchmark = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +cov = ["cloudpickle", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier"] -tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\""] +tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] [[package]] name = "cel-python" @@ -460,14 +460,14 @@ setuptools = "*" [[package]] name = "hatchet-sdk" -version = "1.16.2" +version = "1.0.0a1" description = "" optional = false python-versions = "<4.0,>=3.10" groups = ["main"] files = [ - {file = "hatchet_sdk-1.16.2-py3-none-any.whl", hash = "sha256:f4e4716ec7ed0d1e8a730699b33ad59830a6b143d73151591c08a08b8fe2d803"}, - {file = "hatchet_sdk-1.16.2.tar.gz", hash = "sha256:b7d4b29979f27a7fccc3b1e2a9987eb115a7e4cbb77aa4c914fc7da4aa87ad74"}, + {file = "hatchet_sdk-1.0.0a1-py3-none-any.whl", hash = "sha256:bfc84358c8842cecd0d95b30645109733b7292dff0db1a776ca862785ee93d7f"}, + {file = "hatchet_sdk-1.0.0a1.tar.gz", hash = "sha256:f0272bbaac6faed75ff727826e9f7b1ac42ae597f9b590e14d392aada9c9692f"}, ] [package.dependencies] @@ -483,11 +483,13 @@ grpcio-tools = [ {version = ">=1.64.1,<1.68.dev0 || >=1.69.dev0", markers = "python_version < \"3.13\""}, {version = ">=1.69.0", markers = "python_version >= \"3.13\""}, ] +nest-asyncio = ">=1.6.0,<2.0.0" prometheus-client = ">=0.21.1,<0.22.0" -protobuf = ">=5.29.5,<6.0.0" +protobuf = ">=5.29.1,<6.0.0" pydantic = ">=2.6.3,<3.0.0" pydantic-settings = ">=2.7.1,<3.0.0" python-dateutil = ">=2.9.0.post0,<3.0.0" +pyyaml = ">=6.0.1,<7.0.0" tenacity = ">=8.4.1" urllib3 = ">=1.26.20" @@ -643,6 +645,18 @@ files = [ [package.dependencies] typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.11\""} +[[package]] +name = "nest-asyncio" +version = "1.6.0" +description = "Patch asyncio to allow nested event loops" +optional = false +python-versions = ">=3.5" +groups = ["main"] +files = [ + {file = "nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c"}, + {file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"}, +] + [[package]] name = "prometheus-client" version = "0.21.1" @@ -768,23 +782,23 @@ files = [ [[package]] name = "protobuf" -version = "5.29.5" +version = "5.29.4" description = "" optional = false python-versions = ">=3.8" groups = ["main"] files = [ - {file = "protobuf-5.29.5-cp310-abi3-win32.whl", hash = "sha256:3f1c6468a2cfd102ff4703976138844f78ebd1fb45f49011afc5139e9e283079"}, - {file = "protobuf-5.29.5-cp310-abi3-win_amd64.whl", hash = "sha256:3f76e3a3675b4a4d867b52e4a5f5b78a2ef9565549d4037e06cf7b0942b1d3fc"}, - {file = "protobuf-5.29.5-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:e38c5add5a311f2a6eb0340716ef9b039c1dfa428b28f25a7838ac329204a671"}, - {file = "protobuf-5.29.5-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:fa18533a299d7ab6c55a238bf8629311439995f2e7eca5caaff08663606e9015"}, - {file = "protobuf-5.29.5-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:63848923da3325e1bf7e9003d680ce6e14b07e55d0473253a690c3a8b8fd6e61"}, - {file = "protobuf-5.29.5-cp38-cp38-win32.whl", hash = "sha256:ef91363ad4faba7b25d844ef1ada59ff1604184c0bcd8b39b8a6bef15e1af238"}, - {file = "protobuf-5.29.5-cp38-cp38-win_amd64.whl", hash = "sha256:7318608d56b6402d2ea7704ff1e1e4597bee46d760e7e4dd42a3d45e24b87f2e"}, - {file = "protobuf-5.29.5-cp39-cp39-win32.whl", hash = "sha256:6f642dc9a61782fa72b90878af134c5afe1917c89a568cd3476d758d3c3a0736"}, - {file = "protobuf-5.29.5-cp39-cp39-win_amd64.whl", hash = "sha256:470f3af547ef17847a28e1f47200a1cbf0ba3ff57b7de50d22776607cd2ea353"}, - {file = "protobuf-5.29.5-py3-none-any.whl", hash = "sha256:6cf42630262c59b2d8de33954443d94b746c952b01434fc58a417fdbd2e84bd5"}, - {file = "protobuf-5.29.5.tar.gz", hash = "sha256:bc1463bafd4b0929216c35f437a8e28731a2b7fe3d98bb77a600efced5a15c84"}, + {file = "protobuf-5.29.4-cp310-abi3-win32.whl", hash = "sha256:13eb236f8eb9ec34e63fc8b1d6efd2777d062fa6aaa68268fb67cf77f6839ad7"}, + {file = "protobuf-5.29.4-cp310-abi3-win_amd64.whl", hash = "sha256:bcefcdf3976233f8a502d265eb65ea740c989bacc6c30a58290ed0e519eb4b8d"}, + {file = "protobuf-5.29.4-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:307ecba1d852ec237e9ba668e087326a67564ef83e45a0189a772ede9e854dd0"}, + {file = "protobuf-5.29.4-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:aec4962f9ea93c431d5714ed1be1c93f13e1a8618e70035ba2b0564d9e633f2e"}, + {file = "protobuf-5.29.4-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:d7d3f7d1d5a66ed4942d4fefb12ac4b14a29028b209d4bfb25c68ae172059922"}, + {file = "protobuf-5.29.4-cp38-cp38-win32.whl", hash = "sha256:1832f0515b62d12d8e6ffc078d7e9eb06969aa6dc13c13e1036e39d73bebc2de"}, + {file = "protobuf-5.29.4-cp38-cp38-win_amd64.whl", hash = "sha256:476cb7b14914c780605a8cf62e38c2a85f8caff2e28a6a0bad827ec7d6c85d68"}, + {file = "protobuf-5.29.4-cp39-cp39-win32.whl", hash = "sha256:fd32223020cb25a2cc100366f1dedc904e2d71d9322403224cdde5fdced0dabe"}, + {file = "protobuf-5.29.4-cp39-cp39-win_amd64.whl", hash = "sha256:678974e1e3a9b975b8bc2447fca458db5f93a2fb6b0c8db46b6675b5b5346812"}, + {file = "protobuf-5.29.4-py3-none-any.whl", hash = "sha256:3fde11b505e1597f71b875ef2fc52062b6a9740e5f7c8997ce878b6009145862"}, + {file = "protobuf-5.29.4.tar.gz", hash = "sha256:4f1dfcd7997b31ef8f53ec82781ff434a28bf71d9102ddde14d076adcfc78c99"}, ] [[package]] @@ -806,7 +820,7 @@ typing-extensions = ">=4.12.2" [package.extras] email = ["email-validator (>=2.0.0)"] -timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""] +timezone = ["tzdata"] [[package]] name = "pydantic-core" @@ -1048,13 +1062,13 @@ files = [ ] [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""] -core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.8.0)"] +core = ["importlib_metadata (>=6)", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] enabler = ["pytest-enabler (>=2.2)"] -test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] -type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.14.*)", "pytest-mypy"] [[package]] name = "six" @@ -1133,7 +1147,7 @@ files = [ ] [package.extras] -brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] @@ -1238,4 +1252,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "73ae28ab4e3258eaa644f6054b1e6e5ec6135504fa36a4f44e5e712c5e1d9bfa" +content-hash = "74c12e499aa797ca5c8559af579f1212b0e4e3a77f068f9385db39d70ba304e0" diff --git a/examples/python/worker.py b/examples/python/worker.py index 26f226598..3bcdc68f6 100644 --- a/examples/python/worker.py +++ b/examples/python/worker.py @@ -15,7 +15,7 @@ from examples.concurrency_workflow_level.worker import ( from examples.conditions.worker import task_condition_workflow from examples.dag.worker import dag_workflow from examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf -from examples.durable.worker import durable_workflow +from examples.durable.worker import durable_workflow, wait_for_sleep_twice from examples.events.worker import event_workflow from examples.fanout.worker import child_wf, parent_wf from examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent @@ -67,6 +67,7 @@ def main() -> None: bulk_replay_test_2, bulk_replay_test_3, return_exceptions_task, + wait_for_sleep_twice, ], lifespan=lifespan, ) diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts b/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts index 002176a0f..8da1c1970 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/durable/test_durable.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import EVENT_KEY, SLEEP_TIME, durable_workflow\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {"test": "test"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == "ACTIVE"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker_durable")\n for w in active_workers\n )\n\n assert result["durable_task"]["status"] == "success"\n\n wait_group_1 = result["wait_for_or_group_1"]\n wait_group_2 = result["wait_for_or_group_2"]\n\n assert abs(wait_group_1["runtime"] - SLEEP_TIME) < 3\n\n assert wait_group_1["key"] == wait_group_2["key"]\n assert wait_group_1["key"] == "CREATE"\n assert "sleep" in wait_group_1["event_id"]\n assert "event" in wait_group_2["event_id"]\n', + 'import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {"test": "test"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == "ACTIVE"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace("e2e-test-worker_durable")\n for w in active_workers\n )\n\n assert result["durable_task"]["status"] == "success"\n\n wait_group_1 = result["wait_for_or_group_1"]\n wait_group_2 = result["wait_for_or_group_2"]\n\n assert abs(wait_group_1["runtime"] - SLEEP_TIME) < 3\n\n assert wait_group_1["key"] == wait_group_2["key"]\n assert wait_group_1["key"] == "CREATE"\n assert "sleep" in wait_group_1["event_id"]\n assert "event" in wait_group_2["event_id"]\n\n wait_for_multi_sleep = result["wait_for_multi_sleep"]\n\n assert wait_for_multi_sleep["runtime"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n """We\'ve already slept for a little bit by the time the task is cancelled"""\n assert second_sleep_result["runtime"] < SLEEP_TIME\n', source: 'out/python/durable/test_durable.py', blocks: {}, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/durable/worker.ts b/frontend/app/src/next/lib/docs/generated/snips/python/durable/worker.ts index f573d3ee7..71f8095ac 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/durable/worker.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/durable/worker.ts @@ -3,20 +3,20 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import time\nfrom datetime import timedelta\nfrom uuid import uuid4\n\nfrom hatchet_sdk import (\n Context,\n DurableContext,\n EmptyModel,\n Hatchet,\n SleepCondition,\n UserEventCondition,\n or_,\n)\n\nhatchet = Hatchet(debug=True)\n\n# > Create a durable workflow\ndurable_workflow = hatchet.workflow(name="DurableWorkflow")\n\n\nephemeral_workflow = hatchet.workflow(name="EphemeralWorkflow")\n\n\n# > Add durable task\nEVENT_KEY = "durable-example:event"\nSLEEP_TIME = 5\n\n\n@durable_workflow.task()\nasync def ephemeral_task(input: EmptyModel, ctx: Context) -> None:\n print("Running non-durable task")\n\n\n@durable_workflow.durable_task()\nasync def durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:\n print("Waiting for sleep")\n await ctx.aio_sleep_for(duration=timedelta(seconds=SLEEP_TIME))\n print("Sleep finished")\n\n print("Waiting for event")\n await ctx.aio_wait_for(\n "event",\n UserEventCondition(event_key=EVENT_KEY, expression="true"),\n )\n print("Event received")\n\n return {\n "status": "success",\n }\n\n\n\n\n# > Add durable tasks that wait for or groups\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_1(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n "runtime": int(time.time() - start),\n "key": key,\n "event_id": event_id,\n }\n\n\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_2(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=6 * SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n "runtime": int(time.time() - start),\n "key": key,\n "event_id": event_id,\n }\n\n\n@ephemeral_workflow.task()\ndef ephemeral_task_2(input: EmptyModel, ctx: Context) -> None:\n print("Running non-durable task")\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "durable-worker", workflows=[durable_workflow, ephemeral_workflow]\n )\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', + 'import asyncio\nimport time\nfrom datetime import timedelta\nfrom uuid import uuid4\n\nfrom hatchet_sdk import (\n Context,\n DurableContext,\n EmptyModel,\n Hatchet,\n SleepCondition,\n UserEventCondition,\n or_,\n)\n\nhatchet = Hatchet(debug=True)\n\n# > Create a durable workflow\ndurable_workflow = hatchet.workflow(name="DurableWorkflow")\n\n\nephemeral_workflow = hatchet.workflow(name="EphemeralWorkflow")\n\n\n# > Add durable task\nEVENT_KEY = "durable-example:event"\nSLEEP_TIME = 5\n\n\n@durable_workflow.task()\nasync def ephemeral_task(input: EmptyModel, ctx: Context) -> None:\n print("Running non-durable task")\n\n\n@durable_workflow.durable_task()\nasync def durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:\n print("Waiting for sleep")\n await ctx.aio_sleep_for(duration=timedelta(seconds=SLEEP_TIME))\n print("Sleep finished")\n\n print("Waiting for event")\n await ctx.aio_wait_for(\n "event",\n UserEventCondition(event_key=EVENT_KEY, expression="true"),\n )\n print("Event received")\n\n return {\n "status": "success",\n }\n\n\n\n\n# > Add durable tasks that wait for or groups\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_1(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n "runtime": int(time.time() - start),\n "key": key,\n "event_id": event_id,\n }\n\n\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_2(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=6 * SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n "runtime": int(time.time() - start),\n "key": key,\n "event_id": event_id,\n }\n\n\n@durable_workflow.durable_task()\nasync def wait_for_multi_sleep(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n\n for _ in range(3):\n await ctx.aio_sleep_for(\n timedelta(seconds=SLEEP_TIME),\n )\n\n return {\n "runtime": int(time.time() - start),\n }\n\n\n@ephemeral_workflow.task()\ndef ephemeral_task_2(input: EmptyModel, ctx: Context) -> None:\n print("Running non-durable task")\n\n\n@hatchet.durable_task()\nasync def wait_for_sleep_twice(\n input: EmptyModel, ctx: DurableContext\n) -> dict[str, int]:\n try:\n start = time.time()\n\n await ctx.aio_sleep_for(\n timedelta(seconds=SLEEP_TIME),\n )\n\n return {\n "runtime": int(time.time() - start),\n }\n except asyncio.CancelledError:\n return {"runtime": -1}\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "durable-worker",\n workflows=[durable_workflow, ephemeral_workflow, wait_for_sleep_twice],\n )\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', source: 'out/python/durable/worker.py', blocks: { create_a_durable_workflow: { - start: 18, - stop: 18, + start: 19, + stop: 19, }, add_durable_task: { - start: 25, - stop: 51, + start: 26, + stop: 52, }, add_durable_tasks_that_wait_for_or_groups: { - start: 55, - stop: 79, + start: 56, + stop: 80, }, }, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/events/test_event.ts b/frontend/app/src/next/lib/docs/generated/snips/python/events/test_event.ts index 265f307bc..6ffa16269 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/events/test_event.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/events/test_event.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import asyncio\nimport json\nfrom collections.abc import AsyncGenerator\nfrom contextlib import asynccontextmanager\nfrom datetime import datetime, timedelta, timezone\nfrom typing import cast\nfrom uuid import uuid4\n\nimport pytest\nfrom pydantic import BaseModel\n\nfrom examples.events.worker import (\n EVENT_KEY,\n SECONDARY_KEY,\n WILDCARD_KEY,\n EventWorkflowInput,\n event_workflow,\n)\nfrom hatchet_sdk.clients.events import (\n BulkPushEventOptions,\n BulkPushEventWithMetadata,\n PushEventOptions,\n)\nfrom hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus\nfrom hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary\nfrom hatchet_sdk.contracts.events_pb2 import Event\nfrom hatchet_sdk.hatchet import Hatchet\n\n\nclass ProcessedEvent(BaseModel):\n id: str\n payload: dict[str, str | bool]\n meta: dict[str, str | bool | int]\n should_have_runs: bool\n test_run_id: str\n\n def __hash__(self) -> int:\n return hash(self.model_dump_json())\n\n\n@asynccontextmanager\nasync def event_filter(\n hatchet: Hatchet,\n test_run_id: str,\n expression: str | None = None,\n payload: dict[str, str] = {},\n) -> AsyncGenerator[None, None]:\n expression = (\n expression\n or f"input.should_skip == false && payload.test_run_id == \'{test_run_id}\'"\n )\n\n f = await hatchet.filters.aio_create(\n workflow_id=event_workflow.id,\n expression=expression,\n scope=test_run_id,\n payload={"test_run_id": test_run_id, **payload},\n )\n\n try:\n yield\n finally:\n await hatchet.filters.aio_delete(f.metadata.id)\n\n\nasync def fetch_runs_for_event(\n hatchet: Hatchet, event: Event\n) -> tuple[ProcessedEvent, list[V1TaskSummary]]:\n runs = await hatchet.runs.aio_list(triggering_event_external_id=event.eventId)\n\n meta = (\n cast(dict[str, str | int | bool], json.loads(event.additionalMetadata))\n if event.additionalMetadata\n else {}\n )\n payload = (\n cast(dict[str, str | bool], json.loads(event.payload)) if event.payload else {}\n )\n\n processed_event = ProcessedEvent(\n id=event.eventId,\n payload=payload,\n meta=meta,\n should_have_runs=meta.get("should_have_runs", False) is True,\n test_run_id=cast(str, meta["test_run_id"]),\n )\n\n if not all([r.output for r in runs.rows]):\n return (processed_event, [])\n\n return (\n processed_event,\n runs.rows or [],\n )\n\n\nasync def wait_for_result(\n hatchet: Hatchet, events: list[Event]\n) -> dict[ProcessedEvent, list[V1TaskSummary]]:\n await asyncio.sleep(3)\n\n since = datetime.now(tz=timezone.utc) - timedelta(minutes=2)\n\n persisted = (await hatchet.event.aio_list(limit=100, since=since)).rows or []\n\n assert {e.eventId for e in events}.issubset({e.metadata.id for e in persisted})\n\n iters = 0\n while True:\n print("Waiting for event runs to complete...")\n if iters > 15:\n print("Timed out waiting for event runs to complete.")\n return {\n ProcessedEvent(\n id=event.eventId,\n payload=json.loads(event.payload) if event.payload else {},\n meta=(\n json.loads(event.additionalMetadata)\n if event.additionalMetadata\n else {}\n ),\n should_have_runs=False,\n test_run_id=cast(\n str, json.loads(event.additionalMetadata).get("test_run_id", "")\n ),\n ): []\n for event in events\n }\n\n iters += 1\n\n event_runs = await asyncio.gather(\n *[fetch_runs_for_event(hatchet, event) for event in events]\n )\n\n all_empty = all(not event_run for _, event_run in event_runs)\n\n if all_empty:\n await asyncio.sleep(1)\n continue\n\n event_id_to_runs = {event_id: runs for (event_id, runs) in event_runs}\n\n any_queued_or_running = any(\n run.status in [V1TaskStatus.QUEUED, V1TaskStatus.RUNNING]\n for runs in event_id_to_runs.values()\n for run in runs\n )\n\n if any_queued_or_running:\n await asyncio.sleep(1)\n continue\n\n break\n\n return event_id_to_runs\n\n\nasync def wait_for_result_and_assert(hatchet: Hatchet, events: list[Event]) -> None:\n event_to_runs = await wait_for_result(hatchet, events)\n\n for event, runs in event_to_runs.items():\n await assert_event_runs_processed(event, runs)\n\n\nasync def assert_event_runs_processed(\n event: ProcessedEvent,\n runs: list[V1TaskSummary],\n) -> None:\n runs = [\n run\n for run in runs\n if (run.additional_metadata or {}).get("hatchet__event_id") == event.id\n ]\n\n if event.should_have_runs:\n assert len(runs) > 0\n\n for run in runs:\n assert run.status == V1TaskStatus.COMPLETED\n assert run.output.get("test_run_id") == event.test_run_id\n else:\n assert len(runs) == 0\n\n\ndef bpi(\n index: int = 1,\n test_run_id: str = "",\n should_skip: bool = False,\n should_have_runs: bool = True,\n key: str = EVENT_KEY,\n payload: dict[str, str] = {},\n scope: str | None = None,\n) -> BulkPushEventWithMetadata:\n return BulkPushEventWithMetadata(\n key=key,\n payload={\n "should_skip": should_skip,\n **payload,\n },\n additional_metadata={\n "should_have_runs": should_have_runs,\n "test_run_id": test_run_id,\n "key": index,\n },\n scope=scope,\n )\n\n\ndef cp(should_skip: bool) -> dict[str, bool]:\n return EventWorkflowInput(should_skip=should_skip).model_dump()\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_push(hatchet: Hatchet) -> None:\n e = hatchet.event.push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_async_event_push(hatchet: Hatchet) -> None:\n e = await hatchet.event.aio_push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_async_event_bulk_push(hatchet: Hatchet) -> None:\n events = [\n BulkPushEventWithMetadata(\n key="event1",\n payload={"message": "This is event 1", "should_skip": False},\n additional_metadata={"source": "test", "user_id": "user123"},\n ),\n BulkPushEventWithMetadata(\n key="event2",\n payload={"message": "This is event 2", "should_skip": False},\n additional_metadata={"source": "test", "user_id": "user456"},\n ),\n BulkPushEventWithMetadata(\n key="event3",\n payload={"message": "This is event 3", "should_skip": False},\n additional_metadata={"source": "test", "user_id": "user789"},\n ),\n ]\n opts = BulkPushEventOptions(namespace="bulk-test")\n\n e = await hatchet.event.aio_bulk_push(events, opts)\n\n assert len(e) == 3\n\n # Sort both lists of events by their key to ensure comparison order\n sorted_events = sorted(events, key=lambda x: x.key)\n sorted_returned_events = sorted(e, key=lambda x: x.key)\n namespace = "bulk-test"\n\n # Check that the returned events match the original events\n for original_event, returned_event in zip(\n sorted_events, sorted_returned_events, strict=False\n ):\n assert returned_event.key == namespace + original_event.key\n\n\n@pytest.fixture(scope="function")\ndef test_run_id() -> str:\n return str(uuid4())\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_engine_behavior(hatchet: Hatchet) -> None:\n test_run_id = str(uuid4())\n events = [\n bpi(\n test_run_id=test_run_id,\n ),\n bpi(\n test_run_id=test_run_id,\n key="thisisafakeeventfoobarbaz",\n should_have_runs=False,\n ),\n ]\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\ndef gen_bulk_events(test_run_id: str) -> list[BulkPushEventWithMetadata]:\n return [\n ## No scope, so it shouldn\'t have any runs\n bpi(\n index=1,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n ),\n ## No scope, so it shouldn\'t have any runs\n bpi(\n index=2,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n ),\n ## Scope is set and `should_skip` is False, so it should have runs\n bpi(\n index=3,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=True,\n scope=test_run_id,\n ),\n ## Scope is set and `should_skip` is True, so it shouldn\'t have runs\n bpi(\n index=4,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn\'t have runs\n bpi(\n index=5,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n key="thisisafakeeventfoobarbaz",\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn\'t have runs\n bpi(\n index=6,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n scope=test_run_id,\n key="thisisafakeeventfoobarbaz",\n ),\n ]\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_skipping_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(hatchet, test_run_id):\n events = gen_bulk_events(test_run_id)\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\nasync def bulk_to_single(hatchet: Hatchet, event: BulkPushEventWithMetadata) -> Event:\n return await hatchet.event.aio_push(\n event_key=event.key,\n payload=event.payload,\n options=PushEventOptions(\n scope=event.scope,\n additional_metadata=event.additional_metadata,\n priority=event.priority,\n ),\n )\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_skipping_filtering_no_bulk(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(hatchet, test_run_id):\n raw_events = gen_bulk_events(test_run_id)\n events = await asyncio.gather(\n *[bulk_to_single(hatchet, event) for event in raw_events]\n )\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_payload_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n "input.should_skip == false && payload.foobar == \'baz\'",\n {"foobar": "qux"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={"message": "This is event 1", "should_skip": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": False,\n "test_run_id": test_run_id,\n "key": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_payload_filtering_with_payload_match(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n "input.should_skip == false && payload.foobar == \'baz\'",\n {"foobar": "baz"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={"message": "This is event 1", "should_skip": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": True,\n "test_run_id": test_run_id,\n "key": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_filtering_by_event_key(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n f"event_key == \'{SECONDARY_KEY}\'",\n ):\n event_1 = await hatchet.event.aio_push(\n event_key=SECONDARY_KEY,\n payload={\n "message": "Should run because filter matches",\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": True,\n "test_run_id": test_run_id,\n },\n ),\n )\n event_2 = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n "message": "Should skip because filter does not match",\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": False,\n "test_run_id": test_run_id,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event_1, event_2])\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_key_wildcards(hatchet: Hatchet, test_run_id: str) -> None:\n keys = [\n WILDCARD_KEY.replace("*", "1"),\n WILDCARD_KEY.replace("*", "2"),\n "foobar",\n EVENT_KEY,\n ]\n\n async with event_filter(\n hatchet,\n test_run_id,\n ):\n events = [\n await hatchet.event.aio_push(\n event_key=key,\n payload={\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": key != "foobar",\n "test_run_id": test_run_id,\n },\n ),\n )\n for key in keys\n ]\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_multiple_runs_for_multiple_scope_matches(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet, test_run_id, payload={"filter_id": "1"}, expression="1 == 1"\n ):\n async with event_filter(\n hatchet, test_run_id, payload={"filter_id": "2"}, expression="2 == 2"\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": True,\n "test_run_id": test_run_id,\n },\n ),\n )\n\n event_to_runs = await wait_for_result(hatchet, [event])\n\n assert len(event_to_runs.keys()) == 1\n\n runs = list(event_to_runs.values())[0]\n\n assert len(runs) == 2\n\n assert {r.output.get("filter_id") for r in runs} == {"1", "2"}\n', + 'import asyncio\nimport json\nfrom collections.abc import AsyncGenerator\nfrom contextlib import asynccontextmanager\nfrom datetime import datetime, timedelta, timezone\nfrom typing import cast\nfrom uuid import uuid4\n\nimport pytest\nfrom pydantic import BaseModel\n\nfrom examples.events.worker import (\n EVENT_KEY,\n SECONDARY_KEY,\n WILDCARD_KEY,\n EventWorkflowInput,\n event_workflow,\n)\nfrom hatchet_sdk.clients.events import (\n BulkPushEventOptions,\n BulkPushEventWithMetadata,\n PushEventOptions,\n)\nfrom hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus\nfrom hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary\nfrom hatchet_sdk.contracts.events_pb2 import Event\nfrom hatchet_sdk.hatchet import Hatchet\n\n\nclass ProcessedEvent(BaseModel):\n id: str\n payload: dict[str, str | bool]\n meta: dict[str, str | bool | int]\n should_have_runs: bool\n test_run_id: str\n\n def __hash__(self) -> int:\n return hash(self.model_dump_json())\n\n\n@asynccontextmanager\nasync def event_filter(\n hatchet: Hatchet,\n test_run_id: str,\n expression: str | None = None,\n payload: dict[str, str] = {},\n scope: str | None = None,\n) -> AsyncGenerator[None, None]:\n expression = (\n expression\n or f"input.should_skip == false && payload.test_run_id == \'{test_run_id}\'"\n )\n\n f = await hatchet.filters.aio_create(\n workflow_id=event_workflow.id,\n expression=expression,\n scope=scope or test_run_id,\n payload={"test_run_id": test_run_id, **payload},\n )\n\n try:\n yield\n finally:\n await hatchet.filters.aio_delete(f.metadata.id)\n\n\nasync def fetch_runs_for_event(\n hatchet: Hatchet, event: Event\n) -> tuple[ProcessedEvent, list[V1TaskSummary]]:\n runs = await hatchet.runs.aio_list(triggering_event_external_id=event.eventId)\n\n meta = (\n cast(dict[str, str | int | bool], json.loads(event.additionalMetadata))\n if event.additionalMetadata\n else {}\n )\n payload = (\n cast(dict[str, str | bool], json.loads(event.payload)) if event.payload else {}\n )\n\n processed_event = ProcessedEvent(\n id=event.eventId,\n payload=payload,\n meta=meta,\n should_have_runs=meta.get("should_have_runs", False) is True,\n test_run_id=cast(str, meta["test_run_id"]),\n )\n\n if not all([r.output for r in runs.rows]):\n return (processed_event, [])\n\n return (\n processed_event,\n runs.rows or [],\n )\n\n\nasync def wait_for_result(\n hatchet: Hatchet, events: list[Event]\n) -> dict[ProcessedEvent, list[V1TaskSummary]]:\n await asyncio.sleep(3)\n\n since = datetime.now(tz=timezone.utc) - timedelta(minutes=2)\n\n persisted = (await hatchet.event.aio_list(limit=100, since=since)).rows or []\n\n assert {e.eventId for e in events}.issubset({e.metadata.id for e in persisted})\n\n iters = 0\n while True:\n print("Waiting for event runs to complete...")\n if iters > 15:\n print("Timed out waiting for event runs to complete.")\n return {\n ProcessedEvent(\n id=event.eventId,\n payload=json.loads(event.payload) if event.payload else {},\n meta=(\n json.loads(event.additionalMetadata)\n if event.additionalMetadata\n else {}\n ),\n should_have_runs=False,\n test_run_id=cast(\n str, json.loads(event.additionalMetadata).get("test_run_id", "")\n ),\n ): []\n for event in events\n }\n\n iters += 1\n\n event_runs = await asyncio.gather(\n *[fetch_runs_for_event(hatchet, event) for event in events]\n )\n\n all_empty = all(not event_run for _, event_run in event_runs)\n\n if all_empty:\n await asyncio.sleep(1)\n continue\n\n event_id_to_runs = {event_id: runs for (event_id, runs) in event_runs}\n\n any_queued_or_running = any(\n run.status in [V1TaskStatus.QUEUED, V1TaskStatus.RUNNING]\n for runs in event_id_to_runs.values()\n for run in runs\n )\n\n if any_queued_or_running:\n await asyncio.sleep(1)\n continue\n\n break\n\n return event_id_to_runs\n\n\nasync def wait_for_result_and_assert(hatchet: Hatchet, events: list[Event]) -> None:\n event_to_runs = await wait_for_result(hatchet, events)\n\n for event, runs in event_to_runs.items():\n await assert_event_runs_processed(event, runs)\n\n\nasync def assert_event_runs_processed(\n event: ProcessedEvent,\n runs: list[V1TaskSummary],\n) -> None:\n runs = [\n run\n for run in runs\n if (run.additional_metadata or {}).get("hatchet__event_id") == event.id\n ]\n\n if event.should_have_runs:\n assert len(runs) > 0\n\n for run in runs:\n assert run.status == V1TaskStatus.COMPLETED\n assert run.output.get("test_run_id") == event.test_run_id\n else:\n assert len(runs) == 0\n\n\ndef bpi(\n index: int = 1,\n test_run_id: str = "",\n should_skip: bool = False,\n should_have_runs: bool = True,\n key: str = EVENT_KEY,\n payload: dict[str, str] = {},\n scope: str | None = None,\n) -> BulkPushEventWithMetadata:\n return BulkPushEventWithMetadata(\n key=key,\n payload={\n "should_skip": should_skip,\n **payload,\n },\n additional_metadata={\n "should_have_runs": should_have_runs,\n "test_run_id": test_run_id,\n "key": index,\n },\n scope=scope,\n )\n\n\ndef cp(should_skip: bool) -> dict[str, bool]:\n return EventWorkflowInput(should_skip=should_skip).model_dump()\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_push(hatchet: Hatchet) -> None:\n e = hatchet.event.push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_async_event_push(hatchet: Hatchet) -> None:\n e = await hatchet.event.aio_push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_async_event_bulk_push(hatchet: Hatchet) -> None:\n events = [\n BulkPushEventWithMetadata(\n key="event1",\n payload={"message": "This is event 1", "should_skip": False},\n additional_metadata={"source": "test", "user_id": "user123"},\n ),\n BulkPushEventWithMetadata(\n key="event2",\n payload={"message": "This is event 2", "should_skip": False},\n additional_metadata={"source": "test", "user_id": "user456"},\n ),\n BulkPushEventWithMetadata(\n key="event3",\n payload={"message": "This is event 3", "should_skip": False},\n additional_metadata={"source": "test", "user_id": "user789"},\n ),\n ]\n opts = BulkPushEventOptions(namespace="bulk-test")\n\n e = await hatchet.event.aio_bulk_push(events, opts)\n\n assert len(e) == 3\n\n # Sort both lists of events by their key to ensure comparison order\n sorted_events = sorted(events, key=lambda x: x.key)\n sorted_returned_events = sorted(e, key=lambda x: x.key)\n namespace = "bulk-test"\n\n # Check that the returned events match the original events\n for original_event, returned_event in zip(\n sorted_events, sorted_returned_events, strict=False\n ):\n assert returned_event.key == namespace + original_event.key\n\n\n@pytest.fixture(scope="function")\ndef test_run_id() -> str:\n return str(uuid4())\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_engine_behavior(hatchet: Hatchet) -> None:\n test_run_id = str(uuid4())\n events = [\n bpi(\n test_run_id=test_run_id,\n ),\n bpi(\n test_run_id=test_run_id,\n key="thisisafakeeventfoobarbaz",\n should_have_runs=False,\n ),\n ]\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\ndef gen_bulk_events(test_run_id: str) -> list[BulkPushEventWithMetadata]:\n return [\n ## No scope, so it shouldn\'t have any runs\n bpi(\n index=1,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n ),\n ## No scope, so it shouldn\'t have any runs\n bpi(\n index=2,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n ),\n ## Scope is set and `should_skip` is False, so it should have runs\n bpi(\n index=3,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=True,\n scope=test_run_id,\n ),\n ## Scope is set and `should_skip` is True, so it shouldn\'t have runs\n bpi(\n index=4,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn\'t have runs\n bpi(\n index=5,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n key="thisisafakeeventfoobarbaz",\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn\'t have runs\n bpi(\n index=6,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n scope=test_run_id,\n key="thisisafakeeventfoobarbaz",\n ),\n ]\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_skipping_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(hatchet, test_run_id):\n events = gen_bulk_events(test_run_id)\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\nasync def bulk_to_single(hatchet: Hatchet, event: BulkPushEventWithMetadata) -> Event:\n return await hatchet.event.aio_push(\n event_key=event.key,\n payload=event.payload,\n options=PushEventOptions(\n scope=event.scope,\n additional_metadata=event.additional_metadata,\n priority=event.priority,\n ),\n )\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_skipping_filtering_no_bulk(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(hatchet, test_run_id):\n raw_events = gen_bulk_events(test_run_id)\n events = await asyncio.gather(\n *[bulk_to_single(hatchet, event) for event in raw_events]\n )\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_payload_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n "input.should_skip == false && payload.foobar == \'baz\'",\n {"foobar": "qux"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={"message": "This is event 1", "should_skip": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": False,\n "test_run_id": test_run_id,\n "key": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_event_payload_filtering_with_payload_match(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n "input.should_skip == false && payload.foobar == \'baz\'",\n {"foobar": "baz"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={"message": "This is event 1", "should_skip": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": True,\n "test_run_id": test_run_id,\n "key": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_filtering_by_event_key(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n f"event_key == \'{SECONDARY_KEY}\'",\n ):\n event_1 = await hatchet.event.aio_push(\n event_key=SECONDARY_KEY,\n payload={\n "message": "Should run because filter matches",\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": True,\n "test_run_id": test_run_id,\n },\n ),\n )\n event_2 = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n "message": "Should skip because filter does not match",\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": False,\n "test_run_id": test_run_id,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event_1, event_2])\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_key_wildcards(hatchet: Hatchet, test_run_id: str) -> None:\n keys = [\n WILDCARD_KEY.replace("*", "1"),\n WILDCARD_KEY.replace("*", "2"),\n "foobar",\n EVENT_KEY,\n ]\n\n async with event_filter(\n hatchet,\n test_run_id,\n ):\n events = [\n await hatchet.event.aio_push(\n event_key=key,\n payload={\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": key != "foobar",\n "test_run_id": test_run_id,\n },\n ),\n )\n for key in keys\n ]\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_multiple_runs_for_multiple_scope_matches(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet, test_run_id, payload={"filter_id": "1"}, expression="1 == 1"\n ):\n async with event_filter(\n hatchet, test_run_id, payload={"filter_id": "2"}, expression="2 == 2"\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n "should_skip": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n "should_have_runs": True,\n "test_run_id": test_run_id,\n },\n ),\n )\n\n event_to_runs = await wait_for_result(hatchet, [event])\n\n assert len(event_to_runs.keys()) == 1\n\n runs = list(event_to_runs.values())[0]\n\n assert len(runs) == 2\n\n assert {r.output.get("filter_id") for r in runs} == {"1", "2"}\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_multi_scope_bug(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(hatchet, test_run_id, expression="1 == 1", scope="a"):\n async with event_filter(\n hatchet,\n test_run_id,\n expression="2 == 2",\n scope="b",\n ):\n events = await hatchet.event.aio_bulk_push(\n [\n BulkPushEventWithMetadata(\n key=EVENT_KEY,\n payload={\n "should_skip": False,\n },\n additional_metadata={\n "should_have_runs": True,\n "test_run_id": test_run_id,\n },\n scope="a" if i % 2 == 0 else "b",\n )\n for i in range(100)\n ],\n )\n\n await asyncio.sleep(15)\n\n for event in events:\n runs = await hatchet.runs.aio_list(\n triggering_event_external_id=event.eventId,\n additional_metadata={"test_run_id": test_run_id},\n )\n\n assert len(runs.rows) == 1\n', source: 'out/python/events/test_event.py', blocks: {}, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/fanout/worker.ts b/frontend/app/src/next/lib/docs/generated/snips/python/fanout/worker.ts index 60fabc068..39897b3de 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/fanout/worker.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/fanout/worker.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'from datetime import timedelta\nfrom typing import Any\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Hatchet, TriggerWorkflowOptions\n\nhatchet = Hatchet(debug=True)\n\n\n# > FanoutParent\nclass ParentInput(BaseModel):\n n: int = 100\n\n\nclass ChildInput(BaseModel):\n a: str\n\n\nparent_wf = hatchet.workflow(name="FanoutParent", input_validator=ParentInput)\nchild_wf = hatchet.workflow(name="FanoutChild", input_validator=ChildInput)\n\n\n@parent_wf.task(execution_timeout=timedelta(minutes=5))\nasync def spawn(input: ParentInput, ctx: Context) -> dict[str, Any]:\n print("spawning child")\n\n result = await child_wf.aio_run_many(\n [\n child_wf.create_bulk_run_item(\n input=ChildInput(a=str(i)),\n options=TriggerWorkflowOptions(\n additional_metadata={"hello": "earth"}, key=f"child{i}"\n ),\n )\n for i in range(input.n)\n ],\n )\n\n print(f"results {result}")\n\n return {"results": result}\n\n\n\n\n# > FanoutChild\n@child_wf.task()\nasync def process(input: ChildInput, ctx: Context) -> dict[str, str]:\n print(f"child process {input.a}")\n return {"status": input.a}\n\n\n@child_wf.task(parents=[process])\nasync def process2(input: ChildInput, ctx: Context) -> dict[str, str]:\n process_output = ctx.task_output(process)\n a = process_output["status"]\n\n return {"status2": a + "2"}\n\n\n\nchild_wf.create_bulk_run_item()\n\n\ndef main() -> None:\n worker = hatchet.worker("fanout-worker", slots=40, workflows=[parent_wf, child_wf])\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', + 'from datetime import timedelta\nfrom typing import Any\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Hatchet, TriggerWorkflowOptions\n\nhatchet = Hatchet(debug=True)\n\n\n# > FanoutParent\nclass ParentInput(BaseModel):\n n: int = 100\n\n\nclass ChildInput(BaseModel):\n a: str\n\n\nparent_wf = hatchet.workflow(name="FanoutParent", input_validator=ParentInput)\nchild_wf = hatchet.workflow(name="FanoutChild", input_validator=ChildInput)\n\n\n@parent_wf.task(execution_timeout=timedelta(minutes=5))\nasync def spawn(input: ParentInput, ctx: Context) -> dict[str, Any]:\n print("spawning child")\n\n result = await child_wf.aio_run_many(\n [\n child_wf.create_bulk_run_item(\n input=ChildInput(a=str(i)),\n options=TriggerWorkflowOptions(\n additional_metadata={"hello": "earth"}, key=f"child{i}"\n ),\n )\n for i in range(input.n)\n ],\n )\n\n print(f"results {result}")\n\n return {"results": result}\n\n\n\n\n# > FanoutChild\n@child_wf.task()\nasync def process(input: ChildInput, ctx: Context) -> dict[str, str]:\n print(f"child process {input.a}")\n return {"status": input.a}\n\n\n@child_wf.task(parents=[process])\nasync def process2(input: ChildInput, ctx: Context) -> dict[str, str]:\n process_output = ctx.task_output(process)\n a = process_output["status"]\n\n return {"status2": a + "2"}\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker("fanout-worker", slots=40, workflows=[parent_wf, child_wf])\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', source: 'out/python/fanout/worker.py', blocks: { fanoutparent: { diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/fanout_sync/test_fanout_sync.ts b/frontend/app/src/next/lib/docs/generated/snips/python/fanout_sync/test_fanout_sync.ts index 38f6cbd79..b82fd1373 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/fanout_sync/test_fanout_sync.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/fanout_sync/test_fanout_sync.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import asyncio\nfrom uuid import uuid4\n\nimport pytest\n\nfrom examples.fanout_sync.worker import ParentInput, sync_fanout_parent\nfrom hatchet_sdk import Hatchet, TriggerWorkflowOptions\n\n\ndef test_run() -> None:\n N = 2\n\n result = sync_fanout_parent.run(ParentInput(n=N))\n\n assert len(result["spawn"]["results"]) == N\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_additional_metadata_propagation_sync(hatchet: Hatchet) -> None:\n test_run_id = uuid4().hex\n\n ref = await sync_fanout_parent.aio_run_no_wait(\n ParentInput(n=2),\n options=TriggerWorkflowOptions(\n additional_metadata={"test_run_id": test_run_id}\n ),\n )\n\n await ref.aio_result()\n await asyncio.sleep(1)\n\n runs = await hatchet.runs.aio_list(\n parent_task_external_id=ref.workflow_run_id,\n additional_metadata={"test_run_id": test_run_id},\n )\n\n print(runs.model_dump_json(indent=2))\n\n assert runs.rows\n\n """Assert that the additional metadata is propagated to the child runs."""\n for run in runs.rows:\n assert run.additional_metadata\n assert run.additional_metadata["test_run_id"] == test_run_id\n\n assert run.children\n for child in run.children:\n assert child.additional_metadata\n assert child.additional_metadata["test_run_id"] == test_run_id\n', + 'import asyncio\nfrom uuid import uuid4\n\nimport pytest\n\nfrom examples.fanout_sync.worker import ParentInput, sync_fanout_parent\nfrom hatchet_sdk import Hatchet, TriggerWorkflowOptions\n\n\ndef test_run() -> None:\n N = 2\n\n result = sync_fanout_parent.run(ParentInput(n=N))\n\n assert len(result["spawn"]["results"]) == N\n\n\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_additional_metadata_propagation_sync(hatchet: Hatchet) -> None:\n test_run_id = uuid4().hex\n\n ref = await sync_fanout_parent.aio_run_no_wait(\n ParentInput(n=2),\n options=TriggerWorkflowOptions(\n additional_metadata={"test_run_id": test_run_id}\n ),\n )\n\n await ref.aio_result()\n await asyncio.sleep(1)\n\n runs = await hatchet.runs.aio_list(\n parent_task_external_id=ref.workflow_run_id,\n additional_metadata={"test_run_id": test_run_id},\n )\n\n assert runs.rows\n\n """Assert that the additional metadata is propagated to the child runs."""\n for run in runs.rows:\n assert run.additional_metadata\n assert run.additional_metadata["test_run_id"] == test_run_id\n\n assert run.children\n for child in run.children:\n assert child.additional_metadata\n assert child.additional_metadata["test_run_id"] == test_run_id\n', source: 'out/python/fanout_sync/test_fanout_sync.py', blocks: {}, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts b/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts index 0b074eee1..f4241a481 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/worker.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.durable.worker import durable_workflow\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "e2e-test-worker",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n return_exceptions_task,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', + 'from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.durable.worker import durable_workflow, wait_for_sleep_twice\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n "e2e-test-worker",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n return_exceptions_task,\n wait_for_sleep_twice,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', source: 'out/python/worker.py', blocks: {}, highlights: {}, diff --git a/frontend/docs/lib/generated/snips/python/durable/test_durable.ts b/frontend/docs/lib/generated/snips/python/durable/test_durable.ts index d42abbebf..058ded548 100644 --- a/frontend/docs/lib/generated/snips/python/durable/test_durable.ts +++ b/frontend/docs/lib/generated/snips/python/durable/test_durable.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import EVENT_KEY, SLEEP_TIME, durable_workflow\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {\"test\": \"test\"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == \"ACTIVE\"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker\")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker_durable\")\n for w in active_workers\n )\n\n assert result[\"durable_task\"][\"status\"] == \"success\"\n\n wait_group_1 = result[\"wait_for_or_group_1\"]\n wait_group_2 = result[\"wait_for_or_group_2\"]\n\n assert abs(wait_group_1[\"runtime\"] - SLEEP_TIME) < 3\n\n assert wait_group_1[\"key\"] == wait_group_2[\"key\"]\n assert wait_group_1[\"key\"] == \"CREATE\"\n assert \"sleep\" in wait_group_1[\"event_id\"]\n assert \"event\" in wait_group_2[\"event_id\"]\n", + "content": "import asyncio\n\nimport pytest\n\nfrom examples.durable.worker import (\n EVENT_KEY,\n SLEEP_TIME,\n durable_workflow,\n wait_for_sleep_twice,\n)\nfrom hatchet_sdk import Hatchet\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable(hatchet: Hatchet) -> None:\n ref = durable_workflow.run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME + 10)\n\n hatchet.event.push(EVENT_KEY, {\"test\": \"test\"})\n\n result = await ref.aio_result()\n\n workers = await hatchet.workers.aio_list()\n\n assert workers.rows\n\n active_workers = [w for w in workers.rows if w.status == \"ACTIVE\"]\n\n assert len(active_workers) == 2\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker\")\n for w in active_workers\n )\n assert any(\n w.name == hatchet.config.apply_namespace(\"e2e-test-worker_durable\")\n for w in active_workers\n )\n\n assert result[\"durable_task\"][\"status\"] == \"success\"\n\n wait_group_1 = result[\"wait_for_or_group_1\"]\n wait_group_2 = result[\"wait_for_or_group_2\"]\n\n assert abs(wait_group_1[\"runtime\"] - SLEEP_TIME) < 3\n\n assert wait_group_1[\"key\"] == wait_group_2[\"key\"]\n assert wait_group_1[\"key\"] == \"CREATE\"\n assert \"sleep\" in wait_group_1[\"event_id\"]\n assert \"event\" in wait_group_2[\"event_id\"]\n\n wait_for_multi_sleep = result[\"wait_for_multi_sleep\"]\n\n assert wait_for_multi_sleep[\"runtime\"] > 3 * SLEEP_TIME\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None:\n first_sleep = await wait_for_sleep_twice.aio_run_no_wait()\n\n await asyncio.sleep(SLEEP_TIME / 2)\n\n await hatchet.runs.aio_cancel(first_sleep.workflow_run_id)\n\n await first_sleep.aio_result()\n\n await hatchet.runs.aio_replay(\n first_sleep.workflow_run_id,\n )\n\n second_sleep_result = await first_sleep.aio_result()\n\n \"\"\"We've already slept for a little bit by the time the task is cancelled\"\"\"\n assert second_sleep_result[\"runtime\"] < SLEEP_TIME\n", "source": "out/python/durable/test_durable.py", "blocks": {}, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/durable/worker.ts b/frontend/docs/lib/generated/snips/python/durable/worker.ts index 864baa2ac..f6f694497 100644 --- a/frontend/docs/lib/generated/snips/python/durable/worker.ts +++ b/frontend/docs/lib/generated/snips/python/durable/worker.ts @@ -2,20 +2,20 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import time\nfrom datetime import timedelta\nfrom uuid import uuid4\n\nfrom hatchet_sdk import (\n Context,\n DurableContext,\n EmptyModel,\n Hatchet,\n SleepCondition,\n UserEventCondition,\n or_,\n)\n\nhatchet = Hatchet(debug=True)\n\n# > Create a durable workflow\ndurable_workflow = hatchet.workflow(name=\"DurableWorkflow\")\n\n\nephemeral_workflow = hatchet.workflow(name=\"EphemeralWorkflow\")\n\n\n# > Add durable task\nEVENT_KEY = \"durable-example:event\"\nSLEEP_TIME = 5\n\n\n@durable_workflow.task()\nasync def ephemeral_task(input: EmptyModel, ctx: Context) -> None:\n print(\"Running non-durable task\")\n\n\n@durable_workflow.durable_task()\nasync def durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:\n print(\"Waiting for sleep\")\n await ctx.aio_sleep_for(duration=timedelta(seconds=SLEEP_TIME))\n print(\"Sleep finished\")\n\n print(\"Waiting for event\")\n await ctx.aio_wait_for(\n \"event\",\n UserEventCondition(event_key=EVENT_KEY, expression=\"true\"),\n )\n print(\"Event received\")\n\n return {\n \"status\": \"success\",\n }\n\n\n\n\n# > Add durable tasks that wait for or groups\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_1(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n \"runtime\": int(time.time() - start),\n \"key\": key,\n \"event_id\": event_id,\n }\n\n\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_2(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=6 * SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n \"runtime\": int(time.time() - start),\n \"key\": key,\n \"event_id\": event_id,\n }\n\n\n@ephemeral_workflow.task()\ndef ephemeral_task_2(input: EmptyModel, ctx: Context) -> None:\n print(\"Running non-durable task\")\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"durable-worker\", workflows=[durable_workflow, ephemeral_workflow]\n )\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", + "content": "import asyncio\nimport time\nfrom datetime import timedelta\nfrom uuid import uuid4\n\nfrom hatchet_sdk import (\n Context,\n DurableContext,\n EmptyModel,\n Hatchet,\n SleepCondition,\n UserEventCondition,\n or_,\n)\n\nhatchet = Hatchet(debug=True)\n\n# > Create a durable workflow\ndurable_workflow = hatchet.workflow(name=\"DurableWorkflow\")\n\n\nephemeral_workflow = hatchet.workflow(name=\"EphemeralWorkflow\")\n\n\n# > Add durable task\nEVENT_KEY = \"durable-example:event\"\nSLEEP_TIME = 5\n\n\n@durable_workflow.task()\nasync def ephemeral_task(input: EmptyModel, ctx: Context) -> None:\n print(\"Running non-durable task\")\n\n\n@durable_workflow.durable_task()\nasync def durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:\n print(\"Waiting for sleep\")\n await ctx.aio_sleep_for(duration=timedelta(seconds=SLEEP_TIME))\n print(\"Sleep finished\")\n\n print(\"Waiting for event\")\n await ctx.aio_wait_for(\n \"event\",\n UserEventCondition(event_key=EVENT_KEY, expression=\"true\"),\n )\n print(\"Event received\")\n\n return {\n \"status\": \"success\",\n }\n\n\n\n\n# > Add durable tasks that wait for or groups\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_1(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n \"runtime\": int(time.time() - start),\n \"key\": key,\n \"event_id\": event_id,\n }\n\n\n\n\n@durable_workflow.durable_task()\nasync def wait_for_or_group_2(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n wait_result = await ctx.aio_wait_for(\n uuid4().hex,\n or_(\n SleepCondition(timedelta(seconds=6 * SLEEP_TIME)),\n UserEventCondition(event_key=EVENT_KEY),\n ),\n )\n\n key = list(wait_result.keys())[0]\n event_id = list(wait_result[key].keys())[0]\n\n return {\n \"runtime\": int(time.time() - start),\n \"key\": key,\n \"event_id\": event_id,\n }\n\n\n@durable_workflow.durable_task()\nasync def wait_for_multi_sleep(\n _i: EmptyModel, ctx: DurableContext\n) -> dict[str, str | int]:\n start = time.time()\n\n for _ in range(3):\n await ctx.aio_sleep_for(\n timedelta(seconds=SLEEP_TIME),\n )\n\n return {\n \"runtime\": int(time.time() - start),\n }\n\n\n@ephemeral_workflow.task()\ndef ephemeral_task_2(input: EmptyModel, ctx: Context) -> None:\n print(\"Running non-durable task\")\n\n\n@hatchet.durable_task()\nasync def wait_for_sleep_twice(\n input: EmptyModel, ctx: DurableContext\n) -> dict[str, int]:\n try:\n start = time.time()\n\n await ctx.aio_sleep_for(\n timedelta(seconds=SLEEP_TIME),\n )\n\n return {\n \"runtime\": int(time.time() - start),\n }\n except asyncio.CancelledError:\n return {\"runtime\": -1}\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"durable-worker\",\n workflows=[durable_workflow, ephemeral_workflow, wait_for_sleep_twice],\n )\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", "source": "out/python/durable/worker.py", "blocks": { "create_a_durable_workflow": { - "start": 18, - "stop": 18 + "start": 19, + "stop": 19 }, "add_durable_task": { - "start": 25, - "stop": 51 + "start": 26, + "stop": 52 }, "add_durable_tasks_that_wait_for_or_groups": { - "start": 55, - "stop": 79 + "start": 56, + "stop": 80 } }, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/events/test_event.ts b/frontend/docs/lib/generated/snips/python/events/test_event.ts index c30528230..78c072f87 100644 --- a/frontend/docs/lib/generated/snips/python/events/test_event.ts +++ b/frontend/docs/lib/generated/snips/python/events/test_event.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import asyncio\nimport json\nfrom collections.abc import AsyncGenerator\nfrom contextlib import asynccontextmanager\nfrom datetime import datetime, timedelta, timezone\nfrom typing import cast\nfrom uuid import uuid4\n\nimport pytest\nfrom pydantic import BaseModel\n\nfrom examples.events.worker import (\n EVENT_KEY,\n SECONDARY_KEY,\n WILDCARD_KEY,\n EventWorkflowInput,\n event_workflow,\n)\nfrom hatchet_sdk.clients.events import (\n BulkPushEventOptions,\n BulkPushEventWithMetadata,\n PushEventOptions,\n)\nfrom hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus\nfrom hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary\nfrom hatchet_sdk.contracts.events_pb2 import Event\nfrom hatchet_sdk.hatchet import Hatchet\n\n\nclass ProcessedEvent(BaseModel):\n id: str\n payload: dict[str, str | bool]\n meta: dict[str, str | bool | int]\n should_have_runs: bool\n test_run_id: str\n\n def __hash__(self) -> int:\n return hash(self.model_dump_json())\n\n\n@asynccontextmanager\nasync def event_filter(\n hatchet: Hatchet,\n test_run_id: str,\n expression: str | None = None,\n payload: dict[str, str] = {},\n) -> AsyncGenerator[None, None]:\n expression = (\n expression\n or f\"input.should_skip == false && payload.test_run_id == '{test_run_id}'\"\n )\n\n f = await hatchet.filters.aio_create(\n workflow_id=event_workflow.id,\n expression=expression,\n scope=test_run_id,\n payload={\"test_run_id\": test_run_id, **payload},\n )\n\n try:\n yield\n finally:\n await hatchet.filters.aio_delete(f.metadata.id)\n\n\nasync def fetch_runs_for_event(\n hatchet: Hatchet, event: Event\n) -> tuple[ProcessedEvent, list[V1TaskSummary]]:\n runs = await hatchet.runs.aio_list(triggering_event_external_id=event.eventId)\n\n meta = (\n cast(dict[str, str | int | bool], json.loads(event.additionalMetadata))\n if event.additionalMetadata\n else {}\n )\n payload = (\n cast(dict[str, str | bool], json.loads(event.payload)) if event.payload else {}\n )\n\n processed_event = ProcessedEvent(\n id=event.eventId,\n payload=payload,\n meta=meta,\n should_have_runs=meta.get(\"should_have_runs\", False) is True,\n test_run_id=cast(str, meta[\"test_run_id\"]),\n )\n\n if not all([r.output for r in runs.rows]):\n return (processed_event, [])\n\n return (\n processed_event,\n runs.rows or [],\n )\n\n\nasync def wait_for_result(\n hatchet: Hatchet, events: list[Event]\n) -> dict[ProcessedEvent, list[V1TaskSummary]]:\n await asyncio.sleep(3)\n\n since = datetime.now(tz=timezone.utc) - timedelta(minutes=2)\n\n persisted = (await hatchet.event.aio_list(limit=100, since=since)).rows or []\n\n assert {e.eventId for e in events}.issubset({e.metadata.id for e in persisted})\n\n iters = 0\n while True:\n print(\"Waiting for event runs to complete...\")\n if iters > 15:\n print(\"Timed out waiting for event runs to complete.\")\n return {\n ProcessedEvent(\n id=event.eventId,\n payload=json.loads(event.payload) if event.payload else {},\n meta=(\n json.loads(event.additionalMetadata)\n if event.additionalMetadata\n else {}\n ),\n should_have_runs=False,\n test_run_id=cast(\n str, json.loads(event.additionalMetadata).get(\"test_run_id\", \"\")\n ),\n ): []\n for event in events\n }\n\n iters += 1\n\n event_runs = await asyncio.gather(\n *[fetch_runs_for_event(hatchet, event) for event in events]\n )\n\n all_empty = all(not event_run for _, event_run in event_runs)\n\n if all_empty:\n await asyncio.sleep(1)\n continue\n\n event_id_to_runs = {event_id: runs for (event_id, runs) in event_runs}\n\n any_queued_or_running = any(\n run.status in [V1TaskStatus.QUEUED, V1TaskStatus.RUNNING]\n for runs in event_id_to_runs.values()\n for run in runs\n )\n\n if any_queued_or_running:\n await asyncio.sleep(1)\n continue\n\n break\n\n return event_id_to_runs\n\n\nasync def wait_for_result_and_assert(hatchet: Hatchet, events: list[Event]) -> None:\n event_to_runs = await wait_for_result(hatchet, events)\n\n for event, runs in event_to_runs.items():\n await assert_event_runs_processed(event, runs)\n\n\nasync def assert_event_runs_processed(\n event: ProcessedEvent,\n runs: list[V1TaskSummary],\n) -> None:\n runs = [\n run\n for run in runs\n if (run.additional_metadata or {}).get(\"hatchet__event_id\") == event.id\n ]\n\n if event.should_have_runs:\n assert len(runs) > 0\n\n for run in runs:\n assert run.status == V1TaskStatus.COMPLETED\n assert run.output.get(\"test_run_id\") == event.test_run_id\n else:\n assert len(runs) == 0\n\n\ndef bpi(\n index: int = 1,\n test_run_id: str = \"\",\n should_skip: bool = False,\n should_have_runs: bool = True,\n key: str = EVENT_KEY,\n payload: dict[str, str] = {},\n scope: str | None = None,\n) -> BulkPushEventWithMetadata:\n return BulkPushEventWithMetadata(\n key=key,\n payload={\n \"should_skip\": should_skip,\n **payload,\n },\n additional_metadata={\n \"should_have_runs\": should_have_runs,\n \"test_run_id\": test_run_id,\n \"key\": index,\n },\n scope=scope,\n )\n\n\ndef cp(should_skip: bool) -> dict[str, bool]:\n return EventWorkflowInput(should_skip=should_skip).model_dump()\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_push(hatchet: Hatchet) -> None:\n e = hatchet.event.push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_async_event_push(hatchet: Hatchet) -> None:\n e = await hatchet.event.aio_push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_async_event_bulk_push(hatchet: Hatchet) -> None:\n events = [\n BulkPushEventWithMetadata(\n key=\"event1\",\n payload={\"message\": \"This is event 1\", \"should_skip\": False},\n additional_metadata={\"source\": \"test\", \"user_id\": \"user123\"},\n ),\n BulkPushEventWithMetadata(\n key=\"event2\",\n payload={\"message\": \"This is event 2\", \"should_skip\": False},\n additional_metadata={\"source\": \"test\", \"user_id\": \"user456\"},\n ),\n BulkPushEventWithMetadata(\n key=\"event3\",\n payload={\"message\": \"This is event 3\", \"should_skip\": False},\n additional_metadata={\"source\": \"test\", \"user_id\": \"user789\"},\n ),\n ]\n opts = BulkPushEventOptions(namespace=\"bulk-test\")\n\n e = await hatchet.event.aio_bulk_push(events, opts)\n\n assert len(e) == 3\n\n # Sort both lists of events by their key to ensure comparison order\n sorted_events = sorted(events, key=lambda x: x.key)\n sorted_returned_events = sorted(e, key=lambda x: x.key)\n namespace = \"bulk-test\"\n\n # Check that the returned events match the original events\n for original_event, returned_event in zip(\n sorted_events, sorted_returned_events, strict=False\n ):\n assert returned_event.key == namespace + original_event.key\n\n\n@pytest.fixture(scope=\"function\")\ndef test_run_id() -> str:\n return str(uuid4())\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_engine_behavior(hatchet: Hatchet) -> None:\n test_run_id = str(uuid4())\n events = [\n bpi(\n test_run_id=test_run_id,\n ),\n bpi(\n test_run_id=test_run_id,\n key=\"thisisafakeeventfoobarbaz\",\n should_have_runs=False,\n ),\n ]\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\ndef gen_bulk_events(test_run_id: str) -> list[BulkPushEventWithMetadata]:\n return [\n ## No scope, so it shouldn't have any runs\n bpi(\n index=1,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n ),\n ## No scope, so it shouldn't have any runs\n bpi(\n index=2,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n ),\n ## Scope is set and `should_skip` is False, so it should have runs\n bpi(\n index=3,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=True,\n scope=test_run_id,\n ),\n ## Scope is set and `should_skip` is True, so it shouldn't have runs\n bpi(\n index=4,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn't have runs\n bpi(\n index=5,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n key=\"thisisafakeeventfoobarbaz\",\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn't have runs\n bpi(\n index=6,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n scope=test_run_id,\n key=\"thisisafakeeventfoobarbaz\",\n ),\n ]\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_skipping_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(hatchet, test_run_id):\n events = gen_bulk_events(test_run_id)\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\nasync def bulk_to_single(hatchet: Hatchet, event: BulkPushEventWithMetadata) -> Event:\n return await hatchet.event.aio_push(\n event_key=event.key,\n payload=event.payload,\n options=PushEventOptions(\n scope=event.scope,\n additional_metadata=event.additional_metadata,\n priority=event.priority,\n ),\n )\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_skipping_filtering_no_bulk(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(hatchet, test_run_id):\n raw_events = gen_bulk_events(test_run_id)\n events = await asyncio.gather(\n *[bulk_to_single(hatchet, event) for event in raw_events]\n )\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_payload_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n \"input.should_skip == false && payload.foobar == 'baz'\",\n {\"foobar\": \"qux\"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\"message\": \"This is event 1\", \"should_skip\": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": False,\n \"test_run_id\": test_run_id,\n \"key\": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_payload_filtering_with_payload_match(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n \"input.should_skip == false && payload.foobar == 'baz'\",\n {\"foobar\": \"baz\"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\"message\": \"This is event 1\", \"should_skip\": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": True,\n \"test_run_id\": test_run_id,\n \"key\": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_filtering_by_event_key(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n f\"event_key == '{SECONDARY_KEY}'\",\n ):\n event_1 = await hatchet.event.aio_push(\n event_key=SECONDARY_KEY,\n payload={\n \"message\": \"Should run because filter matches\",\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": True,\n \"test_run_id\": test_run_id,\n },\n ),\n )\n event_2 = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n \"message\": \"Should skip because filter does not match\",\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": False,\n \"test_run_id\": test_run_id,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event_1, event_2])\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_key_wildcards(hatchet: Hatchet, test_run_id: str) -> None:\n keys = [\n WILDCARD_KEY.replace(\"*\", \"1\"),\n WILDCARD_KEY.replace(\"*\", \"2\"),\n \"foobar\",\n EVENT_KEY,\n ]\n\n async with event_filter(\n hatchet,\n test_run_id,\n ):\n events = [\n await hatchet.event.aio_push(\n event_key=key,\n payload={\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": key != \"foobar\",\n \"test_run_id\": test_run_id,\n },\n ),\n )\n for key in keys\n ]\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_multiple_runs_for_multiple_scope_matches(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet, test_run_id, payload={\"filter_id\": \"1\"}, expression=\"1 == 1\"\n ):\n async with event_filter(\n hatchet, test_run_id, payload={\"filter_id\": \"2\"}, expression=\"2 == 2\"\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": True,\n \"test_run_id\": test_run_id,\n },\n ),\n )\n\n event_to_runs = await wait_for_result(hatchet, [event])\n\n assert len(event_to_runs.keys()) == 1\n\n runs = list(event_to_runs.values())[0]\n\n assert len(runs) == 2\n\n assert {r.output.get(\"filter_id\") for r in runs} == {\"1\", \"2\"}\n", + "content": "import asyncio\nimport json\nfrom collections.abc import AsyncGenerator\nfrom contextlib import asynccontextmanager\nfrom datetime import datetime, timedelta, timezone\nfrom typing import cast\nfrom uuid import uuid4\n\nimport pytest\nfrom pydantic import BaseModel\n\nfrom examples.events.worker import (\n EVENT_KEY,\n SECONDARY_KEY,\n WILDCARD_KEY,\n EventWorkflowInput,\n event_workflow,\n)\nfrom hatchet_sdk.clients.events import (\n BulkPushEventOptions,\n BulkPushEventWithMetadata,\n PushEventOptions,\n)\nfrom hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus\nfrom hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary\nfrom hatchet_sdk.contracts.events_pb2 import Event\nfrom hatchet_sdk.hatchet import Hatchet\n\n\nclass ProcessedEvent(BaseModel):\n id: str\n payload: dict[str, str | bool]\n meta: dict[str, str | bool | int]\n should_have_runs: bool\n test_run_id: str\n\n def __hash__(self) -> int:\n return hash(self.model_dump_json())\n\n\n@asynccontextmanager\nasync def event_filter(\n hatchet: Hatchet,\n test_run_id: str,\n expression: str | None = None,\n payload: dict[str, str] = {},\n scope: str | None = None,\n) -> AsyncGenerator[None, None]:\n expression = (\n expression\n or f\"input.should_skip == false && payload.test_run_id == '{test_run_id}'\"\n )\n\n f = await hatchet.filters.aio_create(\n workflow_id=event_workflow.id,\n expression=expression,\n scope=scope or test_run_id,\n payload={\"test_run_id\": test_run_id, **payload},\n )\n\n try:\n yield\n finally:\n await hatchet.filters.aio_delete(f.metadata.id)\n\n\nasync def fetch_runs_for_event(\n hatchet: Hatchet, event: Event\n) -> tuple[ProcessedEvent, list[V1TaskSummary]]:\n runs = await hatchet.runs.aio_list(triggering_event_external_id=event.eventId)\n\n meta = (\n cast(dict[str, str | int | bool], json.loads(event.additionalMetadata))\n if event.additionalMetadata\n else {}\n )\n payload = (\n cast(dict[str, str | bool], json.loads(event.payload)) if event.payload else {}\n )\n\n processed_event = ProcessedEvent(\n id=event.eventId,\n payload=payload,\n meta=meta,\n should_have_runs=meta.get(\"should_have_runs\", False) is True,\n test_run_id=cast(str, meta[\"test_run_id\"]),\n )\n\n if not all([r.output for r in runs.rows]):\n return (processed_event, [])\n\n return (\n processed_event,\n runs.rows or [],\n )\n\n\nasync def wait_for_result(\n hatchet: Hatchet, events: list[Event]\n) -> dict[ProcessedEvent, list[V1TaskSummary]]:\n await asyncio.sleep(3)\n\n since = datetime.now(tz=timezone.utc) - timedelta(minutes=2)\n\n persisted = (await hatchet.event.aio_list(limit=100, since=since)).rows or []\n\n assert {e.eventId for e in events}.issubset({e.metadata.id for e in persisted})\n\n iters = 0\n while True:\n print(\"Waiting for event runs to complete...\")\n if iters > 15:\n print(\"Timed out waiting for event runs to complete.\")\n return {\n ProcessedEvent(\n id=event.eventId,\n payload=json.loads(event.payload) if event.payload else {},\n meta=(\n json.loads(event.additionalMetadata)\n if event.additionalMetadata\n else {}\n ),\n should_have_runs=False,\n test_run_id=cast(\n str, json.loads(event.additionalMetadata).get(\"test_run_id\", \"\")\n ),\n ): []\n for event in events\n }\n\n iters += 1\n\n event_runs = await asyncio.gather(\n *[fetch_runs_for_event(hatchet, event) for event in events]\n )\n\n all_empty = all(not event_run for _, event_run in event_runs)\n\n if all_empty:\n await asyncio.sleep(1)\n continue\n\n event_id_to_runs = {event_id: runs for (event_id, runs) in event_runs}\n\n any_queued_or_running = any(\n run.status in [V1TaskStatus.QUEUED, V1TaskStatus.RUNNING]\n for runs in event_id_to_runs.values()\n for run in runs\n )\n\n if any_queued_or_running:\n await asyncio.sleep(1)\n continue\n\n break\n\n return event_id_to_runs\n\n\nasync def wait_for_result_and_assert(hatchet: Hatchet, events: list[Event]) -> None:\n event_to_runs = await wait_for_result(hatchet, events)\n\n for event, runs in event_to_runs.items():\n await assert_event_runs_processed(event, runs)\n\n\nasync def assert_event_runs_processed(\n event: ProcessedEvent,\n runs: list[V1TaskSummary],\n) -> None:\n runs = [\n run\n for run in runs\n if (run.additional_metadata or {}).get(\"hatchet__event_id\") == event.id\n ]\n\n if event.should_have_runs:\n assert len(runs) > 0\n\n for run in runs:\n assert run.status == V1TaskStatus.COMPLETED\n assert run.output.get(\"test_run_id\") == event.test_run_id\n else:\n assert len(runs) == 0\n\n\ndef bpi(\n index: int = 1,\n test_run_id: str = \"\",\n should_skip: bool = False,\n should_have_runs: bool = True,\n key: str = EVENT_KEY,\n payload: dict[str, str] = {},\n scope: str | None = None,\n) -> BulkPushEventWithMetadata:\n return BulkPushEventWithMetadata(\n key=key,\n payload={\n \"should_skip\": should_skip,\n **payload,\n },\n additional_metadata={\n \"should_have_runs\": should_have_runs,\n \"test_run_id\": test_run_id,\n \"key\": index,\n },\n scope=scope,\n )\n\n\ndef cp(should_skip: bool) -> dict[str, bool]:\n return EventWorkflowInput(should_skip=should_skip).model_dump()\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_push(hatchet: Hatchet) -> None:\n e = hatchet.event.push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_async_event_push(hatchet: Hatchet) -> None:\n e = await hatchet.event.aio_push(EVENT_KEY, cp(False))\n\n assert e.eventId is not None\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_async_event_bulk_push(hatchet: Hatchet) -> None:\n events = [\n BulkPushEventWithMetadata(\n key=\"event1\",\n payload={\"message\": \"This is event 1\", \"should_skip\": False},\n additional_metadata={\"source\": \"test\", \"user_id\": \"user123\"},\n ),\n BulkPushEventWithMetadata(\n key=\"event2\",\n payload={\"message\": \"This is event 2\", \"should_skip\": False},\n additional_metadata={\"source\": \"test\", \"user_id\": \"user456\"},\n ),\n BulkPushEventWithMetadata(\n key=\"event3\",\n payload={\"message\": \"This is event 3\", \"should_skip\": False},\n additional_metadata={\"source\": \"test\", \"user_id\": \"user789\"},\n ),\n ]\n opts = BulkPushEventOptions(namespace=\"bulk-test\")\n\n e = await hatchet.event.aio_bulk_push(events, opts)\n\n assert len(e) == 3\n\n # Sort both lists of events by their key to ensure comparison order\n sorted_events = sorted(events, key=lambda x: x.key)\n sorted_returned_events = sorted(e, key=lambda x: x.key)\n namespace = \"bulk-test\"\n\n # Check that the returned events match the original events\n for original_event, returned_event in zip(\n sorted_events, sorted_returned_events, strict=False\n ):\n assert returned_event.key == namespace + original_event.key\n\n\n@pytest.fixture(scope=\"function\")\ndef test_run_id() -> str:\n return str(uuid4())\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_engine_behavior(hatchet: Hatchet) -> None:\n test_run_id = str(uuid4())\n events = [\n bpi(\n test_run_id=test_run_id,\n ),\n bpi(\n test_run_id=test_run_id,\n key=\"thisisafakeeventfoobarbaz\",\n should_have_runs=False,\n ),\n ]\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\ndef gen_bulk_events(test_run_id: str) -> list[BulkPushEventWithMetadata]:\n return [\n ## No scope, so it shouldn't have any runs\n bpi(\n index=1,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n ),\n ## No scope, so it shouldn't have any runs\n bpi(\n index=2,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n ),\n ## Scope is set and `should_skip` is False, so it should have runs\n bpi(\n index=3,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=True,\n scope=test_run_id,\n ),\n ## Scope is set and `should_skip` is True, so it shouldn't have runs\n bpi(\n index=4,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn't have runs\n bpi(\n index=5,\n test_run_id=test_run_id,\n should_skip=True,\n should_have_runs=False,\n scope=test_run_id,\n key=\"thisisafakeeventfoobarbaz\",\n ),\n ## Scope is set, `should_skip` is False, but key is different, so it shouldn't have runs\n bpi(\n index=6,\n test_run_id=test_run_id,\n should_skip=False,\n should_have_runs=False,\n scope=test_run_id,\n key=\"thisisafakeeventfoobarbaz\",\n ),\n ]\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_skipping_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(hatchet, test_run_id):\n events = gen_bulk_events(test_run_id)\n\n result = await hatchet.event.aio_bulk_push(events)\n\n await wait_for_result_and_assert(hatchet, result)\n\n\nasync def bulk_to_single(hatchet: Hatchet, event: BulkPushEventWithMetadata) -> Event:\n return await hatchet.event.aio_push(\n event_key=event.key,\n payload=event.payload,\n options=PushEventOptions(\n scope=event.scope,\n additional_metadata=event.additional_metadata,\n priority=event.priority,\n ),\n )\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_skipping_filtering_no_bulk(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(hatchet, test_run_id):\n raw_events = gen_bulk_events(test_run_id)\n events = await asyncio.gather(\n *[bulk_to_single(hatchet, event) for event in raw_events]\n )\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_payload_filtering(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n \"input.should_skip == false && payload.foobar == 'baz'\",\n {\"foobar\": \"qux\"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\"message\": \"This is event 1\", \"should_skip\": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": False,\n \"test_run_id\": test_run_id,\n \"key\": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_event_payload_filtering_with_payload_match(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n \"input.should_skip == false && payload.foobar == 'baz'\",\n {\"foobar\": \"baz\"},\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\"message\": \"This is event 1\", \"should_skip\": False},\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": True,\n \"test_run_id\": test_run_id,\n \"key\": 1,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event])\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_filtering_by_event_key(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(\n hatchet,\n test_run_id,\n f\"event_key == '{SECONDARY_KEY}'\",\n ):\n event_1 = await hatchet.event.aio_push(\n event_key=SECONDARY_KEY,\n payload={\n \"message\": \"Should run because filter matches\",\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": True,\n \"test_run_id\": test_run_id,\n },\n ),\n )\n event_2 = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n \"message\": \"Should skip because filter does not match\",\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": False,\n \"test_run_id\": test_run_id,\n },\n ),\n )\n\n await wait_for_result_and_assert(hatchet, [event_1, event_2])\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_key_wildcards(hatchet: Hatchet, test_run_id: str) -> None:\n keys = [\n WILDCARD_KEY.replace(\"*\", \"1\"),\n WILDCARD_KEY.replace(\"*\", \"2\"),\n \"foobar\",\n EVENT_KEY,\n ]\n\n async with event_filter(\n hatchet,\n test_run_id,\n ):\n events = [\n await hatchet.event.aio_push(\n event_key=key,\n payload={\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": key != \"foobar\",\n \"test_run_id\": test_run_id,\n },\n ),\n )\n for key in keys\n ]\n\n await wait_for_result_and_assert(hatchet, events)\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_multiple_runs_for_multiple_scope_matches(\n hatchet: Hatchet, test_run_id: str\n) -> None:\n async with event_filter(\n hatchet, test_run_id, payload={\"filter_id\": \"1\"}, expression=\"1 == 1\"\n ):\n async with event_filter(\n hatchet, test_run_id, payload={\"filter_id\": \"2\"}, expression=\"2 == 2\"\n ):\n event = await hatchet.event.aio_push(\n event_key=EVENT_KEY,\n payload={\n \"should_skip\": False,\n },\n options=PushEventOptions(\n scope=test_run_id,\n additional_metadata={\n \"should_have_runs\": True,\n \"test_run_id\": test_run_id,\n },\n ),\n )\n\n event_to_runs = await wait_for_result(hatchet, [event])\n\n assert len(event_to_runs.keys()) == 1\n\n runs = list(event_to_runs.values())[0]\n\n assert len(runs) == 2\n\n assert {r.output.get(\"filter_id\") for r in runs} == {\"1\", \"2\"}\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_multi_scope_bug(hatchet: Hatchet, test_run_id: str) -> None:\n async with event_filter(hatchet, test_run_id, expression=\"1 == 1\", scope=\"a\"):\n async with event_filter(\n hatchet,\n test_run_id,\n expression=\"2 == 2\",\n scope=\"b\",\n ):\n events = await hatchet.event.aio_bulk_push(\n [\n BulkPushEventWithMetadata(\n key=EVENT_KEY,\n payload={\n \"should_skip\": False,\n },\n additional_metadata={\n \"should_have_runs\": True,\n \"test_run_id\": test_run_id,\n },\n scope=\"a\" if i % 2 == 0 else \"b\",\n )\n for i in range(100)\n ],\n )\n\n await asyncio.sleep(15)\n\n for event in events:\n runs = await hatchet.runs.aio_list(\n triggering_event_external_id=event.eventId,\n additional_metadata={\"test_run_id\": test_run_id},\n )\n\n assert len(runs.rows) == 1\n", "source": "out/python/events/test_event.py", "blocks": {}, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/fanout/worker.ts b/frontend/docs/lib/generated/snips/python/fanout/worker.ts index 54ec596ef..dbf13631f 100644 --- a/frontend/docs/lib/generated/snips/python/fanout/worker.ts +++ b/frontend/docs/lib/generated/snips/python/fanout/worker.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "from datetime import timedelta\nfrom typing import Any\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Hatchet, TriggerWorkflowOptions\n\nhatchet = Hatchet(debug=True)\n\n\n# > FanoutParent\nclass ParentInput(BaseModel):\n n: int = 100\n\n\nclass ChildInput(BaseModel):\n a: str\n\n\nparent_wf = hatchet.workflow(name=\"FanoutParent\", input_validator=ParentInput)\nchild_wf = hatchet.workflow(name=\"FanoutChild\", input_validator=ChildInput)\n\n\n@parent_wf.task(execution_timeout=timedelta(minutes=5))\nasync def spawn(input: ParentInput, ctx: Context) -> dict[str, Any]:\n print(\"spawning child\")\n\n result = await child_wf.aio_run_many(\n [\n child_wf.create_bulk_run_item(\n input=ChildInput(a=str(i)),\n options=TriggerWorkflowOptions(\n additional_metadata={\"hello\": \"earth\"}, key=f\"child{i}\"\n ),\n )\n for i in range(input.n)\n ],\n )\n\n print(f\"results {result}\")\n\n return {\"results\": result}\n\n\n\n\n# > FanoutChild\n@child_wf.task()\nasync def process(input: ChildInput, ctx: Context) -> dict[str, str]:\n print(f\"child process {input.a}\")\n return {\"status\": input.a}\n\n\n@child_wf.task(parents=[process])\nasync def process2(input: ChildInput, ctx: Context) -> dict[str, str]:\n process_output = ctx.task_output(process)\n a = process_output[\"status\"]\n\n return {\"status2\": a + \"2\"}\n\n\n\nchild_wf.create_bulk_run_item()\n\n\ndef main() -> None:\n worker = hatchet.worker(\"fanout-worker\", slots=40, workflows=[parent_wf, child_wf])\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", + "content": "from datetime import timedelta\nfrom typing import Any\n\nfrom pydantic import BaseModel\n\nfrom hatchet_sdk import Context, Hatchet, TriggerWorkflowOptions\n\nhatchet = Hatchet(debug=True)\n\n\n# > FanoutParent\nclass ParentInput(BaseModel):\n n: int = 100\n\n\nclass ChildInput(BaseModel):\n a: str\n\n\nparent_wf = hatchet.workflow(name=\"FanoutParent\", input_validator=ParentInput)\nchild_wf = hatchet.workflow(name=\"FanoutChild\", input_validator=ChildInput)\n\n\n@parent_wf.task(execution_timeout=timedelta(minutes=5))\nasync def spawn(input: ParentInput, ctx: Context) -> dict[str, Any]:\n print(\"spawning child\")\n\n result = await child_wf.aio_run_many(\n [\n child_wf.create_bulk_run_item(\n input=ChildInput(a=str(i)),\n options=TriggerWorkflowOptions(\n additional_metadata={\"hello\": \"earth\"}, key=f\"child{i}\"\n ),\n )\n for i in range(input.n)\n ],\n )\n\n print(f\"results {result}\")\n\n return {\"results\": result}\n\n\n\n\n# > FanoutChild\n@child_wf.task()\nasync def process(input: ChildInput, ctx: Context) -> dict[str, str]:\n print(f\"child process {input.a}\")\n return {\"status\": input.a}\n\n\n@child_wf.task(parents=[process])\nasync def process2(input: ChildInput, ctx: Context) -> dict[str, str]:\n process_output = ctx.task_output(process)\n a = process_output[\"status\"]\n\n return {\"status2\": a + \"2\"}\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\"fanout-worker\", slots=40, workflows=[parent_wf, child_wf])\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", "source": "out/python/fanout/worker.py", "blocks": { "fanoutparent": { diff --git a/frontend/docs/lib/generated/snips/python/fanout_sync/test_fanout_sync.ts b/frontend/docs/lib/generated/snips/python/fanout_sync/test_fanout_sync.ts index aa3832f60..0414fd842 100644 --- a/frontend/docs/lib/generated/snips/python/fanout_sync/test_fanout_sync.ts +++ b/frontend/docs/lib/generated/snips/python/fanout_sync/test_fanout_sync.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import asyncio\nfrom uuid import uuid4\n\nimport pytest\n\nfrom examples.fanout_sync.worker import ParentInput, sync_fanout_parent\nfrom hatchet_sdk import Hatchet, TriggerWorkflowOptions\n\n\ndef test_run() -> None:\n N = 2\n\n result = sync_fanout_parent.run(ParentInput(n=N))\n\n assert len(result[\"spawn\"][\"results\"]) == N\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_additional_metadata_propagation_sync(hatchet: Hatchet) -> None:\n test_run_id = uuid4().hex\n\n ref = await sync_fanout_parent.aio_run_no_wait(\n ParentInput(n=2),\n options=TriggerWorkflowOptions(\n additional_metadata={\"test_run_id\": test_run_id}\n ),\n )\n\n await ref.aio_result()\n await asyncio.sleep(1)\n\n runs = await hatchet.runs.aio_list(\n parent_task_external_id=ref.workflow_run_id,\n additional_metadata={\"test_run_id\": test_run_id},\n )\n\n print(runs.model_dump_json(indent=2))\n\n assert runs.rows\n\n \"\"\"Assert that the additional metadata is propagated to the child runs.\"\"\"\n for run in runs.rows:\n assert run.additional_metadata\n assert run.additional_metadata[\"test_run_id\"] == test_run_id\n\n assert run.children\n for child in run.children:\n assert child.additional_metadata\n assert child.additional_metadata[\"test_run_id\"] == test_run_id\n", + "content": "import asyncio\nfrom uuid import uuid4\n\nimport pytest\n\nfrom examples.fanout_sync.worker import ParentInput, sync_fanout_parent\nfrom hatchet_sdk import Hatchet, TriggerWorkflowOptions\n\n\ndef test_run() -> None:\n N = 2\n\n result = sync_fanout_parent.run(ParentInput(n=N))\n\n assert len(result[\"spawn\"][\"results\"]) == N\n\n\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_additional_metadata_propagation_sync(hatchet: Hatchet) -> None:\n test_run_id = uuid4().hex\n\n ref = await sync_fanout_parent.aio_run_no_wait(\n ParentInput(n=2),\n options=TriggerWorkflowOptions(\n additional_metadata={\"test_run_id\": test_run_id}\n ),\n )\n\n await ref.aio_result()\n await asyncio.sleep(1)\n\n runs = await hatchet.runs.aio_list(\n parent_task_external_id=ref.workflow_run_id,\n additional_metadata={\"test_run_id\": test_run_id},\n )\n\n assert runs.rows\n\n \"\"\"Assert that the additional metadata is propagated to the child runs.\"\"\"\n for run in runs.rows:\n assert run.additional_metadata\n assert run.additional_metadata[\"test_run_id\"] == test_run_id\n\n assert run.children\n for child in run.children:\n assert child.additional_metadata\n assert child.additional_metadata[\"test_run_id\"] == test_run_id\n", "source": "out/python/fanout_sync/test_fanout_sync.py", "blocks": {}, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/worker.ts b/frontend/docs/lib/generated/snips/python/worker.ts index 350789b96..d8f814a36 100644 --- a/frontend/docs/lib/generated/snips/python/worker.ts +++ b/frontend/docs/lib/generated/snips/python/worker.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.durable.worker import durable_workflow\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"e2e-test-worker\",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n return_exceptions_task,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", + "content": "from examples.affinity_workers.worker import affinity_worker_workflow\nfrom examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf\nfrom examples.bulk_operations.worker import (\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n)\nfrom examples.cancellation.worker import cancellation_workflow\nfrom examples.concurrency_limit.worker import concurrency_limit_workflow\nfrom examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow\nfrom examples.concurrency_multiple_keys.worker import concurrency_multiple_keys_workflow\nfrom examples.concurrency_workflow_level.worker import (\n concurrency_workflow_level_workflow,\n)\nfrom examples.conditions.worker import task_condition_workflow\nfrom examples.dag.worker import dag_workflow\nfrom examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf\nfrom examples.durable.worker import durable_workflow, wait_for_sleep_twice\nfrom examples.events.worker import event_workflow\nfrom examples.fanout.worker import child_wf, parent_wf\nfrom examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent\nfrom examples.lifespans.simple import lifespan, lifespan_task\nfrom examples.logger.workflow import logging_workflow\nfrom examples.non_retryable.worker import non_retryable_workflow\nfrom examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details\nfrom examples.return_exceptions.worker import return_exceptions_task\nfrom examples.simple.worker import simple, simple_durable\nfrom examples.timeout.worker import refresh_timeout_wf, timeout_wf\nfrom hatchet_sdk import Hatchet\n\nhatchet = Hatchet(debug=True)\n\n\ndef main() -> None:\n worker = hatchet.worker(\n \"e2e-test-worker\",\n slots=100,\n workflows=[\n affinity_worker_workflow,\n bulk_child_wf,\n bulk_parent_wf,\n concurrency_limit_workflow,\n concurrency_limit_rr_workflow,\n concurrency_multiple_keys_workflow,\n dag_workflow,\n dedupe_child_wf,\n dedupe_parent_wf,\n durable_workflow,\n child_wf,\n event_workflow,\n parent_wf,\n on_failure_wf,\n on_failure_wf_with_details,\n logging_workflow,\n timeout_wf,\n refresh_timeout_wf,\n task_condition_workflow,\n cancellation_workflow,\n sync_fanout_parent,\n sync_fanout_child,\n non_retryable_workflow,\n concurrency_workflow_level_workflow,\n lifespan_task,\n simple,\n simple_durable,\n bulk_replay_test_1,\n bulk_replay_test_2,\n bulk_replay_test_3,\n return_exceptions_task,\n wait_for_sleep_twice,\n ],\n lifespan=lifespan,\n )\n\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", "source": "out/python/worker.py", "blocks": {}, "highlights": {} diff --git a/sdks/python/CHANGELOG.md b/sdks/python/CHANGELOG.md index f34f032bf..4e9b20bb2 100644 --- a/sdks/python/CHANGELOG.md +++ b/sdks/python/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.16.3] - 2025-07-23 + +### Added + +- Adds support for filters and formatters in the logger that's passed to the Hatchet client. +- Adds a flag to disable log capture. + +### Changed + +- Fixes a bug in `aio_sleep_for` and the `SleepCondition` that did not allow duplicate sleeps to be awaited correctly. +- Stops retrying gRPC requests on 4XX failures, since retrying won't help ## [1.16.2] - 2025-07-22 diff --git a/sdks/python/examples/durable/test_durable.py b/sdks/python/examples/durable/test_durable.py index 90c0cdb25..e4c5c182b 100644 --- a/sdks/python/examples/durable/test_durable.py +++ b/sdks/python/examples/durable/test_durable.py @@ -2,7 +2,12 @@ import asyncio import pytest -from examples.durable.worker import EVENT_KEY, SLEEP_TIME, durable_workflow +from examples.durable.worker import ( + EVENT_KEY, + SLEEP_TIME, + durable_workflow, + wait_for_sleep_twice, +) from hatchet_sdk import Hatchet @@ -43,3 +48,27 @@ async def test_durable(hatchet: Hatchet) -> None: assert wait_group_1["key"] == "CREATE" assert "sleep" in wait_group_1["event_id"] assert "event" in wait_group_2["event_id"] + + wait_for_multi_sleep = result["wait_for_multi_sleep"] + + assert wait_for_multi_sleep["runtime"] > 3 * SLEEP_TIME + + +@pytest.mark.asyncio(loop_scope="session") +async def test_durable_sleep_cancel_replay(hatchet: Hatchet) -> None: + first_sleep = await wait_for_sleep_twice.aio_run_no_wait() + + await asyncio.sleep(SLEEP_TIME / 2) + + await hatchet.runs.aio_cancel(first_sleep.workflow_run_id) + + await first_sleep.aio_result() + + await hatchet.runs.aio_replay( + first_sleep.workflow_run_id, + ) + + second_sleep_result = await first_sleep.aio_result() + + """We've already slept for a little bit by the time the task is cancelled""" + assert second_sleep_result["runtime"] < SLEEP_TIME diff --git a/sdks/python/examples/durable/worker.py b/sdks/python/examples/durable/worker.py index 1e34f7414..13fc2083a 100644 --- a/sdks/python/examples/durable/worker.py +++ b/sdks/python/examples/durable/worker.py @@ -1,3 +1,4 @@ +import asyncio import time from datetime import timedelta from uuid import uuid4 @@ -105,14 +106,49 @@ async def wait_for_or_group_2( } +@durable_workflow.durable_task() +async def wait_for_multi_sleep( + _i: EmptyModel, ctx: DurableContext +) -> dict[str, str | int]: + start = time.time() + + for _ in range(3): + await ctx.aio_sleep_for( + timedelta(seconds=SLEEP_TIME), + ) + + return { + "runtime": int(time.time() - start), + } + + @ephemeral_workflow.task() def ephemeral_task_2(input: EmptyModel, ctx: Context) -> None: print("Running non-durable task") +@hatchet.durable_task() +async def wait_for_sleep_twice( + input: EmptyModel, ctx: DurableContext +) -> dict[str, int]: + try: + start = time.time() + + await ctx.aio_sleep_for( + timedelta(seconds=SLEEP_TIME), + ) + + return { + "runtime": int(time.time() - start), + } + except asyncio.CancelledError: + return {"runtime": -1} + + def main() -> None: worker = hatchet.worker( - "durable-worker", workflows=[durable_workflow, ephemeral_workflow] + "durable-worker", + workflows=[durable_workflow, ephemeral_workflow, wait_for_sleep_twice], ) worker.start() diff --git a/sdks/python/examples/fanout/worker.py b/sdks/python/examples/fanout/worker.py index cd313b3eb..38547c92d 100644 --- a/sdks/python/examples/fanout/worker.py +++ b/sdks/python/examples/fanout/worker.py @@ -62,8 +62,6 @@ async def process2(input: ChildInput, ctx: Context) -> dict[str, str]: # !! -child_wf.create_bulk_run_item() - def main() -> None: worker = hatchet.worker("fanout-worker", slots=40, workflows=[parent_wf, child_wf]) diff --git a/sdks/python/examples/worker.py b/sdks/python/examples/worker.py index 26f226598..3bcdc68f6 100644 --- a/sdks/python/examples/worker.py +++ b/sdks/python/examples/worker.py @@ -15,7 +15,7 @@ from examples.concurrency_workflow_level.worker import ( from examples.conditions.worker import task_condition_workflow from examples.dag.worker import dag_workflow from examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf -from examples.durable.worker import durable_workflow +from examples.durable.worker import durable_workflow, wait_for_sleep_twice from examples.events.worker import event_workflow from examples.fanout.worker import child_wf, parent_wf from examples.fanout_sync.worker import sync_fanout_child, sync_fanout_parent @@ -67,6 +67,7 @@ def main() -> None: bulk_replay_test_2, bulk_replay_test_3, return_exceptions_task, + wait_for_sleep_twice, ], lifespan=lifespan, ) diff --git a/sdks/python/hatchet_sdk/clients/rest/tenacity_utils.py b/sdks/python/hatchet_sdk/clients/rest/tenacity_utils.py index 3098d3481..a02b1a889 100644 --- a/sdks/python/hatchet_sdk/clients/rest/tenacity_utils.py +++ b/sdks/python/hatchet_sdk/clients/rest/tenacity_utils.py @@ -33,5 +33,9 @@ def tenacity_should_retry(ex: BaseException) -> bool: return ex.code() not in [ grpc.StatusCode.UNIMPLEMENTED, grpc.StatusCode.NOT_FOUND, + grpc.StatusCode.INVALID_ARGUMENT, + grpc.StatusCode.ALREADY_EXISTS, + grpc.StatusCode.UNAUTHENTICATED, + grpc.StatusCode.PERMISSION_DENIED, ] return False diff --git a/sdks/python/hatchet_sdk/config.py b/sdks/python/hatchet_sdk/config.py index cefa65a57..05dd783d5 100644 --- a/sdks/python/hatchet_sdk/config.py +++ b/sdks/python/hatchet_sdk/config.py @@ -83,6 +83,7 @@ class ClientConfig(BaseSettings): enable_thread_pool_monitoring: bool = False terminate_worker_after_num_tasks: int | None = None + disable_log_capture: bool = False @model_validator(mode="after") def validate_token_and_tenant(self) -> "ClientConfig": diff --git a/sdks/python/hatchet_sdk/context/context.py b/sdks/python/hatchet_sdk/context/context.py index 6e62c6e45..46b10251c 100644 --- a/sdks/python/hatchet_sdk/context/context.py +++ b/sdks/python/hatchet_sdk/context/context.py @@ -371,6 +371,42 @@ class Context: class DurableContext(Context): + def __init__( + self, + action: Action, + dispatcher_client: DispatcherClient, + admin_client: AdminClient, + event_client: EventClient, + durable_event_listener: DurableEventListener | None, + worker: WorkerContext, + runs_client: RunsClient, + lifespan_context: Any | None, + log_sender: AsyncLogSender, + ): + super().__init__( + action, + dispatcher_client, + admin_client, + event_client, + durable_event_listener, + worker, + runs_client, + lifespan_context, + log_sender, + ) + + self._wait_index = 0 + + @property + def wait_index(self) -> int: + return self._wait_index + + def _increment_wait_index(self) -> int: + index = self._wait_index + self._wait_index += 1 + + return index + async def aio_wait_for( self, signal_key: str, @@ -411,6 +447,9 @@ class DurableContext(Context): For more complicated conditions, use `ctx.aio_wait_for` directly. """ + wait_index = self._increment_wait_index() + return await self.aio_wait_for( - f"sleep:{timedelta_to_expr(duration)}", SleepCondition(duration=duration) + f"sleep:{timedelta_to_expr(duration)}-{wait_index}", + SleepCondition(duration=duration), ) diff --git a/sdks/python/hatchet_sdk/worker/runner/run_loop_manager.py b/sdks/python/hatchet_sdk/worker/runner/run_loop_manager.py index ecec2074f..154b5f22c 100644 --- a/sdks/python/hatchet_sdk/worker/runner/run_loop_manager.py +++ b/sdks/python/hatchet_sdk/worker/runner/run_loop_manager.py @@ -60,11 +60,14 @@ class WorkerActionRunLoopManager: self.start_loop_manager_task = self.loop.create_task(self.aio_start()) async def aio_start(self, retry_count: int = 1) -> None: - await capture_logs( - self.client.log_interceptor, - self.log_sender, - self._async_start, - )() + if self.client.config.disable_log_capture: + await self._async_start() + else: + await capture_logs( + self.client.log_interceptor, + self.log_sender, + self._async_start, + )() async def _async_start(self) -> None: logger.info("starting runner...") diff --git a/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py b/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py index df249b250..2b21dc82f 100644 --- a/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py +++ b/sdks/python/hatchet_sdk/worker/runner/utils/capture_logs.py @@ -122,7 +122,17 @@ def capture_logs( async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: log_stream = StringIO() custom_handler = CustomLogHandler(log_sender, log_stream) - custom_handler.setLevel(logging.INFO) + custom_handler.setLevel(logger.level) + + if logger.handlers: + for handler in logger.handlers: + if handler.formatter: + custom_handler.setFormatter(handler.formatter) + break + + for handler in logger.handlers: + for filter_obj in handler.filters: + custom_handler.addFilter(filter_obj) if not any(h for h in logger.handlers if isinstance(h, CustomLogHandler)): logger.addHandler(custom_handler) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 921c33e1c..5d7bbae2f 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.16.2" +version = "1.16.3" description = "" authors = ["Alexander Belanger "] readme = "README.md"