mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-23 18:49:47 -05:00
[Docs, Python] Expand Cancellation + Conditional Workflow Docs, Fix cancellation in Python (#1471)
* feat: expand conditional docs * feat: initial cancellation work + fixing some broken links * feat: docs on cancellation * python: fix cancellation * python: cruft * chore: version * feat: python example * fix: TS cancellation examples * fix: lint * feat: go example * feat: half-baked ts conditional logic workflow * feat: add ts example conditional workflow * feat: go example * feat: go example * fix: cancellation test * fix: thanks, copilot! * Update frontend/docs/pages/home/conditional-workflows.mdx Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com> * fix: lint * chore: lint * fix: longer sleep --------- Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com>
This commit is contained in:
@@ -18,6 +18,7 @@ type CancellationResult struct {
|
||||
|
||||
func Cancellation(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[CancellationInput, CancellationResult] {
|
||||
|
||||
// ❓ Cancelled task
|
||||
// Create a task that sleeps for 10 seconds and checks if it was cancelled
|
||||
cancellation := factory.NewTask(
|
||||
create.StandaloneTask{
|
||||
@@ -40,6 +41,7 @@ func Cancellation(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[Cancell
|
||||
},
|
||||
hatchet,
|
||||
)
|
||||
// !!
|
||||
|
||||
return cancellation
|
||||
}
|
||||
|
||||
@@ -0,0 +1,207 @@
|
||||
package v1_workflows
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/client/create"
|
||||
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
|
||||
"github.com/hatchet-dev/hatchet/pkg/v1/factory"
|
||||
"github.com/hatchet-dev/hatchet/pkg/v1/workflow"
|
||||
"github.com/hatchet-dev/hatchet/pkg/worker"
|
||||
"github.com/hatchet-dev/hatchet/pkg/worker/condition"
|
||||
)
|
||||
|
||||
// StepOutput represents the output of most tasks in this workflow
|
||||
type StepOutput struct {
|
||||
RandomNumber int `json:"randomNumber"`
|
||||
}
|
||||
|
||||
// RandomSum represents the output of the sum task
|
||||
type RandomSum struct {
|
||||
Sum int `json:"sum"`
|
||||
}
|
||||
|
||||
// TaskConditionWorkflowResult represents the aggregate output of all tasks
|
||||
type TaskConditionWorkflowResult struct {
|
||||
Start StepOutput `json:"start"`
|
||||
WaitForSleep StepOutput `json:"waitForSleep"`
|
||||
WaitForEvent StepOutput `json:"waitForEvent"`
|
||||
SkipOnEvent StepOutput `json:"skipOnEvent"`
|
||||
LeftBranch StepOutput `json:"leftBranch"`
|
||||
RightBranch StepOutput `json:"rightBranch"`
|
||||
Sum RandomSum `json:"sum"`
|
||||
}
|
||||
|
||||
// taskOpts is a type alias for workflow task options
|
||||
type taskOpts = create.WorkflowTask[struct{}, TaskConditionWorkflowResult]
|
||||
|
||||
func TaskConditionWorkflow(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[struct{}, TaskConditionWorkflowResult] {
|
||||
// ❓ Create a workflow
|
||||
wf := factory.NewWorkflow[struct{}, TaskConditionWorkflowResult](
|
||||
create.WorkflowCreateOpts[struct{}]{
|
||||
Name: "TaskConditionWorkflow",
|
||||
},
|
||||
hatchet,
|
||||
)
|
||||
// !!
|
||||
|
||||
// ❓ Add base task
|
||||
start := wf.Task(
|
||||
taskOpts{
|
||||
Name: "start",
|
||||
},
|
||||
func(ctx worker.HatchetContext, _ struct{}) (interface{}, error) {
|
||||
return &StepOutput{
|
||||
RandomNumber: rand.Intn(100) + 1,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
// !!
|
||||
|
||||
// ❓ Add wait for sleep
|
||||
waitForSleep := wf.Task(
|
||||
taskOpts{
|
||||
Name: "waitForSleep",
|
||||
Parents: []create.NamedTask{start},
|
||||
WaitFor: condition.SleepCondition(time.Second * 10),
|
||||
},
|
||||
func(ctx worker.HatchetContext, _ struct{}) (interface{}, error) {
|
||||
return &StepOutput{
|
||||
RandomNumber: rand.Intn(100) + 1,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
// !!
|
||||
|
||||
// ❓ Add skip on event
|
||||
skipOnEvent := wf.Task(
|
||||
taskOpts{
|
||||
Name: "skipOnEvent",
|
||||
Parents: []create.NamedTask{start},
|
||||
WaitFor: condition.SleepCondition(time.Second * 30),
|
||||
SkipIf: condition.UserEventCondition("skip_on_event:skip", "true"),
|
||||
},
|
||||
func(ctx worker.HatchetContext, _ struct{}) (interface{}, error) {
|
||||
return &StepOutput{
|
||||
RandomNumber: rand.Intn(100) + 1,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
// !!
|
||||
|
||||
// ❓ Add branching
|
||||
leftBranch := wf.Task(
|
||||
taskOpts{
|
||||
Name: "leftBranch",
|
||||
Parents: []create.NamedTask{waitForSleep},
|
||||
SkipIf: condition.ParentCondition(waitForSleep, "output.randomNumber > 50"),
|
||||
},
|
||||
func(ctx worker.HatchetContext, _ struct{}) (interface{}, error) {
|
||||
return &StepOutput{
|
||||
RandomNumber: rand.Intn(100) + 1,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
|
||||
rightBranch := wf.Task(
|
||||
taskOpts{
|
||||
Name: "rightBranch",
|
||||
Parents: []create.NamedTask{waitForSleep},
|
||||
SkipIf: condition.ParentCondition(waitForSleep, "output.randomNumber <= 50"),
|
||||
},
|
||||
func(ctx worker.HatchetContext, _ struct{}) (interface{}, error) {
|
||||
return &StepOutput{
|
||||
RandomNumber: rand.Intn(100) + 1,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
// !!
|
||||
|
||||
// ❓ Add wait for event
|
||||
waitForEvent := wf.Task(
|
||||
taskOpts{
|
||||
Name: "waitForEvent",
|
||||
Parents: []create.NamedTask{start},
|
||||
WaitFor: condition.Or(
|
||||
condition.SleepCondition(time.Minute),
|
||||
condition.UserEventCondition("wait_for_event:start", "true"),
|
||||
),
|
||||
},
|
||||
func(ctx worker.HatchetContext, _ struct{}) (interface{}, error) {
|
||||
return &StepOutput{
|
||||
RandomNumber: rand.Intn(100) + 1,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
// !!
|
||||
|
||||
// ❓ Add sum
|
||||
wf.Task(
|
||||
taskOpts{
|
||||
Name: "sum",
|
||||
Parents: []create.NamedTask{
|
||||
start,
|
||||
waitForSleep,
|
||||
waitForEvent,
|
||||
skipOnEvent,
|
||||
leftBranch,
|
||||
rightBranch,
|
||||
},
|
||||
},
|
||||
func(ctx worker.HatchetContext, _ struct{}) (interface{}, error) {
|
||||
var startOutput StepOutput
|
||||
if err := ctx.ParentOutput(start, &startOutput); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var waitForSleepOutput StepOutput
|
||||
if err := ctx.ParentOutput(waitForSleep, &waitForSleepOutput); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var waitForEventOutput StepOutput
|
||||
ctx.ParentOutput(waitForEvent, &waitForEventOutput)
|
||||
|
||||
// Handle potentially skipped tasks
|
||||
var skipOnEventOutput StepOutput
|
||||
var four int
|
||||
|
||||
err := ctx.ParentOutput(skipOnEvent, &skipOnEventOutput)
|
||||
|
||||
if err != nil {
|
||||
four = 0
|
||||
} else {
|
||||
four = skipOnEventOutput.RandomNumber
|
||||
}
|
||||
|
||||
var leftBranchOutput StepOutput
|
||||
var five int
|
||||
|
||||
err = ctx.ParentOutput(leftBranch, leftBranchOutput)
|
||||
if err != nil {
|
||||
five = 0
|
||||
} else {
|
||||
five = leftBranchOutput.RandomNumber
|
||||
}
|
||||
|
||||
var rightBranchOutput StepOutput
|
||||
var six int
|
||||
|
||||
err = ctx.ParentOutput(rightBranch, rightBranchOutput)
|
||||
if err != nil {
|
||||
six = 0
|
||||
} else {
|
||||
six = rightBranchOutput.RandomNumber
|
||||
}
|
||||
|
||||
return &RandomSum{
|
||||
Sum: startOutput.RandomNumber + waitForEventOutput.RandomNumber +
|
||||
waitForSleepOutput.RandomNumber + four + five + six,
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
// !!
|
||||
|
||||
return wf
|
||||
}
|
||||
@@ -92,7 +92,6 @@ export default {
|
||||
},
|
||||
"cancellation": {
|
||||
"title": "Cancellation",
|
||||
"display": "hidden"
|
||||
},
|
||||
"--v1-migration-guides": {
|
||||
"title": "V1 Migration Guides",
|
||||
|
||||
@@ -1 +1,112 @@
|
||||
TODO V1 DOCS
|
||||
import { Tabs, Callout } from "nextra/components";
|
||||
import UniversalTabs from "../../components/UniversalTabs";
|
||||
import { GithubSnippet, getSnippets } from "@/components/code";
|
||||
|
||||
export const CancelPy = {
|
||||
path: "examples/cancellation/worker.py",
|
||||
};
|
||||
export const CancelTs = {
|
||||
path: "src/v1/examples/cancellations/workflow.ts",
|
||||
};
|
||||
|
||||
export const CancelGo = {
|
||||
path: "examples/v1/workflows/cancellations.go",
|
||||
};
|
||||
|
||||
export const getStaticProps = ({}) =>
|
||||
getSnippets([CancelPy, CancelTs, CancelGo]);
|
||||
|
||||
# Cancellation in Hatchet Workflows
|
||||
|
||||
Hatchet provides a mechanism for canceling workflow executions gracefully, allowing you to stop running workflows and their associated tasks when needed. Cancellation can be triggered on graceful termination of a worker or automatically through concurrency control strategies like [`CANCEL_IN_PROGRESS`](./concurrency.mdx#cancel_in_progress), which cancels currently running workflow instances to free up slots for new instances when the concurrency limit is reached.
|
||||
|
||||
When a workflow is canceled, Hatchet sends a cancellation signal to all the currently executing tasks. The tasks can then check for the cancellation signal and take appropriate action, such as cleaning up resources, aborting network requests, or gracefully terminating their execution.
|
||||
|
||||
## Cancellation Mechanisms
|
||||
|
||||
<UniversalTabs items={['Python', 'Typescript', 'Go']}>
|
||||
|
||||
<Tabs.Tab>
|
||||
|
||||
Hatchet's Python SDK provides a number of ways to handle cancellations, in addition to some important footguns to be aware of when running tasks.
|
||||
|
||||
For async tasks, Hatchet uses the `cancel` method of [asyncio's `task`](https://docs.python.org/3/library/asyncio-task.html#task-cancellation) to handle cancellation. This means that for any async work, the task will be cancelled at the next `await` point via a `CancelledError` exception.
|
||||
|
||||
For synchronous tasks, cancellation is more involved (and riskier) because of how Hatchet uses threads to run synchronous work without blocking the event loop. Threads cannot be cancelled the same was as async tasks can, which means that synchronous tasks, when running, cannot be cancelled easily. If you need to have tasks be cancellable, we highly recommend making them async. However, if they _really_ need to be both synchronous _and_ cancellable, Hatchet exposes a configuration option called `enable_force_kill_sync_threads`, which can be set by setting the `HATCHET_CLIENT_ENABLE_FORCE_KILL_SYNC_THREADS` to `True`, which will forcibly kill the thread the task is running on and cause it to exit immediately.
|
||||
|
||||
<Callout type="warning" emoji="⚠️">
|
||||
It's important to note that forcibly killing threads is an inherently
|
||||
dangerous operation, which can lead to data loss, data corruption, and so on.
|
||||
As mentioned above, a much preferred option for tasks that need to be
|
||||
cancellable is to make them asynchronous.
|
||||
</Callout>
|
||||
|
||||
While your task is running, you can manage cancellation by:
|
||||
|
||||
1. Checking for cancellation using the `Context.exit_flag`, which indicates whether a task has been cancelled. You can check this flag at any point in your task to determine whether or not to exit. For example:
|
||||
|
||||
<GithubSnippet src={CancelPy} target="Checking exit flag" />
|
||||
|
||||
2. Using `Context.cancel` or `Context.aio_cancel` to cancel the task. This method will set the `exit_flag` to `True` and will notify the engine that it should cancel the task. For example:
|
||||
|
||||
<GithubSnippet src={CancelPy} target="Self-cancelling task" />
|
||||
|
||||
</Tabs.Tab>
|
||||
|
||||
<Tabs.Tab>
|
||||
|
||||
Hatchet uses the standard `AbortController` and `AbortSignal` interfaces from Node.js to handle cancellation. Each task in a workflow has access to a `context.controller` property, which is an instance of `AbortController`. The `AbortController` provides a way to signal cancellation to the task and any asynchronous operations it may be performing.
|
||||
|
||||
Inside a task, you can check for cancellation by accessing the `cancelled` property of the `Context`, which is a boolean value indicating whether the task has been cancelled or not. For example:
|
||||
|
||||
<GithubSnippet src={CancelTs} target="Declaring a Task" />
|
||||
|
||||
In this example, the task checks the `cancelled` flag and aborts.
|
||||
|
||||
Additionally, you can register an event listener for the `'abort'` event on the `signal`. This listener will be called when the cancellation signal is received, allowing you to handle the cancellation asynchronously.
|
||||
|
||||
## Aborting Network Requests
|
||||
|
||||
One common use case for cancellation is aborting network requests that are no longer needed. Hatchet's cancellation mechanism integrates seamlessly with many network libraries that support the `AbortSignal` interface.
|
||||
|
||||
Here's an example of how to pass the `AbortSignal` to an `axios` request:
|
||||
|
||||
<GithubSnippet src={CancelTs} target="Abort Signal" />
|
||||
|
||||
In this example, the `signal` property from the `Context.controller` is passed as an option to the `axios.get` request. If the task is canceled while the request is still pending, the request will be automatically aborted, and an error will be thrown. You can catch the error and check if it was due to cancellation using the `axios.isCancel` method.
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab>
|
||||
|
||||
Hatchet uses the standard `context.WithCancel` way of creating a context and passes that to a task. Each task can check for cancellation signals using the `Done()` method that context package provides.
|
||||
|
||||
Here's an example of how to check for cancellation in a task:
|
||||
|
||||
<GithubSnippet src={CancelGo} target="Cancelled task" />
|
||||
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
## Cancellation Best Practices
|
||||
|
||||
When working with cancellation in Hatchet workflows, consider the following best practices:
|
||||
|
||||
1. **Graceful Termination**: When a task receives a cancellation signal, aim to terminate its execution gracefully. Clean up any resources, abort pending operations, and perform any necessary cleanup tasks before returning from the task function.
|
||||
|
||||
2. **Cancellation Checks**: Regularly check for cancellation signals within long-running tasks or loops. This allows the task to respond to cancellation in a timely manner and avoid unnecessary processing.
|
||||
|
||||
3. **Asynchronous Operations**: If a task performs asynchronous operations, such as network requests or file I/O, consider passing the cancellation signal to those operations. Many libraries and APIs support cancellation through the `AbortSignal` interface.
|
||||
|
||||
4. **Error Handling**: Handle cancellation errors appropriately. Distinguish between cancellation errors and other types of errors to provide meaningful error messages and take appropriate actions.
|
||||
|
||||
5. **Cancellation Propagation**: If a task invokes other functions or libraries, consider propagating the cancellation signal to those dependencies. This ensures that cancellation is handled consistently throughout the workflow.
|
||||
|
||||
## Additional Features
|
||||
|
||||
In addition to the methods of cancellation listed here, Hatchet also supports [bulk cancellation](./bulk-retries-and-cancellations.mdx), which allows you to cancel many tasks in bulk using either their IDs or a set of filters, which is often the easiest way to cancel many things at once.
|
||||
|
||||
## Conclusion
|
||||
|
||||
Cancellation is a powerful feature in Hatchet that allows you to gracefully stop workflow executions when needed. Remember to follow best practices when implementing cancellation in your workflows, such as graceful termination, regular cancellation checks, handling asynchronous operations, proper error handling, and cancellation propagation.
|
||||
|
||||
By incorporating cancellation into your Hatchet workflows, you can build more resilient and responsive systems that can adapt to changing circumstances and user needs.
|
||||
|
||||
@@ -1,20 +1,52 @@
|
||||
import { GithubSnippet, getSnippets } from "@/components/code";
|
||||
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
|
||||
import UniversalTabs from "@/components/UniversalTabs";
|
||||
|
||||
export const WaitsPy = {
|
||||
export const ConditionalWorkflowPy = {
|
||||
path: "examples/waits/worker.py",
|
||||
};
|
||||
export const ConditionalWorkflowTs = {
|
||||
path: "src/v1/examples/dag_match_condition/complex-workflow.ts",
|
||||
};
|
||||
export const ConditionalWorkflowGo = {
|
||||
path: "examples/v1/workflows/complex-conditions.go",
|
||||
};
|
||||
|
||||
export const getStaticProps = ({}) => getSnippets([WaitsPy]);
|
||||
export const getStaticProps = ({}) =>
|
||||
getSnippets([
|
||||
ConditionalWorkflowPy,
|
||||
ConditionalWorkflowTs,
|
||||
ConditionalWorkflowGo,
|
||||
]);
|
||||
|
||||
## Introduction
|
||||
|
||||
Hatchet V1 introduces the ability to add conditions to tasks in your workflows that determine whether or not a task should be run, based on a number of conditions. There are three types of `Condition`s in Hatchet V1:
|
||||
Hatchet V1 introduces the ability to add conditions to tasks in your workflows that determine whether or not a task should be run, based on a number of conditions. Conditions unlock a number of new ways to solve problems with Hatchet, such as:
|
||||
|
||||
1. A workflow that reads a feature flag, and then decides how to progress based on its value. In this case, you'd have two tasks that use parent conditions, where one task runs if the flag value is e.g. `True`, while the other runs if it's `False`.
|
||||
2. Any type of human-in-the-loop workflow, where you want to wait for a human to e.g. approve something before continuing the dag.
|
||||
|
||||
## Types of Conditions
|
||||
|
||||
There are three types of `Condition`s in Hatchet V1:
|
||||
|
||||
1. Sleep conditions, which sleep for a specified duration before continuing
|
||||
2. Event conditions, which wait for an event (and optionally a CEL expression evaluated on the payload of that event) before deciding how to continue
|
||||
3. Parent conditions, which wait for a parent task to complete and then decide how to progress based on its output.
|
||||
|
||||
These conditions can also be combined using an `Or` operator into groups of conditions where at least one must be satisfied in order for the group to evaluate to `True`.
|
||||
## Or Groups
|
||||
|
||||
Conditions can also be combined using an `Or` operator into groups of conditions (called "or groups") where at least one must be satisfied in order for the group to evaluate to `True`. An "or group" behaves like a boolean `OR` operator, where the group evaluates to `True` if at least one of its conditions is `True`.
|
||||
|
||||
Or groups are an extremely powerful feature because they let you express arbitrarily complex sets of conditions in [conjunctive normal form](https://en.wikipedia.org/wiki/Conjunctive_normal_form) (CNF) for determining when your tasks should run and when they should not. As a simple example, consider the following conditions:
|
||||
|
||||
- **Condition A**: Checking if the output of a parent task is greater than 50
|
||||
- **Condition B**: Sleeping for 30 seconds
|
||||
- **Condition C**: Receiving the `payment:processed` event
|
||||
|
||||
You might want to progress in your workflow if A _or_ B and C. In this case, we can express this set of conditions in CNF as `A or B` AND `A or C` where both `A or B` and `A or C` are or groups.
|
||||
|
||||
## Usage
|
||||
|
||||
Conditions can be used at task _declaration_ time in three ways:
|
||||
|
||||
@@ -22,12 +54,23 @@ Conditions can be used at task _declaration_ time in three ways:
|
||||
2. They can be used in a `skip_if` fashion, where a task will be skipped if the conditions evaluate to `True`.
|
||||
3. They can be used in a `cancel_if` fashion, where a task will be cancelled if the conditions evaluate to `True`.
|
||||
|
||||
## Use Cases
|
||||
### `wait_for`
|
||||
|
||||
There are a number of use cases that these features unlock. Some examples might be:
|
||||
Declaring a task with conditions to `wait_for` will cause the task to wait before starting for until its conditions evaluate to `True`. For instance, if you use `wait_for` with a 60 second sleep, the workflow will wait for 60 seconds before triggering the task. Similar, if the task is waiting for an event, it will wait until the event is fired before continuing.
|
||||
|
||||
1. A workflow that reads a feature flag, and then decides how to progress based on its value. In this case, you'd have two tasks that use parent conditions, where one task runs if the flag value is e.g. `True`, while the other runs if it's `False`.
|
||||
2. Any type of human-in-the-loop workflow, where you want to wait for a human to e.g. approve something before continuing the run.
|
||||
### `skip_if`
|
||||
|
||||
Declaring a task with conditions to `skip_if` will cause the task to be skipped if the conditions evaluate to `True`. For instance, if you use a parent condition to check if the output of a parent task is equal to some value, the task will be skipped if that condition evaluates to `True`.
|
||||
|
||||
### `cancel_if`
|
||||
|
||||
Declaring a task with conditions to `cancel_if` will cause the task to be cancelled if the conditions evaluate to `True`. For instance, if you use a parent condition to check if the output of a parent task is equal to some value, the task will be cancelled if that condition evaluates to `True`.
|
||||
|
||||
<Callout type="warning">
|
||||
A task cancelled by a `cancel_if` operator will behave the same as any other
|
||||
cancellation in Hatchet, meaning that downstream tasks will be cancelled as
|
||||
well.
|
||||
</Callout>
|
||||
|
||||
## Example Workflow
|
||||
|
||||
@@ -39,38 +82,110 @@ Note the branching logic (`left_branch` and `right_branch`), as well as the use
|
||||
|
||||
To get started, let's declare the workflow.
|
||||
|
||||
<GithubSnippet src={WaitsPy} target="Create a workflow" />
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
<GithubSnippet src={ConditionalWorkflowPy} target="Create a workflow" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
<GithubSnippet src={ConditionalWorkflowTs} target="Create a workflow" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
<GithubSnippet src={ConditionalWorkflowGo} target="Create a workflow" />
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
Next, we'll start adding tasks to our workflow. First, we'll add a basic task that outputs a random number:
|
||||
|
||||
<GithubSnippet src={WaitsPy} target="Add base task" />
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
<GithubSnippet src={ConditionalWorkflowPy} target="Add base task" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
<GithubSnippet src={ConditionalWorkflowTs} target="Add base task" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
<GithubSnippet src={ConditionalWorkflowGo} target="Add base task" />
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
Next, we'll add a task to the workflow that's a child of the first task, but it has a `wait_for` condition that sleeps for 10 seconds.
|
||||
|
||||
<GithubSnippet src={WaitsPy} target="Add wait for sleep" />
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
<GithubSnippet src={ConditionalWorkflowPy} target="Add wait for sleep" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
<GithubSnippet src={ConditionalWorkflowTs} target="Add wait for sleep" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
<GithubSnippet src={ConditionalWorkflowGo} target="Add wait for sleep" />
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
This task will first wait for the parent task to complete, and then it'll sleep for 10 seconds before executing and returning another random number.
|
||||
|
||||
Next, we'll add a task that will be skipped on an event:
|
||||
|
||||
<GithubSnippet src={WaitsPy} target="Add skip on event" />
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
<GithubSnippet src={ConditionalWorkflowPy} target="Add skip on event" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
<GithubSnippet src={ConditionalWorkflowTs} target="Add skip on event" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
<GithubSnippet src={ConditionalWorkflowGo} target="Add skip on event" />
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
In this case, our task will wait for a 30 second sleep, and then it will be skipped if the `skip_on_event:skip` is fired.
|
||||
|
||||
Next, let's add some branching logic. Here we'll add two more tasks, a left and right branch.
|
||||
|
||||
<GithubSnippet src={WaitsPy} target="Add branching" />
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
<GithubSnippet src={ConditionalWorkflowPy} target="Add branching" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
<GithubSnippet src={ConditionalWorkflowTs} target="Add branching" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
<GithubSnippet src={ConditionalWorkflowGo} target="Add branching" />
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
These two tasks use the `ParentCondition` and `skip_if` together to check if the output of an upstream task was greater or less than `50`, respectively. Only one of the two tasks will run: whichever one's condition evaluates to `True`.
|
||||
|
||||
Next, we'll add a task that waits for an event:
|
||||
|
||||
<GithubSnippet src={WaitsPy} target="Add wait for event" />
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
<GithubSnippet src={ConditionalWorkflowPy} target="Add wait for event" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
<GithubSnippet src={ConditionalWorkflowTs} target="Add wait for event" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
<GithubSnippet src={ConditionalWorkflowGo} target="Add wait for event" />
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
And finally, we'll add the last task, which collects all of its parents and sums them up.
|
||||
|
||||
<GithubSnippet src={WaitsPy} target="Add sum" />
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
<GithubSnippet src={ConditionalWorkflowPy} target="Add sum" />
|
||||
|
||||
Note that in this task, we rely on `ctx.was_skipped` to determine if a task was skipped.
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
<GithubSnippet src={ConditionalWorkflowTs} target="Add sum" />
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
<GithubSnippet src={ConditionalWorkflowGo} target="Add sum" />
|
||||
</Tabs.Tab>
|
||||
|
||||
</UniversalTabs>
|
||||
|
||||
This workflow demonstrates the power of the new conditional logic in Hatchet V1. You can now create complex workflows that are much more dynamic than workflows in the previous version of Hatchet, and do all of it declaratively (rather than, for example, by dynamically spawning child workflows based on conditions in the parent).
|
||||
|
||||
@@ -222,4 +222,4 @@ When using cron triggers, there are a few considerations to keep in mind:
|
||||
|
||||
3. **Missed Schedules**: If a scheduled workflow is missed (e.g., due to system downtime), Hatchet will not automatically run the missed instances. It will wait for the next scheduled time to trigger the workflow.
|
||||
|
||||
4. **Overlapping Schedules**: If a workflow is still running when the next scheduled time arrives, Hatchet will start a new instance of the workflow or respect [concurrency](../concurrency/overview.mdx) policy.
|
||||
4. **Overlapping Schedules**: If a workflow is still running when the next scheduled time arrives, Hatchet will start a new instance of the workflow or respect [concurrency](./concurrency.mdx) policy.
|
||||
|
||||
@@ -111,6 +111,7 @@ hatchet.rate_limits.put(RATE_LIMIT_KEY, 10, RateLimitDuration.MINUTE)
|
||||
<Tabs.Tab>
|
||||
|
||||
{" "}
|
||||
|
||||
<GithubSnippet src={RateLimitTs} target="Upsert Rate Limit" />
|
||||
|
||||
</Tabs.Tab>
|
||||
|
||||
@@ -127,4 +127,4 @@ In these cases, even though `retries` is set to a non-zero number (meaning the t
|
||||
|
||||
Hatchet's task-level retry feature is a simple and effective way to handle transient failures in your workflow tasks, improving the reliability and resilience of your workflows. By specifying the number of retries for each task, you can ensure that your workflows can recover from temporary issues without requiring complex error handling logic.
|
||||
|
||||
Remember to use retries judiciously and only for tasks that are idempotent and can safely be repeated. For more advanced retry strategies, such as exponential backoff or circuit breaking, stay tuned for future updates to Hatchet's retry capabilities.../../components/UniversalTabs
|
||||
Remember to use retries judiciously and only for tasks that are idempotent and can safely be repeated. For more advanced retry strategies, such as exponential backoff or circuit breaking, stay tuned for future updates to Hatchet's retry capabilities.
|
||||
|
||||
@@ -124,5 +124,5 @@ When using scheduled runs, there are a few considerations to keep in mind:
|
||||
|
||||
3. **Missed Schedules**: If a scheduled workflow is missed (e.g., due to system downtime), Hatchet will not automatically run the missed instances.
|
||||
|
||||
4. **Overlapping Schedules**: If a workflow is still running when a second scheduled run is scheduled to start, Hatchet will start a new instance of the workflow or respect [concurrency](../concurrency/overview.mdx) policy.
|
||||
4. **Overlapping Schedules**: If a workflow is still running when a second scheduled run is scheduled to start, Hatchet will start a new instance of the workflow or respect [concurrency](./concurrency.mdx) policy.
|
||||
```
|
||||
|
||||
@@ -4,7 +4,6 @@ from examples.bulk_fanout.worker import ParentInput, bulk_parent_wf
|
||||
from hatchet_sdk import Hatchet
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
result = await bulk_parent_wf.aio_run(input=ParentInput(n=12))
|
||||
|
||||
@@ -1,11 +1,29 @@
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from examples.cancellation.worker import wf
|
||||
from examples.cancellation.worker import cancellation_workflow
|
||||
from hatchet_sdk import Hatchet
|
||||
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
with pytest.raises(Exception, match="(Task exceeded timeout|TIMED_OUT)"):
|
||||
await wf.aio_run()
|
||||
async def test_cancellation(hatchet: Hatchet) -> None:
|
||||
ref = await cancellation_workflow.aio_run_no_wait()
|
||||
|
||||
"""Sleep for a long time since we only need cancellation to happen _eventually_"""
|
||||
await asyncio.sleep(10)
|
||||
|
||||
for i in range(30):
|
||||
run = await hatchet.runs.aio_get(ref.workflow_run_id)
|
||||
|
||||
if run.run.status == V1TaskStatus.RUNNING:
|
||||
await asyncio.sleep(1)
|
||||
continue
|
||||
|
||||
assert run.run.status == V1TaskStatus.CANCELLED
|
||||
assert not run.run.output
|
||||
|
||||
break
|
||||
else:
|
||||
assert False, "Workflow run did not cancel in time"
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
import time
|
||||
|
||||
from examples.cancellation.worker import cancellation_workflow, hatchet
|
||||
|
||||
id = cancellation_workflow.run_no_wait()
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
hatchet.runs.cancel(id.workflow_run_id)
|
||||
@@ -1,27 +1,49 @@
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
import time
|
||||
|
||||
from hatchet_sdk import Context, EmptyModel, Hatchet
|
||||
|
||||
hatchet = Hatchet(debug=True)
|
||||
|
||||
wf = hatchet.workflow(name="CancelWorkflow")
|
||||
cancellation_workflow = hatchet.workflow(name="CancelWorkflow")
|
||||
|
||||
|
||||
@wf.task(execution_timeout=timedelta(seconds=10), retries=1)
|
||||
async def step1(input: EmptyModel, ctx: Context) -> None:
|
||||
i = 0
|
||||
while not ctx.exit_flag and i < 40:
|
||||
print(f"Waiting for cancellation {i}")
|
||||
await asyncio.sleep(1)
|
||||
i += 1
|
||||
# ❓ Self-cancelling task
|
||||
@cancellation_workflow.task()
|
||||
async def self_cancel(input: EmptyModel, ctx: Context) -> dict[str, str]:
|
||||
await asyncio.sleep(2)
|
||||
|
||||
if ctx.exit_flag:
|
||||
print("Cancelled")
|
||||
## Cancel the task
|
||||
await ctx.aio_cancel()
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
return {"error": "Task should have been cancelled"}
|
||||
|
||||
|
||||
# !!
|
||||
|
||||
|
||||
# ❓ Checking exit flag
|
||||
@cancellation_workflow.task()
|
||||
def check_flag(input: EmptyModel, ctx: Context) -> dict[str, str]:
|
||||
for i in range(3):
|
||||
time.sleep(1)
|
||||
|
||||
# Note: Checking the status of the exit flag is mostly useful for cancelling
|
||||
# sync tasks without needing to forcibly kill the thread they're running on.
|
||||
if ctx.exit_flag:
|
||||
print("Task has been cancelled")
|
||||
raise ValueError("Task has been cancelled")
|
||||
|
||||
return {"error": "Task should have been cancelled"}
|
||||
|
||||
|
||||
# !!
|
||||
|
||||
|
||||
def main() -> None:
|
||||
worker = hatchet.worker("cancellation-worker", slots=4, workflows=[wf])
|
||||
worker = hatchet.worker("cancellation-worker", workflows=[cancellation_workflow])
|
||||
worker.start()
|
||||
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ from hatchet_sdk import Hatchet
|
||||
from hatchet_sdk.workflow_run import WorkflowRunRef
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
@pytest.mark.skip(reason="The timing for this test is not reliable")
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
|
||||
@@ -7,7 +7,6 @@ from hatchet_sdk import Hatchet
|
||||
from hatchet_sdk.workflow_run import WorkflowRunRef
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.skip(reason="The timing for this test is not reliable")
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
|
||||
@@ -4,7 +4,6 @@ from examples.dag.worker import dag_workflow
|
||||
from hatchet_sdk import Hatchet
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
result = await dag_workflow.aio_run()
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
|
||||
# worker = fixture_bg_worker(["poetry", "run", "manual_trigger"])
|
||||
|
||||
# # requires scope module or higher for shared event loop
|
||||
# @pytest.mark.asyncio()
|
||||
# # @pytest.mark.asyncio()
|
||||
# async def test_run(hatchet: Hatchet):
|
||||
# # TODO
|
||||
|
||||
@@ -4,7 +4,6 @@ from hatchet_sdk.clients.events import BulkPushEventOptions, BulkPushEventWithMe
|
||||
from hatchet_sdk.hatchet import Hatchet
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_event_push(hatchet: Hatchet) -> None:
|
||||
e = hatchet.event.push("user:create", {"test": "test"})
|
||||
|
||||
@@ -4,7 +4,6 @@ from examples.fanout.worker import ParentInput, parent_wf
|
||||
from hatchet_sdk import Hatchet
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
result = await parent_wf.aio_run(ParentInput(n=2))
|
||||
|
||||
@@ -4,7 +4,6 @@ from examples.logger.workflow import logging_workflow
|
||||
from hatchet_sdk import Hatchet
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
result = await logging_workflow.aio_run()
|
||||
|
||||
@@ -7,7 +7,6 @@ from hatchet_sdk import Hatchet, Worker
|
||||
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run_timeout(aiohatchet: Hatchet, worker: Worker) -> None:
|
||||
run = on_failure_wf.run_no_wait()
|
||||
|
||||
@@ -7,7 +7,6 @@ from examples.rate_limit.worker import rate_limit_workflow
|
||||
from hatchet_sdk import Hatchet
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.skip(reason="The timing for this test is not reliable")
|
||||
@pytest.mark.asyncio()
|
||||
async def test_run(hatchet: Hatchet) -> None:
|
||||
|
||||
@@ -4,7 +4,6 @@ from examples.timeout.worker import refresh_timeout_wf, timeout_wf
|
||||
from hatchet_sdk import Hatchet
|
||||
|
||||
|
||||
# requires scope module or higher for shared event loop
|
||||
@pytest.mark.asyncio()
|
||||
async def test_execution_timeout(hatchet: Hatchet) -> None:
|
||||
run = timeout_wf.run_no_wait()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from examples.affinity_workers.worker import affinity_worker_workflow
|
||||
from examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf
|
||||
from examples.cancellation.worker import wf
|
||||
from examples.cancellation.worker import cancellation_workflow
|
||||
from examples.concurrency_limit.worker import concurrency_limit_workflow
|
||||
from examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow
|
||||
from examples.dag.worker import dag_workflow
|
||||
@@ -40,7 +40,7 @@ def main() -> None:
|
||||
timeout_wf,
|
||||
refresh_timeout_wf,
|
||||
task_condition_workflow,
|
||||
wf,
|
||||
cancellation_workflow,
|
||||
sync_fanout_parent,
|
||||
sync_fanout_child,
|
||||
non_retryable_workflow,
|
||||
|
||||
@@ -18,6 +18,7 @@ from hatchet_sdk.clients.durable_event_listener import (
|
||||
)
|
||||
from hatchet_sdk.clients.events import EventClient
|
||||
from hatchet_sdk.context.worker_context import WorkerContext
|
||||
from hatchet_sdk.features.runs import RunsClient
|
||||
from hatchet_sdk.logger import logger
|
||||
from hatchet_sdk.utils.timedelta_to_expression import Duration, timedelta_to_expr
|
||||
from hatchet_sdk.utils.typing import JSONSerializableMapping, WorkflowValidator
|
||||
@@ -52,6 +53,7 @@ class Context:
|
||||
event_client: EventClient,
|
||||
durable_event_listener: DurableEventListener | None,
|
||||
worker: WorkerContext,
|
||||
runs_client: RunsClient,
|
||||
validator_registry: dict[str, WorkflowValidator] = {},
|
||||
):
|
||||
self.worker = worker
|
||||
@@ -66,6 +68,7 @@ class Context:
|
||||
self.dispatcher_client = dispatcher_client
|
||||
self.admin_client = admin_client
|
||||
self.event_client = event_client
|
||||
self.runs_client = runs_client
|
||||
self.durable_event_listener = durable_event_listener
|
||||
|
||||
# FIXME: this limits the number of concurrent log requests to 1, which means we can do about
|
||||
@@ -135,6 +138,12 @@ class Context:
|
||||
|
||||
def cancel(self) -> None:
|
||||
logger.debug("cancelling step...")
|
||||
self.runs_client.cancel(self.step_run_id)
|
||||
self.exit_flag = True
|
||||
|
||||
async def aio_cancel(self) -> None:
|
||||
logger.debug("cancelling step...")
|
||||
await self.runs_client.aio_cancel(self.step_run_id)
|
||||
self.exit_flag = True
|
||||
|
||||
# done returns true if the context has been cancelled
|
||||
|
||||
@@ -287,13 +287,14 @@ class Runner:
|
||||
constructor = DurableContext if is_durable else Context
|
||||
|
||||
return constructor(
|
||||
action,
|
||||
self.dispatcher_client,
|
||||
self.admin_client,
|
||||
self.client.event,
|
||||
self.durable_event_listener,
|
||||
self.worker_context,
|
||||
action=action,
|
||||
dispatcher_client=self.dispatcher_client,
|
||||
admin_client=self.admin_client,
|
||||
event_client=self.client.event,
|
||||
durable_event_listener=self.durable_event_listener,
|
||||
worker=self.worker_context,
|
||||
validator_registry=self.validator_registry,
|
||||
runs_client=self.client.runs,
|
||||
)
|
||||
|
||||
## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
|
||||
@@ -415,7 +416,7 @@ class Runner:
|
||||
context = self.contexts.get(run_id)
|
||||
|
||||
if context:
|
||||
context.cancel()
|
||||
await context.aio_cancel()
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "hatchet-sdk"
|
||||
version = "1.2.0"
|
||||
version = "1.2.1"
|
||||
description = ""
|
||||
authors = ["Alexander Belanger <alexander@hatchet.run>"]
|
||||
readme = "README.md"
|
||||
|
||||
@@ -23,7 +23,7 @@ async def test_list_runs(hatchet: Hatchet) -> None:
|
||||
async def test_get_run(hatchet: Hatchet) -> None:
|
||||
dag_ref = await dag_workflow.aio_run_no_wait()
|
||||
|
||||
await asyncio.sleep(3)
|
||||
await asyncio.sleep(5)
|
||||
|
||||
run = await hatchet.runs.aio_get(dag_ref.workflow_run_id)
|
||||
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
// ❓ Declaring a Task
|
||||
import sleep from '@hatchet/util/sleep';
|
||||
import axios from 'axios';
|
||||
import { hatchet } from '../hatchet-client';
|
||||
|
||||
// (optional) Define the input type for the workflow
|
||||
// ❓ Declaring a Task
|
||||
export const cancellation = hatchet.task({
|
||||
name: 'cancellation',
|
||||
fn: async (_, { cancelled }) => {
|
||||
@@ -19,4 +19,25 @@ export const cancellation = hatchet.task({
|
||||
});
|
||||
// !!
|
||||
|
||||
// ❓ Abort Signal
|
||||
export const abortSignal = hatchet.task({
|
||||
name: 'abort-signal',
|
||||
fn: async (_, { controller }) => {
|
||||
try {
|
||||
const response = await axios.get('https://api.example.com/data', {
|
||||
signal: controller.signal,
|
||||
});
|
||||
// Handle the response
|
||||
} catch (error) {
|
||||
if (axios.isCancel(error)) {
|
||||
// Request was canceled
|
||||
console.log('Request canceled');
|
||||
} else {
|
||||
// Handle other errors
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
// see ./worker.ts and ./run.ts for how to run the workflow
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
// ❓ Create a workflow
|
||||
import { Or, SleepCondition, UserEventCondition } from '@hatchet/v1/conditions';
|
||||
import { ParentCondition } from '@hatchet/v1/conditions/parent-condition';
|
||||
import { Context } from '@hatchet/step';
|
||||
import { hatchet } from '../hatchet-client';
|
||||
|
||||
export const taskConditionWorkflow = hatchet.workflow({
|
||||
name: 'TaskConditionWorkflow',
|
||||
});
|
||||
// !!
|
||||
|
||||
// ❓ Add base task
|
||||
const start = taskConditionWorkflow.task({
|
||||
name: 'start',
|
||||
fn: () => {
|
||||
return {
|
||||
randomNumber: Math.floor(Math.random() * 100) + 1,
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
// ❓ Add wait for sleep
|
||||
const waitForSleep = taskConditionWorkflow.task({
|
||||
name: 'waitForSleep',
|
||||
parents: [start],
|
||||
waitFor: [new SleepCondition('10s')],
|
||||
fn: () => {
|
||||
return {
|
||||
randomNumber: Math.floor(Math.random() * 100) + 1,
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
// ❓ Add skip on event
|
||||
const skipOnEvent = taskConditionWorkflow.task({
|
||||
name: 'skipOnEvent',
|
||||
parents: [start],
|
||||
waitFor: [new SleepCondition('10s')],
|
||||
skipIf: [new UserEventCondition('skip_on_event:skip', 'true')],
|
||||
fn: () => {
|
||||
return {
|
||||
randomNumber: Math.floor(Math.random() * 100) + 1,
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
// ❓ Add branching
|
||||
const leftBranch = taskConditionWorkflow.task({
|
||||
name: 'leftBranch',
|
||||
parents: [waitForSleep],
|
||||
skipIf: [new ParentCondition(waitForSleep, 'output.randomNumber > 50')],
|
||||
fn: () => {
|
||||
return {
|
||||
randomNumber: Math.floor(Math.random() * 100) + 1,
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
const rightBranch = taskConditionWorkflow.task({
|
||||
name: 'rightBranch',
|
||||
parents: [waitForSleep],
|
||||
skipIf: [new ParentCondition(waitForSleep, 'output.randomNumber <= 50')],
|
||||
fn: () => {
|
||||
return {
|
||||
randomNumber: Math.floor(Math.random() * 100) + 1,
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
// ❓ Add wait for event
|
||||
const waitForEvent = taskConditionWorkflow.task({
|
||||
name: 'waitForEvent',
|
||||
parents: [start],
|
||||
waitFor: [Or(new SleepCondition('1m'), new UserEventCondition('wait_for_event:start', 'true'))],
|
||||
fn: () => {
|
||||
return {
|
||||
randomNumber: Math.floor(Math.random() * 100) + 1,
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
// ❓ Add sum
|
||||
taskConditionWorkflow.task({
|
||||
name: 'sum',
|
||||
parents: [start, waitForSleep, waitForEvent, skipOnEvent, leftBranch, rightBranch],
|
||||
fn: async (_, ctx: Context<any, any>) => {
|
||||
const one = (await ctx.parentOutput(start)).randomNumber;
|
||||
const two = (await ctx.parentOutput(waitForEvent)).randomNumber;
|
||||
const three = (await ctx.parentOutput(waitForSleep)).randomNumber;
|
||||
const four = (await ctx.parentOutput(skipOnEvent))?.randomNumber || 0;
|
||||
const five = (await ctx.parentOutput(leftBranch))?.randomNumber || 0;
|
||||
const six = (await ctx.parentOutput(rightBranch))?.randomNumber || 0;
|
||||
|
||||
return {
|
||||
sum: one + two + three + four + five + six,
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
Reference in New Issue
Block a user