mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-20 00:11:13 -05:00
Feat: Top-level stream consumer in the SDKs (#1917)
* feat: add stream sub on top level * feat: clean up examples * chore: gen * feat: move stream onto the runs client in ts * fix: examples * chore: gen * fix: circular import issues * chore: lint * feat: first pass at Next app * fix: pull next out to top level * fix: all the things * fix: get it all wired up * fix: imports * fix: lint rule * fix: just use js * fix: revert tsconfig changes * fix: check in pages * fix: hangup event in streaming impl * chore: gen * chore: generate again, remove lots of nextjs stuff * fix: one more ignore * fix: gen * fix: ignore * fix: ugh * fix: simplify a bunch * fix: lint * fix: rm gen cruft * fix: changelog * feat: implement list with pagination * feat: add some tests * feat: add warnings * fix: update workflow / task methods * chore: version * feat: retries
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
|
||||
from examples.streaming.worker import stream_task
|
||||
from examples.streaming.worker import hatchet, stream_task
|
||||
from hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType
|
||||
|
||||
|
||||
@@ -8,9 +8,8 @@ async def main() -> None:
|
||||
# > Consume
|
||||
ref = await stream_task.aio_run_no_wait()
|
||||
|
||||
async for chunk in ref.stream():
|
||||
if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:
|
||||
print(chunk.payload, flush=True, end="")
|
||||
async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id):
|
||||
print(chunk, flush=True, end="")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -4,23 +4,20 @@ from fastapi import FastAPI
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from examples.streaming.worker import stream_task
|
||||
from hatchet_sdk import RunEventListener, StepRunEventType
|
||||
from hatchet_sdk import Hatchet
|
||||
|
||||
# > FastAPI Proxy
|
||||
hatchet = Hatchet()
|
||||
app = FastAPI()
|
||||
|
||||
|
||||
async def generate_stream(stream: RunEventListener) -> AsyncGenerator[str, None]:
|
||||
async for chunk in stream:
|
||||
if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:
|
||||
yield chunk.payload
|
||||
|
||||
|
||||
@app.get("/stream")
|
||||
async def stream() -> StreamingResponse:
|
||||
ref = await stream_task.aio_run_no_wait()
|
||||
|
||||
return StreamingResponse(generate_stream(ref.stream()), media_type="text/plain")
|
||||
return StreamingResponse(
|
||||
hatchet.runs.subscribe_to_stream(ref.workflow_run_id), media_type="text/plain"
|
||||
)
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -30,11 +30,10 @@ async def test_streaming_ordering_and_completeness(
|
||||
ix = 0
|
||||
anna_karenina = ""
|
||||
|
||||
async for chunk in ref.stream():
|
||||
if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:
|
||||
assert chunks[ix] == chunk.payload
|
||||
ix += 1
|
||||
anna_karenina += chunk.payload
|
||||
async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id):
|
||||
assert chunks[ix] == chunk
|
||||
ix += 1
|
||||
anna_karenina += chunk
|
||||
|
||||
assert ix == len(chunks)
|
||||
assert anna_karenina == "".join(chunks)
|
||||
|
||||
Reference in New Issue
Block a user