Files
hatchet/sdks/python/examples/run_details/worker.py
matt 73229b0e21 Feat: Run detail getter on the engine (#2725)
* feat: initial rpc

* chore: gen python

* feat: add more fields

* chore: gen again

* fix: finish cleaning up python

* feat: start wiring up api

* fix: panic

* feat: start implementing getters

* fix: improve api

* feat: expand return type a bit

* feat: more wiring

* feat: more wiring

* fix: finish wiring up input reads

* fix: admin client cleanup

* fix: ordering

* fix: add all_finished param

* feat: wire up all finished

* fix: propagate allfinished

* fix: propagate external ids

* chore: gen protos again

* fix: one more thing

* chore: rename

* chore: rename

* chore: fix typing

* fix: cleanup

* feat: add queued default

* fix: wiring

* feat: running check

* fix: query

* chore: rm print

* fix: edge case handling

* feat: python test

* feat: add `done` field

* feat: pass `done` through

* fix: test done flag

* fix: cleanup

* fix: handle cancelled

* refactor: clean up implementations of status handling

* fix: feedback

* fix: done logic

* fix: export run status

* fix: couple small bugs

* fix: handle done

* fix: properly extract input

* fix: bug with sequential dags

* refactor: improve performance of lookup query slightly

* refactor: add helpers on V1StepRunData for getting input + parsing bytes

* refactor: create listutils internal package

* refactor: status derivation

* fix: rm unused method

* fix: sqlcv1 import

* fix: error log

* fix: 404 on not found

* feat: changelog, async method
2026-01-08 12:44:01 -05:00

72 lines
1.6 KiB
Python

import asyncio
import random
import time
from pydantic import BaseModel
from hatchet_sdk import Context, Hatchet
class MockInput(BaseModel):
foo: str
class StepOutput(BaseModel):
random_number: int
class RandomSum(BaseModel):
sum: int
hatchet = Hatchet(debug=True)
run_detail_test_workflow = hatchet.workflow(
name="RunDetailTest", input_validator=MockInput
)
@run_detail_test_workflow.task()
async def step1(input: MockInput, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
@run_detail_test_workflow.task()
async def cancel_step(input: MockInput, ctx: Context) -> None:
await ctx.aio_cancel()
for _ in range(10):
await asyncio.sleep(1)
@run_detail_test_workflow.task()
async def fail_step(input: MockInput, ctx: Context) -> None:
raise Exception("Intentional Failure")
@run_detail_test_workflow.task()
async def step2(input: MockInput, ctx: Context) -> StepOutput:
await asyncio.sleep(5)
return StepOutput(random_number=random.randint(1, 100))
@run_detail_test_workflow.task(parents=[step1, step2])
async def step3(input: MockInput, ctx: Context) -> RandomSum:
one = ctx.task_output(step1).random_number
two = ctx.task_output(step2).random_number
return RandomSum(sum=one + two)
@run_detail_test_workflow.task(parents=[step1, step3])
async def step4(input: MockInput, ctx: Context) -> dict[str, str]:
print(
"executed step4",
time.strftime("%H:%M:%S", time.localtime()),
input,
ctx.task_output(step1),
ctx.task_output(step3),
)
return {
"step4": "step4",
}