Files
hatchet/examples/python/cancellation/worker.py
Gabe Ruttner 8e80faf2d6 Fe overhaul docs (#1640)
* api changes

* doc changes

* move docs

* generated

* generate

* pkg

* backmerge main

* revert to main

* revert main

* race?

* remove go tests
2025-04-30 14:10:09 -07:00

50 lines
1.1 KiB
Python

import asyncio
import time
from hatchet_sdk import Context, EmptyModel, Hatchet
hatchet = Hatchet(debug=True)
cancellation_workflow = hatchet.workflow(name="CancelWorkflow")
# > Self-cancelling task
@cancellation_workflow.task()
async def self_cancel(input: EmptyModel, ctx: Context) -> dict[str, str]:
await asyncio.sleep(2)
## Cancel the task
await ctx.aio_cancel()
await asyncio.sleep(10)
return {"error": "Task should have been cancelled"}
# > Checking exit flag
@cancellation_workflow.task()
def check_flag(input: EmptyModel, ctx: Context) -> dict[str, str]:
for i in range(3):
time.sleep(1)
# Note: Checking the status of the exit flag is mostly useful for cancelling
# sync tasks without needing to forcibly kill the thread they're running on.
if ctx.exit_flag:
print("Task has been cancelled")
raise ValueError("Task has been cancelled")
return {"error": "Task should have been cancelled"}
def main() -> None:
worker = hatchet.worker("cancellation-worker", workflows=[cancellation_workflow])
worker.start()
if __name__ == "__main__":
main()