diff --git a/examples/python/dag/trigger.py b/examples/python/dag/trigger.py
index 0d4296869..289aed2fe 100644
--- a/examples/python/dag/trigger.py
+++ b/examples/python/dag/trigger.py
@@ -1,3 +1,4 @@
from examples.dag.worker import dag_workflow
+# > Trigger the DAG
dag_workflow.run()
diff --git a/examples/python/dag/worker.py b/examples/python/dag/worker.py
index 8cbb0d30e..d8fbc996e 100644
--- a/examples/python/dag/worker.py
+++ b/examples/python/dag/worker.py
@@ -17,14 +17,20 @@ class RandomSum(BaseModel):
hatchet = Hatchet(debug=True)
+# > Define a DAG
dag_workflow = hatchet.workflow(name="DAGWorkflow")
+# > First task
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
def step1(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
+
+# > Task with parents
+
+
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
@@ -38,6 +44,8 @@ async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
return RandomSum(sum=one + two)
+
+
@dag_workflow.task(parents=[step1, step3])
async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
print(
@@ -52,11 +60,13 @@ async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
}
+# > Declare a worker
def main() -> None:
worker = hatchet.worker("dag-worker", workflows=[dag_workflow])
worker.start()
+
if __name__ == "__main__":
main()
diff --git a/examples/python/events/event.py b/examples/python/events/event.py
index d831c052d..5d930c424 100644
--- a/examples/python/events/event.py
+++ b/examples/python/events/event.py
@@ -1,6 +1,27 @@
-from hatchet_sdk import Hatchet
+from hatchet_sdk import Hatchet, PushEventOptions
+from hatchet_sdk.clients.events import BulkPushEventWithMetadata
hatchet = Hatchet()
# > Event trigger
hatchet.event.push("user:create", {"should_skip": False})
+
+# > Event trigger with metadata
+hatchet.event.push(
+ "user:create",
+ {"userId": "1234", "should_skip": False},
+ options=PushEventOptions(
+ additional_metadata={"source": "api"} # Arbitrary key-value pair
+ ),
+)
+
+# > Bulk event push
+hatchet.event.bulk_push(
+ events=[
+ BulkPushEventWithMetadata(
+ key="user:create",
+ payload={"userId": str(i), "should_skip": False},
+ )
+ for i in range(10)
+ ]
+)
diff --git a/examples/python/fanout/example_child_spawn.py b/examples/python/fanout/example_child_spawn.py
new file mode 100644
index 000000000..cb9ae49ef
--- /dev/null
+++ b/examples/python/fanout/example_child_spawn.py
@@ -0,0 +1,15 @@
+# > Child spawn
+from examples.fanout.worker import ChildInput, child_wf
+
+# 👀 example: run this inside of a parent task to spawn a child
+child_wf.run(
+ ChildInput(a="b"),
+)
+
+# > Error handling
+try:
+ child_wf.run(
+ ChildInput(a="b"),
+ )
+except Exception as e:
+ print(f"Child workflow failed: {e}")
diff --git a/examples/python/fanout/trigger.py b/examples/python/fanout/trigger.py
index 1fcf763b1..aee9a8e26 100644
--- a/examples/python/fanout/trigger.py
+++ b/examples/python/fanout/trigger.py
@@ -1,6 +1,7 @@
import asyncio
+from typing import Any
-from examples.fanout.worker import ParentInput, parent_wf
+from examples.fanout.worker import ChildInput, ParentInput, child_wf, parent_wf
from hatchet_sdk import Hatchet
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
@@ -14,5 +15,18 @@ async def main() -> None:
)
+# > Bulk run children
+async def run_child_workflows(n: int) -> list[dict[str, Any]]:
+ return await child_wf.aio_run_many(
+ [
+ child_wf.create_bulk_run_item(
+ input=ChildInput(a=str(i)),
+ )
+ for i in range(n)
+ ]
+ )
+
+
+
if __name__ == "__main__":
asyncio.run(main())
diff --git a/examples/python/fastapi_blog/worker.py b/examples/python/fastapi_blog/worker.py
new file mode 100644
index 000000000..bafd33d24
--- /dev/null
+++ b/examples/python/fastapi_blog/worker.py
@@ -0,0 +1,29 @@
+# > Worker
+import asyncio
+
+from aiohttp import ClientSession
+
+from hatchet_sdk import Context, EmptyModel, Hatchet
+
+hatchet = Hatchet()
+
+
+async def fetch(session: ClientSession, url: str) -> bool:
+ async with session.get(url) as response:
+ return response.status == 200
+
+
+@hatchet.task(name="Fetch")
+async def hello_from_hatchet(input: EmptyModel, ctx: Context) -> dict[str, int]:
+ num_requests = 10
+
+ async with ClientSession() as session:
+ tasks = [
+ fetch(session, "https://docs.hatchet.run/home") for _ in range(num_requests)
+ ]
+
+ results = await asyncio.gather(*tasks)
+
+ return {"count": results.count(True)}
+
+
diff --git a/examples/python/opentelemetry_instrumentation/worker.py b/examples/python/opentelemetry_instrumentation/worker.py
index 6d3538e3c..6070a4689 100644
--- a/examples/python/opentelemetry_instrumentation/worker.py
+++ b/examples/python/opentelemetry_instrumentation/worker.py
@@ -1,12 +1,15 @@
from examples.opentelemetry_instrumentation.client import hatchet
from examples.opentelemetry_instrumentation.tracer import trace_provider
from hatchet_sdk import Context, EmptyModel
+
+# > Configure the instrumentor
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor
HatchetInstrumentor(
tracer_provider=trace_provider,
).instrument()
+
otel_workflow = hatchet.workflow(
name="OTelWorkflow",
)
diff --git a/examples/python/quickstart/run.py b/examples/python/quickstart/run.py
index 58eaa54ee..53eb9c896 100644
--- a/examples/python/quickstart/run.py
+++ b/examples/python/quickstart/run.py
@@ -4,6 +4,7 @@ from .workflows.first_task import SimpleInput, first_task
async def main() -> None:
+ # > Run a Task
result = await first_task.aio_run(SimpleInput(message="Hello World!"))
print(
diff --git a/examples/python/quickstart/workflows/first_task.py b/examples/python/quickstart/workflows/first_task.py
index a4ee02511..e3f18d549 100644
--- a/examples/python/quickstart/workflows/first_task.py
+++ b/examples/python/quickstart/workflows/first_task.py
@@ -5,6 +5,7 @@ from hatchet_sdk import Context
from ..hatchet_client import hatchet
+# > Simple task
class SimpleInput(BaseModel):
message: str
@@ -19,3 +20,5 @@ def first_task(input: SimpleInput, ctx: Context) -> SimpleOutput:
print("first-task task called")
return SimpleOutput(transformed_message=input.message.lower())
+
+
diff --git a/examples/python/rate_limit/worker.py b/examples/python/rate_limit/worker.py
index 741114fd8..f4f2e2a22 100644
--- a/examples/python/rate_limit/worker.py
+++ b/examples/python/rate_limit/worker.py
@@ -47,6 +47,9 @@ def step_2(input: RateLimitInput, ctx: Context) -> None:
def main() -> None:
+ # > Create a rate limit
+ RATE_LIMIT_KEY = "test-limit"
+
hatchet.rate_limits.put(RATE_LIMIT_KEY, 2, RateLimitDuration.SECOND)
worker = hatchet.worker(
diff --git a/examples/python/setup/client.py b/examples/python/setup/client.py
new file mode 100644
index 000000000..d7f888a7f
--- /dev/null
+++ b/examples/python/setup/client.py
@@ -0,0 +1,4 @@
+# > Create a Hatchet client
+from hatchet_sdk import Hatchet
+
+hatchet = Hatchet()
diff --git a/examples/python/simple/schedule.py b/examples/python/simple/schedule.py
new file mode 100644
index 000000000..7f811e4e1
--- /dev/null
+++ b/examples/python/simple/schedule.py
@@ -0,0 +1,9 @@
+# > Schedule a Task
+from examples.simple.worker import simple
+from datetime import datetime
+
+schedule = simple.schedule([datetime(2025, 3, 14, 15, 9, 26)])
+
+## 👀 do something with the id
+print(schedule.id)
+
diff --git a/examples/python/simple/trigger_with_metadata.py b/examples/python/simple/trigger_with_metadata.py
new file mode 100644
index 000000000..d35a4d517
--- /dev/null
+++ b/examples/python/simple/trigger_with_metadata.py
@@ -0,0 +1,9 @@
+from examples.simple.worker import simple
+from hatchet_sdk import TriggerWorkflowOptions
+
+# > Trigger with metadata
+simple.run(
+ options=TriggerWorkflowOptions(
+ additional_metadata={"source": "api"} # Arbitrary key-value pair
+ )
+)
diff --git a/examples/python/simple/workflow.py b/examples/python/simple/workflow.py
new file mode 100644
index 000000000..b9c1a03b1
--- /dev/null
+++ b/examples/python/simple/workflow.py
@@ -0,0 +1,6 @@
+from hatchet_sdk import Hatchet
+
+hatchet = Hatchet()
+
+# > Define a workflow
+simple = hatchet.workflow(name="example-workflow")
diff --git a/examples/python/trigger_methods/workflow.py b/examples/python/trigger_methods/workflow.py
new file mode 100644
index 000000000..fac7cac82
--- /dev/null
+++ b/examples/python/trigger_methods/workflow.py
@@ -0,0 +1,34 @@
+from hatchet_sdk import Hatchet, Context
+from pydantic import BaseModel
+
+hatchet = Hatchet()
+
+
+# > Define a task
+class HelloInput(BaseModel):
+ name: str
+
+
+class HelloOutput(BaseModel):
+ greeting: str
+
+
+@hatchet.task(input_validator=HelloInput)
+async def say_hello(input: HelloInput, ctx: Context) -> HelloOutput:
+ return HelloOutput(greeting=f"Hello, {input.name}!")
+
+
+
+
+async def main() -> None:
+ # > Sync
+ ref = say_hello.run_no_wait(input=HelloInput(name="World"))
+
+ # > Async
+ ref = await say_hello.aio_run_no_wait(input=HelloInput(name="Async World"))
+
+ # > Result Sync
+ result = ref.result()
+
+ # > Result Async
+ result = await ref.aio_result()
diff --git a/frontend/docs/pages/_setup/_existing/py.mdx b/frontend/docs/pages/_setup/_existing/py.mdx
index 8b1906dc3..42b2741d5 100644
--- a/frontend/docs/pages/_setup/_existing/py.mdx
+++ b/frontend/docs/pages/_setup/_existing/py.mdx
@@ -1,6 +1,8 @@
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
import UniversalTabs from "@/components/UniversalTabs";
import InstallCommand from "@/components/InstallCommand";
+import { Snippet } from "@/components/code";
+import { snippets } from "@/lib/generated/snippets";
#### Cd into your project directory
@@ -33,11 +35,7 @@ touch hatchet-client.py
Add the following code to the file:
-```python copy
-from hatchet_sdk import Hatchet
-
-hatchet = Hatchet()
-```
+
You can now import the Hatchet Client in any file that needs it.
diff --git a/frontend/docs/pages/_setup/_new/py.mdx b/frontend/docs/pages/_setup/_new/py.mdx
index 598c41f66..1f9b2693b 100644
--- a/frontend/docs/pages/_setup/_new/py.mdx
+++ b/frontend/docs/pages/_setup/_new/py.mdx
@@ -1,6 +1,8 @@
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
import UniversalTabs from "@/components/UniversalTabs";
import InstallCommand from "@/components/InstallCommand";
+import { Snippet } from "@/components/code";
+import { snippets } from "@/lib/generated/snippets";
#### Create a new project directory and cd into it
@@ -32,11 +34,7 @@ touch src/hatchet_client.py
Add the following code to the file:
-```python copy
-from hatchet_sdk import Hatchet
-
-hatchet = Hatchet()
-```
+
You can now import the Hatchet Client in any file that needs it.
diff --git a/frontend/docs/pages/blog/mergent-migration-guide.mdx b/frontend/docs/pages/blog/mergent-migration-guide.mdx
index 091c1de7e..61fdb7062 100644
--- a/frontend/docs/pages/blog/mergent-migration-guide.mdx
+++ b/frontend/docs/pages/blog/mergent-migration-guide.mdx
@@ -53,11 +53,7 @@ It is recommended to instantiate a shared Hatchet Client in a separate file as a
Create a new file called `hatchet-client.py` in your project root.
-```python copy
-from hatchet_sdk import Hatchet
-
-hatchet = Hatchet()
-```
+
You can now import the Hatchet Client in any file that needs it.
diff --git a/frontend/docs/pages/blog/task-queue-modern-python.mdx b/frontend/docs/pages/blog/task-queue-modern-python.mdx
index e9fd3c15c..dc6c1fc17 100644
--- a/frontend/docs/pages/blog/task-queue-modern-python.mdx
+++ b/frontend/docs/pages/blog/task-queue-modern-python.mdx
@@ -81,34 +81,7 @@ With Hatchet, all of your tasks can be defined as either sync or async functions
As a simple example, you can easily run a Hatchet task that makes 10 concurrent API calls using `async` / `await` with `asyncio.gather` and `aiohttp`, as opposed to needing to run each one in a blocking fashion as its own task. For example:
-```python
-import asyncio
-
-from aiohttp import ClientSession
-
-from hatchet_sdk import Context, EmptyModel, Hatchet
-
-hatchet = Hatchet()
-
-
-async def fetch(session: ClientSession, url: str) -> bool:
- async with session.get(url) as response:
- return response.status == 200
-
-
-@hatchet.task(name="Fetch")
-async def hello_from_hatchet(input: EmptyModel, ctx: Context) -> int:
- num_requests = 10
-
- async with ClientSession() as session:
- tasks = [
- fetch(session, "https://docs.hatchet.run/home") for _ in range(num_requests)
- ]
-
- results = await asyncio.gather(*tasks)
-
- return results.count(True)
-```
+
With Hatchet, you can perform all of these requests concurrently, in a single task, as opposed to needing to e.g. enqueue a single task per request. This is more performant on your side (as the client), and also puts less pressure on the backing queue, since it needs to handle an order of magnitude fewer requests in this case.
diff --git a/frontend/docs/pages/home/additional-metadata.mdx b/frontend/docs/pages/home/additional-metadata.mdx
index 96dfa54ba..551728732 100644
--- a/frontend/docs/pages/home/additional-metadata.mdx
+++ b/frontend/docs/pages/home/additional-metadata.mdx
@@ -1,5 +1,7 @@
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
import UniversalTabs from "../../components/UniversalTabs";
+import { Snippet } from "@/components/code";
+import { snippets } from "@/lib/generated/snippets";
# Additional Metadata
@@ -17,17 +19,9 @@ You can attach additional metadata when pushing events or triggering task runs u
-```python
-hatchet.event.push(
- "user:create",
- {'userId': '1234'},
- options=PushEventOptions(
- additional_metadata={
- "source": "api" # Arbitrary key-value pair
- }
- )
-)
-```
+
+
+
```typescript
@@ -61,16 +55,11 @@ err := c.Event().Push(
-```python
-simple_task.run(
- SimpleInput(user_id=1234),
- options=TriggerTaskOptions(
- additional_metadata={
- "hello": "moon" # Arbitrary key-value pair
- }
- )
-)
-```
+
+
+
```typescript
diff --git a/frontend/docs/pages/home/child-spawning.mdx b/frontend/docs/pages/home/child-spawning.mdx
index 29418f2ae..d1032c94f 100644
--- a/frontend/docs/pages/home/child-spawning.mdx
+++ b/frontend/docs/pages/home/child-spawning.mdx
@@ -211,10 +211,7 @@ To spawn and run a child task from a parent task, use the appropriate method for
-```python
-# Inside a parent task
-child_result = child_task.run(child_input)
-```
+
@@ -242,23 +239,7 @@ As shown in the examples above, you can spawn multiple child tasks in parallel:
-```python
-# Run multiple child workflows concurrently with asyncio
-import asyncio
-
-async def run_child_workflows(n: int) -> list[dict[str, Any]]:
- return await child.aio_run_many([
- child.create_bulk_run_item(
- options=TriggerWorkflowOptions(
- input=ChildInput(n=i),
- )
- )
- for i in range(n)
- ])
-
-# In your parent task
-child_results = await run_child_workflows(input.n)
-```
+
@@ -317,14 +298,7 @@ When working with child workflows, it's important to properly handle errors. Her
-```python
-try:
- child_result = child.run(ChildInput(a="foobar"))
-except Exception as e:
- # Handle error from child workflow
- print(f"Child workflow failed: {e}")
- # Decide how to proceed - retry, skip, or fail the parent
-```
+
diff --git a/frontend/docs/pages/home/compute/gpu.mdx b/frontend/docs/pages/home/compute/gpu.mdx
index a52776542..463337386 100644
--- a/frontend/docs/pages/home/compute/gpu.mdx
+++ b/frontend/docs/pages/home/compute/gpu.mdx
@@ -72,29 +72,6 @@ FROM ubuntu:22.04
COPY --from=builder /usr/local/cuda-12.2 /usr/local/cuda-12.2
```
-## Usage in Workflows
-
-```python
-from hatchet_sdk import Hatchet, Context
-
-hatchet = Hatchet()
-
-@hatchet.workflow()
-class GPUWorkflow:
- @hatchet.step(
- compute=Compute(
- gpu_kind="a100-80gb",
- gpus=1,
- memory_mb=163840,
- num_replicas=1,
- regions=["ams"]
- )
- )
- def train_model(self, context: Context):
- # GPU-accelerated code here
- pass
-```
-
## Memory and Resource Allocation
### Available Memory per GPU Type
@@ -106,34 +83,6 @@ class GPUWorkflow:
When configuring memory_mb, ensure it's sufficient for both system memory and GPU operations.
-## Region-Specific Configurations
-
-### A100-80GB Example
-
-```python
-# Multi-region A100-80GB configuration
-compute = Compute(
- gpu_kind="a100-80gb",
- gpus=1,
- memory_mb=163840,
- num_replicas=3,
- regions=["ams", "sjc", "syd"] # Replicas will be randomly distributed
-)
-```
-
-### A10 Example
-
-```python
-# Chicago-based A10 configuration
-compute = Compute(
- gpu_kind="a10",
- gpus=1,
- memory_mb=49152,
- num_replicas=2,
- regions=["ord"]
-)
-```
-
## Best Practices
1. **GPU Selection**
diff --git a/frontend/docs/pages/home/dags.mdx b/frontend/docs/pages/home/dags.mdx
index bfd7123e3..4a46d2444 100644
--- a/frontend/docs/pages/home/dags.mdx
+++ b/frontend/docs/pages/home/dags.mdx
@@ -16,13 +16,7 @@ The returned object is an instance of the `Workflow` class, which is the primary
-```python
-from hatchet_sdk import Context, EmptyModel, Hatchet
-
-hatchet = Hatchet(debug=True)
-
-simple = hatchet.workflow(name="SimpleWorkflow")
-```
+
@@ -70,11 +64,7 @@ The `task` method takes a name and a function that defines the task's behavior.
In Python, the `task` method is a decorator, which is used like this to wrap a function:
-```python
-@simple.task()
-def task_1(input: EmptyModel, ctx: Context) -> None:
- print("executed task_1")
-```
+
The function takes two arguments: `input`, which is a Pydantic model, and `ctx`, which is the Hatchet `Context` object. We'll discuss both of these more later.
@@ -119,18 +109,7 @@ The power of Hatchet's workflow design comes from connecting tasks into a DAG st
-```python
-@simple.task()
-def first_task(input: EmptyModel, ctx: Context) -> dict:
- return {"result": "Hello World"}
-
-@simple.task(parents=[first_task])
-def second_task(input: EmptyModel, ctx: Context) -> dict:
- # Access output from parent task
- first_result = ctx.task_output(first_task)
- print(f"First task said: {first_result['result']}")
- return {"final_result": "Completed"}
-```
+
@@ -172,10 +151,9 @@ As shown in the examples above, tasks can access outputs from their parent tasks
-```python
-# Inside a task with parent dependencies
-parent_output = ctx.task_output(parent_task_name)
-```
+
+
+
```typescript
@@ -202,13 +180,7 @@ You can run workflows directly or enqueue them for asynchronous execution. All t
-```python
-# Run workflow and wait for the result
-result = simple.run(input_data)
-
-# Enqueue workflow to be executed asynchronously
-run_id = simple.run_no_wait(input_data)
-```
+
diff --git a/frontend/docs/pages/home/opentelemetry.mdx b/frontend/docs/pages/home/opentelemetry.mdx
index 6cb88d835..314a22d93 100644
--- a/frontend/docs/pages/home/opentelemetry.mdx
+++ b/frontend/docs/pages/home/opentelemetry.mdx
@@ -18,12 +18,12 @@ Hatchet's SDK provides an instrumentor that auto-instruments Hatchet code if you
First, install the `otel` extra with (e.g.) `pip install hatchet-sdk[otel]`. Then, import the instrumentor:
-```python
-from path.to.your.trace.provider import trace_provider
-from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor
-
-HatchetInstrumentor(tracer_provider=trace_provider).instrument()
-```
+
You bring your own trace provider and plug it into the `HatchetInstrumentor`, call `instrument`, and that's it!
diff --git a/frontend/docs/pages/home/rate-limits.mdx b/frontend/docs/pages/home/rate-limits.mdx
index e952ac073..575658586 100644
--- a/frontend/docs/pages/home/rate-limits.mdx
+++ b/frontend/docs/pages/home/rate-limits.mdx
@@ -87,11 +87,7 @@ Define the static rate limits that can be consumed by any step run across all wo
-```python
-RATE_LIMIT_KEY = "test-limit"
-
-hatchet.rate_limits.put(RATE_LIMIT_KEY, 10, RateLimitDuration.MINUTE)
-```
+
diff --git a/frontend/docs/pages/home/run-no-wait.mdx b/frontend/docs/pages/home/run-no-wait.mdx
index 78c919fca..b8d527f5d 100644
--- a/frontend/docs/pages/home/run-no-wait.mdx
+++ b/frontend/docs/pages/home/run-no-wait.mdx
@@ -16,19 +16,18 @@ Some example use cases for fire-and-forget style tasks might be:
+
+ If we have the following workflow:
+
+
+
You can use your `Workflow` object to run a task and "forget" it by calling the `run_no_wait` method. This method enqueue a task run and return a `WorkflowRunRef`, a reference to that run, without waiting for the result.
-```python
-from src.workflows import my_workflow, MyWorkflowInputModel
-
-ref = my_workflow.run_no_wait(MyWorkflowInputModel(foo="bar"))
-```
+
You can also `await` the result of `aio_run_no_wait`:
-```python
-ref = await my_task.aio_run_no_wait(input=MyTaskInputModel(foo="bar"))
-```
+
Note that the type of `input` here is a Pydantic model that matches the input schema of your task.
@@ -54,15 +53,11 @@ Often it is useful to subscribe to the results of a task at a later time. The `r
Use `ref.result()` to block until the result is available:
-```python
-result = ref.result()
-```
+
or await `aio_result`:
-```python
-result = await ref.aio_result()
-```
+
diff --git a/frontend/docs/pages/home/running-your-task.mdx b/frontend/docs/pages/home/running-your-task.mdx
index 8accdeb44..c4d6c6e56 100644
--- a/frontend/docs/pages/home/running-your-task.mdx
+++ b/frontend/docs/pages/home/running-your-task.mdx
@@ -10,13 +10,7 @@ With your task defined, you can import it wherever you need to use it and invoke
-```python
-from .task import simple
-
-simple.run(
- input=SimpleInput(Message="Hello, World!"),
-)
-```
+
diff --git a/frontend/docs/pages/home/scheduled-runs.mdx b/frontend/docs/pages/home/scheduled-runs.mdx
index 512b7e77b..d26b2206b 100644
--- a/frontend/docs/pages/home/scheduled-runs.mdx
+++ b/frontend/docs/pages/home/scheduled-runs.mdx
@@ -35,14 +35,7 @@ Here's an example of creating a scheduled run to trigger a task tomorrow at noon
-```python
-schedule = simple.schedule([datetime(2025, 3, 14, 15, 9, 26)])
-
-## do something with the id
-
-print(schedule.id)
-
-```
+
diff --git a/frontend/docs/pages/home/workers.mdx b/frontend/docs/pages/home/workers.mdx
index 0f93f8c43..e5af8a52d 100644
--- a/frontend/docs/pages/home/workers.mdx
+++ b/frontend/docs/pages/home/workers.mdx
@@ -19,14 +19,7 @@ Declare a worker by calling the `worker` method on the Hatchet client. The `work
- ```python
- def main() -> None:
- worker = hatchet.worker("test-worker", workflows=[simple])
- worker.start()
-
- if __name__ == "__main__":
- main()
- ```
+
If you are using Windows, attempting to run a worker will result in an error:
diff --git a/frontend/docs/pages/home/your-first-task.mdx b/frontend/docs/pages/home/your-first-task.mdx
index 0e70f5e41..b1e5daf24 100644
--- a/frontend/docs/pages/home/your-first-task.mdx
+++ b/frontend/docs/pages/home/your-first-task.mdx
@@ -18,22 +18,7 @@ The returned object is an instance of the `Task` class, which is the primary int
-```python
-from hatchet_sdk import Context, EmptyModel, Hatchet
-from pydantic import BaseModel
-
-hatchet = Hatchet(debug=True)
-
-class SimpleInput(BaseModel):
- message: str
-
-@hatchet.task(name="SimpleTask", input_validator=SimpleInput)
-def simple(input: SimpleInput, ctx: Context) -> dict[str, str]:
- return {
- "transformed_message": input.message.lower(),
- }
-```
-
+
@@ -56,9 +41,7 @@ With your task defined, you can import it wherever you need to use it and invoke
-```python
-simple.run(SimpleInput(message="HeLlO WoRlD"))
-```
+
diff --git a/frontend/docs/pages/self-hosting/improving-performance.mdx b/frontend/docs/pages/self-hosting/improving-performance.mdx
index 866617f88..a4881a810 100644
--- a/frontend/docs/pages/self-hosting/improving-performance.mdx
+++ b/frontend/docs/pages/self-hosting/improving-performance.mdx
@@ -26,31 +26,7 @@ There are two main ways to initiate workflows, by sending events to Hatchet and
-```python
-from hatchet_sdk import Hatchet
-
-hatchet = Hatchet()
-
-events: List[BulkPushEventWithMetadata] = [
- {
- "key": "user:create",
- "payload": {"message": "This is event 1"},
- "additional_metadata": {"source": "test", "user_id": "user123"},
- },
- {
- "key": "user:create",
- "payload": {"message": "This is event 2"},
- "additional_metadata": {"source": "test", "user_id": "user456"},
- },
- {
- "key": "user:create",
- "payload": {"message": "This is event 3"},
- "additional_metadata": {"source": "test", "user_id": "user789"},
- },
-]
-
-result = hatchet.client.event.bulk_push(events)
-```
+
diff --git a/sdks/python/examples/dag/trigger.py b/sdks/python/examples/dag/trigger.py
index 0d4296869..89bbdd835 100644
--- a/sdks/python/examples/dag/trigger.py
+++ b/sdks/python/examples/dag/trigger.py
@@ -1,3 +1,5 @@
from examples.dag.worker import dag_workflow
+# > Trigger the DAG
dag_workflow.run()
+# !!
diff --git a/sdks/python/examples/dag/worker.py b/sdks/python/examples/dag/worker.py
index 8cbb0d30e..5d8f56d35 100644
--- a/sdks/python/examples/dag/worker.py
+++ b/sdks/python/examples/dag/worker.py
@@ -17,14 +17,22 @@ class RandomSum(BaseModel):
hatchet = Hatchet(debug=True)
+# > Define a DAG
dag_workflow = hatchet.workflow(name="DAGWorkflow")
+# !!
+# > First task
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
def step1(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
+# !!
+
+# > Task with parents
+
+
@dag_workflow.task(execution_timeout=timedelta(seconds=5))
async def step2(input: EmptyModel, ctx: Context) -> StepOutput:
return StepOutput(random_number=random.randint(1, 100))
@@ -38,6 +46,9 @@ async def step3(input: EmptyModel, ctx: Context) -> RandomSum:
return RandomSum(sum=one + two)
+# !!
+
+
@dag_workflow.task(parents=[step1, step3])
async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
print(
@@ -52,11 +63,14 @@ async def step4(input: EmptyModel, ctx: Context) -> dict[str, str]:
}
+# > Declare a worker
def main() -> None:
worker = hatchet.worker("dag-worker", workflows=[dag_workflow])
worker.start()
+# !!
+
if __name__ == "__main__":
main()
diff --git a/sdks/python/examples/events/event.py b/sdks/python/examples/events/event.py
index 35244ae3c..60c50db69 100644
--- a/sdks/python/examples/events/event.py
+++ b/sdks/python/examples/events/event.py
@@ -1,7 +1,30 @@
-from hatchet_sdk import Hatchet
+from hatchet_sdk import Hatchet, PushEventOptions
+from hatchet_sdk.clients.events import BulkPushEventWithMetadata
hatchet = Hatchet()
# > Event trigger
hatchet.event.push("user:create", {"should_skip": False})
# !!
+
+# > Event trigger with metadata
+hatchet.event.push(
+ "user:create",
+ {"userId": "1234", "should_skip": False},
+ options=PushEventOptions(
+ additional_metadata={"source": "api"} # Arbitrary key-value pair
+ ),
+)
+# !!
+
+# > Bulk event push
+hatchet.event.bulk_push(
+ events=[
+ BulkPushEventWithMetadata(
+ key="user:create",
+ payload={"userId": str(i), "should_skip": False},
+ )
+ for i in range(10)
+ ]
+)
+# !!
diff --git a/sdks/python/examples/fanout/example_child_spawn.py b/sdks/python/examples/fanout/example_child_spawn.py
new file mode 100644
index 000000000..b7c5cb2a4
--- /dev/null
+++ b/sdks/python/examples/fanout/example_child_spawn.py
@@ -0,0 +1,17 @@
+# > Child spawn
+from examples.fanout.worker import ChildInput, child_wf
+
+# 👀 example: run this inside of a parent task to spawn a child
+child_wf.run(
+ ChildInput(a="b"),
+)
+# !!
+
+# > Error handling
+try:
+ child_wf.run(
+ ChildInput(a="b"),
+ )
+except Exception as e:
+ print(f"Child workflow failed: {e}")
+# !!
diff --git a/sdks/python/examples/fanout/trigger.py b/sdks/python/examples/fanout/trigger.py
index 1fcf763b1..65f3c101e 100644
--- a/sdks/python/examples/fanout/trigger.py
+++ b/sdks/python/examples/fanout/trigger.py
@@ -1,6 +1,7 @@
import asyncio
+from typing import Any
-from examples.fanout.worker import ParentInput, parent_wf
+from examples.fanout.worker import ChildInput, ParentInput, child_wf, parent_wf
from hatchet_sdk import Hatchet
from hatchet_sdk.clients.admin import TriggerWorkflowOptions
@@ -14,5 +15,19 @@ async def main() -> None:
)
+# > Bulk run children
+async def run_child_workflows(n: int) -> list[dict[str, Any]]:
+ return await child_wf.aio_run_many(
+ [
+ child_wf.create_bulk_run_item(
+ input=ChildInput(a=str(i)),
+ )
+ for i in range(n)
+ ]
+ )
+
+
+# !!
+
if __name__ == "__main__":
asyncio.run(main())
diff --git a/sdks/python/examples/fastapi_blog/worker.py b/sdks/python/examples/fastapi_blog/worker.py
new file mode 100644
index 000000000..672598102
--- /dev/null
+++ b/sdks/python/examples/fastapi_blog/worker.py
@@ -0,0 +1,30 @@
+# > Worker
+import asyncio
+
+from aiohttp import ClientSession
+
+from hatchet_sdk import Context, EmptyModel, Hatchet
+
+hatchet = Hatchet()
+
+
+async def fetch(session: ClientSession, url: str) -> bool:
+ async with session.get(url) as response:
+ return response.status == 200
+
+
+@hatchet.task(name="Fetch")
+async def hello_from_hatchet(input: EmptyModel, ctx: Context) -> dict[str, int]:
+ num_requests = 10
+
+ async with ClientSession() as session:
+ tasks = [
+ fetch(session, "https://docs.hatchet.run/home") for _ in range(num_requests)
+ ]
+
+ results = await asyncio.gather(*tasks)
+
+ return {"count": results.count(True)}
+
+
+# !!
diff --git a/sdks/python/examples/opentelemetry_instrumentation/worker.py b/sdks/python/examples/opentelemetry_instrumentation/worker.py
index 6d3538e3c..3e27d4cbe 100644
--- a/sdks/python/examples/opentelemetry_instrumentation/worker.py
+++ b/sdks/python/examples/opentelemetry_instrumentation/worker.py
@@ -1,12 +1,16 @@
from examples.opentelemetry_instrumentation.client import hatchet
from examples.opentelemetry_instrumentation.tracer import trace_provider
from hatchet_sdk import Context, EmptyModel
+
+# > Configure the instrumentor
from hatchet_sdk.opentelemetry.instrumentor import HatchetInstrumentor
HatchetInstrumentor(
tracer_provider=trace_provider,
).instrument()
+# !!
+
otel_workflow = hatchet.workflow(
name="OTelWorkflow",
)
diff --git a/sdks/python/examples/quickstart/run.py b/sdks/python/examples/quickstart/run.py
index 58eaa54ee..c7c144222 100644
--- a/sdks/python/examples/quickstart/run.py
+++ b/sdks/python/examples/quickstart/run.py
@@ -4,7 +4,9 @@ from .workflows.first_task import SimpleInput, first_task
async def main() -> None:
+ # > Run a Task
result = await first_task.aio_run(SimpleInput(message="Hello World!"))
+ # !!
print(
"Finished running task, and got the transformed message! The transformed message is:",
diff --git a/sdks/python/examples/quickstart/workflows/first_task.py b/sdks/python/examples/quickstart/workflows/first_task.py
index a4ee02511..08a965397 100644
--- a/sdks/python/examples/quickstart/workflows/first_task.py
+++ b/sdks/python/examples/quickstart/workflows/first_task.py
@@ -5,6 +5,7 @@ from hatchet_sdk import Context
from ..hatchet_client import hatchet
+# > Simple task
class SimpleInput(BaseModel):
message: str
@@ -19,3 +20,6 @@ def first_task(input: SimpleInput, ctx: Context) -> SimpleOutput:
print("first-task task called")
return SimpleOutput(transformed_message=input.message.lower())
+
+
+# !!
diff --git a/sdks/python/examples/rate_limit/worker.py b/sdks/python/examples/rate_limit/worker.py
index 929b27920..db0c8f41a 100644
--- a/sdks/python/examples/rate_limit/worker.py
+++ b/sdks/python/examples/rate_limit/worker.py
@@ -50,7 +50,11 @@ def step_2(input: RateLimitInput, ctx: Context) -> None:
def main() -> None:
+ # > Create a rate limit
+ RATE_LIMIT_KEY = "test-limit"
+
hatchet.rate_limits.put(RATE_LIMIT_KEY, 2, RateLimitDuration.SECOND)
+ # !!
worker = hatchet.worker(
"rate-limit-worker", slots=10, workflows=[rate_limit_workflow]
diff --git a/sdks/python/examples/setup/client.py b/sdks/python/examples/setup/client.py
new file mode 100644
index 000000000..16a2ec9be
--- /dev/null
+++ b/sdks/python/examples/setup/client.py
@@ -0,0 +1,5 @@
+# > Create a Hatchet client
+from hatchet_sdk import Hatchet
+
+hatchet = Hatchet()
+# !!
diff --git a/sdks/python/examples/simple/schedule.py b/sdks/python/examples/simple/schedule.py
new file mode 100644
index 000000000..84085dcae
--- /dev/null
+++ b/sdks/python/examples/simple/schedule.py
@@ -0,0 +1,11 @@
+# > Schedule a Task
+from datetime import datetime
+
+from examples.simple.worker import simple
+
+schedule = simple.schedule(datetime(2025, 3, 14, 15, 9, 26))
+
+## 👀 do something with the id
+print(schedule.id)
+
+# !!
diff --git a/sdks/python/examples/simple/trigger_with_metadata.py b/sdks/python/examples/simple/trigger_with_metadata.py
new file mode 100644
index 000000000..8b270d6ee
--- /dev/null
+++ b/sdks/python/examples/simple/trigger_with_metadata.py
@@ -0,0 +1,10 @@
+from examples.simple.worker import simple
+from hatchet_sdk import TriggerWorkflowOptions
+
+# > Trigger with metadata
+simple.run(
+ options=TriggerWorkflowOptions(
+ additional_metadata={"source": "api"} # Arbitrary key-value pair
+ )
+)
+# !!
diff --git a/sdks/python/examples/simple/workflow.py b/sdks/python/examples/simple/workflow.py
new file mode 100644
index 000000000..e5c775541
--- /dev/null
+++ b/sdks/python/examples/simple/workflow.py
@@ -0,0 +1,7 @@
+from hatchet_sdk import Hatchet
+
+hatchet = Hatchet()
+
+# > Define a workflow
+simple = hatchet.workflow(name="example-workflow")
+# !!
diff --git a/sdks/python/examples/trigger_methods/workflow.py b/sdks/python/examples/trigger_methods/workflow.py
new file mode 100644
index 000000000..090215510
--- /dev/null
+++ b/sdks/python/examples/trigger_methods/workflow.py
@@ -0,0 +1,40 @@
+from pydantic import BaseModel
+
+from hatchet_sdk import Context, Hatchet
+
+hatchet = Hatchet()
+
+
+# > Define a task
+class HelloInput(BaseModel):
+ name: str
+
+
+class HelloOutput(BaseModel):
+ greeting: str
+
+
+@hatchet.task(input_validator=HelloInput)
+async def say_hello(input: HelloInput, ctx: Context) -> HelloOutput:
+ return HelloOutput(greeting=f"Hello, {input.name}!")
+
+
+# !!
+
+
+async def main() -> None:
+ # > Sync
+ ref = say_hello.run_no_wait(input=HelloInput(name="World"))
+ # !!
+
+ # > Async
+ ref = await say_hello.aio_run_no_wait(input=HelloInput(name="Async World"))
+ # !!
+
+ # > Result Sync
+ result = ref.result()
+ # !!
+
+ # > Result Async
+ result = await ref.aio_result()
+ # !!