From 93454d6e75273a0e7c601ee65b67cdd840bca131 Mon Sep 17 00:00:00 2001 From: matt Date: Tue, 9 Sep 2025 15:37:20 -0400 Subject: [PATCH] Fix: More doc snippets (#2267) * fix: batch i * fix: batch ii * fix: batch iii * fix: batch iv * fix: batch v * fix: guide * fix: batch vi * fix: batch vii * fix: dag docs * fix: separate dag tasks --- examples/python/blocked_loop_blog/snippets.py | 40 ++++++ examples/python/fastapi_blog/trigger.py | 93 ++++++++++++++ .../typescript/affinity/affinity-workers.ts | 1 + .../typescript/child_workflows/workflow.ts | 31 +++++ examples/typescript/dag/run.ts | 1 + examples/typescript/dag/workflow.ts | 17 ++- examples/typescript/hatchet-client.ts | 2 + examples/typescript/on_event/event.ts | 31 +++++ examples/typescript/simple/replay-cancel.ts | 23 ++++ examples/typescript/simple/run.ts | 12 ++ .../typescript/simple/typed-run-methods.ts | 26 ++++ frontend/docs/pages/_setup/_existing/ts.mdx | 14 +-- frontend/docs/pages/_setup/_new/ts.mdx | 12 +- .../blog/background-tasks-fastapi-hatchet.mdx | 42 +------ .../pages/blog/mergent-migration-guide.mdx | 2 +- .../pages/blog/task-queue-modern-python.mdx | 8 +- .../docs/pages/home/additional-metadata.mdx | 32 ++--- frontend/docs/pages/home/asyncio.mdx | 40 ++---- frontend/docs/pages/home/child-spawning.mdx | 70 ++--------- frontend/docs/pages/home/dags.mdx | 70 ++--------- .../pages/home/migration-guide-typescript.mdx | 6 +- .../docs/pages/home/v1-sdk-improvements.mdx | 119 +++--------------- frontend/docs/pages/home/worker-affinity.mdx | 28 +---- .../self-hosting/improving-performance.mdx | 54 +------- .../examples/blocked_loop_blog/snippets.py | 44 +++++++ sdks/python/examples/fastapi_blog/trigger.py | 95 ++++++++++++++ .../v1/examples/affinity/affinity-workers.ts | 2 + .../v1/examples/child_workflows/workflow.ts | 33 +++++ sdks/typescript/src/v1/examples/dag/run.ts | 2 + .../src/v1/examples/dag/workflow.ts | 21 +++- .../src/v1/examples/hatchet-client.ts | 3 + .../src/v1/examples/on_event/event.ts | 33 +++++ .../src/v1/examples/simple/replay-cancel.ts | 24 ++++ sdks/typescript/src/v1/examples/simple/run.ts | 13 ++ .../v1/examples/simple/typed-run-methods.ts | 28 +++++ 35 files changed, 653 insertions(+), 419 deletions(-) create mode 100644 examples/python/blocked_loop_blog/snippets.py create mode 100644 examples/python/fastapi_blog/trigger.py create mode 100644 examples/typescript/simple/replay-cancel.ts create mode 100644 examples/typescript/simple/typed-run-methods.ts create mode 100644 sdks/python/examples/blocked_loop_blog/snippets.py create mode 100644 sdks/python/examples/fastapi_blog/trigger.py create mode 100644 sdks/typescript/src/v1/examples/simple/replay-cancel.ts create mode 100644 sdks/typescript/src/v1/examples/simple/typed-run-methods.ts diff --git a/examples/python/blocked_loop_blog/snippets.py b/examples/python/blocked_loop_blog/snippets.py new file mode 100644 index 000000000..f7d071182 --- /dev/null +++ b/examples/python/blocked_loop_blog/snippets.py @@ -0,0 +1,40 @@ +import asyncio +import time + + +# > Async-safe +async def async_safe() -> int: + await asyncio.sleep(5) + + return 42 + + + + +# > Blocking +async def blocking() -> int: + time.sleep(5) + + return 42 + + + + +# > Using to_thread +async def to_thread() -> int: + await asyncio.to_thread(time.sleep, 5) + + return 42 + + + + +# > Using run_in_executor +async def run_in_executor() -> int: + loop = asyncio.get_event_loop() + + await loop.run_in_executor(None, time.sleep, 5) + + return 42 + + diff --git a/examples/python/fastapi_blog/trigger.py b/examples/python/fastapi_blog/trigger.py new file mode 100644 index 000000000..48b7326b1 --- /dev/null +++ b/examples/python/fastapi_blog/trigger.py @@ -0,0 +1,93 @@ +from types import TracebackType + +from fastapi import BackgroundTasks, FastAPI +from pydantic import BaseModel + +from hatchet_sdk import Context, Hatchet + + +class Session: + ## simulate async db session + async def __aenter__(self) -> "Session": + return self + + async def __aexit__( + self, + type_: type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + pass + + +class User: + def __init__(self, id: int, email: str): + self.id = id + self.email = email + + +async def get_user(db: Session, user_id: int) -> User: + return User(user_id, "test@example.com") + + +async def create_user(db: Session) -> User: + return User(1, "test@example.com") + + +async def send_welcome_email(email: str) -> None: + print(f"Sending welcome email to {email}") + + +app = FastAPI() +hatchet = Hatchet() + + +# > FastAPI Background Tasks +async def send_welcome_email_task_bg(user_id: int) -> None: + async with Session() as db: + user = await get_user(db, user_id) + + await send_welcome_email(user.email) + + +@app.post("/user") +async def post__create_user__background_tasks( + background_tasks: BackgroundTasks, +) -> User: + async with Session() as db: + user = await create_user(db) + + background_tasks.add_task(send_welcome_email_task_bg, user.id) + + return user + + + + +# > Hatchet Task +class WelcomeEmailInput(BaseModel): + user_id: int + + +@hatchet.task(input_validator=WelcomeEmailInput) +async def send_welcome_email_task_hatchet( + input: WelcomeEmailInput, _ctx: Context +) -> None: + async with Session() as db: + user = await get_user(db, input.user_id) + + await send_welcome_email(user.email) + + +@app.post("/user") +async def post__create_user__hatchet() -> User: + async with Session() as db: + user = await create_user(db) + + await send_welcome_email_task_hatchet.aio_run_no_wait( + WelcomeEmailInput(user_id=user.id) + ) + + return user + + diff --git a/examples/typescript/affinity/affinity-workers.ts b/examples/typescript/affinity/affinity-workers.ts index e1860cd83..486af75a2 100644 --- a/examples/typescript/affinity/affinity-workers.ts +++ b/examples/typescript/affinity/affinity-workers.ts @@ -24,6 +24,7 @@ workflow.task({ }); +// > Task with labels const childWorkflow = hatchet.workflow({ name: 'child-affinity-workflow', description: 'test', diff --git a/examples/typescript/child_workflows/workflow.ts b/examples/typescript/child_workflows/workflow.ts index f8f2db8b2..cf10710bb 100644 --- a/examples/typescript/child_workflows/workflow.ts +++ b/examples/typescript/child_workflows/workflow.ts @@ -38,3 +38,34 @@ export const parent = hatchet.task({ }; }, }); + +// > Parent with Single Child +export const parentSingleChild = hatchet.task({ + name: 'parent-single-child', + fn: async () => { + const childRes = await child.run({ N: 1 }); + + return { + Result: childRes.Value, + }; + }, +}); + +// > Parent with Error Handling +export const withErrorHandling = hatchet.task({ + name: 'parent-error-handling', + fn: async () => { + try { + const childRes = await child.run({ N: 1 }); + + return { + Result: childRes.Value, + }; + } catch (error) { + // decide how to proceed here + return { + Result: -1, + }; + } + }, +}); diff --git a/examples/typescript/dag/run.ts b/examples/typescript/dag/run.ts index 8526d7fe0..12f909acf 100644 --- a/examples/typescript/dag/run.ts +++ b/examples/typescript/dag/run.ts @@ -1,6 +1,7 @@ import { dag } from './workflow'; async function main() { + // > Run the workflow const res = await dag.run({ Message: 'hello world', }); diff --git a/examples/typescript/dag/workflow.ts b/examples/typescript/dag/workflow.ts index 22d7b3ba2..d38a7c104 100644 --- a/examples/typescript/dag/workflow.ts +++ b/examples/typescript/dag/workflow.ts @@ -1,5 +1,6 @@ import { hatchet } from '../hatchet-client'; +// > Declaring Types type DagInput = { Message: string; }; @@ -17,6 +18,7 @@ export const dag = hatchet.workflow({ name: 'simple', }); +// > First task // Next, we declare the tasks bound to the workflow const toLower = dag.task({ name: 'to-lower', @@ -27,7 +29,7 @@ const toLower = dag.task({ }, }); -// Next, we declare the tasks bound to the workflow +// > Second task with parent dag.task({ name: 'reverse', parents: [toLower], @@ -39,3 +41,16 @@ dag.task({ }; }, }); + +// > Accessing Parent Outputs +dag.task({ + name: 'task-with-parent-output', + parents: [toLower], + fn: async (input, ctx) => { + const lower = await ctx.parentOutput(toLower); + return { + Original: input.Message, + Transformed: lower.TransformedMessage.split('').reverse().join(''), + }; + }, +}); diff --git a/examples/typescript/hatchet-client.ts b/examples/typescript/hatchet-client.ts index fa9fd077f..19d4150ce 100644 --- a/examples/typescript/hatchet-client.ts +++ b/examples/typescript/hatchet-client.ts @@ -1,3 +1,5 @@ +// > Create client import { HatchetClient } from '@hatchet-dev/typescript-sdk/v1'; export const hatchet = HatchetClient.init(); + diff --git a/examples/typescript/on_event/event.ts b/examples/typescript/on_event/event.ts index 510eee018..07d582aaf 100644 --- a/examples/typescript/on_event/event.ts +++ b/examples/typescript/on_event/event.ts @@ -8,6 +8,37 @@ async function main() { ShouldSkip: false, }); + // > Push an Event with Metadata + const withMetadata = await hatchet.events.push( + 'user:create', + { + test: 'test', + }, + { + additionalMetadata: { + source: 'api', // Arbitrary key-value pair + }, + } + ); + + // > Bulk push events + const events = [ + { + payload: { test: 'test1' }, + additionalMetadata: { user_id: 'user1', source: 'test' }, + }, + { + payload: { test: 'test2' }, + additionalMetadata: { user_id: 'user2', source: 'test' }, + }, + { + payload: { test: 'test3' }, + additionalMetadata: { user_id: 'user3', source: 'test' }, + }, + ]; + + await hatchet.events.bulkPush('user:create', events); + console.log(res.eventId); } diff --git a/examples/typescript/simple/replay-cancel.ts b/examples/typescript/simple/replay-cancel.ts new file mode 100644 index 000000000..8b2932af7 --- /dev/null +++ b/examples/typescript/simple/replay-cancel.ts @@ -0,0 +1,23 @@ +import { V1TaskStatus } from '@hatchet-dev/typescript-sdk/clients/rest/generated/data-contracts'; +import { hatchet } from '../hatchet-client'; + +async function main() { + const { runs } = hatchet; + + // > API operations + // list all failed runs + const allFailedRuns = await runs.list({ + statuses: [V1TaskStatus.FAILED], + }); + + // replay by ids + await runs.replay({ ids: allFailedRuns.rows?.map((r) => r.metadata.id) }); + + // or you can run bulk operations with filters directly + await runs.cancel({ + filters: { + since: new Date('2025-03-27'), + additionalMetadata: { user: '123' }, + }, + }); +} diff --git a/examples/typescript/simple/run.ts b/examples/typescript/simple/run.ts index 195a871f4..e0823896c 100644 --- a/examples/typescript/simple/run.ts +++ b/examples/typescript/simple/run.ts @@ -48,6 +48,18 @@ export async function extra() { }; }, }); + + // > Run with metadata + const withMetadata = simple.run( + { + Message: 'HeLlO WoRlD', + }, + { + additionalMetadata: { + source: 'api', // Arbitrary key-value pair + }, + } + ); } if (require.main === module) { diff --git a/examples/typescript/simple/typed-run-methods.ts b/examples/typescript/simple/typed-run-methods.ts new file mode 100644 index 000000000..ed080ec13 --- /dev/null +++ b/examples/typescript/simple/typed-run-methods.ts @@ -0,0 +1,26 @@ +import { simple } from './workflow'; + +async function main() { + // > Run methods + const input = { Message: 'Hello, World!' }; + + // run now + const result = await simple.run(input); + const runReference = await simple.runNoWait(input); + + // or in the future + const runAt = new Date(new Date().setHours(12, 0, 0, 0) + 24 * 60 * 60 * 1000); + const scheduled = await simple.schedule(runAt, input); + const cron = await simple.cron('simple-daily', '0 0 * * *', input); +} + +async function runFlavors() { + // > Run method flavors + const input = { Message: 'Hello, World!' }; + + // Run workflow and wait for the result + const result = await simple.run(input); + + // Enqueue workflow to be executed asynchronously + const runReference = await simple.runNoWait(input); +} diff --git a/frontend/docs/pages/_setup/_existing/ts.mdx b/frontend/docs/pages/_setup/_existing/ts.mdx index 9704fa738..826bf8050 100644 --- a/frontend/docs/pages/_setup/_existing/ts.mdx +++ b/frontend/docs/pages/_setup/_existing/ts.mdx @@ -1,3 +1,5 @@ +import { snippets } from "@/lib/generated/snippets"; +import { Snippet } from "@/components/code"; import { Callout, Card, Cards, Steps, Tabs } from "nextra/components"; import UniversalTabs from "@/components/UniversalTabs"; import InstallCommand from "@/components/InstallCommand"; @@ -33,14 +35,4 @@ touch hatchet-client.ts Add the following code to the file: -```typescript copy -import { Hatchet } from "@hatchet-dev/typescript-sdk"; - -export const hatchet = Hatchet.init(); -``` - -You can now import the Hatchet Client in any file that needs it. - -```typescript copy -import { hatchet } from "./hatchet-client"; -``` + diff --git a/frontend/docs/pages/_setup/_new/ts.mdx b/frontend/docs/pages/_setup/_new/ts.mdx index b4043d4ad..34e8c26ba 100644 --- a/frontend/docs/pages/_setup/_new/ts.mdx +++ b/frontend/docs/pages/_setup/_new/ts.mdx @@ -1,3 +1,5 @@ +import { snippets } from "@/lib/generated/snippets"; +import { Snippet } from "@/components/code"; import { Callout, Card, Cards, Steps, Tabs } from "nextra/components"; import UniversalTabs from "@/components/UniversalTabs"; import InstallCommand from "@/components/InstallCommand"; @@ -32,14 +34,6 @@ touch src/hatchet-client.ts Add the following code to the file: -```typescript copy -import { Hatchet } from "@hatchet-dev/typescript-sdk"; - -export const hatchet = Hatchet.init(); -``` + You can now import the Hatchet Client in any file that needs it. - -```typescript copy -import { hatchet } from "./hatchet-client"; -``` diff --git a/frontend/docs/pages/blog/background-tasks-fastapi-hatchet.mdx b/frontend/docs/pages/blog/background-tasks-fastapi-hatchet.mdx index 3a41f52ce..0c1c54715 100644 --- a/frontend/docs/pages/blog/background-tasks-fastapi-hatchet.mdx +++ b/frontend/docs/pages/blog/background-tasks-fastapi-hatchet.mdx @@ -2,6 +2,8 @@ import DynamicLottie from "../../components/DynamicLottie"; import { LogViewer } from "../../components/LogViewer"; import * as prefetch from "./_celery_prefetch.json"; import { Callout, Card, Cards, Steps, Tabs } from "nextra/components"; +import { snippets } from "@/lib/generated/snippets"; +import { Snippet } from "@/components/code"; # **Background Tasks: From FastAPI to Hatchet** @@ -44,47 +46,11 @@ Hatchet's functionality is built to solve exactly these problems, and many more Porting your tasks from FastAPI background tasks to Hatchet is simple - all you need to do is create Hatchet tasks out of the functions you're passing to `add_task`. For instance: -```python -async def send_welcome_email_task(user_id: int) -> None: - async with Session() as db: - user = await get_user(db, user_id) - - await send_welcome_email(user.email) - -@app.post("/user") -async def create_user(background_tasks: BackgroundTasks) -> User: - async with Session() as db: - user = await create_user(db) - - background_tasks.add_task(send_welcome_email_task, user.id) - - return user -``` + Would become: -```python -class WelcomeEmailInput(BaseModel): - user_id: int - -@hatchet.task(input_validator=WelcomeEmailInput) -async def send_welcome_email_task(input: WelcomeEmailInput, _ctx: Context) -> None: - async with Session() as db: - user = await get_user(db, input.user_id) - - await send_welcome_email(user.email) - -@app.post("/user") -async def create_user() -> User: - async with Session() as db: - user = await create_user(db) - - await send_welcome_email_task.aio_run_no_wait( - WelcomeEmailInput(user_id=user.id) - ) - - return user -``` + And that's it! When you trigger the Hatchet task (in this case, in ["fire and forget"](../home/run-no-wait.mdx) style), your task will be sent through the Hatchet Engine to your worker, where it will execute, and report its result in the dashboard for you to see. Or if something goes wrong, you can be notified. diff --git a/frontend/docs/pages/blog/mergent-migration-guide.mdx b/frontend/docs/pages/blog/mergent-migration-guide.mdx index 61fdb7062..048f9aa2d 100644 --- a/frontend/docs/pages/blog/mergent-migration-guide.mdx +++ b/frontend/docs/pages/blog/mergent-migration-guide.mdx @@ -38,7 +38,7 @@ It is recommended to instantiate a shared Hatchet Client in a separate file as a Create a new file called `hatchet-client.ts` in your project root. - + You can now import the Hatchet Client in any file that needs it. diff --git a/frontend/docs/pages/blog/task-queue-modern-python.mdx b/frontend/docs/pages/blog/task-queue-modern-python.mdx index dc6c1fc17..871f6c038 100644 --- a/frontend/docs/pages/blog/task-queue-modern-python.mdx +++ b/frontend/docs/pages/blog/task-queue-modern-python.mdx @@ -69,9 +69,11 @@ Finally, Hatchet also has first-class support for cron jobs. You can either crea Or you can define them declaratively when you create your workflow: -```python -cron_workflow = hatchet.workflow(name="CronWorkflow", on_crons=["* * * * *"]) -``` + Importantly, first-class support for crons in Hatchet means there's no need for a tool like [Beat](https://docs.celeryq.dev/en/latest/userguide/periodic-tasks.html#introduction) in Celery for handling scheduling periodic tasks. diff --git a/frontend/docs/pages/home/additional-metadata.mdx b/frontend/docs/pages/home/additional-metadata.mdx index 551728732..7f6ccab3d 100644 --- a/frontend/docs/pages/home/additional-metadata.mdx +++ b/frontend/docs/pages/home/additional-metadata.mdx @@ -24,19 +24,9 @@ You can attach additional metadata when pushing events or triggering task runs u -```typescript -hatchet.event.push( - 'user:create', - { - test: 'test', - }, - { - additionalMetadata: { - source: 'api', // Arbitrary key-value pair - }, - } -); -``` + + + ```go @@ -62,18 +52,10 @@ err := c.Event().Push( -```typescript -const taskRunId = await simple.run( - { - userId: '1234', - }, - { - additionalMetadata: { - source: 'api', // Arbitrary key-value pair - }, - } -); -``` + + ```go diff --git a/frontend/docs/pages/home/asyncio.mdx b/frontend/docs/pages/home/asyncio.mdx index f97c9c7b3..3ee8e7855 100644 --- a/frontend/docs/pages/home/asyncio.mdx +++ b/frontend/docs/pages/home/asyncio.mdx @@ -1,4 +1,6 @@ import { Callout } from "nextra/components"; +import { Snippet } from "@/components/code"; +import { snippets } from "@/lib/generated/snippets"; # Working with `asyncio` @@ -13,23 +15,11 @@ However, as is the case in FastAPI, when using `asyncio` in Hatchet, you need to For example, this is async-safe: -```python -import asyncio - -async def my_task() -> int: - await asyncio.sleep(5) - - return 42 -``` + But this is not: -```python -async def my_task() -> int: - time.sleep(5) - - return 42 -``` + In the second case, your worker will not be able to process any other work that's defined as async until the five-second sleep has finished. @@ -37,25 +27,11 @@ In the second case, your worker will not be able to process any other work that' To avoid problems caused by blocking code, you can run your blocking code in an executor with `asyncio.to_thread` or, more verbosely, `loop.run_in_executor`. The two examples below are async-safe and will no longer block the event loop. -```python -import asyncio + -async def my_task() -> int: - await asyncio.to_thread(time.sleep, 5) - - return 42 -``` - -```python -import asyncio - -async def my_task() -> int: - loop = asyncio.get_event_loop() - - await loop.run_in_executor(None, time.sleep, 5) - - return 42 -``` + ### More Resources for working with `asyncio` diff --git a/frontend/docs/pages/home/child-spawning.mdx b/frontend/docs/pages/home/child-spawning.mdx index d1032c94f..06e203c32 100644 --- a/frontend/docs/pages/home/child-spawning.mdx +++ b/frontend/docs/pages/home/child-spawning.mdx @@ -32,43 +32,9 @@ And that's it! The fanout parent will run and spawn the child, and then will col -```typescript -import { hatchet } from "../hatchet-client"; - -// Child task definition -export const child = hatchet.task({ - name: "child", - fn: (input) => { - return { - Value: input.N, - }; - }, -}); - -// Parent task definition -export const parent = hatchet.task({ - name: "parent", - fn: async (input, ctx) => { - const n = input.N; - const promises = []; - - // Spawn multiple child tasks in parallel - for (let i = 0; i < n; i++) { - promises.push(child.run({ N: i })); - } - - // Wait for all child tasks to complete - const childRes = await Promise.all(promises); - - // Sum the results - const sum = childRes.reduce((acc, curr) => acc + curr.Value, 0); - - return { - Result: sum, - }; - }, -}); -``` + @@ -216,10 +182,9 @@ To spawn and run a child task from a parent task, use the appropriate method for -```typescript -// Inside a parent task -const childResult = await childTask.run(childInput); -``` + @@ -244,14 +209,9 @@ As shown in the examples above, you can spawn multiple child tasks in parallel: -```typescript -// Run multiple child tasks in parallel -const promises = []; -for (let i = 0; i < n; i++) { - promises.push(child.run({ N: i })); -} -const childResults = await Promise.all(promises); -``` + @@ -303,15 +263,9 @@ When working with child workflows, it's important to properly handle errors. Her -```typescript -try { - const childResult = await child.run({ N: i }); -} catch (error) { - // Handle error from child workflow - console.error(`Child workflow failed: ${error}`); - // Decide how to proceed - retry, skip, or fail the parent -} -``` + diff --git a/frontend/docs/pages/home/dags.mdx b/frontend/docs/pages/home/dags.mdx index 4a46d2444..f4734a776 100644 --- a/frontend/docs/pages/home/dags.mdx +++ b/frontend/docs/pages/home/dags.mdx @@ -21,24 +21,7 @@ The returned object is an instance of the `Workflow` class, which is the primary -```typescript -import { hatchet } from "../hatchet-client"; - -type DagInput = { - Message: string; -}; - -type DagOutput = { - reverse: { - Original: string; - Transformed: string; - }; -}; - -export const simple = hatchet.workflow({ - name: "simple", -}); -``` + @@ -78,16 +61,7 @@ For instance, `def task_1(foo: EmptyModel, bar: Context) -> None:` is perfectly -```typescript -const toLower = simple.task({ - name: "to-lower", - fn: (input) => { - return { - TransformedMessage: input.Message.toLowerCase(), - }; - }, -}); -``` + The `fn` argument is a function that takes the workflow's input and a context object. The context object contains information about the workflow @@ -114,28 +88,7 @@ The power of Hatchet's workflow design comes from connecting tasks into a DAG st -```typescript -const toLower = dag.task({ - name: "to-lower", - fn: (input) => { - return { - TransformedMessage: input.Message.toLowerCase(), - }; - }, -}); - -dag.task({ - name: "reverse", - parents: [toLower], - fn: async (input, ctx) => { - const lower = await ctx.parentOutput(toLower); - return { - Original: input.Message, - Transformed: lower.TransformedMessage.split("").reverse().join(""), - }; - }, -}); -``` + @@ -156,10 +109,9 @@ As shown in the examples above, tasks can access outputs from their parent tasks -```typescript -// Inside a task with parent dependencies -const parentOutput = await ctx.parentOutput(parentTaskName); -``` + + + ```go @@ -185,13 +137,9 @@ You can run workflows directly or enqueue them for asynchronous execution. All t -```typescript -// Run workflow and wait for the result -const result = await simple.run({ Message: "Hello World" }); - -// Enqueue workflow to be executed asynchronously -const runId = await simple.runNoWait({ Message: "Hello World" }); -``` + diff --git a/frontend/docs/pages/home/migration-guide-typescript.mdx b/frontend/docs/pages/home/migration-guide-typescript.mdx index 1bb4df335..c4f67ba49 100644 --- a/frontend/docs/pages/home/migration-guide-typescript.mdx +++ b/frontend/docs/pages/home/migration-guide-typescript.mdx @@ -17,9 +17,13 @@ DAGs are still defined as workflows, but they can now be declared using the `hat +And you can bind tasks to workflows as follows: + + + You can now run work for tasks and workflows by directly interacting with the returned object. - + There are a few important things to note when migrating to the new SDK: diff --git a/frontend/docs/pages/home/v1-sdk-improvements.mdx b/frontend/docs/pages/home/v1-sdk-improvements.mdx index 53b49d1ab..1420aee82 100644 --- a/frontend/docs/pages/home/v1-sdk-improvements.mdx +++ b/frontend/docs/pages/home/v1-sdk-improvements.mdx @@ -1,5 +1,7 @@ import { Callout, Card, Cards, Steps, Tabs } from "nextra/components"; import UniversalTabs from "@/components/UniversalTabs"; +import { snippets } from "@/lib/generated/snippets"; +import { Snippet } from "@/components/code"; ## SDK Improvements in V1 @@ -61,37 +63,17 @@ There are a handful of other new features that will make interfacing with the SD 2. There is now an `on_success_task` on the `Workflow` object, which works just like an on-failure task, but it runs after all upstream tasks in the workflow have _succeeded_. 3. We've exposed feature clients on the Hatchet client to make it easier to interact with and control your environment. -For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them. +For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them. First, fetch some run ids: -```python -hatchet = Hatchet() + -workflows = hatchet.workflows.list() +Then, use those ids to bulk cancel: -assert workflows.rows + -workflow = workflows.rows[0] +Or cancel directly by using filters: -workflow_runs = hatchet.runs.list(workflow_ids=[workflow.metadata.id]) - -workflow_run_ids = [workflow_run.metadata.id for workflow_run in workflow_runs.rows] - -bulk_cancel_by_ids = BulkCancelReplayOpts(ids=workflow_run_ids) - -hatchet.runs.bulk_cancel(bulk_cancel_by_ids) - -bulk_cancel_by_filters = BulkCancelReplayOpts( - filters=RunFilter( - since=datetime.today() - timedelta(days=1), - until=datetime.now(), - statuses=[V1TaskStatus.RUNNING], - workflow_ids=[workflow.metadata.id], - additional_metadata={"key": "value"}, - ) -) - -hatchet.runs.bulk_cancel(bulk_cancel_by_filters) -``` + The `hatchet` client also has clients for `workflows` (declarations), `schedules`, `crons`, `metrics` (i.e. queue depth), `events`, and `workers`. @@ -108,74 +90,27 @@ First and foremost: Many of the changes in the V1 Typescript SDK are motivated b The simplest way to declare a workflow is with `hatchet.task`. -```ts -export const simple = hatchet.task({ - name: "simple", - fn: (input: SimpleInput) => { - return { - TransformedMessage: input.Message.toLowerCase(), - }; - }, -}); -``` + This returns an object that you can use to run the task with fully inferred types! -```ts -const input = { Message: "Hello, World!" }; -// run now -const result = await simple.run(input); -const runReference = await simple.runNoWait(input); - -// or in the future -const runAt = new Date(new Date().setHours(12, 0, 0, 0) + 24 * 60 * 60 * 1000); -const scheduled = await simple.schedule(runAt, input); -const cron = await simple.cron("simple-daily", "0 0 * * *", input); -``` + 2. DAGs got a similar and can be run the same way. DAGs are now a collection of tasks that are composed by calling `.task` on the `Workflow` object. You can declare your types for DAGs. Output types are checked if there is a corresponding task name as a key in the output type. -```ts -type DagInput = { - Message: string; -}; +First, declare the types: -type DagOutput = { - reverse: { - Original: string; - Transformed: string; - }; -}; + -export const dag = hatchet.workflow({ - name: "simple", -}); +Then, create the workflow: -// Next, we declare the tasks bound to the workflow -const toLower = dag.task({ - name: "to-lower", - fn: (input) => { - return { - TransformedMessage: input.Message.toLowerCase(), - }; - }, -}); + -// Next, we declare the tasks bound to the workflow -dag.task({ - name: "reverse", - parents: [toLower], - fn: async (input, ctx) => { - const lower = await ctx.parentOutput(toLower); - return { - Original: input.Message, - Transformed: lower.TransformedMessage.split("").reverse().join(""), - }; - }, -}); -``` +And bind tasks to it: + + 3. Logical organization of SDK features to make it easier to understand and use. @@ -183,25 +118,7 @@ We've exposed feature clients on the Hatchet client to make it easier to interac For example, you can write scripts to find all runs that match certain criteria, and replay or cancel them. -```ts -const hatchet = HatchetClient.init(); -const { runs } = hatchet; - -const allFailedRuns = await runs.list({ - statuses: [WorkflowRunStatus.FAILED], -}); - -// replay by ids -await runs.replay({ ids: allFailedRuns.rows?.map((r) => r.metadata.id) }); - -// or you can run bulk operations with filters directly -await runs.cancel({ - filters: { - since: new Date("2025-03-27"), - additionalMetadata: { user: "123" }, - }, -}); -``` + The `hatchet` client also has clients for `workflows` (declarations), `schedules`, `crons`, `metrics` (i.e. queue depth), `events`, and `workers`. diff --git a/frontend/docs/pages/home/worker-affinity.mdx b/frontend/docs/pages/home/worker-affinity.mdx index 4ec19a55e..232f830a2 100644 --- a/frontend/docs/pages/home/worker-affinity.mdx +++ b/frontend/docs/pages/home/worker-affinity.mdx @@ -122,33 +122,7 @@ Labels can also be set dynamically on workers using the `upsertLabels` method. T -```typescript -const affinity: Workflow = { - id: "dynamic-affinity-workflow", - description: "test", - tasks: [ - { - name: "child-task1", - worker_labels: { - model: { - value: "fancy-vision-model", - required: false, - }, - }, - run: async (ctx) => { - if (ctx.worker.labels().model !== "fancy-vision-model") { - await ctx.worker.upsertLabels({ model: undefined }); - await evictModel(); - await loadNewModel("fancy-vision-model"); - await ctx.worker.upsertLabels({ model: "fancy-vision-model" }); - } - // DO WORK - return { childStep1: "childStep1 results!" }; - }, - }, - ], -}; -``` + diff --git a/frontend/docs/pages/self-hosting/improving-performance.mdx b/frontend/docs/pages/self-hosting/improving-performance.mdx index a4881a810..34d37dd79 100644 --- a/frontend/docs/pages/self-hosting/improving-performance.mdx +++ b/frontend/docs/pages/self-hosting/improving-performance.mdx @@ -31,28 +31,7 @@ There are two main ways to initiate workflows, by sending events to Hatchet and -```typescript -import Hatchet from "@hatchet-dev/typescript-sdk"; - -const hatchet = Hatchet.init(); - -const events = [ - { - payload: { test: "test1" }, - additionalMetadata: { user_id: "user1", source: "test" }, - }, - { - payload: { test: "test2" }, - additionalMetadata: { user_id: "user2", source: "test" }, - }, - { - payload: { test: "test3" }, - additionalMetadata: { user_id: "user3", source: "test" }, - }, -]; - -hatchet.event.bulkPush("user:create", events); -``` + @@ -106,34 +85,9 @@ c.Event().BulkPush( -```typescript -const parentWorkflow: Workflow = { - id: "parent-workflow", - description: "Example workflow for spawning child workflows", - on: { - event: "fanout:create", - }, - steps: [ - { - name: "parent-spawn", - timeout: "10s", - run: async (ctx) => { - const workflowRequests = Array.from({ length: 10 }, (_, i) => ({ - workflow: "child-workflow", - input: { input: `child-input-${i}` }, - options: { additionalMetadata: { childKey: `child-${i}` } }, - })); - - const spawnedWorkflows = await ctx.spawnWorkflows( - workflowRequests, - ); - - return spawnedWorkflows; - }, - }, - ], -}; -``` + diff --git a/sdks/python/examples/blocked_loop_blog/snippets.py b/sdks/python/examples/blocked_loop_blog/snippets.py new file mode 100644 index 000000000..d0e2ef303 --- /dev/null +++ b/sdks/python/examples/blocked_loop_blog/snippets.py @@ -0,0 +1,44 @@ +import asyncio +import time + + +# > Async-safe +async def async_safe() -> int: + await asyncio.sleep(5) + + return 42 + + +# !! + + +# > Blocking +async def blocking() -> int: + time.sleep(5) + + return 42 + + +# !! + + +# > Using to_thread +async def to_thread() -> int: + await asyncio.to_thread(time.sleep, 5) + + return 42 + + +# !! + + +# > Using run_in_executor +async def run_in_executor() -> int: + loop = asyncio.get_event_loop() + + await loop.run_in_executor(None, time.sleep, 5) + + return 42 + + +# !! diff --git a/sdks/python/examples/fastapi_blog/trigger.py b/sdks/python/examples/fastapi_blog/trigger.py new file mode 100644 index 000000000..d23fab6e4 --- /dev/null +++ b/sdks/python/examples/fastapi_blog/trigger.py @@ -0,0 +1,95 @@ +from types import TracebackType + +from fastapi import BackgroundTasks, FastAPI +from pydantic import BaseModel + +from hatchet_sdk import Context, Hatchet + + +class Session: + ## simulate async db session + async def __aenter__(self) -> "Session": + return self + + async def __aexit__( + self, + type_: type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + pass + + +class User: + def __init__(self, id: int, email: str): + self.id = id + self.email = email + + +async def get_user(db: Session, user_id: int) -> User: + return User(user_id, "test@example.com") + + +async def create_user(db: Session) -> User: + return User(1, "test@example.com") + + +async def send_welcome_email(email: str) -> None: + print(f"Sending welcome email to {email}") + + +app = FastAPI() +hatchet = Hatchet() + + +# > FastAPI Background Tasks +async def send_welcome_email_task_bg(user_id: int) -> None: + async with Session() as db: + user = await get_user(db, user_id) + + await send_welcome_email(user.email) + + +@app.post("/user") +async def post__create_user__background_tasks( + background_tasks: BackgroundTasks, +) -> User: + async with Session() as db: + user = await create_user(db) + + background_tasks.add_task(send_welcome_email_task_bg, user.id) + + return user + + +# !! + + +# > Hatchet Task +class WelcomeEmailInput(BaseModel): + user_id: int + + +@hatchet.task(input_validator=WelcomeEmailInput) +async def send_welcome_email_task_hatchet( + input: WelcomeEmailInput, _ctx: Context +) -> None: + async with Session() as db: + user = await get_user(db, input.user_id) + + await send_welcome_email(user.email) + + +@app.post("/user") +async def post__create_user__hatchet() -> User: + async with Session() as db: + user = await create_user(db) + + await send_welcome_email_task_hatchet.aio_run_no_wait( + WelcomeEmailInput(user_id=user.id) + ) + + return user + + +# !! diff --git a/sdks/typescript/src/v1/examples/affinity/affinity-workers.ts b/sdks/typescript/src/v1/examples/affinity/affinity-workers.ts index d5ee17573..9513316bf 100644 --- a/sdks/typescript/src/v1/examples/affinity/affinity-workers.ts +++ b/sdks/typescript/src/v1/examples/affinity/affinity-workers.ts @@ -26,6 +26,7 @@ workflow.task({ // !! +// > Task with labels const childWorkflow = hatchet.workflow({ name: 'child-affinity-workflow', description: 'test', @@ -43,6 +44,7 @@ childWorkflow.task({ return { childStep1: 'childStep1 results!' }; }, }); +// !! childWorkflow.task({ name: 'child-step2', diff --git a/sdks/typescript/src/v1/examples/child_workflows/workflow.ts b/sdks/typescript/src/v1/examples/child_workflows/workflow.ts index cb5e93919..75af50ade 100644 --- a/sdks/typescript/src/v1/examples/child_workflows/workflow.ts +++ b/sdks/typescript/src/v1/examples/child_workflows/workflow.ts @@ -41,3 +41,36 @@ export const parent = hatchet.task({ }, }); // !! + +// > Parent with Single Child +export const parentSingleChild = hatchet.task({ + name: 'parent-single-child', + fn: async () => { + const childRes = await child.run({ N: 1 }); + + return { + Result: childRes.Value, + }; + }, +}); +// !! + +// > Parent with Error Handling +export const withErrorHandling = hatchet.task({ + name: 'parent-error-handling', + fn: async () => { + try { + const childRes = await child.run({ N: 1 }); + + return { + Result: childRes.Value, + }; + } catch (error) { + // decide how to proceed here + return { + Result: -1, + }; + } + }, +}); +// !! diff --git a/sdks/typescript/src/v1/examples/dag/run.ts b/sdks/typescript/src/v1/examples/dag/run.ts index c3ee2aa03..2eee900ba 100644 --- a/sdks/typescript/src/v1/examples/dag/run.ts +++ b/sdks/typescript/src/v1/examples/dag/run.ts @@ -1,9 +1,11 @@ import { dag } from './workflow'; async function main() { + // > Run the workflow const res = await dag.run({ Message: 'hello world', }); + // !! // eslint-disable-next-line no-console console.log(res.reverse.Transformed); diff --git a/sdks/typescript/src/v1/examples/dag/workflow.ts b/sdks/typescript/src/v1/examples/dag/workflow.ts index d46e76c54..365024e3b 100644 --- a/sdks/typescript/src/v1/examples/dag/workflow.ts +++ b/sdks/typescript/src/v1/examples/dag/workflow.ts @@ -1,5 +1,6 @@ import { hatchet } from '../hatchet-client'; +// > Declaring Types type DagInput = { Message: string; }; @@ -10,13 +11,16 @@ type DagOutput = { Transformed: string; }; }; +// !! // > Declaring a DAG Workflow // First, we declare the workflow export const dag = hatchet.workflow({ name: 'simple', }); +// !! +// > First task // Next, we declare the tasks bound to the workflow const toLower = dag.task({ name: 'to-lower', @@ -26,8 +30,9 @@ const toLower = dag.task({ }; }, }); +// !! -// Next, we declare the tasks bound to the workflow +// > Second task with parent dag.task({ name: 'reverse', parents: [toLower], @@ -40,3 +45,17 @@ dag.task({ }, }); // !! + +// > Accessing Parent Outputs +dag.task({ + name: 'task-with-parent-output', + parents: [toLower], + fn: async (input, ctx) => { + const lower = await ctx.parentOutput(toLower); + return { + Original: input.Message, + Transformed: lower.TransformedMessage.split('').reverse().join(''), + }; + }, +}); +// !! diff --git a/sdks/typescript/src/v1/examples/hatchet-client.ts b/sdks/typescript/src/v1/examples/hatchet-client.ts index 86838e17a..21eea1aa0 100644 --- a/sdks/typescript/src/v1/examples/hatchet-client.ts +++ b/sdks/typescript/src/v1/examples/hatchet-client.ts @@ -1,3 +1,6 @@ +// > Create client import { HatchetClient } from '@hatchet/v1'; export const hatchet = HatchetClient.init(); + +// !! diff --git a/sdks/typescript/src/v1/examples/on_event/event.ts b/sdks/typescript/src/v1/examples/on_event/event.ts index fa428d78e..f846e1da0 100644 --- a/sdks/typescript/src/v1/examples/on_event/event.ts +++ b/sdks/typescript/src/v1/examples/on_event/event.ts @@ -9,6 +9,39 @@ async function main() { }); // !! + // > Push an Event with Metadata + const withMetadata = await hatchet.events.push( + 'user:create', + { + test: 'test', + }, + { + additionalMetadata: { + source: 'api', // Arbitrary key-value pair + }, + } + ); + // !! + + // > Bulk push events + const events = [ + { + payload: { test: 'test1' }, + additionalMetadata: { user_id: 'user1', source: 'test' }, + }, + { + payload: { test: 'test2' }, + additionalMetadata: { user_id: 'user2', source: 'test' }, + }, + { + payload: { test: 'test3' }, + additionalMetadata: { user_id: 'user3', source: 'test' }, + }, + ]; + + await hatchet.events.bulkPush('user:create', events); + // !! + // eslint-disable-next-line no-console console.log(res.eventId); } diff --git a/sdks/typescript/src/v1/examples/simple/replay-cancel.ts b/sdks/typescript/src/v1/examples/simple/replay-cancel.ts new file mode 100644 index 000000000..1a4cc2a0d --- /dev/null +++ b/sdks/typescript/src/v1/examples/simple/replay-cancel.ts @@ -0,0 +1,24 @@ +import { V1TaskStatus } from '@hatchet-dev/typescript-sdk/clients/rest/generated/data-contracts'; +import { hatchet } from '../hatchet-client'; + +async function main() { + const { runs } = hatchet; + + // > API operations + // list all failed runs + const allFailedRuns = await runs.list({ + statuses: [V1TaskStatus.FAILED], + }); + + // replay by ids + await runs.replay({ ids: allFailedRuns.rows?.map((r) => r.metadata.id) }); + + // or you can run bulk operations with filters directly + await runs.cancel({ + filters: { + since: new Date('2025-03-27'), + additionalMetadata: { user: '123' }, + }, + }); + // !! +} diff --git a/sdks/typescript/src/v1/examples/simple/run.ts b/sdks/typescript/src/v1/examples/simple/run.ts index c4f32cebb..37196ad9a 100644 --- a/sdks/typescript/src/v1/examples/simple/run.ts +++ b/sdks/typescript/src/v1/examples/simple/run.ts @@ -52,6 +52,19 @@ export async function extra() { }, }); // !! + + // > Run with metadata + const withMetadata = simple.run( + { + Message: 'HeLlO WoRlD', + }, + { + additionalMetadata: { + source: 'api', // Arbitrary key-value pair + }, + } + ); + // !! } if (require.main === module) { diff --git a/sdks/typescript/src/v1/examples/simple/typed-run-methods.ts b/sdks/typescript/src/v1/examples/simple/typed-run-methods.ts new file mode 100644 index 000000000..44ebed3ae --- /dev/null +++ b/sdks/typescript/src/v1/examples/simple/typed-run-methods.ts @@ -0,0 +1,28 @@ +import { simple } from './workflow'; + +async function main() { + // > Run methods + const input = { Message: 'Hello, World!' }; + + // run now + const result = await simple.run(input); + const runReference = await simple.runNoWait(input); + + // or in the future + const runAt = new Date(new Date().setHours(12, 0, 0, 0) + 24 * 60 * 60 * 1000); + const scheduled = await simple.schedule(runAt, input); + const cron = await simple.cron('simple-daily', '0 0 * * *', input); + // !! +} + +async function runFlavors() { + // > Run method flavors + const input = { Message: 'Hello, World!' }; + + // Run workflow and wait for the result + const result = await simple.run(input); + + // Enqueue workflow to be executed asynchronously + const runReference = await simple.runNoWait(input); + // !! +}