mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2025-12-21 08:40:10 -06:00
docs: durable execution + update self-hosted defaults to use v1 (#1484)
* docs: update self-hosted defaults to use v1 * docs: durable execution * rm dep * lint: run black * redundant readme * more wording * other small things * isort
This commit is contained in:
@@ -36,7 +36,7 @@ Hatchet is a platform for running background tasks, built on top of Postgres. In
|
||||
|
||||
Background tasks are critical for offloading work from your main web application. Usually background tasks are sent through a FIFO (first-in-first-out) queue, which helps guard against traffic spikes (queues can absorb a lot of load) and ensures that tasks are retried when your task handlers error out. Most stacks begin with a library-based queue backed by Redis or RabbitMQ (like Celery or BullMQ). But as your tasks become more complex, these queues become difficult to debug, monitor and start to fail in unexpected ways.
|
||||
|
||||
This is where Hatchet comes in. Hatchet is a full-featured background task management platform, with built-in support for chaining complex tasks together into workflows, alerting on failures, and building complex workflows which would otherwise take months of engineering effort.
|
||||
This is where Hatchet comes in. Hatchet is a full-featured background task management platform, with built-in support for chaining complex tasks together into workflows, alerting on failures, making tasks more durable, and viewing tasks in a real-time web dashboard.
|
||||
|
||||
### Features
|
||||
|
||||
@@ -730,7 +730,7 @@ Most AI frameworks are built to run in-memory, with horizontal scaling and durab
|
||||
|
||||
### Issues
|
||||
|
||||
Please submit any bugs that you encounter via Github issues. However, please reach out on [Discord](https://hatchet.run/discord) before submitting a feature request - as the project is very early, we'd like to build a solid foundation before adding more complex features.
|
||||
Please submit any bugs that you encounter via Github issues.
|
||||
|
||||
### I'd Like to Contribute
|
||||
|
||||
|
||||
@@ -21,9 +21,10 @@ type DurableEventOutput struct {
|
||||
}
|
||||
|
||||
func DurableEvent(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[DurableEventInput, DurableEventOutput] {
|
||||
durableEventWorkflow := factory.NewDurableTask(
|
||||
// ❓ Durable Event
|
||||
durableEventTask := factory.NewDurableTask(
|
||||
create.StandaloneTask{
|
||||
Name: "durable-sleep",
|
||||
Name: "durable-event",
|
||||
},
|
||||
func(ctx worker.DurableHatchetContext, input DurableEventInput) (*DurableEventOutput, error) {
|
||||
eventData, err := ctx.WaitForEvent("user:update", "")
|
||||
@@ -45,6 +46,34 @@ func DurableEvent(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[Durable
|
||||
},
|
||||
hatchet,
|
||||
)
|
||||
// !!
|
||||
|
||||
return durableEventWorkflow
|
||||
factory.NewDurableTask(
|
||||
create.StandaloneTask{
|
||||
Name: "durable-event",
|
||||
},
|
||||
func(ctx worker.DurableHatchetContext, input DurableEventInput) (*DurableEventOutput, error) {
|
||||
// ❓ Durable Event With Filter
|
||||
eventData, err := ctx.WaitForEvent("user:update", "input.user_id == '1234'")
|
||||
// !!
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
v := EventData{}
|
||||
err = eventData.Unmarshal(&v)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &DurableEventOutput{
|
||||
Data: v,
|
||||
}, nil
|
||||
},
|
||||
hatchet,
|
||||
)
|
||||
|
||||
return durableEventTask
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ type DurableSleepOutput struct {
|
||||
}
|
||||
|
||||
func DurableSleep(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[DurableSleepInput, DurableSleepOutput] {
|
||||
// ctx as first param of NewTask
|
||||
// ❓ Durable Sleep
|
||||
simple := factory.NewDurableTask(
|
||||
create.StandaloneTask{
|
||||
Name: "durable-sleep",
|
||||
@@ -38,6 +38,7 @@ func DurableSleep(hatchet v1.HatchetClient) workflow.WorkflowDeclaration[Durable
|
||||
},
|
||||
hatchet,
|
||||
)
|
||||
// !!
|
||||
|
||||
return simple
|
||||
}
|
||||
|
||||
@@ -49,15 +49,28 @@ export default {
|
||||
},
|
||||
"conditional-workflows": "Conditional Workflows",
|
||||
"on-failure-tasks": "On Failure Tasks",
|
||||
"durable-execution": {
|
||||
"title": "Durable Execution"
|
||||
},
|
||||
"child-spawning": {
|
||||
"title": "Child Spawning"
|
||||
},
|
||||
"additional-metadata": {
|
||||
"title": "Additional Metadata"
|
||||
},
|
||||
"--durable-execution": {
|
||||
"title": "Durable Execution",
|
||||
"type": "separator"
|
||||
},
|
||||
"durable-execution": {
|
||||
"title": "Durable Execution"
|
||||
},
|
||||
"durable-events": {
|
||||
"title": "Durable Events"
|
||||
},
|
||||
"durable-sleep": {
|
||||
"title": "Durable Sleep"
|
||||
},
|
||||
"durable-best-practices": {
|
||||
"title": "Best Practices"
|
||||
},
|
||||
"--error-handling": {
|
||||
"title": "Error Handling",
|
||||
"type": "separator"
|
||||
|
||||
19
frontend/docs/pages/home/durable-best-practices.mdx
Normal file
19
frontend/docs/pages/home/durable-best-practices.mdx
Normal file
@@ -0,0 +1,19 @@
|
||||
## Durable Execution Best Practices
|
||||
|
||||
Durable tasks require a bit of extra work to ensure that they are not misused. An important concept in running a durable task is that the task should be **deterministic**. This means that the task should always perform the same sequence of operations in between retries.
|
||||
|
||||
The deterministic nature of durable tasks is what allows Hatchet to replay the task from the last checkpoint. If a task is not deterministic, it may produce different results on each retry, which can lead to unexpected behavior.
|
||||
|
||||
## Maintaining Determinism
|
||||
|
||||
By following a few simple rules, you can ensure that your durable tasks are deterministic:
|
||||
|
||||
1. **Only call methods available on the `DurableContext`**: a very common way to introduce non-determinism is to call methods within your application code which produces side effects. If you need to call a method in your application code which fetches data from a database, calls any sort of i/o operation, or otherwise interacts with other systems, you should spawn those tasks as a **child task** or **child workflow** using `RunChild`.
|
||||
|
||||
2. **When updating durable tasks, always guarantee backwards compatibility**: if you change the order of operations in a durable task, you may break determinism. For example, if you call `SleepFor` followed by `WaitFor`, and then change the order of those calls, Hatchet will not be able to replay the task correctly. This is because the task may have already been checkpointed at the first call to `SleepFor`, and if you change the order of operations, the checkpoint is meaningless.
|
||||
|
||||
## Using DAGs instead of durable tasks
|
||||
|
||||
[DAGs](./dags) are generally a much easier, more intuitive way to run a durable, deterministic workflow. DAGs are inherently deterministic, as their shape of work is predefined, and they cache intermediate results. If you are running simple workflows that can be represented as a DAG, you should use DAGs instead of durable tasks. DAGs also have conditional execution primitives which match the behavior of `SleepFor` and `WaitFor` in durable tasks.
|
||||
|
||||
Durable tasks are useful if you need to run a workflow that is not easily represented as a DAG.
|
||||
66
frontend/docs/pages/home/durable-events.mdx
Normal file
66
frontend/docs/pages/home/durable-events.mdx
Normal file
@@ -0,0 +1,66 @@
|
||||
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
|
||||
import UniversalTabs from "@/components/UniversalTabs";
|
||||
import { GithubSnippet, getSnippets } from "@/components/code";
|
||||
|
||||
export const DurableGo = {
|
||||
path: "examples/v1/workflows/durable-event.go",
|
||||
};
|
||||
|
||||
export const DurableTs = {
|
||||
path: "src/v1/examples/durable-event/workflow.ts",
|
||||
};
|
||||
|
||||
export const DurablePy = {
|
||||
path: "examples/durable_event/worker.py",
|
||||
};
|
||||
|
||||
export const getStaticProps = ({}) =>
|
||||
getSnippets([DurableGo, DurableTs, DurablePy]);
|
||||
|
||||
## Durable Events
|
||||
|
||||
Durable events are a feature of **durable tasks** which allow tasks to wait for an event to occur before continuing. This is useful in cases where a task needs to wait for a long time for an external action. Durable events are useful, because even if your task is interrupted and requeued while waiting for an event, the event will still be processed. When the task is resumed, it will read the event from the durable event log and continue processing.
|
||||
|
||||
## Declaring durable events
|
||||
|
||||
Durable events are declared using the context method `WaitFor` (or utility method `WaitForEvent`) on the `DurableContext` object.
|
||||
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
|
||||
<GithubSnippet src={DurablePy} target="Durable Event" />
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
|
||||
<GithubSnippet src={DurableTs} target="Durable Event" />
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
|
||||
<GithubSnippet src={DurableGo} target="Durable Event" />
|
||||
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
## Durable event filters
|
||||
|
||||
Durable events can be filtered using [CEL](https://github.com/google/cel-spec) expressions. For example, to only receive `user:update` events for a specific user, you can use the following filter:
|
||||
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
|
||||
<GithubSnippet src={DurablePy} target="Durable Event With Filter" />
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
|
||||
<GithubSnippet src={DurableTs} target="Durable Event With Filter" />
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
|
||||
<GithubSnippet src={DurableGo} target="Durable Event With Filter" />
|
||||
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
@@ -21,9 +21,7 @@ This is especially useful in cases such as:
|
||||
|
||||
## How Hatchet Runs Durable Tasks
|
||||
|
||||
When you register a durable task, Hatchet marks the entire task as durable. Then, when you start your worker, Hatchet will start a second worker in the background for running durable tasks.
|
||||
|
||||
If you don't register any durable tasks, the durable worker will not be started. Similarly, if you start a worker with _only_ durable tasks, the "main" worker will not start, and _only_ the durable worker will run. The durable worker will show up as a second worker in the Hatchet Dashboard.
|
||||
When you register a durable task, Hatchet will start a second worker in the background for running durable tasks. If you don't register any durable workflows, the durable worker will not be started. Similarly, if you start a worker with _only_ durable workflows, the "main" worker will not start, and _only_ the durable worker will run. The durable worker will show up as a second worker in the Hatchet Dashboard.
|
||||
|
||||
Tasks that are declared as being durable (using `durable_task` instead of `task`), will receive a `DurableContext` object instead of a normal `Context,` which extends the `Context` by providing some additional tools for working with durable execution features.
|
||||
|
||||
|
||||
46
frontend/docs/pages/home/durable-sleep.mdx
Normal file
46
frontend/docs/pages/home/durable-sleep.mdx
Normal file
@@ -0,0 +1,46 @@
|
||||
import { Callout, Card, Cards, Steps, Tabs } from "nextra/components";
|
||||
import UniversalTabs from "@/components/UniversalTabs";
|
||||
import { GithubSnippet, getSnippets } from "@/components/code";
|
||||
|
||||
export const DurableGo = {
|
||||
path: "examples/v1/workflows/durable-sleep.go",
|
||||
};
|
||||
|
||||
export const DurableTs = {
|
||||
path: "src/v1/examples/durable-sleep/workflow.ts",
|
||||
};
|
||||
|
||||
export const DurablePy = {
|
||||
path: "examples/durable_sleep/worker.py",
|
||||
};
|
||||
|
||||
export const getStaticProps = ({}) =>
|
||||
getSnippets([DurableGo, DurableTs, DurablePy]);
|
||||
|
||||
## Durable Sleep
|
||||
|
||||
Durable sleep is a feature of **durable tasks** which allow tasks to pause execution for a specified amount of time. Instead of a regular `sleep` call in your task, durable sleep is guaranteed to only sleep for the specified amount of time after the first time it was called.
|
||||
|
||||
For example, say you'd like to send a notification to a user after 24 hours. With a regular `sleep`, if the task is interrupted after 23 hours, it will restart and call `sleep` for 24 hours again. This means that the task will sleep for 47 hours in total, which is not what you want. With durable sleep, the task will respect the original sleep duration on restart -- that is, if the task calls `ctx.aio_sleep_for` for 24 hours and is interrupted after 23 hours, it will only sleep for 1 more hour on restart.
|
||||
|
||||
## Using durable sleep
|
||||
|
||||
Durable sleep can be used by calling the `SleepFor` method on the `DurableContext` object. This method takes a duration as an argument and will sleep for that duration.
|
||||
|
||||
<UniversalTabs items={["Python", "Typescript", "Go"]}>
|
||||
<Tabs.Tab title="Python">
|
||||
|
||||
<GithubSnippet src={DurablePy} target="Durable Sleep" />
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Typescript">
|
||||
|
||||
<GithubSnippet src={DurableTs} target="Durable Sleep" />
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab title="Go">
|
||||
|
||||
<GithubSnippet src={DurableGo} target="Durable Sleep" />
|
||||
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
@@ -12,6 +12,12 @@ export const getStaticProps = ({}) => getSnippets([SimpleWorker]);
|
||||
|
||||
This guide shows how to deploy Hatchet using Docker Compose for a production-ready deployment. If you'd like to get up and running quickly, you can also deploy Hatchet using the `hatchet-lite` image following the tutorial here: [Hatchet Lite Deployment](/self-hosting/hatchet-lite).
|
||||
|
||||
This guide uses RabbitMQ as a message broker for Hatchet. This is optional: if you'd like to use Postgres as a message broker, modify the `setup-config` service in the `docker-compose.yml` file with the following env var, and delete all RabbitMQ references:
|
||||
|
||||
```sh
|
||||
SERVER_MSGQUEUE_KIND=postgres
|
||||
```
|
||||
|
||||
## Quickstart
|
||||
|
||||
<Steps>
|
||||
@@ -22,12 +28,11 @@ This deployment requires [Docker](https://docs.docker.com/engine/install/) insta
|
||||
|
||||
### Create files
|
||||
|
||||
We will be creating 2 files in the root of your repository:
|
||||
We will be creating a `docker-compose.yml` file in the root of your repository:
|
||||
|
||||
<FileTree>
|
||||
<FileTree.Folder name="root" defaultOpen>
|
||||
<FileTree.File name="docker-compose.yml" />
|
||||
<FileTree.File name="Caddyfile" />
|
||||
</FileTree.Folder>
|
||||
</FileTree>
|
||||
|
||||
@@ -36,7 +41,7 @@ version: "3.8"
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:15.6
|
||||
command: postgres -c 'max_connections=200'
|
||||
command: postgres -c 'max_connections=1000'
|
||||
restart: always
|
||||
hostname: "postgres"
|
||||
environment:
|
||||
@@ -89,6 +94,7 @@ services:
|
||||
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
|
||||
SERVER_GRPC_INSECURE: "t"
|
||||
SERVER_GRPC_BROADCAST_ADDRESS: localhost:7077
|
||||
SERVER_DEFAULT_ENGINE_VERSION: "V1"
|
||||
volumes:
|
||||
- hatchet_certs:/hatchet/certs
|
||||
- hatchet_config:/hatchet/config
|
||||
@@ -117,9 +123,11 @@ services:
|
||||
volumes:
|
||||
- hatchet_certs:/hatchet/certs
|
||||
- hatchet_config:/hatchet/config
|
||||
hatchet-api:
|
||||
image: ghcr.io/hatchet-dev/hatchet/hatchet-api:latest
|
||||
command: /hatchet/hatchet-api --config /hatchet/config
|
||||
hatchet-dashboard:
|
||||
image: ghcr.io/hatchet-dev/hatchet/hatchet-dashboard:latest
|
||||
command: sh ./entrypoint.sh --config /hatchet/config
|
||||
ports:
|
||||
- 8080:80
|
||||
restart: on-failure
|
||||
depends_on:
|
||||
setup-config:
|
||||
@@ -131,14 +139,6 @@ services:
|
||||
volumes:
|
||||
- hatchet_certs:/hatchet/certs
|
||||
- hatchet_config:/hatchet/config
|
||||
hatchet-frontend:
|
||||
image: ghcr.io/hatchet-dev/hatchet/hatchet-frontend:latest
|
||||
caddy:
|
||||
image: caddy:2.7.6-alpine
|
||||
ports:
|
||||
- 8080:8080
|
||||
volumes:
|
||||
- ./Caddyfile:/etc/caddy/Caddyfile
|
||||
|
||||
volumes:
|
||||
hatchet_postgres_data:
|
||||
@@ -148,18 +148,6 @@ volumes:
|
||||
hatchet_certs:
|
||||
```
|
||||
|
||||
```txt filename="Caddyfile" copy
|
||||
http://localhost:8080 {
|
||||
handle /api/* {
|
||||
reverse_proxy hatchet-api:8080
|
||||
}
|
||||
|
||||
handle /* {
|
||||
reverse_proxy hatchet-frontend:80
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Get Hatchet up and running
|
||||
|
||||
To start the services, run the following command in the root of your repository:
|
||||
@@ -168,7 +156,7 @@ To start the services, run the following command in the root of your repository:
|
||||
docker compose up
|
||||
```
|
||||
|
||||
Wait for the `hatchet-engine` and `hatchet-api` services to start.
|
||||
Wait for the `hatchet-engine` and `hatchet-dashboard` services to start.
|
||||
|
||||
### Accessing Hatchet
|
||||
|
||||
@@ -181,216 +169,36 @@ Email: admin@example.com
|
||||
Password: Admin123!!
|
||||
```
|
||||
|
||||
### Generate a `.env` file
|
||||
## Run tasks against the Hatchet instance
|
||||
|
||||
You can generate a `.env` file as follows:
|
||||
To run tasks against this instance, you will first need to create an API token for your worker. There are two ways to do this:
|
||||
|
||||
```sh
|
||||
cat <<EOF > .env
|
||||
HATCHET_CLIENT_TOKEN="$(docker compose run --no-deps setup-config /hatchet/hatchet-admin token create --config /hatchet/config --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52 | xargs)"
|
||||
HATCHET_CLIENT_TLS_STRATEGY=none
|
||||
EOF
|
||||
```
|
||||
1. **Using a CLI command**:
|
||||
|
||||
<Callout type="info" emoji="🪓">
|
||||
You can run the following command to create a token:
|
||||
|
||||
You can also generate an API token by logging in and navigating to the "General" settings page, clicking on the "API Tokens" tab, and then clicking "Create API Token".
|
||||
```sh
|
||||
docker compose run --no-deps setup-config /hatchet/hatchet-admin token create --config /hatchet/config --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52
|
||||
```
|
||||
|
||||
</Callout>
|
||||
2. **Using the Hatchet dashboard**:
|
||||
- Log in to the Hatchet dashboard.
|
||||
- Navigate to the "Settings" page.
|
||||
- Click on the "API Tokens" tab.
|
||||
- Click on "Create API Token".
|
||||
|
||||
### Run your first worker
|
||||
|
||||
<UniversalTabs items={['Python', 'Typescript', 'Go']}>
|
||||
<Tabs.Tab>
|
||||
Make sure you have the following dependencies installed:
|
||||
|
||||
```sh
|
||||
pip install hatchet-sdk
|
||||
```
|
||||
|
||||
<Callout type="info" emoji="💡">
|
||||
The Python SDK uses [Pydantic
|
||||
Settings](https://docs.pydantic.dev/latest/concepts/pydantic_settings/) to
|
||||
manage configuration, which will automatically try to load environment
|
||||
variables from a `.env` file, if one is present.
|
||||
</Callout>
|
||||
|
||||
Create a `worker.py` file with the following contents:
|
||||
|
||||
<GithubSnippet src={SimpleWorker} target="Simple" />
|
||||
|
||||
Open a new terminal and start the worker with:
|
||||
|
||||
```sh
|
||||
python3 worker.py
|
||||
```
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab>
|
||||
First, install `@hatchet-dev/typescript-sdk` via:
|
||||
|
||||
```sh npm2yarn
|
||||
npm i @hatchet-dev/typescript-sdk
|
||||
npm i dotenv
|
||||
```
|
||||
|
||||
We also use `dotenv` to load the environment variables from a `.env` file. This isn't required, and you can use your own method to load environment variables.
|
||||
|
||||
Next, copy the following code into a `worker.ts` file:
|
||||
|
||||
```typescript filename="worker.ts" copy
|
||||
import Hatchet, { Workflow } from "@hatchet-dev/typescript-sdk";
|
||||
import dotenv from "dotenv";
|
||||
|
||||
dotenv.config();
|
||||
|
||||
const hatchet = Hatchet.init();
|
||||
|
||||
const workflow: Workflow = {
|
||||
id: "first-typescript-workflow",
|
||||
description: "This is my first workflow",
|
||||
on: {
|
||||
event: "user:create",
|
||||
},
|
||||
steps: [
|
||||
{
|
||||
name: "step1",
|
||||
run: async (ctx) => {
|
||||
console.log(
|
||||
"starting step1 with the following input",
|
||||
ctx.workflowInput(),
|
||||
);
|
||||
|
||||
return {
|
||||
result: "success!",
|
||||
};
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
const worker = hatchet.worker("my-worker");
|
||||
await worker.registerWorkflow(workflow);
|
||||
worker.start();
|
||||
```
|
||||
|
||||
Next, modify your `package.json` to include a script to start:
|
||||
|
||||
```json
|
||||
{
|
||||
// ...rest of your `package.json`
|
||||
"scripts": {
|
||||
// ...existing scripts
|
||||
"worker": "npx ts-node worker.ts"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Now to start the worker, in a new terminal run:
|
||||
|
||||
```sh npm2yarn
|
||||
npm run worker
|
||||
```
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab>
|
||||
In a new Go project, copy the following code into a `main.go` file:
|
||||
|
||||
```go filename="main.go" copy
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/client"
|
||||
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
|
||||
"github.com/hatchet-dev/hatchet/pkg/worker"
|
||||
)
|
||||
|
||||
type stepOutput struct{}
|
||||
|
||||
func main() {
|
||||
err := godotenv.Load()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c, err := client.New()
|
||||
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error creating client: %v", err))
|
||||
}
|
||||
|
||||
w, err := worker.NewWorker(
|
||||
worker.WithClient(
|
||||
c,
|
||||
),
|
||||
worker.WithMaxRuns(1),
|
||||
)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error creating worker: %v", err))
|
||||
}
|
||||
|
||||
err = w.RegisterWorkflow(
|
||||
&worker.WorkflowJob{
|
||||
Name: "simple-workflow",
|
||||
Description: "Simple one-step workflow.",
|
||||
On: worker.Events("simple"),
|
||||
Steps: []*worker.WorkflowStep{
|
||||
worker.Fn(func(ctx worker.HatchetContext) (result *stepOutput, err error) {
|
||||
fmt.Println("executed step 1")
|
||||
|
||||
return &stepOutput{}, nil
|
||||
},
|
||||
).SetName("step-one"),
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error registering workflow: %v", err))
|
||||
}
|
||||
|
||||
interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan())
|
||||
defer cancel()
|
||||
|
||||
cleanup, err := w.Start()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error starting worker: %v", err))
|
||||
}
|
||||
|
||||
<-interruptCtx.Done()
|
||||
if err := cleanup(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Next, run the following command to start the worker:
|
||||
|
||||
```sh
|
||||
go run main.go
|
||||
```
|
||||
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
### Run your first workflow
|
||||
|
||||
The worker is now running and listening for steps to execute. You should see your first worker registered in the `Workers` tab of the Hatchet dashboard:
|
||||
|
||||

|
||||
|
||||
You can now trigger your first workflow by navigating to the `Workflows` tab, selecting your workflow, and clicking the top right "Trigger workflow" button:
|
||||
|
||||

|
||||
|
||||
That's it! You've successfully deployed Hatchet and run your first workflow.
|
||||
Now that you have an API token, see the guide [here](https://docs.hatchet.run/home/setup) for how to run your first task.
|
||||
|
||||
</Steps>
|
||||
|
||||
## Repulling images
|
||||
|
||||
The docker compose file above uses the `latest` tag for all images. This means that if you want to pull the latest version of the images, you can run the following command:
|
||||
|
||||
```bash
|
||||
docker compose pull
|
||||
```
|
||||
|
||||
## Connecting to the engine from within Docker
|
||||
|
||||
If you're also running your worker application inside of `docker-compose`, you should modify the `SERVER_GRPC_BROADCAST_ADDRESS` environment variable in the `setup-config` service to use `host.docker.internal` as the hostname. For example:
|
||||
|
||||
@@ -65,10 +65,7 @@ services:
|
||||
SERVER_GRPC_PORT: "7077"
|
||||
SERVER_URL: http://localhost:8888
|
||||
SERVER_AUTH_SET_EMAIL_VERIFIED: "t"
|
||||
SERVER_LOGGER_LEVEL: warn
|
||||
SERVER_LOGGER_FORMAT: console
|
||||
DATABASE_LOGGER_LEVEL: warn
|
||||
DATABASE_LOGGER_FORMAT: console
|
||||
SERVER_DEFAULT_ENGINE_VERSION: "V1"
|
||||
volumes:
|
||||
- "hatchet_lite_rabbitmq_data:/var/lib/rabbitmq"
|
||||
- "hatchet_lite_config:/config"
|
||||
@@ -110,10 +107,7 @@ services:
|
||||
SERVER_GRPC_PORT: "7077"
|
||||
SERVER_URL: http://localhost:8888
|
||||
SERVER_AUTH_SET_EMAIL_VERIFIED: "t"
|
||||
SERVER_LOGGER_LEVEL: warn
|
||||
SERVER_LOGGER_FORMAT: console
|
||||
DATABASE_LOGGER_LEVEL: warn
|
||||
DATABASE_LOGGER_FORMAT: console
|
||||
SERVER_DEFAULT_ENGINE_VERSION: "V1"
|
||||
volumes:
|
||||
- "hatchet_lite_rabbitmq_data:/var/lib/rabbitmq"
|
||||
- "hatchet_lite_config:/config"
|
||||
@@ -138,209 +132,24 @@ Email: admin@example.com
|
||||
Password: Admin123!!
|
||||
```
|
||||
|
||||
### Generate a `.env` file
|
||||
## Run tasks against the Hatchet instance
|
||||
|
||||
You can generate a `.env` file as follows:
|
||||
To run tasks against this instance, you will first need to create an API token for your worker. There are two ways to do this:
|
||||
|
||||
```sh
|
||||
cat <<EOF > .env
|
||||
HATCHET_CLIENT_TOKEN="$(docker compose -f docker-compose.hatchet.yml exec hatchet-lite /hatchet-admin token create --config /config --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52 | xargs)"
|
||||
HATCHET_CLIENT_TLS_STRATEGY=none
|
||||
EOF
|
||||
```
|
||||
1. **Using a CLI command**:
|
||||
|
||||
<Callout type="info" emoji="🪓">
|
||||
You can run the following command to create a token:
|
||||
|
||||
You can also generate an API token by logging in and navigating to the "General" settings page, clicking on the "API Tokens" tab, and then clicking "Create API Token".
|
||||
```sh
|
||||
docker compose -f docker-compose.hatchet.yml exec hatchet-lite /hatchet-admin token create --config /config --tenant-id 707d0855-80ab-4e1f-a156-f1c4546cbf52 | xargs
|
||||
```
|
||||
|
||||
</Callout>
|
||||
2. **Using the Hatchet dashboard**:
|
||||
- Log in to the Hatchet dashboard.
|
||||
- Navigate to the "Settings" page.
|
||||
- Click on the "API Tokens" tab.
|
||||
- Click on "Create API Token".
|
||||
|
||||
### Run your first worker
|
||||
|
||||
<UniversalTabs items={['Python', 'Typescript', 'Go']}>
|
||||
<Tabs.Tab>
|
||||
Make sure you have the following dependencies installed:
|
||||
|
||||
```sh
|
||||
pip install hatchet-sdk
|
||||
```
|
||||
|
||||
<Callout type="info" emoji="💡">
|
||||
The Python SDK uses [Pydantic
|
||||
Settings](https://docs.pydantic.dev/latest/concepts/pydantic_settings/) to
|
||||
manage configuration, which will automatically try to load environment
|
||||
variables from a `.env` file, if one is present.
|
||||
</Callout>
|
||||
|
||||
Create a `worker.py` file with the following contents:
|
||||
|
||||
<GithubSnippet src={SimpleWorker} target="Simple" />
|
||||
|
||||
Open a new terminal and start the worker with:
|
||||
|
||||
```sh
|
||||
python3 worker.py
|
||||
```
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab>
|
||||
First, install `@hatchet-dev/typescript-sdk` via:
|
||||
|
||||
```sh npm2yarn
|
||||
npm i @hatchet-dev/typescript-sdk
|
||||
npm i dotenv
|
||||
```
|
||||
|
||||
We also use `dotenv` to load the environment variables from a `.env` file. This isn't required, and you can use your own method to load environment variables.
|
||||
|
||||
Next, copy the following code into a `worker.ts` file:
|
||||
|
||||
```typescript filename="worker.ts" copy
|
||||
import Hatchet, { Workflow } from "@hatchet-dev/typescript-sdk";
|
||||
import dotenv from "dotenv";
|
||||
|
||||
dotenv.config();
|
||||
|
||||
const hatchet = Hatchet.init();
|
||||
|
||||
const workflow: Workflow = {
|
||||
id: "first-typescript-workflow",
|
||||
description: "This is my first workflow",
|
||||
on: {
|
||||
event: "user:create",
|
||||
},
|
||||
steps: [
|
||||
{
|
||||
name: "step1",
|
||||
run: async (ctx) => {
|
||||
console.log(
|
||||
"starting step1 with the following input",
|
||||
ctx.workflowInput(),
|
||||
);
|
||||
|
||||
return {
|
||||
result: "success!",
|
||||
};
|
||||
},
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
hatchet.run(workflow);
|
||||
```
|
||||
|
||||
Next, modify your `package.json` to include a script to start:
|
||||
|
||||
```json
|
||||
{
|
||||
// ...rest of your `package.json`
|
||||
"scripts": {
|
||||
// ...existing scripts
|
||||
"worker": "npx ts-node worker.ts"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Now to start the worker, in a new terminal run:
|
||||
|
||||
```sh npm2yarn
|
||||
npm run worker
|
||||
```
|
||||
|
||||
</Tabs.Tab>
|
||||
<Tabs.Tab>
|
||||
In a new Go project, copy the following code into a `main.go` file:
|
||||
|
||||
```go filename="main.go" copy
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/joho/godotenv"
|
||||
|
||||
"github.com/hatchet-dev/hatchet/pkg/client"
|
||||
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
|
||||
"github.com/hatchet-dev/hatchet/pkg/worker"
|
||||
)
|
||||
|
||||
type stepOutput struct{}
|
||||
|
||||
func main() {
|
||||
err := godotenv.Load()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c, err := client.New()
|
||||
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error creating client: %v", err))
|
||||
}
|
||||
|
||||
w, err := worker.NewWorker(
|
||||
worker.WithClient(
|
||||
c,
|
||||
),
|
||||
worker.WithMaxRuns(1),
|
||||
)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error creating worker: %v", err))
|
||||
}
|
||||
|
||||
err = w.RegisterWorkflow(
|
||||
&worker.WorkflowJob{
|
||||
Name: "simple-workflow",
|
||||
Description: "Simple one-step workflow.",
|
||||
On: worker.Events("simple"),
|
||||
Steps: []*worker.WorkflowStep{
|
||||
worker.Fn(func(ctx worker.HatchetContext) (result *stepOutput, err error) {
|
||||
fmt.Println("executed step 1")
|
||||
|
||||
return &stepOutput{}, nil
|
||||
},
|
||||
).SetName("step-one"),
|
||||
},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error registering workflow: %v", err))
|
||||
}
|
||||
|
||||
interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan())
|
||||
defer cancel()
|
||||
|
||||
cleanup, err := w.Start()
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("error starting worker: %v", err))
|
||||
}
|
||||
|
||||
<-interruptCtx.Done()
|
||||
if err := cleanup(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Next, run the following command to start the worker:
|
||||
|
||||
```sh
|
||||
go run main.go
|
||||
```
|
||||
|
||||
</Tabs.Tab>
|
||||
</UniversalTabs>
|
||||
|
||||
### Run your first workflow
|
||||
|
||||
The worker is now running and listening for steps to execute. You should see your first worker registered in the `Workers` tab of the Hatchet dashboard:
|
||||
|
||||

|
||||
|
||||
You can now trigger your first workflow by navigating to the `Workflows` tab, selecting your workflow, and clicking the top right "Trigger workflow" button:
|
||||
|
||||

|
||||
|
||||
That's it! You've successfully deployed Hatchet and run your first workflow.
|
||||
Now that you have an API token, see the guide [here](https://docs.hatchet.run/home/setup) for how to run your first task.
|
||||
|
||||
</Steps>
|
||||
|
||||
22
sdks/python/examples/durable_event/trigger.py
Normal file
22
sdks/python/examples/durable_event/trigger.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import time
|
||||
|
||||
from examples.durable_event.worker import (
|
||||
EVENT_KEY,
|
||||
durable_event_task,
|
||||
durable_event_task_with_filter,
|
||||
hatchet,
|
||||
)
|
||||
|
||||
durable_event_task.run_no_wait()
|
||||
durable_event_task_with_filter.run_no_wait()
|
||||
|
||||
print("Sleeping")
|
||||
time.sleep(2)
|
||||
|
||||
print("Pushing event")
|
||||
hatchet.event.push(
|
||||
EVENT_KEY,
|
||||
{
|
||||
"user_id": "1234",
|
||||
},
|
||||
)
|
||||
47
sdks/python/examples/durable_event/worker.py
Normal file
47
sdks/python/examples/durable_event/worker.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from hatchet_sdk import DurableContext, EmptyModel, Hatchet, UserEventCondition
|
||||
|
||||
hatchet = Hatchet(debug=True)
|
||||
|
||||
EVENT_KEY = "user:update"
|
||||
|
||||
|
||||
# ❓ Durable Event
|
||||
@hatchet.durable_task(name="DurableEventTask")
|
||||
async def durable_event_task(input: EmptyModel, ctx: DurableContext) -> None:
|
||||
res = await ctx.aio_wait_for(
|
||||
"event",
|
||||
UserEventCondition(event_key="user:update"),
|
||||
)
|
||||
|
||||
print("got event", res)
|
||||
|
||||
|
||||
# !!
|
||||
|
||||
|
||||
@hatchet.durable_task(name="DurableEventWithFilterTask")
|
||||
async def durable_event_task_with_filter(
|
||||
input: EmptyModel, ctx: DurableContext
|
||||
) -> None:
|
||||
# ❓ Durable Event With Filter
|
||||
res = await ctx.aio_wait_for(
|
||||
"event",
|
||||
UserEventCondition(
|
||||
event_key="user:update", expression="input.user_id == '1234'"
|
||||
),
|
||||
)
|
||||
# !!
|
||||
|
||||
print("got event", res)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
worker = hatchet.worker(
|
||||
"durable-event-worker",
|
||||
workflows=[durable_event_task, durable_event_task_with_filter],
|
||||
)
|
||||
worker.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
3
sdks/python/examples/durable_sleep/trigger.py
Normal file
3
sdks/python/examples/durable_sleep/trigger.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from examples.durable_sleep.worker import durable_sleep_task
|
||||
|
||||
durable_sleep_task.run_no_wait()
|
||||
25
sdks/python/examples/durable_sleep/worker.py
Normal file
25
sdks/python/examples/durable_sleep/worker.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from hatchet_sdk import DurableContext, EmptyModel, Hatchet
|
||||
|
||||
hatchet = Hatchet(debug=True)
|
||||
|
||||
|
||||
# ❓ Durable Sleep
|
||||
@hatchet.durable_task(name="DurableSleepTask")
|
||||
async def durable_sleep_task(input: EmptyModel, ctx: DurableContext) -> None:
|
||||
res = await ctx.aio_sleep_for(timedelta(seconds=5))
|
||||
|
||||
print("got result", res)
|
||||
|
||||
|
||||
# !!
|
||||
|
||||
|
||||
def main() -> None:
|
||||
worker = hatchet.worker("durable-sleep-worker", workflows=[durable_sleep_task])
|
||||
worker.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
17
sdks/typescript/src/v1/examples/durable-event/event.ts
Normal file
17
sdks/typescript/src/v1/examples/durable-event/event.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
import { hatchet } from '../hatchet-client';
|
||||
|
||||
async function main() {
|
||||
const event = await hatchet.events.push('user:update', {
|
||||
userId: '1234',
|
||||
});
|
||||
}
|
||||
|
||||
if (require.main === module) {
|
||||
main()
|
||||
.then(() => process.exit(0))
|
||||
.catch((error) => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error('Error:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
19
sdks/typescript/src/v1/examples/durable-event/run.ts
Normal file
19
sdks/typescript/src/v1/examples/durable-event/run.ts
Normal file
@@ -0,0 +1,19 @@
|
||||
import { durableEvent } from './workflow';
|
||||
|
||||
async function main() {
|
||||
const timeStart = Date.now();
|
||||
const res = await durableEvent.run({});
|
||||
const timeEnd = Date.now();
|
||||
// eslint-disable-next-line no-console
|
||||
console.log(`Time taken: ${timeEnd - timeStart}ms`);
|
||||
}
|
||||
|
||||
if (require.main === module) {
|
||||
main()
|
||||
.then(() => process.exit(0))
|
||||
.catch((error) => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error('Error:', error);
|
||||
process.exit(1);
|
||||
});
|
||||
}
|
||||
14
sdks/typescript/src/v1/examples/durable-event/worker.ts
Normal file
14
sdks/typescript/src/v1/examples/durable-event/worker.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import { hatchet } from '../hatchet-client';
|
||||
import { durableEvent } from './workflow';
|
||||
|
||||
async function main() {
|
||||
const worker = await hatchet.worker('durable-event-worker', {
|
||||
workflows: [durableEvent],
|
||||
});
|
||||
|
||||
await worker.start();
|
||||
}
|
||||
|
||||
if (require.main === module) {
|
||||
main();
|
||||
}
|
||||
40
sdks/typescript/src/v1/examples/durable-event/workflow.ts
Normal file
40
sdks/typescript/src/v1/examples/durable-event/workflow.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
// import sleep from '@hatchet/util/sleep';
|
||||
import { hatchet } from '../hatchet-client';
|
||||
|
||||
// ❓ Durable Event
|
||||
export const durableEvent = hatchet.durableTask({
|
||||
name: 'durable-event',
|
||||
executionTimeout: '10m',
|
||||
fn: async (_, ctx) => {
|
||||
const res = ctx.waitFor({
|
||||
eventKey: 'user:update',
|
||||
});
|
||||
|
||||
console.log('res', res);
|
||||
|
||||
return {
|
||||
Value: 'done',
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
export const durableEventWithFilter = hatchet.durableTask({
|
||||
name: 'durable-event-with-filter',
|
||||
executionTimeout: '10m',
|
||||
fn: async (_, ctx) => {
|
||||
// ❓ Durable Event With Filter
|
||||
const res = ctx.waitFor({
|
||||
eventKey: 'user:update',
|
||||
expression: "input.userId == '1234'",
|
||||
});
|
||||
// !!
|
||||
|
||||
console.log('res', res);
|
||||
|
||||
return {
|
||||
Value: 'done',
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
@@ -1,11 +1,11 @@
|
||||
// import sleep from '@hatchet/util/sleep';
|
||||
import { Or } from '@hatchet/v1/conditions';
|
||||
import { hatchet } from '../hatchet-client';
|
||||
|
||||
export const durableSleep = hatchet.workflow({
|
||||
name: 'durable-sleep',
|
||||
});
|
||||
|
||||
// ❓ Durable Sleep
|
||||
durableSleep.durableTask({
|
||||
name: 'durable-sleep',
|
||||
executionTimeout: '10m',
|
||||
@@ -14,21 +14,9 @@ durableSleep.durableTask({
|
||||
const sleepRes = await ctx.sleepFor('5s');
|
||||
console.log('done sleeping for 5s', sleepRes);
|
||||
|
||||
// wait for either an event or a sleep
|
||||
const res = await ctx.waitFor(
|
||||
Or(
|
||||
{
|
||||
eventKey: 'user:event',
|
||||
},
|
||||
{
|
||||
sleepFor: '1m',
|
||||
}
|
||||
)
|
||||
);
|
||||
|
||||
console.log('res', res);
|
||||
return {
|
||||
Value: 'done',
|
||||
};
|
||||
},
|
||||
});
|
||||
// !!
|
||||
|
||||
Reference in New Issue
Block a user