Files
hatchet/examples/python/unit_testing/workflows.py
Matt Kaye f1f276f6dc Feat: Python task unit tests (#1990)
* feat: add mock run methods for tasks

* feat: docs

* feat: first pass at unit tests

* cleanup: split out tests

* feat: pass lifespan through

* fix: rm comment

* drive by: retry on 404 to help with races

* chore: changelog

* chore: ver

* feat: improve logging everywhere

* chore: changelog

* fix: rm print cruft

* feat: print statement linter

* feat: helper for getting result of a standalone

* feat: docs for mock run

* feat: add task run getter

* feat: propagate additional metadata properly

* chore: gen

* fix: date

* chore: gen

* feat: return exceptions

* chore: gen

* chore: changelog

* feat: tests + gen again

* fix: rm print cruft
2025-07-17 13:54:40 -04:00

172 lines
4.5 KiB
Python

from typing import cast
from pydantic import BaseModel
from hatchet_sdk import Context, DurableContext, EmptyModel, Hatchet
class UnitTestInput(BaseModel):
key: str
number: int
class Lifespan(BaseModel):
mock_db_url: str
class UnitTestOutput(UnitTestInput, Lifespan):
additional_metadata: dict[str, str]
retry_count: int
hatchet = Hatchet()
@hatchet.task(input_validator=UnitTestInput)
def sync_standalone(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@hatchet.task(input_validator=UnitTestInput)
async def async_standalone(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@hatchet.durable_task(input_validator=UnitTestInput)
def durable_sync_standalone(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@hatchet.durable_task(input_validator=UnitTestInput)
async def durable_async_standalone(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
simple_workflow = hatchet.workflow(
name="simple-unit-test-workflow", input_validator=UnitTestInput
)
@simple_workflow.task()
def sync_simple_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@simple_workflow.task()
async def async_simple_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@simple_workflow.durable_task()
def durable_sync_simple_workflow(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@simple_workflow.durable_task()
async def durable_async_simple_workflow(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
complex_workflow = hatchet.workflow(
name="complex-unit-test-workflow", input_validator=UnitTestInput
)
@complex_workflow.task()
async def start(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return UnitTestOutput(
key=input.key,
number=input.number,
additional_metadata=ctx.additional_metadata,
retry_count=ctx.retry_count,
mock_db_url=cast(Lifespan, ctx.lifespan).mock_db_url,
)
@complex_workflow.task(
parents=[start],
)
def sync_complex_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return ctx.task_output(start)
@complex_workflow.task(
parents=[start],
)
async def async_complex_workflow(input: UnitTestInput, ctx: Context) -> UnitTestOutput:
return ctx.task_output(start)
@complex_workflow.durable_task(
parents=[start],
)
def durable_sync_complex_workflow(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return ctx.task_output(start)
@complex_workflow.durable_task(
parents=[start],
)
async def durable_async_complex_workflow(
input: UnitTestInput, ctx: DurableContext
) -> UnitTestOutput:
return ctx.task_output(start)