From 8172d59f84c618ddfe7e77b0d0e4d3861d82b56d Mon Sep 17 00:00:00 2001 From: Matt Kaye Date: Mon, 31 Mar 2025 13:47:34 -0400 Subject: [PATCH] [Python] Fix: Misc. Python Bugs (#1451) * fix: stop trying to parse payloads back from json * feat: streaming example * feat: aio output * fix: lint * feat: sync and async examples * fix: small issues * fix: one more small thing * chore: version * fix: lint * fix: add sleeps * feat: factor out run_async_from_sync * Revert "feat: factor out run_async_from_sync" This reverts commit fb37395913801b75892f6d09f300245352dbe8d3. * fix: schedule trigger namespace issue * fix: log error if action payload fails to decode but don't raise out of listener --- sdks/python/conftest.py | 4 +-- .../python/examples/streaming/async_stream.py | 19 +++++++++++ sdks/python/examples/streaming/sync_stream.py | 17 ++++++++++ sdks/python/examples/streaming/worker.py | 27 +++++++++++++++ sdks/python/hatchet_sdk/clients/admin.py | 7 +++- .../clients/dispatcher/action_listener.py | 4 ++- .../hatchet_sdk/clients/run_event_listener.py | 33 ++++--------------- sdks/python/hatchet_sdk/context/context.py | 9 +++++ sdks/python/pyproject.toml | 3 +- 9 files changed, 92 insertions(+), 31 deletions(-) create mode 100644 sdks/python/examples/streaming/async_stream.py create mode 100644 sdks/python/examples/streaming/sync_stream.py create mode 100644 sdks/python/examples/streaming/worker.py diff --git a/sdks/python/conftest.py b/sdks/python/conftest.py index 6dda8c071..30f40f534 100644 --- a/sdks/python/conftest.py +++ b/sdks/python/conftest.py @@ -74,8 +74,8 @@ def worker() -> Generator[subprocess.Popen[bytes], None, None]: update_tenant_request=UpdateTenantRequest(version=TenantVersion.V1), ) ) - except Exception as e: - print(e) + except Exception: + pass command = ["poetry", "run", "python", "examples/worker.py"] diff --git a/sdks/python/examples/streaming/async_stream.py b/sdks/python/examples/streaming/async_stream.py new file mode 100644 index 000000000..289b57c08 --- /dev/null +++ b/sdks/python/examples/streaming/async_stream.py @@ -0,0 +1,19 @@ +import asyncio + +from examples.streaming.worker import streaming_workflow + + +async def main() -> None: + ref = await streaming_workflow.aio_run_no_wait() + await asyncio.sleep(1) + + stream = ref.stream() + + async for chunk in stream: + print(chunk) + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/sdks/python/examples/streaming/sync_stream.py b/sdks/python/examples/streaming/sync_stream.py new file mode 100644 index 000000000..8566de7de --- /dev/null +++ b/sdks/python/examples/streaming/sync_stream.py @@ -0,0 +1,17 @@ +import time + +from examples.streaming.worker import streaming_workflow + + +def main() -> None: + ref = streaming_workflow.run_no_wait() + time.sleep(1) + + stream = ref.stream() + + for chunk in stream: + print(chunk) + + +if __name__ == "__main__": + main() diff --git a/sdks/python/examples/streaming/worker.py b/sdks/python/examples/streaming/worker.py new file mode 100644 index 000000000..e904d41da --- /dev/null +++ b/sdks/python/examples/streaming/worker.py @@ -0,0 +1,27 @@ +import asyncio + +from hatchet_sdk import Context, EmptyModel, Hatchet + +hatchet = Hatchet(debug=True) + +# ❓ Streaming + +streaming_workflow = hatchet.workflow(name="StreamingWorkflow") + + +@streaming_workflow.task() +async def step1(input: EmptyModel, ctx: Context) -> None: + for i in range(10): + await asyncio.sleep(1) + ctx.put_stream(f"Processing {i}") + + +def main() -> None: + worker = hatchet.worker("test-worker", workflows=[streaming_workflow]) + worker.start() + + +# ‼️ + +if __name__ == "__main__": + main() diff --git a/sdks/python/hatchet_sdk/clients/admin.py b/sdks/python/hatchet_sdk/clients/admin.py index ca2c17bf6..460c3990d 100644 --- a/sdks/python/hatchet_sdk/clients/admin.py +++ b/sdks/python/hatchet_sdk/clients/admin.py @@ -36,6 +36,7 @@ class ScheduleTriggerWorkflowOptions(BaseModel): child_index: int | None = None child_key: str | None = None namespace: str | None = None + additional_metadata: JSONSerializableMapping = Field(default_factory=dict) class TriggerWorkflowOptions(ScheduleTriggerWorkflowOptions): @@ -153,7 +154,11 @@ class AdminClient: name=name, schedules=[self._parse_schedule(schedule) for schedule in schedules], input=json.dumps(input), - **options.model_dump(), + parent_id=options.parent_id, + parent_step_run_id=options.parent_step_run_id, + child_index=options.child_index, + child_key=options.child_key, + additional_metadata=json.dumps(options.additional_metadata), ) @tenacity_retry diff --git a/sdks/python/hatchet_sdk/clients/dispatcher/action_listener.py b/sdks/python/hatchet_sdk/clients/dispatcher/action_listener.py index 19fae33ba..c285dea1b 100644 --- a/sdks/python/hatchet_sdk/clients/dispatcher/action_listener.py +++ b/sdks/python/hatchet_sdk/clients/dispatcher/action_listener.py @@ -298,7 +298,9 @@ class ActionListener: ) ) except (ValueError, json.JSONDecodeError) as e: - raise ValueError(f"Error decoding payload: {e}") + logger.error(f"Error decoding payload: {e}") + + action_payload = ActionPayload() action = Action( tenant_id=assigned_action.tenantId, diff --git a/sdks/python/hatchet_sdk/clients/run_event_listener.py b/sdks/python/hatchet_sdk/clients/run_event_listener.py index c6efc7795..c416667ba 100644 --- a/sdks/python/hatchet_sdk/clients/run_event_listener.py +++ b/sdks/python/hatchet_sdk/clients/run_event_listener.py @@ -1,5 +1,4 @@ import asyncio -import json from enum import Enum from typing import Any, AsyncGenerator, Callable, Generator, cast @@ -128,18 +127,10 @@ class RunEventListener: raise Exception( f"Unknown event type: {workflow_event.eventType}" ) - payload = None - try: - if workflow_event.eventPayload: - payload = json.loads(workflow_event.eventPayload) - except Exception: - payload = workflow_event.eventPayload - pass - - assert isinstance(payload, str) - - yield StepRunEvent(type=eventType, payload=payload) + yield StepRunEvent( + type=eventType, payload=workflow_event.eventPayload + ) elif workflow_event.resourceType == RESOURCE_TYPE_WORKFLOW_RUN: if workflow_event.eventType in step_run_event_type_mapping: workflowRunEventType = step_run_event_type_mapping[ @@ -150,17 +141,10 @@ class RunEventListener: f"Unknown event type: {workflow_event.eventType}" ) - payload = None - - try: - if workflow_event.eventPayload: - payload = json.loads(workflow_event.eventPayload) - except Exception: - pass - - assert isinstance(payload, str) - - yield StepRunEvent(type=workflowRunEventType, payload=payload) + yield StepRunEvent( + type=workflowRunEventType, + payload=workflow_event.eventPayload, + ) if workflow_event.hangup: listener = None @@ -236,9 +220,6 @@ class RunEventListenerClient: return self.stream(workflow_run_id) def stream(self, workflow_run_id: str) -> RunEventListener: - if not isinstance(workflow_run_id, str) and hasattr(workflow_run_id, "__str__"): - workflow_run_id = str(workflow_run_id) - if not self.client: aio_conn = new_conn(self.config, True) self.client = DispatcherStub(aio_conn) # type: ignore[no-untyped-call] diff --git a/sdks/python/hatchet_sdk/context/context.py b/sdks/python/hatchet_sdk/context/context.py index f9c936aaa..0d7355b5b 100644 --- a/sdks/python/hatchet_sdk/context/context.py +++ b/sdks/python/hatchet_sdk/context/context.py @@ -113,6 +113,14 @@ class Context: return parent_step_data + def aio_task_output(self, task: "Task[TWorkflowInput, R]") -> "R": + if task.is_async_function: + return self.task_output(task) + + raise ValueError( + f"Task '{task.name}' is not an async function. Use `task_output` instead." + ) + @property def was_triggered_by_event(self) -> bool: return self.data.triggered_by == "event" @@ -157,6 +165,7 @@ class Context: def handle_result(future: Future[tuple[bool, Exception | None]]) -> None: success, exception = future.result() + if not success and exception: if raise_on_error: raise exception diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 62de92e84..004b9d63a 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.0.2" +version = "1.0.3" description = "" authors = ["Alexander Belanger "] readme = "README.md" @@ -155,3 +155,4 @@ v2_simple = "examples.v2.simple.worker:main" otel = "examples.opentelemetry_instrumentation.worker:main" waits = "examples.waits.worker:main" durable = "examples.durable.worker:main" +streaming = "examples.streaming.worker:main"