Files
hatchet/examples/python/timeout/test_timeout.py
Gabe Ruttner 8e80faf2d6 Fe overhaul docs (#1640)
* api changes

* doc changes

* move docs

* generated

* generate

* pkg

* backmerge main

* revert to main

* revert main

* race?

* remove go tests
2025-04-30 14:10:09 -07:00

19 lines
513 B
Python

import pytest
from examples.timeout.worker import refresh_timeout_wf, timeout_wf
@pytest.mark.asyncio(loop_scope="session")
async def test_execution_timeout() -> None:
run = timeout_wf.run_no_wait()
with pytest.raises(Exception, match="(Task exceeded timeout|TIMED_OUT)"):
await run.aio_result()
@pytest.mark.asyncio(loop_scope="session")
async def test_run_refresh_timeout() -> None:
result = await refresh_timeout_wf.aio_run()
assert result["refresh_task"]["status"] == "success"