Files
hatchet/sdks/python/examples/timeout/test_timeout.py
Matt Kaye 77f81476bd [Docs, Python] Expand Cancellation + Conditional Workflow Docs, Fix cancellation in Python (#1471)
* feat: expand conditional docs

* feat: initial cancellation work + fixing some broken links

* feat: docs on cancellation

* python: fix cancellation

* python: cruft

* chore: version

* feat: python example

* fix: TS cancellation examples

* fix: lint

* feat: go example

* feat: half-baked ts conditional logic workflow

* feat: add ts example conditional workflow

* feat: go example

* feat: go example

* fix: cancellation test

* fix: thanks, copilot!

* Update frontend/docs/pages/home/conditional-workflows.mdx

Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com>

* fix: lint

* chore: lint

* fix: longer sleep

---------

Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com>
2025-04-02 19:51:51 -04:00

20 lines
537 B
Python

import pytest
from examples.timeout.worker import refresh_timeout_wf, timeout_wf
from hatchet_sdk import Hatchet
@pytest.mark.asyncio()
async def test_execution_timeout(hatchet: Hatchet) -> None:
run = timeout_wf.run_no_wait()
with pytest.raises(Exception, match="(Task exceeded timeout|TIMED_OUT)"):
await run.aio_result()
@pytest.mark.asyncio()
async def test_run_refresh_timeout(hatchet: Hatchet) -> None:
result = await refresh_timeout_wf.aio_run()
assert result["refresh_task"]["status"] == "success"