Files
hatchet/examples/python/unit_testing/workflows.py
matt 7e3e3b8fc0 Feat: Non-determinism errors (#3041)
* fix: retrieve payloads in bulk

* fix: hash -> idempotency key

* feat: initial hashing work

* feat: check idempotency key if entry exists

* fix: panic

* feat: initial work on custom error for non-determinism

* fix: handle nondeterminism error properly

* feat: add error response, pub message to task controller

* chore: lint

* feat: add node id field to error proto

* chore: rm a bunch of unhelpful cancellation logs

* fix: conflict issues

* fix: rm another log

* fix: send node id properly

* fix: improve what we hash

* fix: improve error handling

* fix: python issues

* fix: don't hash or group id

* fix: rm print

* feat: add python test

* fix: add timeout

* fix: improve handling of non determinism error

* fix: propagate node id through

* fix: types, test

* fix: make serializable

* fix: no need to cancel internally anymore

* fix: hide another internal log

* fix: add link to docs

* fix: copilot

* fix: use sha256

* fix: test cleanup

* fix: add error type enum

* fix: handle exceptions on the worker

* fix: clean up a bunch of cursor imports

* fix: cursor docstring formatting

* fix: simplify idempotency key func

* fix: add back cancellation logs

* feat: tests for idempotency keys

* fix: add a couple more for priority and metadata

* chore: gen

* fix: python reconnect

* fix: noisy error

* fix: improve log

* fix: don't run durable listener if no durable tasks are registered

* fix: non-null idempotency keys
2026-02-18 11:27:02 -05:00

137 lines
3.6 KiB
Python

from typing import cast
from pydantic import BaseModel
from hatchet_sdk import Context, DurableContext, EmptyModel, Hatchet
class UnitTestInput(BaseModel):
key: str
number: int
class Lifespan(BaseModel):
mock_db_url: str
class UnitTestOutput(UnitTestInput, Lifespan):
additional_metadata: dict[str, str]
retry_count: int
hatchet = Hatchet()
@hatchet.task(input_validator=UnitTestInput)
def sync_standalone(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@hatchet.task(input_validator=UnitTestInput)
async def async_standalone(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@hatchet.durable_task(input_validator=UnitTestInput)
async def durable_async_standalone(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
simple_workflow = hatchet.workflow(
name="simple-unit-test-workflow", input_validator=UnitTestInput
)
@simple_workflow.task()
def sync_simple_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@simple_workflow.task()
async def async_simple_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@simple_workflow.durable_task()
async def durable_async_simple_workflow(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
complex_workflow = hatchet.workflow(
name="complex-unit-test-workflow", input_validator=UnitTestInput
)
@complex_workflow.task()
async def start(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@complex_workflow.task(
parents=[start],
)
def sync_complex_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return ctx.task_output(start)
@complex_workflow.task(
parents=[start],
)
async def async_complex_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return ctx.task_output(start)
@complex_workflow.durable_task(
parents=[start],
)
async def durable_async_complex_workflow(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return ctx.task_output(start)