Files
hatchet/examples/python/fanout/test_fanout.py
Matt Kaye f1f276f6dc Feat: Python task unit tests (#1990)
* feat: add mock run methods for tasks

* feat: docs

* feat: first pass at unit tests

* cleanup: split out tests

* feat: pass lifespan through

* fix: rm comment

* drive by: retry on 404 to help with races

* chore: changelog

* chore: ver

* feat: improve logging everywhere

* chore: changelog

* fix: rm print cruft

* feat: print statement linter

* feat: helper for getting result of a standalone

* feat: docs for mock run

* feat: add task run getter

* feat: propagate additional metadata properly

* chore: gen

* fix: date

* chore: gen

* feat: return exceptions

* chore: gen

* chore: changelog

* feat: tests + gen again

* fix: rm print cruft
2025-07-17 13:54:40 -04:00

51 lines
1.4 KiB
Python

import asyncio
from uuid import uuid4
import pytest
from examples.fanout.worker import ParentInput, parent_wf
from hatchet_sdk import Hatchet, TriggerWorkflowOptions
@pytest.mark.asyncio(loop_scope="session")
async def test_run(hatchet: Hatchet) -> None:
ref = await parent_wf.aio_run_no_wait(
ParentInput(n=2),
)
result = await ref.aio_result()
assert len(result["spawn"]["results"]) == 2
@pytest.mark.asyncio(loop_scope="session")
async def test_additional_metadata_propagation(hatchet: Hatchet) -> None:
test_run_id = uuid4().hex
ref = await parent_wf.aio_run_no_wait(
ParentInput(n=2),
options=TriggerWorkflowOptions(
additional_metadata={"test_run_id": test_run_id}
),
)
await ref.aio_result()
await asyncio.sleep(1)
runs = await hatchet.runs.aio_list(
parent_task_external_id=ref.workflow_run_id,
additional_metadata={"test_run_id": test_run_id},
)
assert runs.rows
"""Assert that the additional metadata is propagated to the child runs."""
for run in runs.rows:
assert run.additional_metadata
assert run.additional_metadata["test_run_id"] == test_run_id
assert run.children
for child in run.children:
assert child.additional_metadata
assert child.additional_metadata["test_run_id"] == test_run_id