From c920a9213e51b2b7552d3a52abcbb7cf81ba7cd5 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Mon, 3 Jun 2024 12:32:10 -0400 Subject: [PATCH] docs: update python sdk (#550) * docs: update python sdk * Apply suggestions from code review Co-authored-by: Gabe Ruttner --------- Co-authored-by: Gabe Ruttner --- .../docs/pages/sdks/python-sdk/_meta.json | 3 +- .../sdks/python-sdk/creating-a-workflow.mdx | 77 ++++++++++----- frontend/docs/pages/sdks/python-sdk/setup.mdx | 4 + .../python-sdk/spawning-workflow-runs.mdx | 97 +++++++++++++++++++ 4 files changed, 157 insertions(+), 24 deletions(-) create mode 100644 frontend/docs/pages/sdks/python-sdk/spawning-workflow-runs.mdx diff --git a/frontend/docs/pages/sdks/python-sdk/_meta.json b/frontend/docs/pages/sdks/python-sdk/_meta.json index 0c7d2de21..1c1344db9 100644 --- a/frontend/docs/pages/sdks/python-sdk/_meta.json +++ b/frontend/docs/pages/sdks/python-sdk/_meta.json @@ -3,5 +3,6 @@ "creating-a-workflow": "Creating a Workflow", "creating-a-worker": "Creating a Worker", "pushing-events": "Pushing Events", - "api": "API" + "spawning-workflow-runs": "Spawning Workflow Runs", + "api": "REST API" } diff --git a/frontend/docs/pages/sdks/python-sdk/creating-a-workflow.mdx b/frontend/docs/pages/sdks/python-sdk/creating-a-workflow.mdx index 001a6e018..f42bdaa12 100644 --- a/frontend/docs/pages/sdks/python-sdk/creating-a-workflow.mdx +++ b/frontend/docs/pages/sdks/python-sdk/creating-a-workflow.mdx @@ -61,11 +61,32 @@ class MyWorkflow: Future steps can access this output by calling `context.step_output("")`. In this example, a future step could access this data via `context.step_output("step1")`. +## Sync vs Async Steps + +You can define async steps by using the `async` keyword on the step method. For example: + +```py +@hatchet.workflow(on_events=["user:create"]) +class MyWorkflow: + @hatchet.step() + async def step1(self, context : Context): + print("executed step1", context.workflow_input()) + await asyncio.sleep(1) + pass +``` + +When steps are run with `async`, they will be executed in the main thread using the default asyncio event loop. If an asyncio event loop is not set when the worker is started with `worker.start`, one will be created. If you are using `async` steps, **make sure you are not blocking the event loop**. If you are running blocking code, decorating the method with only `def` will execute it in a separate thread. You can also call `asyncio.run` within your `def` method if you're in need of using `async/await` within a `def` method, but please ensure that you are not using a third-party dependency which expects a shared event loop. + +As a quick rule of thumb: + +- If you are using `async/await`, use `async def` and ensure you are not blocking the event loop. +- If you are running blocking code, use `def` and ensure you are not using third-party dependencies that expect a shared event loop. + ## Cron Schedules You can declare a cron schedule by passing `on_crons` to the `hatchet.workflow` decorator. For example, to trigger a workflow every 5 minutes, you can do the following: -```go +```py from hatchet_sdk import Hatchet hatchet = Hatchet() @@ -98,14 +119,14 @@ def step1(self, context): ## Concurrency Limits and Fairness -> **Note:** this feature is currently in beta, and currently only supports a concurrency strategy which terminates the oldest running workflow run to make room for the new one. This will be expanded in the future to support other strategies.\*\* - By default, there are no concurrency limits for Hatchet workflows. Workflow runs are immediately executed as soon as they are triggered (by an event, cron, or schedule). However, you can enforce a concurrency limit by decorating a custom function with `hatchet.concurrency`. This function returns a **concurrency group key**, which is a string that is used to group concurrent executions. **Note that this function should not also be used as a `hatchet.step`.** For example, the following workflow will only allow 5 concurrent executions for any workflow execution of `ConcurrencyDemoWorkflow`, since the key is statically set to `concurrency-key`: ```py +from hatchet_sdk import ConcurrencyLimitStrategy + @hatchet.workflow(on_events=["concurrency-test"]) class ConcurrencyDemoWorkflow: - @hatchet.concurrency(max_runs=5) + @hatchet.concurrency(max_runs=5, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN) def concurrency(self, context) -> str: return "concurrency-key" @@ -122,7 +143,7 @@ You can use the custom concurrency function to enforce per-user concurrency limi ```py @hatchet.workflow(on_events=["concurrency-test"]) class ConcurrencyDemoWorkflow: - @hatchet.concurrency(max_runs=1) + @hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN) def concurrency(self, context) -> str: return context.workflow_input()["user_id"] @@ -138,31 +159,23 @@ This same approach can be used for: - Limiting data or document ingestion by setting an input hash or on-file key. - Rudimentary fairness rules by limiting groups per tenant to a certain number of concurrent executions. -## Termination, Sleeps and Threads +### Use-Case: Cancelling In-Progress Workflows -Hatchet spawns a new thread per step, which means that there is a risk of thread leakage if your code is busy outside of the python interpreter. For example, this can happen if you call `time.sleep` within a step. To avoid this, you can use `context.sleep` instead. For example: +You can use the custom concurrency function to cancel in-progress workflows. For example, the following workflow will cancel any in-progress workflows for a user if a new event is triggered: ```py -@hatchet.step() -def step1(self, context): - context.sleep(5) - pass -``` +@hatchet.workflow(on_events=["concurrency-test"]) +class Concurrency + @hatchet.concurrency(max_runs=1, limit_strategy=ConcurrencyLimitStrategy.CANCEL_IN_PROGRESS) + def concurrency(self, context) -> str: + return context.workflow_input()["user_id"] -You can also determine whether to exit the thread by calling `context.done()` within a step, which returns true if the step has been cancelled. For example: - -```py -@hatchet.step(timeout="2s") -def step1(self, context): - while True: - # this step will gracefully exit after 2 seconds - if context.done(): - break + @hatchet.step() + def step1(self, context): + print("executed step1") pass ``` -If you need control over cancellation, you can also use `context.cancel()` to cancel the current step, though this is not recommended. - ## Logging Hatchet comes with a built-in logging view where you can push debug logs from your workflows. To use this, you can use the `context.log` method. For example: @@ -182,3 +195,21 @@ class LoggingWorkflow: ``` Each step is currently limited to 1000 log lines. + +## Termination, Sleeps and Threads + +Hatchet spawns a new thread per step, which means that there is a risk of thread leakage if your code is busy outside of the python interpreter. For example, this can happen if you call `time.sleep` within a step. Consider using an `async` method with `await asyncio.sleep` instead. + +You can also determine whether to exit the thread by calling `context.done()` within a step, which returns true if the step has been cancelled. For example: + +```py +@hatchet.step(timeout="2s") +def step1(self, context): + while True: + # this step will gracefully exit after 2 seconds + if context.done(): + break + pass +``` + +If you need control over cancellation, you can also use `context.cancel()` to cancel the current step, though this is not recommended. diff --git a/frontend/docs/pages/sdks/python-sdk/setup.mdx b/frontend/docs/pages/sdks/python-sdk/setup.mdx index 1d340d377..d2d36be84 100644 --- a/frontend/docs/pages/sdks/python-sdk/setup.mdx +++ b/frontend/docs/pages/sdks/python-sdk/setup.mdx @@ -22,6 +22,10 @@ Navigate to your Hatchet dashboard and navigate to your settings tab. You should HATCHET_CLIENT_TOKEN="" ``` +## Quickstart + +To get started with the Hatchet SDK, check out the quickstart repo [here](https://github.com/hatchet-dev/hatchet-python-quickstart). + ## Advanced ### TLS with self-signed server certificates diff --git a/frontend/docs/pages/sdks/python-sdk/spawning-workflow-runs.mdx b/frontend/docs/pages/sdks/python-sdk/spawning-workflow-runs.mdx new file mode 100644 index 000000000..a5edaf7d2 --- /dev/null +++ b/frontend/docs/pages/sdks/python-sdk/spawning-workflow-runs.mdx @@ -0,0 +1,97 @@ +# Spawning Workflow Runs + +There are two ways to run a workflow directly in Hatchet (without an event or schedule): using `admin.run_workflow` or `spawn_workflow` to create a child workflow within a step. + +## `admin.run_workflow` + +The `admin.run_workflow` method (or `admin.aio.run_workflow` for an async method) is used to run a workflow directly which isn't meant to be a child workflow. This method returns a reference to the workflow which can be used for streaming results from the workflow or awaiting a result. For example, assuming that you have a workflow named `ManualTriggerWorkflow`: + +```py +workflow_run = hatchet.admin.run_workflow( + "ManualTriggerWorkflow", + {"test": "test"}, + options={"additional_metadata": {"hello": "there"}}, +) + +print(f"spawned workflow run: {workflow_run.workflow_run_id}") + +listener = workflow_run.stream() + +async for event in listener: + print(event.type, event.payload) + +result = await workflow_run.result() + +print("result: " + json.dumps(result, indent=2)) +``` + +You can retrieve a reference to this workflow from a different method (for example, a handler) by using the `workflow_run_id` attribute of the `workflow_run` object. For example, using FastAPI to create a trigger and print the results of the workflow run: + +```py +@app.get("/trigger") +async def trigger(): + workflow_run = await hatchet.admin.aio.run_workflow( + "ManualTriggerWorkflow", + {"test": "test"}, + options={"additional_metadata": {"hello": "there"}}, + ) + + return {"workflow_run_id": workflow_run.workflow_run_id} + +@app.get("/workflow-run/{workflow_run_id}") +async def stream(workflow_run_id: str): + workflow_run = await hatchet.admin.aio.get_workflow_run(workflow_run_id) + + listener = workflow_run.stream() + + # You can listen for all events for a workflow run + async for event in listener: + print(event.type, event.payload) + + # Or await results from that run directly + result = await workflow_run.result() + + print("result: " + json.dumps(result, indent=2)) +``` + +## `spawn_workflow` + +The `spawn_workflow` method is used to spawn a child workflow within a step. This method returns a reference to the workflow which can be used for streaming results from the workflow or awaiting a result. For example: + +```py +@hatchet.workflow(on_events=["parent:create"]) +class Parent: + @hatchet.step(timeout="5m") + async def spawn(self, context: Context): + print("spawning child") + + results = [] + + for i in range(100): + results.append( + ( + await context.aio.spawn_workflow( + "Child", {"a": str(i)}, key=f"child{i}" + ) + ).result() + ) + + result = await asyncio.gather(*results) + print(f"results {result}") + + return {"results": result} + + +@hatchet.workflow(on_events=["child:create"], schedule_timeout="5s") +class Child: + @hatchet.step() + async def process(self, context: Context): + a = context.workflow_input()["a"] + print(f"child process {a}") + return {"status": "success " + a} + + @hatchet.step() + async def process2(self, context: Context): + print("child process2") + return {"status2": "success"} +```