Files
hatchet/examples/python/streaming/test_streaming.py
Matt Kaye 3442c11106 Feat: Top-level stream consumer in the SDKs (#1917)
* feat: add stream sub on top level

* feat: clean up examples

* chore: gen

* feat: move stream onto the runs client in ts

* fix: examples

* chore: gen

* fix: circular import issues

* chore: lint

* feat: first pass at Next app

* fix: pull next out to top level

* fix: all the things

* fix: get it all wired up

* fix: imports

* fix: lint rule

* fix: just use js

* fix: revert tsconfig changes

* fix: check in pages

* fix: hangup event in streaming impl

* chore: gen

* chore: generate again, remove lots of nextjs stuff

* fix: one more ignore

* fix: gen

* fix: ignore

* fix: ugh

* fix: simplify a bunch

* fix: lint

* fix: rm gen cruft

* fix: changelog

* feat: implement list with pagination

* feat: add some tests

* feat: add warnings

* fix: update workflow / task methods

* chore: version

* feat: retries
2025-07-03 18:49:16 -04:00

42 lines
1.0 KiB
Python

from subprocess import Popen
from typing import Any
import pytest
from examples.streaming.worker import chunks, stream_task
from hatchet_sdk import Hatchet
from hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType
@pytest.mark.parametrize(
"on_demand_worker",
[
(
["poetry", "run", "python", "examples/streaming/worker.py", "--slots", "1"],
8008,
)
],
indirect=True,
)
@pytest.mark.parametrize("execution_number", range(5)) # run test multiple times
@pytest.mark.asyncio(loop_scope="session")
async def test_streaming_ordering_and_completeness(
execution_number: int,
hatchet: Hatchet,
on_demand_worker: Popen[Any],
) -> None:
ref = await stream_task.aio_run_no_wait()
ix = 0
anna_karenina = ""
async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id):
assert chunks[ix] == chunk
ix += 1
anna_karenina += chunk
assert ix == len(chunks)
assert anna_karenina == "".join(chunks)
await ref.aio_result()