mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-04 15:52:25 -06:00
21 lines
633 B
Python
21 lines
633 B
Python
import pytest
|
|
|
|
from examples.timeout.worker import refresh_timeout_wf, timeout_wf
|
|
from hatchet_sdk import Hatchet
|
|
|
|
|
|
# requires scope module or higher for shared event loop
|
|
@pytest.mark.asyncio(loop_scope="session")
|
|
async def test_execution_timeout(hatchet: Hatchet) -> None:
|
|
run = timeout_wf.run_no_wait()
|
|
|
|
with pytest.raises(Exception, match="(Task exceeded timeout|TIMED_OUT)"):
|
|
await run.aio_result()
|
|
|
|
|
|
@pytest.mark.asyncio(loop_scope="session")
|
|
async def test_run_refresh_timeout(hatchet: Hatchet) -> None:
|
|
result = await refresh_timeout_wf.aio_run()
|
|
|
|
assert result["refresh_task"]["status"] == "success"
|