Files
hatchet/examples/python/timeout/worker.py
Matt Kaye f1f276f6dc Feat: Python task unit tests (#1990)
* feat: add mock run methods for tasks

* feat: docs

* feat: first pass at unit tests

* cleanup: split out tests

* feat: pass lifespan through

* fix: rm comment

* drive by: retry on 404 to help with races

* chore: changelog

* chore: ver

* feat: improve logging everywhere

* chore: changelog

* fix: rm print cruft

* feat: print statement linter

* feat: helper for getting result of a standalone

* feat: docs for mock run

* feat: add task run getter

* feat: propagate additional metadata properly

* chore: gen

* fix: date

* chore: gen

* feat: return exceptions

* chore: gen

* chore: changelog

* feat: tests + gen again

* fix: rm print cruft
2025-07-17 13:54:40 -04:00

50 lines
1.1 KiB
Python

import time
from datetime import timedelta
from hatchet_sdk import Context, EmptyModel, Hatchet, TaskDefaults
hatchet = Hatchet(debug=True)
# > ScheduleTimeout
timeout_wf = hatchet.workflow(
name="TimeoutWorkflow",
task_defaults=TaskDefaults(execution_timeout=timedelta(minutes=2)),
)
# > ExecutionTimeout
# 👀 Specify an execution timeout on a task
@timeout_wf.task(
execution_timeout=timedelta(seconds=5), schedule_timeout=timedelta(minutes=10)
)
def timeout_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
time.sleep(30)
return {"status": "success"}
refresh_timeout_wf = hatchet.workflow(name="RefreshTimeoutWorkflow")
# > RefreshTimeout
@refresh_timeout_wf.task(execution_timeout=timedelta(seconds=4))
def refresh_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
ctx.refresh_timeout(timedelta(seconds=10))
time.sleep(5)
return {"status": "success"}
def main() -> None:
worker = hatchet.worker(
"timeout-worker", slots=4, workflows=[timeout_wf, refresh_timeout_wf]
)
worker.start()
if __name__ == "__main__":
main()