Files
hatchet/sdks/python/examples/stubs/stub_trigger.py
matt cf0aa21623 [Python]: Enable force kill on sigterm for hot reloading, stub workflow + task (#2197)
* feat: exit forcefully

* fix: handler, ver

* chore: lint

* feat: stubs client

* fix: more overloads

* chore: version

* chore: changelog

* feat: example of stubs

* feat: comment

* fix: add dedenting

* fix: snippet for running external tasks
2025-08-26 15:22:46 -04:00

43 lines
782 B
Python

import asyncio
from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet
# > Define models
class TaskInput(BaseModel):
user_id: int
class TaskOutput(BaseModel):
ok: bool
# !!
async def main() -> None:
hatchet = Hatchet()
# > Create a stub task
stub = hatchet.stubs.task(
# make sure the name and schemas exactly match the implementation
name="externally-triggered-task",
input_validator=TaskInput,
output_validator=TaskOutput,
)
# !!
# > Trigger the task
# input type checks properly
result = await stub.aio_run(input=TaskInput(user_id=1234))
# `result.ok` type checks properly
print("Is successful:", result.ok)
# !!
if __name__ == "__main__":
asyncio.run(main())