docs: update python sdk (#550)

* docs: update python sdk

* Apply suggestions from code review

Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com>

---------

Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com>
This commit is contained in:
abelanger5
2024-06-03 12:32:10 -04:00
committed by GitHub
parent 207f1bbb08
commit c920a9213e
4 changed files with 157 additions and 24 deletions

View File

@@ -3,5 +3,6 @@
"creating-a-workflow": "Creating a Workflow",
"creating-a-worker": "Creating a Worker",
"pushing-events": "Pushing Events",
"api": "API"
"spawning-workflow-runs": "Spawning Workflow Runs",
"api": "REST API"
}

View File

@@ -61,11 +61,32 @@ class MyWorkflow:
Future steps can access this output by calling `context.step_output("<step>")`. In this example, a future step could access this data via `context.step_output("step1")`.
## Sync vs Async Steps
You can define async steps by using the `async` keyword on the step method. For example:
```py
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
@hatchet.step()
async def step1(self, context : Context):
print("executed step1", context.workflow_input())
await asyncio.sleep(1)
pass
```
When steps are run with `async`, they will be executed in the main thread using the default asyncio event loop. If an asyncio event loop is not set when the worker is started with `worker.start`, one will be created. If you are using `async` steps, **make sure you are not blocking the event loop**. If you are running blocking code, decorating the method with only `def` will execute it in a separate thread. You can also call `asyncio.run` within your `def` method if you're in need of using `async/await` within a `def` method, but please ensure that you are not using a third-party dependency which expects a shared event loop.
As a quick rule of thumb:
- If you are using `async/await`, use `async def` and ensure you are not blocking the event loop.
- If you are running blocking code, use `def` and ensure you are not using third-party dependencies that expect a shared event loop.
## Cron Schedules
You can declare a cron schedule by passing `on_crons` to the `hatchet.workflow` decorator. For example, to trigger a workflow every 5 minutes, you can do the following:
```go
```py
from hatchet_sdk import Hatchet
hatchet = Hatchet()
@@ -98,14 +119,14 @@ def step1(self, context):
## Concurrency Limits and Fairness
> **Note:** this feature is currently in beta, and currently only supports a concurrency strategy which terminates the oldest running workflow run to make room for the new one. This will be expanded in the future to support other strategies.\*\*
By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by decorating a custom function with `hatchet.concurrency`. This function returns a **concurrency group key**, which is a string that is used to group concurrent executions. **Note that this function should not also be used as a `hatchet.step`.** For example, the following workflow will only allow 5 concurrent executions for any workflow execution of `ConcurrencyDemoWorkflow`, since the key is statically set to `concurrency-key`:
```py
from hatchet_sdk import ConcurrencyLimitStrategy
@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
@hatchet.concurrency(max_runs=5)
@hatchet.concurrency(max_runs=5, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN)
def concurrency(self, context) -> str:
return "concurrency-key"
@@ -122,7 +143,7 @@ You can use the custom concurrency function to enforce per-user concurrency limi
```py
@hatchet.workflow(on_events=["concurrency-test"])
class ConcurrencyDemoWorkflow:
@hatchet.concurrency(max_runs=1)
@hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN)
def concurrency(self, context) -> str:
return context.workflow_input()["user_id"]
@@ -138,31 +159,23 @@ This same approach can be used for:
- Limiting data or document ingestion by setting an input hash or on-file key.
- Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions.
## Termination, Sleeps and Threads
### Use-Case: Cancelling In-Progress Workflows
Hatchet spawns a new thread per step, which means that there is a risk of thread leakage if your code is busy outside of the python interpreter. For example, this can happen if you call `time.sleep` within a step. To avoid this, you can use `context.sleep` instead. For example:
You can use the custom concurrency function to cancel in-progress workflows. For example, the following workflow will cancel any in-progress workflows for a user if a new event is triggered:
```py
@hatchet.step()
def step1(self, context):
context.sleep(5)
pass
```
@hatchet.workflow(on_events=["concurrency-test"])
class Concurrency
@hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS)
def concurrency(self, context) -> str:
return context.workflow_input()["user_id"]
You can also determine whether to exit the thread by calling `context.done()` within a step, which returns true if the step has been cancelled. For example:
```py
@hatchet.step(timeout="2s")
def step1(self, context):
while True:
# this step will gracefully exit after 2 seconds
if context.done():
break
@hatchet.step()
def step1(self, context):
print("executed step1")
pass
```
If you need control over cancellation, you can also use `context.cancel()` to cancel the current step, though this is not recommended.
## Logging
Hatchet comes with a built-in logging view where you can push debug logs from your workflows. To use this, you can use the `context.log` method. For example:
@@ -182,3 +195,21 @@ class LoggingWorkflow:
```
Each step is currently limited to 1000 log lines.
## Termination, Sleeps and Threads
Hatchet spawns a new thread per step, which means that there is a risk of thread leakage if your code is busy outside of the python interpreter. For example, this can happen if you call `time.sleep` within a step. Consider using an `async` method with `await asyncio.sleep` instead.
You can also determine whether to exit the thread by calling `context.done()` within a step, which returns true if the step has been cancelled. For example:
```py
@hatchet.step(timeout="2s")
def step1(self, context):
while True:
# this step will gracefully exit after 2 seconds
if context.done():
break
pass
```
If you need control over cancellation, you can also use `context.cancel()` to cancel the current step, though this is not recommended.

View File

@@ -22,6 +22,10 @@ Navigate to your Hatchet dashboard and navigate to your settings tab. You should
HATCHET_CLIENT_TOKEN="<your-api-key>"
```
## Quickstart
To get started with the Hatchet SDK, check out the quickstart repo [here](https://github.com/hatchet-dev/hatchet-python-quickstart).
## Advanced
### TLS with self-signed server certificates

View File

@@ -0,0 +1,97 @@
# Spawning Workflow Runs
There are two ways to run a workflow directly in Hatchet (without an event or schedule): using `admin.run_workflow` or `spawn_workflow` to create a child workflow within a step.
## `admin.run_workflow`
The `admin.run_workflow` method (or `admin.aio.run_workflow` for an async method) is used to run a workflow directly which isn't meant to be a child workflow. This method returns a reference to the workflow which can be used for streaming results from the workflow or awaiting a result. For example, assuming that you have a workflow named `ManualTriggerWorkflow`:
```py
workflow_run = hatchet.admin.run_workflow(
"ManualTriggerWorkflow",
{"test": "test"},
options={"additional_metadata": {"hello": "there"}},
)
print(f"spawned workflow run: {workflow_run.workflow_run_id}")
listener = workflow_run.stream()
async for event in listener:
print(event.type, event.payload)
result = await workflow_run.result()
print("result: " + json.dumps(result, indent=2))
```
You can retrieve a reference to this workflow from a different method (for example, a handler) by using the `workflow_run_id` attribute of the `workflow_run` object. For example, using FastAPI to create a trigger and print the results of the workflow run:
```py
@app.get("/trigger")
async def trigger():
workflow_run = await hatchet.admin.aio.run_workflow(
"ManualTriggerWorkflow",
{"test": "test"},
options={"additional_metadata": {"hello": "there"}},
)
return {"workflow_run_id": workflow_run.workflow_run_id}
@app.get("/workflow-run/{workflow_run_id}")
async def stream(workflow_run_id: str):
workflow_run = await hatchet.admin.aio.get_workflow_run(workflow_run_id)
listener = workflow_run.stream()
# You can listen for all events for a workflow run
async for event in listener:
print(event.type, event.payload)
# Or await results from that run directly
result = await workflow_run.result()
print("result: " + json.dumps(result, indent=2))
```
## `spawn_workflow`
The `spawn_workflow` method is used to spawn a child workflow within a step. This method returns a reference to the workflow which can be used for streaming results from the workflow or awaiting a result. For example:
```py
@hatchet.workflow(on_events=["parent:create"])
class Parent:
@hatchet.step(timeout="5m")
async def spawn(self, context: Context):
print("spawning child")
results = []
for i in range(100):
results.append(
(
await context.aio.spawn_workflow(
"Child", {"a": str(i)}, key=f"child{i}"
)
).result()
)
result = await asyncio.gather(*results)
print(f"results {result}")
return {"results": result}
@hatchet.workflow(on_events=["child:create"], schedule_timeout="5s")
class Child:
@hatchet.step()
async def process(self, context: Context):
a = context.workflow_input()["a"]
print(f"child process {a}")
return {"status": "success " + a}
@hatchet.step()
async def process2(self, context: Context):
print("child process2")
return {"status2": "success"}
```