diff --git a/examples/python/streaming/async_stream.py b/examples/python/streaming/async_stream.py index 35d8526ee..ddbd56f59 100644 --- a/examples/python/streaming/async_stream.py +++ b/examples/python/streaming/async_stream.py @@ -1,6 +1,6 @@ import asyncio -from examples.streaming.worker import stream_task +from examples.streaming.worker import hatchet, stream_task from hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType @@ -8,9 +8,8 @@ async def main() -> None: # > Consume ref = await stream_task.aio_run_no_wait() - async for chunk in ref.stream(): - if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM: - print(chunk.payload, flush=True, end="") + async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id): + print(chunk, flush=True, end="") if __name__ == "__main__": diff --git a/examples/python/streaming/fastapi_proxy.py b/examples/python/streaming/fastapi_proxy.py index 9602de546..a688fa1e1 100644 --- a/examples/python/streaming/fastapi_proxy.py +++ b/examples/python/streaming/fastapi_proxy.py @@ -4,23 +4,20 @@ from fastapi import FastAPI from fastapi.responses import StreamingResponse from examples.streaming.worker import stream_task -from hatchet_sdk import RunEventListener, StepRunEventType +from hatchet_sdk import Hatchet # > FastAPI Proxy +hatchet = Hatchet() app = FastAPI() -async def generate_stream(stream: RunEventListener) -> AsyncGenerator[str, None]: - async for chunk in stream: - if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM: - yield chunk.payload - - @app.get("/stream") async def stream() -> StreamingResponse: ref = await stream_task.aio_run_no_wait() - return StreamingResponse(generate_stream(ref.stream()), media_type="text/plain") + return StreamingResponse( + hatchet.runs.subscribe_to_stream(ref.workflow_run_id), media_type="text/plain" + ) diff --git a/examples/python/streaming/test_streaming.py b/examples/python/streaming/test_streaming.py index 089bd46fc..777ea79b2 100644 --- a/examples/python/streaming/test_streaming.py +++ b/examples/python/streaming/test_streaming.py @@ -30,11 +30,10 @@ async def test_streaming_ordering_and_completeness( ix = 0 anna_karenina = "" - async for chunk in ref.stream(): - if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM: - assert chunks[ix] == chunk.payload - ix += 1 - anna_karenina += chunk.payload + async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id): + assert chunks[ix] == chunk + ix += 1 + anna_karenina += chunk assert ix == len(chunks) assert anna_karenina == "".join(chunks) diff --git a/examples/typescript/streaming/nextjs-proxy.ts b/examples/typescript/streaming/nextjs-proxy.ts new file mode 100644 index 000000000..c278effd1 --- /dev/null +++ b/examples/typescript/streaming/nextjs-proxy.ts @@ -0,0 +1,24 @@ +import { Readable } from 'stream'; +import { hatchet } from '../hatchet-client'; +import { streamingTask } from './workflow'; + +// > NextJS Proxy +export async function GET() { + try { + const ref = await streamingTask.runNoWait({}); + const workflowRunId = await ref.getWorkflowRunId(); + + const stream = Readable.from(hatchet.runs.subscribeToStream(workflowRunId)); + + // @ts-ignore + return new Response(Readable.toWeb(stream), { + headers: { + 'Content-Type': 'text/plain', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); + } catch (error) { + return new Response('Internal Server Error', { status: 500 }); + } +} diff --git a/examples/typescript/streaming/run.ts b/examples/typescript/streaming/run.ts index 4a94028b9..e0ceff2ed 100644 --- a/examples/typescript/streaming/run.ts +++ b/examples/typescript/streaming/run.ts @@ -1,16 +1,13 @@ -import { RunEventType } from '@hatchet-dev/typescript-sdk-dev/typescript-sdk/clients/listeners/run-listener/child-listener-client'; import { streamingTask } from './workflow'; +import { hatchet } from '../hatchet-client'; async function main() { // > Consume const ref = await streamingTask.runNoWait({}); + const id = await ref.getWorkflowRunId(); - const stream = await ref.stream(); - - for await (const event of stream) { - if (event.type === RunEventType.STEP_RUN_EVENT_TYPE_STREAM) { - process.stdout.write(event.payload); - } + for await (const content of hatchet.runs.subscribeToStream(id)) { + process.stdout.write(content); } } diff --git a/examples/typescript/streaming/workflow.ts b/examples/typescript/streaming/workflow.ts index c0c749624..2b8529acc 100644 --- a/examples/typescript/streaming/workflow.ts +++ b/examples/typescript/streaming/workflow.ts @@ -1,4 +1,4 @@ -import sleep from '@hatchet-dev/typescript-sdk-dev/typescript-sdk/util/sleep'; +import sleep from '../../../util/sleep'; import { hatchet } from '../hatchet-client'; // > Streaming @@ -17,6 +17,8 @@ function* createChunks(content: string, n: number): Generator { + await sleep(2000); + for (const chunk of createChunks(annaKarenina, 10)) { ctx.putStream(chunk); await sleep(200); diff --git a/frontend/app/.prettierignore b/frontend/app/.prettierignore index bca05c6d5..3278ae3cb 100644 --- a/frontend/app/.prettierignore +++ b/frontend/app/.prettierignore @@ -1 +1,2 @@ src/lib/api/generated/ +**/.next/** diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/async_stream.ts b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/async_stream.ts index 0234a9070..2bbb8c145 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/async_stream.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/async_stream.ts @@ -3,12 +3,12 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import asyncio\n\nfrom examples.streaming.worker import stream_task\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\nasync def main() -> None:\n # > Consume\n ref = await stream_task.aio_run_no_wait()\n\n async for chunk in ref.stream():\n if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:\n print(chunk.payload, flush=True, end="")\n\n\nif __name__ == "__main__":\n asyncio.run(main())\n', + 'import asyncio\n\nfrom examples.streaming.worker import hatchet, stream_task\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\nasync def main() -> None:\n # > Consume\n ref = await stream_task.aio_run_no_wait()\n\n async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id):\n print(chunk, flush=True, end="")\n\n\nif __name__ == "__main__":\n asyncio.run(main())\n', source: 'out/python/streaming/async_stream.py', blocks: { consume: { start: 9, - stop: 13, + stop: 12, }, }, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/fastapi_proxy.ts b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/fastapi_proxy.ts index 6f38b4219..d194c31fb 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/fastapi_proxy.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/fastapi_proxy.ts @@ -3,12 +3,12 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'from typing import AsyncGenerator\n\nfrom fastapi import FastAPI\nfrom fastapi.responses import StreamingResponse\n\nfrom examples.streaming.worker import stream_task\nfrom hatchet_sdk import RunEventListener, StepRunEventType\n\n# > FastAPI Proxy\napp = FastAPI()\n\n\nasync def generate_stream(stream: RunEventListener) -> AsyncGenerator[str, None]:\n async for chunk in stream:\n if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:\n yield chunk.payload\n\n\n@app.get("/stream")\nasync def stream() -> StreamingResponse:\n ref = await stream_task.aio_run_no_wait()\n\n return StreamingResponse(generate_stream(ref.stream()), media_type="text/plain")\n\n\n\nif __name__ == "__main__":\n import uvicorn\n\n uvicorn.run(app, host="0.0.0.0", port=8000)\n', + 'from typing import AsyncGenerator\n\nfrom fastapi import FastAPI\nfrom fastapi.responses import StreamingResponse\n\nfrom examples.streaming.worker import stream_task\nfrom hatchet_sdk import Hatchet\n\n# > FastAPI Proxy\nhatchet = Hatchet()\napp = FastAPI()\n\n\n@app.get("/stream")\nasync def stream() -> StreamingResponse:\n ref = await stream_task.aio_run_no_wait()\n\n return StreamingResponse(\n hatchet.runs.subscribe_to_stream(ref.workflow_run_id), media_type="text/plain"\n )\n\n\n\nif __name__ == "__main__":\n import uvicorn\n\n uvicorn.run(app, host="0.0.0.0", port=8000)\n', source: 'out/python/streaming/fastapi_proxy.py', blocks: { fastapi_proxy: { start: 10, - stop: 25, + stop: 22, }, }, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/test_streaming.ts b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/test_streaming.ts index 0b2eba4f5..705041728 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/test_streaming.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/test_streaming.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'from subprocess import Popen\nfrom typing import Any\n\nimport pytest\n\nfrom examples.streaming.worker import chunks, stream_task\nfrom hatchet_sdk import Hatchet\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\n@pytest.mark.parametrize(\n "on_demand_worker",\n [\n (\n ["poetry", "run", "python", "examples/streaming/worker.py", "--slots", "1"],\n 8008,\n )\n ],\n indirect=True,\n)\n@pytest.mark.parametrize("execution_number", range(5)) # run test multiple times\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_streaming_ordering_and_completeness(\n execution_number: int,\n hatchet: Hatchet,\n on_demand_worker: Popen[Any],\n) -> None:\n ref = await stream_task.aio_run_no_wait()\n\n ix = 0\n anna_karenina = ""\n\n async for chunk in ref.stream():\n if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:\n assert chunks[ix] == chunk.payload\n ix += 1\n anna_karenina += chunk.payload\n\n assert ix == len(chunks)\n assert anna_karenina == "".join(chunks)\n\n await ref.aio_result()\n', + 'from subprocess import Popen\nfrom typing import Any\n\nimport pytest\n\nfrom examples.streaming.worker import chunks, stream_task\nfrom hatchet_sdk import Hatchet\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\n@pytest.mark.parametrize(\n "on_demand_worker",\n [\n (\n ["poetry", "run", "python", "examples/streaming/worker.py", "--slots", "1"],\n 8008,\n )\n ],\n indirect=True,\n)\n@pytest.mark.parametrize("execution_number", range(5)) # run test multiple times\n@pytest.mark.asyncio(loop_scope="session")\nasync def test_streaming_ordering_and_completeness(\n execution_number: int,\n hatchet: Hatchet,\n on_demand_worker: Popen[Any],\n) -> None:\n ref = await stream_task.aio_run_no_wait()\n\n ix = 0\n anna_karenina = ""\n\n async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id):\n assert chunks[ix] == chunk\n ix += 1\n anna_karenina += chunk\n\n assert ix == len(chunks)\n assert anna_karenina == "".join(chunks)\n\n await ref.aio_result()\n', source: 'out/python/streaming/test_streaming.py', blocks: {}, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/index.ts b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/index.ts index 9f263b9c7..9d70edb0b 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/index.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/index.ts @@ -1,7 +1,9 @@ +import nextjs_proxy from './nextjs-proxy'; import run from './run'; import worker from './worker'; import workflow from './workflow'; +export { nextjs_proxy }; export { run }; export { worker }; export { workflow }; diff --git a/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/nextjs-proxy.ts b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/nextjs-proxy.ts new file mode 100644 index 000000000..0199eb700 --- /dev/null +++ b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/nextjs-proxy.ts @@ -0,0 +1,17 @@ +import { Snippet } from '@/next/lib/docs/generated/snips/types'; + +const snippet: Snippet = { + language: 'typescript ', + content: + "import { Readable } from 'stream';\nimport { hatchet } from '../hatchet-client';\nimport { streamingTask } from './workflow';\n\n// > NextJS Proxy\nexport async function GET() {\n try {\n const ref = await streamingTask.runNoWait({});\n const workflowRunId = await ref.getWorkflowRunId();\n\n const stream = Readable.from(hatchet.runs.subscribeToStream(workflowRunId));\n\n // @ts-ignore\n return new Response(Readable.toWeb(stream), {\n headers: {\n 'Content-Type': 'text/plain',\n 'Cache-Control': 'no-cache',\n Connection: 'keep-alive',\n },\n });\n } catch (error) {\n return new Response('Internal Server Error', { status: 500 });\n }\n}\n", + source: 'out/typescript/streaming/nextjs-proxy.ts', + blocks: { + nextjs_proxy: { + start: 6, + stop: 24, + }, + }, + highlights: {}, +}; + +export default snippet; diff --git a/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/run.ts b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/run.ts index 9e62ab4c5..8a6755bb2 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/run.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/run.ts @@ -3,12 +3,12 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'typescript ', content: - "import { RunEventType } from '@hatchet-dev/typescript-sdk-dev/typescript-sdk/clients/listeners/run-listener/child-listener-client';\nimport { streamingTask } from './workflow';\n\nasync function main() {\n // > Consume\n const ref = await streamingTask.runNoWait({});\n\n const stream = await ref.stream();\n\n for await (const event of stream) {\n if (event.type === RunEventType.STEP_RUN_EVENT_TYPE_STREAM) {\n process.stdout.write(event.payload);\n }\n }\n}\n\nif (require.main === module) {\n main()\n .catch(console.error)\n .finally(() => {\n process.exit(0);\n });\n}\n", + "import { streamingTask } from './workflow';\nimport { hatchet } from '../hatchet-client';\n\nasync function main() {\n // > Consume\n const ref = await streamingTask.runNoWait({});\n const id = await ref.getWorkflowRunId();\n\n for await (const content of hatchet.runs.subscribeToStream(id)) {\n process.stdout.write(content);\n }\n}\n\nif (require.main === module) {\n main()\n .catch(console.error)\n .finally(() => {\n process.exit(0);\n });\n}\n", source: 'out/typescript/streaming/run.ts', blocks: { consume: { start: 6, - stop: 14, + stop: 11, }, }, highlights: {}, diff --git a/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/workflow.ts b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/workflow.ts index 0f0569d0a..07c190314 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/workflow.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/typescript/streaming/workflow.ts @@ -3,12 +3,12 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'typescript ', content: - "import sleep from '@hatchet-dev/typescript-sdk-dev/typescript-sdk/util/sleep';\nimport { hatchet } from '../hatchet-client';\n\n// > Streaming\nconst annaKarenina = `\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n`;\n\nfunction* createChunks(content: string, n: number): Generator {\n for (let i = 0; i < content.length; i += n) {\n yield content.slice(i, i + n);\n }\n}\n\nexport const streamingTask = hatchet.task({\n name: 'stream-example',\n fn: async (_, ctx) => {\n for (const chunk of createChunks(annaKarenina, 10)) {\n ctx.putStream(chunk);\n await sleep(200);\n }\n },\n});\n\n", + "import sleep from '../../../util/sleep';\nimport { hatchet } from '../hatchet-client';\n\n// > Streaming\nconst annaKarenina = `\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n`;\n\nfunction* createChunks(content: string, n: number): Generator {\n for (let i = 0; i < content.length; i += n) {\n yield content.slice(i, i + n);\n }\n}\n\nexport const streamingTask = hatchet.task({\n name: 'stream-example',\n fn: async (_, ctx) => {\n await sleep(2000);\n\n for (const chunk of createChunks(annaKarenina, 10)) {\n ctx.putStream(chunk);\n await sleep(200);\n }\n },\n});\n\n", source: 'out/typescript/streaming/workflow.ts', blocks: { streaming: { start: 5, - stop: 26, + stop: 28, }, }, highlights: {}, diff --git a/frontend/docs/lib/generated/snips/python/streaming/async_stream.ts b/frontend/docs/lib/generated/snips/python/streaming/async_stream.ts index 6e9a1019f..2c5ba4f0f 100644 --- a/frontend/docs/lib/generated/snips/python/streaming/async_stream.ts +++ b/frontend/docs/lib/generated/snips/python/streaming/async_stream.ts @@ -2,12 +2,12 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import asyncio\n\nfrom examples.streaming.worker import stream_task\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\nasync def main() -> None:\n # > Consume\n ref = await stream_task.aio_run_no_wait()\n\n async for chunk in ref.stream():\n if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:\n print(chunk.payload, flush=True, end=\"\")\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n", + "content": "import asyncio\n\nfrom examples.streaming.worker import hatchet, stream_task\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\nasync def main() -> None:\n # > Consume\n ref = await stream_task.aio_run_no_wait()\n\n async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id):\n print(chunk, flush=True, end=\"\")\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n", "source": "out/python/streaming/async_stream.py", "blocks": { "consume": { "start": 9, - "stop": 13 + "stop": 12 } }, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/streaming/fastapi_proxy.ts b/frontend/docs/lib/generated/snips/python/streaming/fastapi_proxy.ts index 50883176b..3bb384190 100644 --- a/frontend/docs/lib/generated/snips/python/streaming/fastapi_proxy.ts +++ b/frontend/docs/lib/generated/snips/python/streaming/fastapi_proxy.ts @@ -2,12 +2,12 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "from typing import AsyncGenerator\n\nfrom fastapi import FastAPI\nfrom fastapi.responses import StreamingResponse\n\nfrom examples.streaming.worker import stream_task\nfrom hatchet_sdk import RunEventListener, StepRunEventType\n\n# > FastAPI Proxy\napp = FastAPI()\n\n\nasync def generate_stream(stream: RunEventListener) -> AsyncGenerator[str, None]:\n async for chunk in stream:\n if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:\n yield chunk.payload\n\n\n@app.get(\"/stream\")\nasync def stream() -> StreamingResponse:\n ref = await stream_task.aio_run_no_wait()\n\n return StreamingResponse(generate_stream(ref.stream()), media_type=\"text/plain\")\n\n\n\nif __name__ == \"__main__\":\n import uvicorn\n\n uvicorn.run(app, host=\"0.0.0.0\", port=8000)\n", + "content": "from typing import AsyncGenerator\n\nfrom fastapi import FastAPI\nfrom fastapi.responses import StreamingResponse\n\nfrom examples.streaming.worker import stream_task\nfrom hatchet_sdk import Hatchet\n\n# > FastAPI Proxy\nhatchet = Hatchet()\napp = FastAPI()\n\n\n@app.get(\"/stream\")\nasync def stream() -> StreamingResponse:\n ref = await stream_task.aio_run_no_wait()\n\n return StreamingResponse(\n hatchet.runs.subscribe_to_stream(ref.workflow_run_id), media_type=\"text/plain\"\n )\n\n\n\nif __name__ == \"__main__\":\n import uvicorn\n\n uvicorn.run(app, host=\"0.0.0.0\", port=8000)\n", "source": "out/python/streaming/fastapi_proxy.py", "blocks": { "fastapi_proxy": { "start": 10, - "stop": 25 + "stop": 22 } }, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/streaming/test_streaming.ts b/frontend/docs/lib/generated/snips/python/streaming/test_streaming.ts index 0fcfca562..5360bd230 100644 --- a/frontend/docs/lib/generated/snips/python/streaming/test_streaming.ts +++ b/frontend/docs/lib/generated/snips/python/streaming/test_streaming.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "from subprocess import Popen\nfrom typing import Any\n\nimport pytest\n\nfrom examples.streaming.worker import chunks, stream_task\nfrom hatchet_sdk import Hatchet\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\n@pytest.mark.parametrize(\n \"on_demand_worker\",\n [\n (\n [\"poetry\", \"run\", \"python\", \"examples/streaming/worker.py\", \"--slots\", \"1\"],\n 8008,\n )\n ],\n indirect=True,\n)\n@pytest.mark.parametrize(\"execution_number\", range(5)) # run test multiple times\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_streaming_ordering_and_completeness(\n execution_number: int,\n hatchet: Hatchet,\n on_demand_worker: Popen[Any],\n) -> None:\n ref = await stream_task.aio_run_no_wait()\n\n ix = 0\n anna_karenina = \"\"\n\n async for chunk in ref.stream():\n if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM:\n assert chunks[ix] == chunk.payload\n ix += 1\n anna_karenina += chunk.payload\n\n assert ix == len(chunks)\n assert anna_karenina == \"\".join(chunks)\n\n await ref.aio_result()\n", + "content": "from subprocess import Popen\nfrom typing import Any\n\nimport pytest\n\nfrom examples.streaming.worker import chunks, stream_task\nfrom hatchet_sdk import Hatchet\nfrom hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType\n\n\n@pytest.mark.parametrize(\n \"on_demand_worker\",\n [\n (\n [\"poetry\", \"run\", \"python\", \"examples/streaming/worker.py\", \"--slots\", \"1\"],\n 8008,\n )\n ],\n indirect=True,\n)\n@pytest.mark.parametrize(\"execution_number\", range(5)) # run test multiple times\n@pytest.mark.asyncio(loop_scope=\"session\")\nasync def test_streaming_ordering_and_completeness(\n execution_number: int,\n hatchet: Hatchet,\n on_demand_worker: Popen[Any],\n) -> None:\n ref = await stream_task.aio_run_no_wait()\n\n ix = 0\n anna_karenina = \"\"\n\n async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id):\n assert chunks[ix] == chunk\n ix += 1\n anna_karenina += chunk\n\n assert ix == len(chunks)\n assert anna_karenina == \"\".join(chunks)\n\n await ref.aio_result()\n", "source": "out/python/streaming/test_streaming.py", "blocks": {}, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/typescript/streaming/index.ts b/frontend/docs/lib/generated/snips/typescript/streaming/index.ts index 3ba5a55ec..ae93c09d1 100644 --- a/frontend/docs/lib/generated/snips/typescript/streaming/index.ts +++ b/frontend/docs/lib/generated/snips/typescript/streaming/index.ts @@ -1,7 +1,9 @@ +import nextjs_proxy from './nextjs-proxy'; import run from './run'; import worker from './worker'; import workflow from './workflow'; +export { nextjs_proxy } export { run } export { worker } export { workflow } diff --git a/frontend/docs/lib/generated/snips/typescript/streaming/nextjs-proxy.ts b/frontend/docs/lib/generated/snips/typescript/streaming/nextjs-proxy.ts new file mode 100644 index 000000000..76bc7df89 --- /dev/null +++ b/frontend/docs/lib/generated/snips/typescript/streaming/nextjs-proxy.ts @@ -0,0 +1,16 @@ +import { Snippet } from '@/lib/generated/snips/types'; + +const snippet: Snippet = { + "language": "typescript ", + "content": "import { Readable } from 'stream';\nimport { hatchet } from '../hatchet-client';\nimport { streamingTask } from './workflow';\n\n// > NextJS Proxy\nexport async function GET() {\n try {\n const ref = await streamingTask.runNoWait({});\n const workflowRunId = await ref.getWorkflowRunId();\n\n const stream = Readable.from(hatchet.runs.subscribeToStream(workflowRunId));\n\n // @ts-ignore\n return new Response(Readable.toWeb(stream), {\n headers: {\n 'Content-Type': 'text/plain',\n 'Cache-Control': 'no-cache',\n Connection: 'keep-alive',\n },\n });\n } catch (error) {\n return new Response('Internal Server Error', { status: 500 });\n }\n}\n", + "source": "out/typescript/streaming/nextjs-proxy.ts", + "blocks": { + "nextjs_proxy": { + "start": 6, + "stop": 24 + } + }, + "highlights": {} +}; + +export default snippet; diff --git a/frontend/docs/lib/generated/snips/typescript/streaming/run.ts b/frontend/docs/lib/generated/snips/typescript/streaming/run.ts index b910b6610..d32c6714e 100644 --- a/frontend/docs/lib/generated/snips/typescript/streaming/run.ts +++ b/frontend/docs/lib/generated/snips/typescript/streaming/run.ts @@ -2,12 +2,12 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "typescript ", - "content": "import { RunEventType } from '@hatchet-dev/typescript-sdk-dev/typescript-sdk/clients/listeners/run-listener/child-listener-client';\nimport { streamingTask } from './workflow';\n\nasync function main() {\n // > Consume\n const ref = await streamingTask.runNoWait({});\n\n const stream = await ref.stream();\n\n for await (const event of stream) {\n if (event.type === RunEventType.STEP_RUN_EVENT_TYPE_STREAM) {\n process.stdout.write(event.payload);\n }\n }\n}\n\nif (require.main === module) {\n main()\n .catch(console.error)\n .finally(() => {\n process.exit(0);\n });\n}\n", + "content": "import { streamingTask } from './workflow';\nimport { hatchet } from '../hatchet-client';\n\nasync function main() {\n // > Consume\n const ref = await streamingTask.runNoWait({});\n const id = await ref.getWorkflowRunId();\n\n for await (const content of hatchet.runs.subscribeToStream(id)) {\n process.stdout.write(content);\n }\n}\n\nif (require.main === module) {\n main()\n .catch(console.error)\n .finally(() => {\n process.exit(0);\n });\n}\n", "source": "out/typescript/streaming/run.ts", "blocks": { "consume": { "start": 6, - "stop": 14 + "stop": 11 } }, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/typescript/streaming/workflow.ts b/frontend/docs/lib/generated/snips/typescript/streaming/workflow.ts index f935101b1..d091bf23a 100644 --- a/frontend/docs/lib/generated/snips/typescript/streaming/workflow.ts +++ b/frontend/docs/lib/generated/snips/typescript/streaming/workflow.ts @@ -2,12 +2,12 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "typescript ", - "content": "import sleep from '@hatchet-dev/typescript-sdk-dev/typescript-sdk/util/sleep';\nimport { hatchet } from '../hatchet-client';\n\n// > Streaming\nconst annaKarenina = `\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n`;\n\nfunction* createChunks(content: string, n: number): Generator {\n for (let i = 0; i < content.length; i += n) {\n yield content.slice(i, i + n);\n }\n}\n\nexport const streamingTask = hatchet.task({\n name: 'stream-example',\n fn: async (_, ctx) => {\n for (const chunk of createChunks(annaKarenina, 10)) {\n ctx.putStream(chunk);\n await sleep(200);\n }\n },\n});\n\n", + "content": "import sleep from '../../../util/sleep';\nimport { hatchet } from '../hatchet-client';\n\n// > Streaming\nconst annaKarenina = `\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n`;\n\nfunction* createChunks(content: string, n: number): Generator {\n for (let i = 0; i < content.length; i += n) {\n yield content.slice(i, i + n);\n }\n}\n\nexport const streamingTask = hatchet.task({\n name: 'stream-example',\n fn: async (_, ctx) => {\n await sleep(2000);\n\n for (const chunk of createChunks(annaKarenina, 10)) {\n ctx.putStream(chunk);\n await sleep(200);\n }\n },\n});\n\n", "source": "out/typescript/streaming/workflow.ts", "blocks": { "streaming": { "start": 5, - "stop": 26 + "stop": 28 } }, "highlights": {} diff --git a/frontend/docs/pages/home/streaming.mdx b/frontend/docs/pages/home/streaming.mdx index 25caad4cf..fd3e432d7 100644 --- a/frontend/docs/pages/home/streaming.mdx +++ b/frontend/docs/pages/home/streaming.mdx @@ -64,8 +64,22 @@ It's common to want to stream events out of a Hatchet task and back to the front In both cases, we recommend using your application's backend as a proxy for the stream, where you would subscribe to the stream of events from Hatchet, and then stream events through to the frontend as they're received by the backend. + + + For example, with FastAPI, you'd do the following: -Then, assuming you run the server on port `8000`, running `curl -N -X GET http://localhost:8000/stream` would result in the text streaming back to your console from Hatchet through your FastAPI proxy. + + + +For example, with NextJS backend-as-frontend, you'd do the following: + + + + + + + +Then, assuming you run the server on port `8000`, running `curl -N http://localhost:8000/stream` would result in the text streaming back to your console from Hatchet through your FastAPI proxy. diff --git a/sdks/python/CHANGELOG.md b/sdks/python/CHANGELOG.md index 7b226abe8..e8a51cccc 100644 --- a/sdks/python/CHANGELOG.md +++ b/sdks/python/CHANGELOG.md @@ -5,6 +5,19 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.14.2] - 2025-07-03 + +### Added + +- The `Runs` client now has `list_with_pagination` and `aio_list_with_pagination` methods that allow for listing workflow runs with internal pagination. The wrappers on the `Standalone` and `Workflow` classes have been updated to use these methods. +- Added retries with backoff to all of the REST API wrapper methods on the feature clients. + +## [1.14.1] - 2025-07-03 + +### Changed + +- `DurableContext.aio_wait_for` can now accept an or group, in addition to sleep and event conditions. + ## [1.14.0] - 2025-06-25 ### Added diff --git a/sdks/python/examples/streaming/async_stream.py b/sdks/python/examples/streaming/async_stream.py index 6801a237e..c86e529bb 100644 --- a/sdks/python/examples/streaming/async_stream.py +++ b/sdks/python/examples/streaming/async_stream.py @@ -1,6 +1,6 @@ import asyncio -from examples.streaming.worker import stream_task +from examples.streaming.worker import hatchet, stream_task from hatchet_sdk.clients.listeners.run_event_listener import StepRunEventType @@ -8,9 +8,8 @@ async def main() -> None: # > Consume ref = await stream_task.aio_run_no_wait() - async for chunk in ref.stream(): - if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM: - print(chunk.payload, flush=True, end="") + async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id): + print(chunk, flush=True, end="") # !! diff --git a/sdks/python/examples/streaming/fastapi_proxy.py b/sdks/python/examples/streaming/fastapi_proxy.py index cb296a9d5..2b066cd60 100644 --- a/sdks/python/examples/streaming/fastapi_proxy.py +++ b/sdks/python/examples/streaming/fastapi_proxy.py @@ -4,23 +4,20 @@ from fastapi import FastAPI from fastapi.responses import StreamingResponse from examples.streaming.worker import stream_task -from hatchet_sdk import RunEventListener, StepRunEventType +from hatchet_sdk import Hatchet # > FastAPI Proxy +hatchet = Hatchet() app = FastAPI() -async def generate_stream(stream: RunEventListener) -> AsyncGenerator[str, None]: - async for chunk in stream: - if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM: - yield chunk.payload - - @app.get("/stream") async def stream() -> StreamingResponse: ref = await stream_task.aio_run_no_wait() - return StreamingResponse(generate_stream(ref.stream()), media_type="text/plain") + return StreamingResponse( + hatchet.runs.subscribe_to_stream(ref.workflow_run_id), media_type="text/plain" + ) # !! diff --git a/sdks/python/examples/streaming/test_streaming.py b/sdks/python/examples/streaming/test_streaming.py index 089bd46fc..777ea79b2 100644 --- a/sdks/python/examples/streaming/test_streaming.py +++ b/sdks/python/examples/streaming/test_streaming.py @@ -30,11 +30,10 @@ async def test_streaming_ordering_and_completeness( ix = 0 anna_karenina = "" - async for chunk in ref.stream(): - if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM: - assert chunks[ix] == chunk.payload - ix += 1 - anna_karenina += chunk.payload + async for chunk in hatchet.runs.subscribe_to_stream(ref.workflow_run_id): + assert chunks[ix] == chunk + ix += 1 + anna_karenina += chunk assert ix == len(chunks) assert anna_karenina == "".join(chunks) diff --git a/sdks/python/hatchet_sdk/clients/v1/api_client.py b/sdks/python/hatchet_sdk/clients/v1/api_client.py index 72f24af4c..e38e9a4cb 100644 --- a/sdks/python/hatchet_sdk/clients/v1/api_client.py +++ b/sdks/python/hatchet_sdk/clients/v1/api_client.py @@ -1,8 +1,13 @@ +from collections.abc import Callable from typing import ParamSpec, TypeVar +import tenacity + from hatchet_sdk.clients.rest.api_client import ApiClient from hatchet_sdk.clients.rest.configuration import Configuration +from hatchet_sdk.clients.rest.exceptions import ServiceException from hatchet_sdk.config import ClientConfig +from hatchet_sdk.logger import logger from hatchet_sdk.utils.typing import JSONSerializableMapping ## Type variables to use with coroutines. @@ -20,7 +25,7 @@ P = ParamSpec("P") def maybe_additional_metadata_to_kv( - additional_metadata: dict[str, str] | JSONSerializableMapping | None + additional_metadata: dict[str, str] | JSONSerializableMapping | None, ) -> list[str] | None: if not additional_metadata: return None @@ -42,3 +47,24 @@ class BaseRestClient: def client(self) -> ApiClient: return ApiClient(self.api_config) + + +def retry(func: Callable[P, R]) -> Callable[P, R]: + return tenacity.retry( + reraise=True, + wait=tenacity.wait_exponential_jitter(), + stop=tenacity.stop_after_attempt(5), + before_sleep=_alert_on_retry, + retry=tenacity.retry_if_exception(_should_retry), + )(func) + + +def _alert_on_retry(retry_state: tenacity.RetryCallState) -> None: + logger.debug( + f"Retrying {retry_state.fn}: attempt " + f"{retry_state.attempt_number} ended with: {retry_state.outcome}", + ) + + +def _should_retry(ex: BaseException) -> bool: + return isinstance(ex, ServiceException) diff --git a/sdks/python/hatchet_sdk/features/cron.py b/sdks/python/hatchet_sdk/features/cron.py index 1b7bdcfaa..add83b065 100644 --- a/sdks/python/hatchet_sdk/features/cron.py +++ b/sdks/python/hatchet_sdk/features/cron.py @@ -19,6 +19,7 @@ from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import ( from hatchet_sdk.clients.v1.api_client import ( BaseRestClient, maybe_additional_metadata_to_kv, + retry, ) from hatchet_sdk.utils.typing import JSONSerializableMapping @@ -201,6 +202,7 @@ class CronClient(BaseRestClient): cron_name=cron_name, ) + @retry def list( self, offset: int | None = None, @@ -241,6 +243,7 @@ class CronClient(BaseRestClient): cron_name=cron_name, ) + @retry def get(self, cron_id: str) -> CronWorkflows: """ Retrieve a specific workflow cron trigger by ID. diff --git a/sdks/python/hatchet_sdk/features/filters.py b/sdks/python/hatchet_sdk/features/filters.py index bbe3bd07f..798ab16df 100644 --- a/sdks/python/hatchet_sdk/features/filters.py +++ b/sdks/python/hatchet_sdk/features/filters.py @@ -10,7 +10,7 @@ from hatchet_sdk.clients.rest.models.v1_filter_list import V1FilterList from hatchet_sdk.clients.rest.models.v1_update_filter_request import ( V1UpdateFilterRequest, ) -from hatchet_sdk.clients.v1.api_client import BaseRestClient +from hatchet_sdk.clients.v1.api_client import BaseRestClient, retry from hatchet_sdk.utils.typing import JSONSerializableMapping @@ -41,6 +41,7 @@ class FiltersClient(BaseRestClient): """ return await asyncio.to_thread(self.list, limit, offset, workflow_ids, scopes) + @retry def list( self, limit: int | None = None, @@ -67,6 +68,7 @@ class FiltersClient(BaseRestClient): scopes=scopes, ) + @retry def get( self, filter_id: str, diff --git a/sdks/python/hatchet_sdk/features/logs.py b/sdks/python/hatchet_sdk/features/logs.py index 776c2e3a5..4c96c4670 100644 --- a/sdks/python/hatchet_sdk/features/logs.py +++ b/sdks/python/hatchet_sdk/features/logs.py @@ -3,7 +3,7 @@ import asyncio from hatchet_sdk.clients.rest.api.log_api import LogApi from hatchet_sdk.clients.rest.api_client import ApiClient from hatchet_sdk.clients.rest.models.v1_log_line_list import V1LogLineList -from hatchet_sdk.clients.v1.api_client import BaseRestClient +from hatchet_sdk.clients.v1.api_client import BaseRestClient, retry class LogsClient(BaseRestClient): @@ -14,6 +14,7 @@ class LogsClient(BaseRestClient): def _la(self, client: ApiClient) -> LogApi: return LogApi(client) + @retry def list(self, task_run_id: str) -> V1LogLineList: """ List log lines for a given task run. diff --git a/sdks/python/hatchet_sdk/features/metrics.py b/sdks/python/hatchet_sdk/features/metrics.py index f856d6db1..2bdd8f98d 100644 --- a/sdks/python/hatchet_sdk/features/metrics.py +++ b/sdks/python/hatchet_sdk/features/metrics.py @@ -12,6 +12,7 @@ from hatchet_sdk.clients.rest.models.workflow_run_status import WorkflowRunStatu from hatchet_sdk.clients.v1.api_client import ( BaseRestClient, maybe_additional_metadata_to_kv, + retry, ) from hatchet_sdk.utils.typing import JSONSerializableMapping @@ -27,6 +28,7 @@ class MetricsClient(BaseRestClient): def _ta(self, client: ApiClient) -> TenantApi: return TenantApi(client) + @retry def get_workflow_metrics( self, workflow_id: str, @@ -66,6 +68,7 @@ class MetricsClient(BaseRestClient): self.get_workflow_metrics, workflow_id, status, group_key ) + @retry def get_queue_metrics( self, workflow_ids: list[str] | None = None, @@ -105,6 +108,7 @@ class MetricsClient(BaseRestClient): self.get_queue_metrics, workflow_ids, additional_metadata ) + @retry def get_task_metrics(self) -> TenantStepRunQueueMetrics: """ Retrieve queue metrics diff --git a/sdks/python/hatchet_sdk/features/runs.py b/sdks/python/hatchet_sdk/features/runs.py index 0384d8e2d..ff4e44cca 100644 --- a/sdks/python/hatchet_sdk/features/runs.py +++ b/sdks/python/hatchet_sdk/features/runs.py @@ -1,10 +1,15 @@ import asyncio +from collections.abc import AsyncIterator from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Literal, overload +from warnings import warn from pydantic import BaseModel, model_validator -from hatchet_sdk.clients.listeners.run_event_listener import RunEventListenerClient +from hatchet_sdk.clients.listeners.run_event_listener import ( + RunEventListenerClient, + StepRunEventType, +) from hatchet_sdk.clients.listeners.workflow_listener import PooledWorkflowRunListener from hatchet_sdk.clients.rest.api.task_api import TaskApi from hatchet_sdk.clients.rest.api.workflow_runs_api import WorkflowRunsApi @@ -13,6 +18,7 @@ from hatchet_sdk.clients.rest.models.v1_cancel_task_request import V1CancelTaskR from hatchet_sdk.clients.rest.models.v1_replay_task_request import V1ReplayTaskRequest from hatchet_sdk.clients.rest.models.v1_task_filter import V1TaskFilter from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus +from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary from hatchet_sdk.clients.rest.models.v1_task_summary_list import V1TaskSummaryList from hatchet_sdk.clients.rest.models.v1_trigger_workflow_run_request import ( V1TriggerWorkflowRunRequest, @@ -21,8 +27,11 @@ from hatchet_sdk.clients.rest.models.v1_workflow_run_details import V1WorkflowRu from hatchet_sdk.clients.v1.api_client import ( BaseRestClient, maybe_additional_metadata_to_kv, + retry, ) from hatchet_sdk.config import ClientConfig +from hatchet_sdk.utils.aio import gather_max_concurrency +from hatchet_sdk.utils.datetimes import partition_date_range from hatchet_sdk.utils.typing import JSONSerializableMapping if TYPE_CHECKING: @@ -110,6 +119,7 @@ class RunsClient(BaseRestClient): def _ta(self, client: ApiClient) -> TaskApi: return TaskApi(client) + @retry def get(self, workflow_run_id: str) -> V1WorkflowRunDetails: """ Get workflow run details for a given workflow run ID. @@ -129,6 +139,7 @@ class RunsClient(BaseRestClient): """ return await asyncio.to_thread(self.get, workflow_run_id) + @retry def get_status(self, workflow_run_id: str) -> V1TaskStatus: """ Get workflow run status for a given workflow run ID. @@ -148,6 +159,152 @@ class RunsClient(BaseRestClient): """ return await asyncio.to_thread(self.get_status, workflow_run_id) + @retry + def list_with_pagination( + self, + since: datetime | None = None, + only_tasks: bool = False, + offset: int | None = None, + limit: int | None = None, + statuses: list[V1TaskStatus] | None = None, + until: datetime | None = None, + additional_metadata: dict[str, str] | None = None, + workflow_ids: list[str] | None = None, + worker_id: str | None = None, + parent_task_external_id: str | None = None, + triggering_event_external_id: str | None = None, + ) -> list[V1TaskSummary]: + """ + List task runs according to a set of filters, paginating through days + + :param since: The start time for filtering task runs. + :param only_tasks: Whether to only list task runs. + :param offset: The offset for pagination. + :param limit: The maximum number of task runs to return. + :param statuses: The statuses to filter task runs by. + :param until: The end time for filtering task runs. + :param additional_metadata: Additional metadata to filter task runs by. + :param workflow_ids: The workflow IDs to filter task runs by. + :param worker_id: The worker ID to filter task runs by. + :param parent_task_external_id: The parent task external ID to filter task runs by. + :param triggering_event_external_id: The event id that triggered the task run. + + :return: A list of task runs matching the specified filters. + """ + + date_ranges = partition_date_range( + since=since or datetime.now(tz=timezone.utc) - timedelta(days=1), + until=until or datetime.now(tz=timezone.utc), + ) + + with self.client() as client: + responses = [ + self._wra(client).v1_workflow_run_list( + tenant=self.client_config.tenant_id, + since=s, + until=u, + only_tasks=only_tasks, + offset=offset, + limit=limit, + statuses=statuses, + additional_metadata=maybe_additional_metadata_to_kv( + additional_metadata + ), + workflow_ids=workflow_ids, + worker_id=worker_id, + parent_task_external_id=parent_task_external_id, + triggering_event_external_id=triggering_event_external_id, + ) + for s, u in date_ranges + ] + + ## Hack for uniqueness + run_id_to_run = { + run.metadata.id: run for record in responses for run in record.rows + } + + return sorted( + run_id_to_run.values(), + key=lambda x: x.created_at, + reverse=True, + ) + + @retry + async def aio_list_with_pagination( + self, + since: datetime | None = None, + only_tasks: bool = False, + offset: int | None = None, + limit: int | None = None, + statuses: list[V1TaskStatus] | None = None, + until: datetime | None = None, + additional_metadata: dict[str, str] | None = None, + workflow_ids: list[str] | None = None, + worker_id: str | None = None, + parent_task_external_id: str | None = None, + triggering_event_external_id: str | None = None, + ) -> list[V1TaskSummary]: + """ + List task runs according to a set of filters, paginating through days + + :param since: The start time for filtering task runs. + :param only_tasks: Whether to only list task runs. + :param offset: The offset for pagination. + :param limit: The maximum number of task runs to return. + :param statuses: The statuses to filter task runs by. + :param until: The end time for filtering task runs. + :param additional_metadata: Additional metadata to filter task runs by. + :param workflow_ids: The workflow IDs to filter task runs by. + :param worker_id: The worker ID to filter task runs by. + :param parent_task_external_id: The parent task external ID to filter task runs by. + :param triggering_event_external_id: The event id that triggered the task run. + + :return: A list of task runs matching the specified filters. + """ + + date_ranges = partition_date_range( + since=since or datetime.now(tz=timezone.utc) - timedelta(days=1), + until=until or datetime.now(tz=timezone.utc), + ) + + with self.client() as client: + coros = [ + asyncio.to_thread( + self._wra(client).v1_workflow_run_list, + tenant=self.client_config.tenant_id, + since=s, + until=u, + only_tasks=only_tasks, + offset=offset, + limit=limit, + statuses=statuses, + additional_metadata=maybe_additional_metadata_to_kv( + additional_metadata + ), + workflow_ids=workflow_ids, + worker_id=worker_id, + parent_task_external_id=parent_task_external_id, + triggering_event_external_id=triggering_event_external_id, + ) + for s, u in date_ranges + ] + + responses = await gather_max_concurrency( + *coros, + max_concurrency=3, + ) + + ## Hack for uniqueness + run_id_to_run = { + run.metadata.id: run for record in responses for run in record.rows + } + + return sorted( + run_id_to_run.values(), + key=lambda x: x.created_at, + reverse=True, + ) + async def aio_list( self, since: datetime | None = None, @@ -181,7 +338,7 @@ class RunsClient(BaseRestClient): """ return await asyncio.to_thread( self.list, - since=since or datetime.now(tz=timezone.utc) - timedelta(days=1), + since=since, only_tasks=only_tasks, offset=offset, limit=limit, @@ -194,6 +351,7 @@ class RunsClient(BaseRestClient): triggering_event_external_id=triggering_event_external_id, ) + @retry def list( self, since: datetime | None = None, @@ -225,10 +383,22 @@ class RunsClient(BaseRestClient): :return: A list of task runs matching the specified filters. """ + + since = since or datetime.now(tz=timezone.utc) - timedelta(days=1) + until = until or datetime.now(tz=timezone.utc) + + if (until - since).days >= 7: + warn( + "Listing runs with a date range longer than 7 days may result in performance issues. " + "Consider using `list_with_pagination` or `aio_list_with_pagination` instead.", + RuntimeWarning, + stacklevel=2, + ) + with self.client() as client: return self._wra(client).v1_workflow_run_list( tenant=self.client_config.tenant_id, - since=since or datetime.now(tz=timezone.utc) - timedelta(days=1), + since=since, only_tasks=only_tasks, offset=offset, limit=limit, @@ -376,6 +546,7 @@ class RunsClient(BaseRestClient): """ return await asyncio.to_thread(self.bulk_cancel, opts) + @retry def get_result(self, run_id: str) -> JSONSerializableMapping: """ Get the result of a workflow run by its external ID. @@ -413,3 +584,13 @@ class RunsClient(BaseRestClient): workflow_run_listener=self.workflow_run_listener, runs_client=self, ) + + async def subscribe_to_stream( + self, + workflow_run_id: str, + ) -> AsyncIterator[str]: + ref = self.get_run_ref(workflow_run_id=workflow_run_id) + + async for chunk in ref.stream(): + if chunk.type == StepRunEventType.STEP_RUN_EVENT_TYPE_STREAM: + yield chunk.payload diff --git a/sdks/python/hatchet_sdk/features/scheduled.py b/sdks/python/hatchet_sdk/features/scheduled.py index 2eddcc808..6db5d403b 100644 --- a/sdks/python/hatchet_sdk/features/scheduled.py +++ b/sdks/python/hatchet_sdk/features/scheduled.py @@ -21,6 +21,7 @@ from hatchet_sdk.clients.rest.models.workflow_run_order_by_direction import ( from hatchet_sdk.clients.v1.api_client import ( BaseRestClient, maybe_additional_metadata_to_kv, + retry, ) from hatchet_sdk.utils.typing import JSONSerializableMapping @@ -153,6 +154,7 @@ class ScheduledClient(BaseRestClient): statuses=statuses, ) + @retry def list( self, offset: int | None = None, @@ -193,6 +195,7 @@ class ScheduledClient(BaseRestClient): statuses=statuses, ) + @retry def get(self, scheduled_id: str) -> ScheduledWorkflows: """ Retrieves a specific scheduled workflow by scheduled run trigger ID. diff --git a/sdks/python/hatchet_sdk/features/tenant.py b/sdks/python/hatchet_sdk/features/tenant.py index 356a406d0..99e4d8794 100644 --- a/sdks/python/hatchet_sdk/features/tenant.py +++ b/sdks/python/hatchet_sdk/features/tenant.py @@ -3,7 +3,7 @@ import asyncio from hatchet_sdk.clients.rest.api.tenant_api import TenantApi from hatchet_sdk.clients.rest.api_client import ApiClient from hatchet_sdk.clients.rest.models.tenant import Tenant -from hatchet_sdk.clients.v1.api_client import BaseRestClient +from hatchet_sdk.clients.v1.api_client import BaseRestClient, retry class TenantClient(BaseRestClient): @@ -14,6 +14,7 @@ class TenantClient(BaseRestClient): def _ta(self, client: ApiClient) -> TenantApi: return TenantApi(client) + @retry def get(self) -> Tenant: """ Get the current tenant. diff --git a/sdks/python/hatchet_sdk/features/workers.py b/sdks/python/hatchet_sdk/features/workers.py index ee430cb48..df63ca94a 100644 --- a/sdks/python/hatchet_sdk/features/workers.py +++ b/sdks/python/hatchet_sdk/features/workers.py @@ -5,7 +5,7 @@ from hatchet_sdk.clients.rest.api_client import ApiClient from hatchet_sdk.clients.rest.models.update_worker_request import UpdateWorkerRequest from hatchet_sdk.clients.rest.models.worker import Worker from hatchet_sdk.clients.rest.models.worker_list import WorkerList -from hatchet_sdk.clients.v1.api_client import BaseRestClient +from hatchet_sdk.clients.v1.api_client import BaseRestClient, retry class WorkersClient(BaseRestClient): @@ -16,6 +16,7 @@ class WorkersClient(BaseRestClient): def _wa(self, client: ApiClient) -> WorkerApi: return WorkerApi(client) + @retry def get(self, worker_id: str) -> Worker: """ Get a worker by its ID. @@ -35,6 +36,7 @@ class WorkersClient(BaseRestClient): """ return await asyncio.to_thread(self.get, worker_id) + @retry def list( self, ) -> WorkerList: diff --git a/sdks/python/hatchet_sdk/features/workflows.py b/sdks/python/hatchet_sdk/features/workflows.py index 077540ca0..343500fac 100644 --- a/sdks/python/hatchet_sdk/features/workflows.py +++ b/sdks/python/hatchet_sdk/features/workflows.py @@ -6,7 +6,7 @@ from hatchet_sdk.clients.rest.api_client import ApiClient from hatchet_sdk.clients.rest.models.workflow import Workflow from hatchet_sdk.clients.rest.models.workflow_list import WorkflowList from hatchet_sdk.clients.rest.models.workflow_version import WorkflowVersion -from hatchet_sdk.clients.v1.api_client import BaseRestClient +from hatchet_sdk.clients.v1.api_client import BaseRestClient, retry class WorkflowsClient(BaseRestClient): @@ -31,6 +31,7 @@ class WorkflowsClient(BaseRestClient): """ return await asyncio.to_thread(self.get, workflow_id) + @retry def get(self, workflow_id: str) -> Workflow: """ Get a workflow by its ID. @@ -41,6 +42,7 @@ class WorkflowsClient(BaseRestClient): with self.client() as client: return self._wa(client).workflow_get(workflow_id) + @retry def list( self, workflow_name: str | None = None, @@ -81,6 +83,7 @@ class WorkflowsClient(BaseRestClient): """ return await asyncio.to_thread(self.list, workflow_name, limit, offset) + @retry def get_version( self, workflow_id: str, version: str | None = None ) -> WorkflowVersion: diff --git a/sdks/python/hatchet_sdk/runnables/workflow.py b/sdks/python/hatchet_sdk/runnables/workflow.py index 7554be9b5..11f5113c3 100644 --- a/sdks/python/hatchet_sdk/runnables/workflow.py +++ b/sdks/python/hatchet_sdk/runnables/workflow.py @@ -1,6 +1,6 @@ import asyncio from collections.abc import Callable -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from functools import cached_property from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast, get_type_hints @@ -310,9 +310,9 @@ class BaseWorkflow(Generic[TWorkflowInput]): :returns: A list of `V1TaskSummary` objects representing the runs of the workflow. """ - response = self.client.runs.list( + return self.client.runs.list_with_pagination( workflow_ids=[self.id], - since=since or datetime.now(tz=timezone.utc) - timedelta(days=1), + since=since, only_tasks=only_tasks, offset=offset, limit=limit, @@ -324,8 +324,6 @@ class BaseWorkflow(Generic[TWorkflowInput]): triggering_event_external_id=triggering_event_external_id, ) - return response.rows - async def aio_list_runs( self, since: datetime | None = None, @@ -355,9 +353,9 @@ class BaseWorkflow(Generic[TWorkflowInput]): :returns: A list of `V1TaskSummary` objects representing the runs of the workflow. """ - return await asyncio.to_thread( - self.list_runs, - since=since or datetime.now(tz=timezone.utc) - timedelta(days=1), + return await self.client.runs.aio_list_with_pagination( + workflow_ids=[self.id], + since=since, only_tasks=only_tasks, offset=offset, limit=limit, diff --git a/sdks/python/hatchet_sdk/utils/aio.py b/sdks/python/hatchet_sdk/utils/aio.py new file mode 100644 index 000000000..53a35c9cb --- /dev/null +++ b/sdks/python/hatchet_sdk/utils/aio.py @@ -0,0 +1,21 @@ +import asyncio +from collections.abc import Coroutine +from typing import TypeVar + +T = TypeVar("T") + + +async def gather_max_concurrency( + *tasks: Coroutine[None, None, T], + max_concurrency: int, +) -> list[T]: + sem = asyncio.Semaphore(max_concurrency) + + async def task_wrapper(task: Coroutine[None, None, T]) -> T: + async with sem: + return await task + + return await asyncio.gather( + *(task_wrapper(task) for task in tasks), + return_exceptions=False, + ) diff --git a/sdks/python/hatchet_sdk/utils/datetimes.py b/sdks/python/hatchet_sdk/utils/datetimes.py new file mode 100644 index 000000000..573d6d1ba --- /dev/null +++ b/sdks/python/hatchet_sdk/utils/datetimes.py @@ -0,0 +1,35 @@ +from datetime import datetime, timedelta, timezone +from typing import TypeVar + +T = TypeVar("T") +R = TypeVar("R") + + +def _to_utc(dt: datetime) -> datetime: + if not dt.tzinfo: + return dt.replace(tzinfo=timezone.utc) + + return dt.astimezone(timezone.utc) + + +def partition_date_range( + since: datetime, until: datetime +) -> list[tuple[datetime, datetime]]: + since = _to_utc(since) + until = _to_utc(until) + + ranges = [] + current_start = since + + while current_start < until: + next_day = (current_start + timedelta(days=1)).replace( + hour=0, minute=0, second=0, microsecond=0 + ) + + current_end = min(next_day, until) + + ranges.append((current_start, current_end)) + + current_start = next_day + + return ranges diff --git a/sdks/python/hatchet_sdk/worker/runner/runner.py b/sdks/python/hatchet_sdk/worker/runner/runner.py index e46036551..d6249bf02 100644 --- a/sdks/python/hatchet_sdk/worker/runner/runner.py +++ b/sdks/python/hatchet_sdk/worker/runner/runner.py @@ -550,7 +550,7 @@ class Runner: return "" if isinstance(output, BaseModel): - output = output.model_dump() + output = output.model_dump(mode="json") if not isinstance(output, dict): raise IllegalTaskOutputError( diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 01774cc91..4d8a1b34b 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.14.1" +version = "1.14.2" description = "" authors = ["Alexander Belanger "] readme = "README.md" diff --git a/sdks/python/tests/test_datetime_partitioning.py b/sdks/python/tests/test_datetime_partitioning.py new file mode 100644 index 000000000..38020c5ef --- /dev/null +++ b/sdks/python/tests/test_datetime_partitioning.py @@ -0,0 +1,67 @@ +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo + +from hatchet_sdk.utils.datetimes import partition_date_range + + +def test_partition_date_ranges_single_day() -> None: + since = datetime(2025, 1, 1, 1, 2, 3, tzinfo=timezone.utc) + until = datetime(2025, 1, 1, 3, 14, 15, tzinfo=timezone.utc) + partitioned = partition_date_range( + since, + until, + ) + + assert len(partitioned) == 1 + assert partitioned[0] == (since, until) + + +def test_partition_date_ranges_multi_day() -> None: + since = datetime(2025, 1, 1, 1, 2, 3, tzinfo=timezone.utc) + until = datetime(2025, 1, 4, 3, 14, 15, tzinfo=timezone.utc) + partitioned = partition_date_range( + since, + until, + ) + + assert len(partitioned) == 4 + assert partitioned[0] == ( + since, + datetime(2025, 1, 2, tzinfo=timezone.utc), + ) + assert partitioned[1] == ( + datetime(2025, 1, 2, tzinfo=timezone.utc), + datetime(2025, 1, 3, tzinfo=timezone.utc), + ) + assert partitioned[2] == ( + datetime(2025, 1, 3, tzinfo=timezone.utc), + datetime(2025, 1, 4, tzinfo=timezone.utc), + ) + assert partitioned[3] == ( + datetime(2025, 1, 4, tzinfo=timezone.utc), + until, + ) + + +def test_partition_date_ranges_non_utc() -> None: + since = datetime(2025, 1, 1, 22, 2, 3, tzinfo=ZoneInfo("America/New_York")) + until = datetime(2025, 1, 4, 3, 14, 15, tzinfo=timezone.utc) + + partitioned = partition_date_range( + since, + until, + ) + + assert len(partitioned) == 3 + assert partitioned[0] == ( + since.astimezone(timezone.utc), + datetime(2025, 1, 3, tzinfo=timezone.utc), + ) + assert partitioned[1] == ( + datetime(2025, 1, 3, tzinfo=timezone.utc), + datetime(2025, 1, 4, tzinfo=timezone.utc), + ) + assert partitioned[2] == ( + datetime(2025, 1, 4, tzinfo=timezone.utc), + until, + ) diff --git a/sdks/typescript/package.json b/sdks/typescript/package.json index 9e29e0d45..a439fa030 100644 --- a/sdks/typescript/package.json +++ b/sdks/typescript/package.json @@ -1,6 +1,6 @@ { "name": "@hatchet-dev/typescript-sdk", - "version": "1.9.0", + "version": "1.9.1", "description": "Background task orchestration & visibility for developers", "types": "dist/index.d.ts", "files": [ diff --git a/sdks/typescript/pnpm-lock.yaml b/sdks/typescript/pnpm-lock.yaml index 0d1fccb82..7e479270c 100644 --- a/sdks/typescript/pnpm-lock.yaml +++ b/sdks/typescript/pnpm-lock.yaml @@ -62,7 +62,7 @@ importers: version: 6.21.0(eslint@8.57.1)(typescript@5.8.2) autoprefixer: specifier: ^10.4.21 - version: 10.4.21(postcss@8.4.33) + version: 10.4.21(postcss@8.5.6) dotenv-cli: specifier: ^7.4.4 version: 7.4.4 @@ -2376,8 +2376,8 @@ packages: postcss-value-parser@4.2.0: resolution: {integrity: sha512-1NNCs6uurfkVbeXG4S8JFT9t19m45ICnif8zWLd5oPSZ50QnwMfK+H3jv408d4jw/7Bttv5axS5IiHoLaVNHeQ==} - postcss@8.4.33: - resolution: {integrity: sha512-Kkpbhhdjw2qQs2O2DGX+8m5OVqEcbB9HRBvuYM9pgrjEFUg30A9LmXNlTAUj4S9kgtGyrMbTzVjH7E+s5Re2yg==} + postcss@8.5.6: + resolution: {integrity: sha512-3Ybi1tAuwAP9s0r1UQ2J4n5Y0G05bJkpUIO0/bI9MhwmD70S5aTWbXGBwxHrelT+XM1k6dM0pk+SwNkpTRN7Pg==} engines: {node: ^10 || ^12 || >=14} prelude-ls@1.2.1: @@ -3889,14 +3889,14 @@ snapshots: atomic-sleep@1.0.0: {} - autoprefixer@10.4.21(postcss@8.4.33): + autoprefixer@10.4.21(postcss@8.5.6): dependencies: browserslist: 4.24.4 caniuse-lite: 1.0.30001707 fraction.js: 4.3.7 normalize-range: 0.1.2 picocolors: 1.1.1 - postcss: 8.4.33 + postcss: 8.5.6 postcss-value-parser: 4.2.0 available-typed-arrays@1.0.7: @@ -5686,7 +5686,7 @@ snapshots: postcss-value-parser@4.2.0: {} - postcss@8.4.33: + postcss@8.5.6: dependencies: nanoid: 3.3.11 picocolors: 1.1.1 diff --git a/sdks/typescript/src/clients/hatchet-client/hatchet-client.ts b/sdks/typescript/src/clients/hatchet-client/hatchet-client.ts index 6eacc22f0..c2d88003e 100644 --- a/sdks/typescript/src/clients/hatchet-client/hatchet-client.ts +++ b/sdks/typescript/src/clients/hatchet-client/hatchet-client.ts @@ -42,7 +42,8 @@ export class LegacyHatchetClient { config?: Partial, options?: HatchetClientOptions, axiosOpts?: AxiosRequestConfig, - runs?: RunsClient + runs?: RunsClient, + listener?: RunListenerClient ) { // Initializes a new Client instance. // Loads config in the following order: config param > yaml file > env vars @@ -91,12 +92,14 @@ export class LegacyHatchetClient { channelFactory(this.config, this.credentials), clientFactory ); - this.listener = new RunListenerClient( - this.config, - channelFactory(this.config, this.credentials), - clientFactory, - this.api - ); + this.listener = + listener || + new RunListenerClient( + this.config, + channelFactory(this.config, this.credentials), + clientFactory, + this.api + ); this.admin = new AdminClient( this.config, diff --git a/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts b/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts index fed10cb57..e5d020d4c 100644 --- a/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts +++ b/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts @@ -136,8 +136,11 @@ export class RunEventListener { }); } } + + this.eventEmitter.emit('complete'); } catch (e: any) { if (e.code === Status.CANCELLED) { + this.eventEmitter.emit('complete'); return; } if (e.code === Status.UNAVAILABLE) { @@ -164,6 +167,13 @@ export class RunEventListener { } async *stream(): AsyncGenerator { + let completed = false; + + this.eventEmitter.once('complete', () => { + completed = true; + this.eventEmitter.emit('event'); + }); + for await (const _ of on(this.eventEmitter, 'event')) { while (this.q.length > 0) { const r = this.q.shift(); @@ -171,6 +181,10 @@ export class RunEventListener { yield r; } } + + if (completed && this.q.length === 0) { + break; + } } } } diff --git a/sdks/typescript/src/v1/client/client.ts b/sdks/typescript/src/v1/client/client.ts index 4f0a48670..9e9ebcd6c 100644 --- a/sdks/typescript/src/v1/client/client.ts +++ b/sdks/typescript/src/v1/client/client.ts @@ -15,6 +15,9 @@ import { ConfigLoader } from '@hatchet/util/config-loader'; import { DEFAULT_LOGGER } from '@hatchet/clients/hatchet-client/hatchet-logger'; import { z } from 'zod'; import { LogLevel } from '@hatchet-dev/typescript-sdk/clients/event/event-client'; +import { RunListenerClient } from '@hatchet-dev/typescript-sdk/clients/listeners/run-listener/child-listener-client'; +import { addTokenMiddleware, channelFactory } from '@hatchet-dev/typescript-sdk/util/grpc-helpers'; +import { createClientFactory } from 'nice-grpc'; import { CreateTaskWorkflowOpts, CreateWorkflow, @@ -49,6 +52,7 @@ export class HatchetClient implements IHatchetClient { /** The underlying v0 client instance */ _v0: LegacyHatchetClient; _api: Api; + _listener: RunListenerClient; /** * @deprecated v0 client will be removed in a future release, please upgrade to v1 @@ -100,7 +104,24 @@ export class HatchetClient implements IHatchetClient { this.tenantId = clientConfig.tenant_id; this._api = api(clientConfig.api_url, clientConfig.token, axiosConfig); - this._v0 = new LegacyHatchetClient(clientConfig, options, axiosConfig, this.runs); + const clientFactory = createClientFactory().use(addTokenMiddleware(this.config.token)); + const credentials = + options?.credentials ?? ConfigLoader.createCredentials(this.config.tls_config); + + this._listener = new RunListenerClient( + this.config, + channelFactory(this.config, credentials), + clientFactory, + this.api + ); + + this._v0 = new LegacyHatchetClient( + clientConfig, + options, + axiosConfig, + this.runs, + this._listener + ); } catch (e) { if (e instanceof z.ZodError) { throw new Error(`Invalid client config: ${e.message}`); @@ -521,6 +542,6 @@ export class HatchetClient implements IHatchetClient { } runRef = any>(id: string): WorkflowRunRef { - return new WorkflowRunRef(id, this.v0.listener, this.runs); + return this.runs.runRef(id); } } diff --git a/sdks/typescript/src/v1/client/features/runs.ts b/sdks/typescript/src/v1/client/features/runs.ts index c54bce6c5..30f3a9ce3 100644 --- a/sdks/typescript/src/v1/client/features/runs.ts +++ b/sdks/typescript/src/v1/client/features/runs.ts @@ -1,5 +1,9 @@ import WorkflowRunRef from '@hatchet/util/workflow-run-ref'; import { V1TaskStatus, V1TaskFilter } from '@hatchet/clients/rest/generated/data-contracts'; +import { + RunEventType, + RunListenerClient, +} from '@hatchet-dev/typescript-sdk/clients/listeners/run-listener/child-listener-client'; import { WorkflowsClient } from './workflows'; import { HatchetClient } from '../client'; @@ -67,11 +71,15 @@ export class RunsClient { api: HatchetClient['api']; tenantId: string; workflows: WorkflowsClient; + listener: RunListenerClient; constructor(client: HatchetClient) { this.api = client.api; this.tenantId = client.tenantId; this.workflows = client.workflows; + + // eslint-disable-next-line no-underscore-dangle + this.listener = client._listener; } async get(run: string | WorkflowRunRef) { @@ -156,4 +164,19 @@ export class RunsClient { triggering_event_external_id: opts.triggeringEventExternalId, }; } + + runRef = any>(id: string): WorkflowRunRef { + return new WorkflowRunRef(id, this.listener, this); + } + + async *subscribeToStream(workflowRunId: string): AsyncIterableIterator { + const ref = this.runRef(workflowRunId); + const stream = await ref.stream(); + + for await (const event of stream) { + if (event.type === RunEventType.STEP_RUN_EVENT_TYPE_STREAM) { + yield event.payload; + } + } + } } diff --git a/sdks/typescript/src/v1/examples/streaming/nextjs-proxy.ts b/sdks/typescript/src/v1/examples/streaming/nextjs-proxy.ts new file mode 100644 index 000000000..d7fdfed6b --- /dev/null +++ b/sdks/typescript/src/v1/examples/streaming/nextjs-proxy.ts @@ -0,0 +1,25 @@ +import { Readable } from 'stream'; +import { hatchet } from '../hatchet-client'; +import { streamingTask } from './workflow'; + +// > NextJS Proxy +export async function GET() { + try { + const ref = await streamingTask.runNoWait({}); + const workflowRunId = await ref.getWorkflowRunId(); + + const stream = Readable.from(hatchet.runs.subscribeToStream(workflowRunId)); + + // @ts-ignore + return new Response(Readable.toWeb(stream), { + headers: { + 'Content-Type': 'text/plain', + 'Cache-Control': 'no-cache', + Connection: 'keep-alive', + }, + }); + } catch (error) { + return new Response('Internal Server Error', { status: 500 }); + } +} +// !! diff --git a/sdks/typescript/src/v1/examples/streaming/run.ts b/sdks/typescript/src/v1/examples/streaming/run.ts index 629805d64..dc71995fc 100644 --- a/sdks/typescript/src/v1/examples/streaming/run.ts +++ b/sdks/typescript/src/v1/examples/streaming/run.ts @@ -1,17 +1,14 @@ /* eslint-disable no-console */ -import { RunEventType } from '@hatchet-dev/typescript-sdk/clients/listeners/run-listener/child-listener-client'; import { streamingTask } from './workflow'; +import { hatchet } from '../hatchet-client'; async function main() { // > Consume const ref = await streamingTask.runNoWait({}); + const id = await ref.getWorkflowRunId(); - const stream = await ref.stream(); - - for await (const event of stream) { - if (event.type === RunEventType.STEP_RUN_EVENT_TYPE_STREAM) { - process.stdout.write(event.payload); - } + for await (const content of hatchet.runs.subscribeToStream(id)) { + process.stdout.write(content); } // !! } diff --git a/sdks/typescript/src/v1/examples/streaming/workflow.ts b/sdks/typescript/src/v1/examples/streaming/workflow.ts index 772b38b8f..2b8d3516e 100644 --- a/sdks/typescript/src/v1/examples/streaming/workflow.ts +++ b/sdks/typescript/src/v1/examples/streaming/workflow.ts @@ -1,4 +1,4 @@ -import sleep from '@hatchet-dev/typescript-sdk/util/sleep'; +import sleep from '../../../util/sleep'; import { hatchet } from '../hatchet-client'; // > Streaming @@ -17,6 +17,8 @@ function* createChunks(content: string, n: number): Generator { + await sleep(2000); + for (const chunk of createChunks(annaKarenina, 10)) { ctx.putStream(chunk); await sleep(200);