[Python] Feat: Dataclass Support (#2476)

* fix: prevent lifespan error from hanging worker

* fix: handle cleanup

* feat: dataclass outputs

* feat: dataclasses

* feat: incremental dataclass work

* feat: dataclass tests

* fix: lint

* fix: register wf

* fix: ugh

* chore: changelog

* fix: validation issue

* fix: none check

* fix: lint

* fix: error type
This commit is contained in:
matt
2025-11-01 00:27:28 +01:00
committed by GitHub
parent 7603b5ef39
commit 4518ff771c
15 changed files with 311 additions and 50 deletions
@@ -0,0 +1,44 @@
import pytest
from examples.dataclasses.worker import Input, Output, say_hello
@pytest.mark.asyncio(loop_scope="session")
async def test_dataclass_usage() -> None:
input = Input(name="Hatchet")
x1 = say_hello.run(input)
x2 = await say_hello.aio_run(input)
x3 = say_hello.run_many([say_hello.create_bulk_run_item(input)])[0]
x4 = (await say_hello.aio_run_many([say_hello.create_bulk_run_item(input)]))[0]
x5 = say_hello.run_no_wait(input).result()
x6 = (await say_hello.aio_run_no_wait(input)).result()
x7 = [
x.result()
for x in say_hello.run_many_no_wait([say_hello.create_bulk_run_item(input)])
][0]
x8 = [
x.result()
for x in await say_hello.aio_run_many_no_wait(
[say_hello.create_bulk_run_item(input)]
)
][0]
x9 = await say_hello.run_no_wait(input).aio_result()
x10 = await (await say_hello.aio_run_no_wait(input)).aio_result()
x11 = [
await x.aio_result()
for x in say_hello.run_many_no_wait([say_hello.create_bulk_run_item(input)])
][0]
x12 = [
await x.aio_result()
for x in await say_hello.aio_run_many_no_wait(
[say_hello.create_bulk_run_item(input)]
)
][0]
assert all(
x == Output(message="Hello, Hatchet!")
for x in [x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12]
)
@@ -0,0 +1,3 @@
from examples.dataclasses.worker import Input, say_hello
say_hello.run(input=Input(name="Hatchet"))
@@ -0,0 +1,31 @@
from dataclasses import dataclass
from typing import Literal
from hatchet_sdk import Context, EmptyModel, Hatchet
@dataclass
class Input:
name: str
@dataclass
class Output:
message: str
hatchet = Hatchet(debug=True)
@hatchet.task(input_validator=Input)
def say_hello(input: Input, ctx: Context) -> Output:
return Output(message=f"Hello, {input.name}!")
def main() -> None:
worker = hatchet.worker("test-worker", workflows=[say_hello])
worker.start()
if __name__ == "__main__":
main()
+2
View File
@@ -21,6 +21,7 @@ from examples.concurrency_workflow_level.worker import (
)
from examples.conditions.worker import task_condition_workflow
from examples.dag.worker import dag_workflow
from examples.dataclasses.worker import say_hello
from examples.dedupe.worker import dedupe_child_wf, dedupe_parent_wf
from examples.dependency_injection.worker import (
async_task_with_dependencies,
@@ -92,6 +93,7 @@ def main() -> None:
sync_task_with_dependencies,
durable_async_task_with_dependencies,
durable_sync_task_with_dependencies,
say_hello,
],
lifespan=lifespan,
)