feat: initial doc pages (#1020)

* generate initial cloud client

* feat: initial doc pages

* feat: cloud register id, action filtering

* feat:cloud register

* fix: env var

* chore:lint

---------

Co-authored-by: Alexander Belanger <alexander@hatchet.run>
This commit is contained in:
Gabe Ruttner
2024-11-08 07:46:43 -08:00
committed by GitHub
parent 07c8b7cc8d
commit 3850964a98
31 changed files with 6775 additions and 112 deletions
+159
View File
@@ -0,0 +1,159 @@
package main
import (
"context"
"fmt"
"log"
"github.com/joho/godotenv"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/compute"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
type userCreateEvent struct {
Username string `json:"username"`
UserID string `json:"user_id"`
Data map[string]string `json:"data"`
}
type stepOneOutput struct {
Message string `json:"message"`
}
func main() {
err := godotenv.Load()
if err != nil {
panic(err)
}
events := make(chan string, 50)
interrupt := cmdutils.InterruptChan()
cleanup, err := run(events)
if err != nil {
panic(err)
}
<-interrupt
if err := cleanup(); err != nil {
panic(fmt.Errorf("error cleaning up: %w", err))
}
}
func run(events chan<- string) (func() error, error) {
c, err := client.New()
if err != nil {
return nil, fmt.Errorf("error creating client: %w", err)
}
w, err := worker.NewWorker(
worker.WithClient(
c,
),
)
if err != nil {
return nil, fmt.Errorf("error creating worker: %w", err)
}
pool := "test-pool"
basicCompute := compute.Compute{
Pool: &pool,
NumReplicas: 1,
CPUs: 1,
MemoryMB: 1024,
CPUKind: compute.ComputeKindSharedCPU,
Regions: []compute.Region{compute.Region("ewr")},
}
performancePool := "performance-pool"
performanceCompute := compute.Compute{
Pool: &performancePool,
NumReplicas: 1,
CPUs: 2,
MemoryMB: 1024,
CPUKind: compute.ComputeKindPerformanceCPU,
Regions: []compute.Region{compute.Region("ewr")},
}
err = w.RegisterWorkflow(
&worker.WorkflowJob{
On: worker.Events("user:create:simple"),
Name: "simple",
Description: "This runs after an update to the user model.",
Concurrency: worker.Expression("input.user_id"),
Steps: []*worker.WorkflowStep{
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
input := &userCreateEvent{}
err = ctx.WorkflowInput(input)
if err != nil {
return nil, err
}
log.Printf("step-one")
events <- "step-one"
return &stepOneOutput{
Message: "Username is: " + input.Username,
}, nil
},
).SetName("step-one").SetCompute(&basicCompute),
worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
input := &stepOneOutput{}
err = ctx.StepOutput("step-one", input)
if err != nil {
return nil, err
}
log.Printf("step-two")
events <- "step-two"
return &stepOneOutput{
Message: "Above message is: " + input.Message,
}, nil
}).SetName("step-two").AddParents("step-one").SetCompute(&performanceCompute),
},
},
)
if err != nil {
return nil, fmt.Errorf("error registering workflow: %w", err)
}
go func() {
testEvent := userCreateEvent{
Username: "echo-test",
UserID: "1234",
Data: map[string]string{
"test": "test",
},
}
log.Printf("pushing event user:create:simple")
// push an event
err := c.Event().Push(
context.Background(),
"user:create:simple",
testEvent,
client.WithEventMetadata(map[string]string{
"hello": "world",
}),
)
if err != nil {
panic(fmt.Errorf("error pushing event: %w", err))
}
}()
cleanup, err := w.Start()
if err != nil {
panic(err)
}
return cleanup, nil
}
+5
View File
@@ -3,6 +3,11 @@
"title": "User Guide",
"type": "page"
},
"compute": {
"title": "Managed Compute",
"type": "page",
"href": "/compute"
},
"sdks": {
"title": "SDK Reference",
"type": "menu",
+26
View File
@@ -0,0 +1,26 @@
{
"-- Managed Compute": {
"type": "separator",
"title": "Managed Compute"
},
"index": "Overview",
"getting-started": "Getting Started",
"cpu": "CPU Machine Types",
"gpu": "GPU Machine Types",
"-- SDKs": {
"type": "separator",
"title": "SDK Deployment Guides"
},
"python": {
"title": "Python ↗",
"href": "/sdks/python-sdk"
},
"typescript": {
"title": "TypeScript ↗",
"href": "/sdks/typescript-sdk"
},
"golang": {
"title": "Golang ↗",
"href": "/sdks/go-sdk"
}
}
+188
View File
@@ -0,0 +1,188 @@
import { Tabs, Callout } from "nextra/components";
# CPU Instance Configuration
<Callout type="warning">
This feature is currently in beta and may be subject to change.
</Callout>
## Overview
The Hatchet SDK provides a `Compute` class that allows you to define and manage compute resources for your workflows. Each step in your workflow can have its own compute configuration, enabling fine-grained control over resource allocation.
## Basic Configuration
<Tabs items={['Python', 'TypeScript', 'Golang']}>
<Tabs.Tab>
```python
from hatchet_sdk.compute.configs import Compute
compute = Compute(
cpu_kind="shared", # "shared" or "performance"
cpus=2, # Number of CPU cores
memory_mb=1024, # Memory in MB
num_replicas=2, # Number of instances
regions=["ewr"] # Region codes
)
```
</Tabs.Tab>
</Tabs>
## CPU Types and Memory Scaling
### Shared CPU
- **Memory Ratio**: 2GB per CPU core
- **Minimum Memory**: 256MB per CPU core
- **Use Case**: Development, testing, and lighter workloads
### Performance CPU
- **Memory Ratio**: 8GB per CPU core
- **Minimum Memory**: 2048MB per CPU core
- **Use Case**: Production and compute-intensive workloads
### Memory Calculation Examples
#### Shared CPU
```python
# 4 shared CPUs
max_memory = 2048 * 4 # = 8192 MB (8GB)
min_memory = 256 * 4 # = 1024 MB (1GB)
compute = Compute(
cpu_kind="shared",
cpus=4,
memory_mb=4096, # Must be between min_memory and max_memory
num_replicas=1,
regions=["ewr"]
)
```
#### Performance CPU
```python
# 4 performance CPUs
max_memory = 8192 * 4 # = 32768 MB (32GB)
min_memory = 2048 * 4 # = 8192 MB (8GB)
compute = Compute(
cpu_kind="performance",
cpus=4,
memory_mb=16384, # Must be between min_memory and max_memory
num_replicas=1,
regions=["ewr"]
)
```
## Available Regions
| Region Code | Location |
| ----------- | ---------------------------- |
| ams | Amsterdam, Netherlands |
| arn | Stockholm, Sweden |
| atl | Atlanta, Georgia (US) |
| bog | Bogotá, Colombia |
| bom | Mumbai, India |
| bos | Boston, Massachusetts (US) |
| cdg | Paris, France |
| den | Denver, Colorado (US) |
| dfw | Dallas, Texas (US) |
| ewr | Secaucus, NJ (US) |
| eze | Ezeiza, Argentina |
| fra | Frankfurt, Germany |
| gdl | Guadalajara, Mexico |
| gig | Rio de Janeiro, Brazil |
| gru | Sao Paulo, Brazil |
| hkg | Hong Kong |
| iad | Ashburn, Virginia (US) |
| lax | Los Angeles, California (US) |
| lhr | London, United Kingdom |
| mad | Madrid, Spain |
| mia | Miami, Florida (US) |
| nrt | Tokyo, Japan |
| ord | Chicago, Illinois (US) |
| otp | Bucharest, Romania |
| phx | Phoenix, Arizona (US) |
| qro | Querétaro, Mexico |
| scl | Santiago, Chile |
| sea | Seattle, Washington (US) |
| sin | Singapore |
| sjc | San Jose, California (US) |
| syd | Sydney, Australia |
| waw | Warsaw, Poland |
| yul | Montreal, Canada |
| yyz | Toronto, Canada |
## Replica Configuration
The `num_replicas` parameter determines the total number of machines that will run your workload. These instances are randomly distributed across the specified regions.
### Example Configurations
```python
# Single region, multiple replicas
compute = Compute(
cpu_kind="shared",
cpus=2,
memory_mb=1024,
num_replicas=3,
regions=["ewr"] # All 3 replicas in ewr
)
# Multiple regions, multiple replicas
compute = Compute(
cpu_kind="shared",
cpus=2,
memory_mb=1024,
num_replicas=6,
regions=["ewr", "lax", "lhr"] # 6 replicas randomly distributed across the three regions
)
```
## Usage in Workflows
```python
from hatchet_sdk import Hatchet, Context
hatchet = Hatchet()
@hatchet.workflow()
class MyWorkflow:
@hatchet.step(compute=compute)
def process_data(self, context: Context):
# Your code here
pass
```
## Best Practices
1. **Resource Allocation**
- Start with minimum required resources
- Scale up based on monitoring and performance needs
- Consider using performance CPUs for production workloads
2. **Region Selection**
- Select regions close to your data sources and users
- Include multiple regions for global availability
- Consider selecting regions in different geographical areas for better redundancy
3. **Memory Configuration**
- Stay within the allowed memory ranges for your CPU type
- Monitor memory usage to optimize allocation
- Consider workload memory requirements when selecting CPU type
4. **Replica Strategy**
- Use multiple replicas for high availability
- Set enough replicas to handle your workload across regions
- Account for random distribution when setting replica count
- Consider potential region failures in your replica count
Remember to monitor your workload performance and adjust these configurations as needed to optimize for your specific use case. Keep in mind that replicas are randomly distributed across regions, so you may need to provision more replicas than you would with an even distribution to ensure minimum coverage in all regions.
@@ -0,0 +1,19 @@
# Getting Started with Hatchet Compute
Hatchet Compute is available in Hatchet Cloud.
## Project Setup
This guide will walk you through the process of setting up a Hatchet Compute project and assumes you have a basic understanding of Hatchet.
If you'd like you can fork the [Hatchet Compute Example Repo](https://github.com/hatchet-dev/hatchet-compute-example) to follow along.
1. Sign in or sign up for Hatchet Cloud [here](https://hatchet.run/cloud).
2. Navigate to the Managed Compute view to configure your compute resources.
3. Ensure your code is committed to a Git repository.
4. Click **+ New Worker** to create a new managed compute worker pool.
5. Connect your Git repository to Hatchet and select the repository and branch you'd like to deploy.
6. Specify the directory where your Dockerfile is located.
7. Select Infrastructure as Code to have Hatchet automatically manage your compute resources based on your workflow code, or select Manual to manage your compute resources through the Hatchet UI.
8. Click **Deploy** to deploy your workflow.
Your workflow will be deployed and you'll be able to monitor and scale your compute resources through the Hatchet UI.
+198
View File
@@ -0,0 +1,198 @@
import { Callout } from "nextra/components";
# GPU Instance Configuration
<Callout type="warning">
This feature is currently in beta and may be subject to change.
</Callout>
## Overview
Hatchet supports GPU-accelerated workloads using NVIDIA GPUs. This guide covers GPU configuration options, Docker setup, and available regions. For basic compute configuration concepts, please refer to the [CPU Instance Configuration](cpu-configuration.md) documentation.
## GPU Types and Availability
Hatchet currently supports the following NVIDIA GPU types:
| GPU Model | Memory | Available Regions |
| ---------------- | ------ | ------------------------------------------------------------------------- |
| NVIDIA A10 | 24GB | ord (Chicago) |
| NVIDIA L40S | 48GB | ord (Chicago) |
| NVIDIA A100-PCIe | 40GB | ord (Chicago) |
| NVIDIA A100-SXM4 | 80GB | ams (Amsterdam), iad (Ashburn), mia (Miami), sjc (San Jose), syd (Sydney) |
## Basic Configuration
```python
from hatchet_sdk.compute.configs import Compute
compute = Compute(
gpu_kind="a100-80gb", # GPU type
gpus=1, # Number of GPUs
memory_mb=163840, # Memory in MB
num_replicas=1, # Number of instances
regions=["ams"] # Must be a region that supports your chosen GPU
)
```
## Docker Configuration
### Example Dockerfile
```dockerfile
# Base image
FROM ubuntu:22.04
# Install CUDA and required libraries
RUN apt-get update && apt-get install -y --no-install-recommends \
cuda-nvcc-12-2 \
libcublas-12-2 \
libcudnn8 \
&& rm -rf /var/lib/apt/lists/*
# Your application setup
WORKDIR /app
COPY . .
# Application entrypoint
CMD ["python", "worker.py"]
```
### Best Practices for Docker for GPU Workloads
1. **Selective Library Installation**
```dockerfile
# DO NOT use meta-packages
# ❌ RUN apt-get install cuda-runtime-*
# ✅ Install only required libraries
RUN apt-get install -y \
cuda-nvcc-12-2 \
libcublas-12-2 \
libcudnn8
```
2. **Multi-stage Builds**
```dockerfile
# Build stage
FROM ubuntu:22.04 AS builder
RUN apt-get update && apt-get install -y cuda-nvcc-12-2
# Runtime stage
FROM ubuntu:22.04
COPY --from=builder /usr/local/cuda-12.2 /usr/local/cuda-12.2
```
## Usage in Workflows
```python
from hatchet_sdk import Hatchet, Context
hatchet = Hatchet()
@hatchet.workflow()
class GPUWorkflow:
@hatchet.step(
compute=Compute(
gpu_kind="a100-80gb",
gpus=1,
memory_mb=163840,
num_replicas=1,
regions=["ams"]
)
)
def train_model(self, context: Context):
# GPU-accelerated code here
pass
```
## Memory and Resource Allocation
### Available Memory per GPU Type
- **A10**: 24GB GPU Memory
- **L40S**: 48GB GPU Memory
- **A100-PCIe**: 40GB GPU Memory
- **A100-SXM4**: 80GB GPU Memory
When configuring memory_mb, ensure it's sufficient for both system memory and GPU operations.
## Region-Specific Configurations
### A100-80GB Example
```python
# Multi-region A100-80GB configuration
compute = Compute(
gpu_kind="a100-80gb",
gpus=1,
memory_mb=163840,
num_replicas=3,
regions=["ams", "sjc", "syd"] # Replicas will be randomly distributed
)
```
### A10 Example
```python
# Chicago-based A10 configuration
compute = Compute(
gpu_kind="a10",
gpus=1,
memory_mb=49152,
num_replicas=2,
regions=["ord"]
)
```
## Best Practices
1. **GPU Selection**
- Choose GPU type based on workload requirements
- Consider memory requirements for your models
- Factor in regional availability
2. **Docker Optimization**
- Use specific library versions instead of meta-packages
- Implement multi-stage builds to reduce image size
- Only install required CUDA libraries
3. **Region Strategy**
- Select regions based on data locality
- Consider backup regions for redundancy
- Remember that replicas are randomly distributed across specified regions
4. **Resource Management**
- Monitor GPU utilization
- Optimize batch sizes for GPU memory
- Consider CPU and system memory requirements alongside GPU resources
## Common Issues and Solutions
1. **CUDA Version Compatibility**
```dockerfile
# Ensure CUDA toolkit version matches driver
RUN apt-get install -y cuda-nvcc-12-2
```
2. **Library Dependencies**
```dockerfile
# Install commonly needed libraries
RUN apt-get install -y \
libcublas-12-2 \
libcudnn8 \
nvidia-driver-525
```
3. **Region Availability**
Region support is limited while GPUs are in beta. Always verify GPU availability in chosen regions.
Remember to monitor your GPU workload performance and adjust configurations as needed. Consider implementing proper error handling for GPU-related operations and ensure your code gracefully handles scenarios where GPUs might be temporarily unavailable.
+52
View File
@@ -0,0 +1,52 @@
import { Callout } from "nextra/components";
# Hatchet Managed Compute
<Callout type="warning">
This feature is currently in beta and may be subject to change.
</Callout>
## Overview
Hatchet Managed Compute provides the simplicity of serverless while delivering the performance and control of traditional infrastructure, making it ideal for long lived, or data intensive AI applications and background job processing. It enables dynamic scaling while eliminating common serverless limitations like cold starts and timeouts.
### High-Availability Computing
- **Sub-100ms Instance Provisioning**: Pre-warms instances before resource demands
- **Distributed Architecture**: Built on [Hatchet Queue](../home/) for reliable workload distribution
- **Multi-Region Support**: Deploy across regions for fault tolerance and data locality
### Available Compute Classes
- Shared CPUs
- Performance CPUs
- GPU instances
- Customizable worker pools
### Smart Workload Management
- **State-Aware**: Routes tasks to instances with preloaded models/resources using [worker labels](../home/features/worker-assignment/worker-affinity.mdx)
- **Burstable Capacity**: Scales dynamically based on queue depth
- **Sticky Assignment**: Routes tasks to the same instance when possible using [sticky assignments](../home/features/worker-assignment/sticky-assignment.mdx)
## Infrastructure as Code
Hatchet Managed Compute is defined directly in your workflow code, making it extremely easy to manage your compute resources.
## Deployment
- **GitOps Integration**: Automatic builds and deployments on commit
- **Zero-Ops**: Managed infrastructure eliminates operational overhead
- **Version Control**: Infrastructure changes tracked in code
## Advantages Over Serverless
1. No cold starts or execution timeouts
2. Predictable performance
3. Cost-effective for sustained workloads
4. Fine-grained control over compute resources
5. Better suited for AI and data processing tasks
## Getting Started
Reach out to support@hatchet.dev to get access to managed compute.
+7 -3
View File
@@ -22,26 +22,30 @@ To utilize workers effectively, you need to register your workflows with the wor
<Tabs items={['Python', 'Typescript', 'Go']}>
<Tabs.Tab>
```python
workflow = MyWorkflow()
worker = hatchet.worker('test-worker', max_runs=4)
worker.register_workflow(workflow)
worker.start()
```
</Tabs.Tab>
<Tabs.Tab>
```typescript
async function main() {
const worker = await hatchet.worker('example-worker');
const worker = await hatchet.worker("example-worker");
await worker.registerWorkflow(workflow);
worker.start();
}
main();
```
````
</Tabs.Tab>
<Tabs.Tab>
```go
client, err := client.New(
client.InitWorkflows(),
@@ -69,7 +73,7 @@ interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan
defer cancel()
go worker.Start()
````
```
</Tabs.Tab>
</Tabs>
@@ -24,6 +24,7 @@ These relationships are defined in code by specifying required parents in the `p
<Tabs items={['Python', 'Typescript']}>
<Tabs.Tab>
```python
@hatchet.workflow(on_events=["question:create"])
class BasicRagWorkflow:
@@ -60,9 +61,11 @@ class BasicRagWorkflow:
"message": message,
}
````
```
</Tabs.Tab>
<Tabs.Tab>
```typescript
const workflow: Workflow = {
id: 'basic-rag-workflow',
@@ -117,7 +120,7 @@ const workflow: Workflow = {
],
};
````
```
</Tabs.Tab>
@@ -59,6 +59,7 @@ async function listen_for_files() {
```
</Tabs.Tab>
```go copy
workflowRunId, err := c.Admin().RunWorkflow("stream-event-workflow", &streamEventInput{
Index: 0,
@@ -78,7 +79,8 @@ if err != nil {
panic(err)
}
````
```
</Tabs>
## Streaming from a Step Context
@@ -95,7 +97,7 @@ def step1(self, context: Context):
context.put_stream('hello from step1')
# continue with the step run...
return {"step1": "results"}
````
```
</Tabs.Tab>
<Tabs.Tab>
@@ -117,6 +117,7 @@ Here is an example of how to push a single event:
<Tabs items={['Python', 'Typescript', 'Golang']}>
<Tabs.Tab>
```python
from hatchet_sdk import Hatchet
@@ -129,9 +130,11 @@ hatchet.client.event.push(
}
)
````
```
</Tabs.Tab>
<Tabs.Tab>
```typescript
import Hatchet from "@hatchet-dev/typescript-sdk";
@@ -140,10 +143,11 @@ const hatchet = Hatchet.init();
hatchet.event.push("user:create", {
test: "test",
});
````
```
</Tabs.Tab>
<Tabs.Tab>
```go
c, err := client.New(
client.WithHostPort("127.0.0.1", 7077),
@@ -161,14 +165,16 @@ Name: "testing",
},
)
````
```
</Tabs.Tab>
</Tabs>
Here is an example of how to push multiple events at once:
Here is an example of how to push multiple events at once:
<Tabs items={['Python', 'Typescript', 'Golang']}>
<Tabs.Tab>
```python
from hatchet_sdk import Hatchet
@@ -197,35 +203,37 @@ result =
hatchet.client.event.bulk_push(
events
)
````
```
</Tabs.Tab>
<Tabs.Tab>
```typescript
import Hatchet from "@hatchet-dev/typescript-sdk";
const hatchet = Hatchet.init();
const events = [
{
payload: { test: 'test1' },
additionalMetadata: { user_id: 'user1', source: 'test' },
},
{
payload: { test: 'test2' },
additionalMetadata: { user_id: 'user2', source: 'test' },
},
{
payload: { test: 'test3' },
additionalMetadata: { user_id: 'user3', source: 'test' },
},
{
payload: { test: "test1" },
additionalMetadata: { user_id: "user1", source: "test" },
},
{
payload: { test: "test2" },
additionalMetadata: { user_id: "user2", source: "test" },
},
{
payload: { test: "test3" },
additionalMetadata: { user_id: "user3", source: "test" },
},
];
hatchet.event.bulkPush('user:create:bulk', events);
hatchet.event.bulkPush("user:create:bulk", events);
```
````
</Tabs.Tab>
<Tabs.Tab>
```go
c, err := client.New(
client.WithHostPort("127.0.0.1", 7077),
@@ -256,7 +264,7 @@ c.Event().BulkPush(
context.Background(),
events,
)
````
```
</Tabs.Tab>
</Tabs>
@@ -280,5 +288,3 @@ When working with event-driven workflows, consider the following best practices:
## Events Dashboard
Hatchet provides a visual dashboard for monitoring and managing events. You can view incoming events, inspect event payloads, and configure event triggers directly from the dashboard. This makes it easy to monitor the flow of events and manage your event-driven workflows.
{/* TODO Graphic */}
@@ -32,6 +32,7 @@ There are two strategies for setting sticky assignment:
<Tabs items={['Python', 'Typescript', 'Golang']}>
<Tabs.Tab>
```python
@hatchet.workflow(
on_events=["user:create"],
@@ -50,9 +51,11 @@ class StickyWorkflow:
def step2(self, context: Context):
return {"worker": context.worker.id()}
````
```
</Tabs.Tab>
<Tabs.Tab>
```typescript
const myWorkflow: Workflow = {
id: "my-workflow",
@@ -65,10 +68,11 @@ const myWorkflow: Workflow = {
// Define your workflow steps here
],
};
````
```
</Tabs.Tab>
<Tabs.Tab>
```go
err = w.RegisterWorkflow(
&worker.WorkflowJob{
@@ -96,6 +100,7 @@ const myWorkflow: Workflow = {
},
)
```
</Tabs.Tab>
</Tabs>
@@ -114,6 +119,7 @@ If either condition is not met, an error will be thrown when the child workflow
<Tabs items={['Python', 'Typescript', 'Golang']}>
<Tabs.Tab>
```python
@hatchet.workflow(
on_events=["sticky:parent"],
@@ -124,7 +130,8 @@ class StickyWorkflow:
ref = context.spawn_workflow('StickyChildWorkflow', {}, options={"sticky": True})
await ref.result()
return {"worker": context.worker.id()}
````
```
```python
@hatchet.workflow(
on_events=["sticky:child"],
@@ -135,37 +142,42 @@ class StickyChildWorkflow:
def child(self, context: Context):
return {"worker": context.worker.id()}
````
```
</Tabs.Tab>
<Tabs.Tab>
```typescript
const parentWorkflow: Workflow = {
id: 'parent-sticky-workflow',
description: 'test',
id: "parent-sticky-workflow",
description: "test",
steps: [
{
name: 'step1',
name: "step1",
run: async (ctx) => {
const results: Promise<any>[] = [];
for (let i = 0; i < 50; i++) {
const result = await ctx.spawnWorkflow(childWorkflow, {}, { sticky: true });
const result = await ctx.spawnWorkflow(
childWorkflow,
{},
{ sticky: true },
);
results.push(result.result());
}
console.log('Results:', await Promise.all(results));
console.log("Results:", await Promise.all(results));
return { step1: 'step1 results!' };
return { step1: "step1 results!" };
},
},
],
],
};
````
```
</Tabs.Tab>
<Tabs.Tab>
```go
err = w.RegisterWorkflow(
&worker.WorkflowJob{
@@ -194,6 +206,6 @@ const parentWorkflow: Workflow = {
)
```
</Tabs.Tab>
</Tabs>
```
@@ -159,6 +159,7 @@ Labels can also be set dynamically on workers using the `upsertLabels` method. T
<Tabs items={['Python', 'Typescript', 'Golang']}>
<Tabs.Tab>
```python
@hatchet.step(
desired_worker_labels={
@@ -183,39 +184,42 @@ Labels can also be set dynamically on workers using the `upsertLabels` method. T
return {"worker": context.worker.id()}
````
</Tabs.Tab>
<Tabs.Tab>
```typescript
const affinity: Workflow = {
id: 'dynamic-affinity-workflow',
description: 'test',
steps: [
{
name: 'child-step1',
worker_labels: {
model: {
value: 'fancy-vision-model',
required: false,
},
},
run: async (ctx) => {
if (ctx.worker.labels().model !== 'fancy-vision-model') {
await ctx.worker.upsertLabels({ model: undefined });
await evictModel();
await loadNewModel("fancy-vision-model");
await ctx.worker.upsertLabels({ model: 'fancy-vision-model' });
}
// DO WORK
return { childStep1: 'childStep1 results!' };
},
},
],
};
````
```
</Tabs.Tab>
<Tabs.Tab>
```typescript
const affinity: Workflow = {
id: "dynamic-affinity-workflow",
description: "test",
steps: [
{
name: "child-step1",
worker_labels: {
model: {
value: "fancy-vision-model",
required: false,
},
},
run: async (ctx) => {
if (ctx.worker.labels().model !== "fancy-vision-model") {
await ctx.worker.upsertLabels({ model: undefined });
await evictModel();
await loadNewModel("fancy-vision-model");
await ctx.worker.upsertLabels({ model: "fancy-vision-model" });
}
// DO WORK
return { childStep1: "childStep1 results!" };
},
},
],
};
```
</Tabs.Tab>
<Tabs.Tab>
```go
err = w.RegisterWorkflow(
&worker.WorkflowJob{
@@ -260,6 +264,6 @@ Labels can also be set dynamically on workers using the `upsertLabels` method. T
)
```
</Tabs.Tab>
</Tabs>
```
@@ -25,6 +25,13 @@
"run-workflow-events": "Event-Triggered Workflows",
"run-workflow-cron": "Cron Workflows",
"run-workflow-schedule": "Scheduled Workflows",
"--- Deploying Workers": {
"type": "separator",
"title": "Deploying Workers"
},
"docker": "Docker",
"managed-compute": "Managed Compute",
"self-hosted": "Self-Hosted",
"--- Getting Workflow Results": {
"type": "separator",
"title": "Getting Workflow Results"
@@ -0,0 +1,101 @@
import { Tabs, Callout } from "nextra/components";
# Dockerizing Hatchet Applications
This guide explains how to create Dockerfiles for Hatchet applications, focusing on both Poetry and PIP implementations. Hatchet workers need proper containerization to ensure reliable execution of workflows in production environments.
## Entry Point Configuration for Hatchet
Before creating your Dockerfile, understand that Hatchet workers require specific entry point configuration:
1. The entry point must run code that runs the Hatchet worker. This can be done by calling the `worker.start()` method. [See the Python SDK docs for more information](./worker.mdx#starting-a-worker)
2. Proper environment variables must be set for Hatchet SDK
3. The worker should be configured to handle your workflows using the `worker.register` method
## Example Docker Files
<Tabs items={['Poetry', 'PIP']}>
<Tabs.Tab>
```dockerfile
# Use the official Python image as the base
FROM python:3.10-slim
# Set environment variables
ENV PYTHONUNBUFFERED=1 \
POETRY_VERSION=1.4.2 \
HATCHET_ENV=production
# Install system dependencies and Poetry
RUN apt-get update && \
apt-get install -y curl && \
curl -sSL https://install.python-poetry.org | python3 - && \
ln -s /root/.local/bin/poetry /usr/local/bin/poetry && \
apt-get clean && \
rm -rf /var/lib/apt/lists/\*
# Set work directory
WORKDIR /app
# Copy dependency files first
COPY pyproject.toml poetry.lock\* /app/
# Install dependencies
RUN poetry config virtualenvs.create false && \
poetry install --no-interaction --no-ansi
# Copy Hatchet application code
COPY . /app
# Set the entrypoint to run the Hatchet worker
CMD ["poetry", "run", "python", "worker.py"]
```
<Callout type="info">
If you're using a poetry script to run your worker, you can replace `poetry run python worker.py` with `poetry run <script-name>` in the CMD.
</Callout>
</Tabs.Tab>
<Tabs.Tab>
```dockerfile
# Use the official Python image as base
FROM python:3.10-slim
# Set environment variables
ENV PYTHONUNBUFFERED=1 \
HATCHET_ENV=production
# Set work directory
WORKDIR /app
# Copy dependency files first
COPY requirements.txt .
# Install dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy Hatchet application code
COPY . /app
# Set the entrypoint to run the Hatchet worker
CMD ["python", "worker.py"]
```
</Tabs.Tab>
</Tabs>
```
@@ -0,0 +1,127 @@
import { Tabs, Callout } from "nextra/components";
# Managed Compute in Python
<Callout type="info">
This feature is currently in beta and may be subject to change.
</Callout>
## Overview
Hatchet Managed Compute lets you define and manage compute resources directly in your Python code. This guide shows you how to configure compute instances, create workflows, and manage workers using the Hatchet Python SDK.
This guide assumes you are already familiar with the basics of Hatchet and have a local workflow running using Docker. If you are not in this state, please see the [Getting Started Guide](../../getting-started.mdx) and [Docker Guide](./docker.mdx).
## Basic Configuration
### Compute Configuration
The `Compute` class allows you to define resource requirements for your workload directly in your Python code. You can define multiple compute configurations to use in your workflows on a step-by-step basis. This allows you to easily optimize your compute resources for different parts of your workflow.
```python
from hatchet_sdk.compute import Compute
# Define a default compute configuration
default_compute = Compute(
cpu_kind="shared", # CPU type: "shared" or "performance"
cpus=2, # Number of CPU cores
memory_mb=1024, # Memory allocation in MB
num_replicas=2, # Number of instances
regions=["ewr"] # Deployment regions
)
# Define a basic compute configuration
basic = Compute(
cpu_kind="shared",
cpus=1,
memory_mb=1024,
num_replicas=1,
regions=["ewr"]
)
```
For a full list of configuration options, see the [Compute API documentation](../../compute/cpu.mdx#basic-configuration).
## GPU Support
<Callout type="warning">
GPU compute has limited region support and constraints. [See the GPU docs for
more information](../../compute/gpu.mdx).
</Callout>
Hatchet Managed Compute supports GPU instances. You can define GPU compute configurations in the same way as CPU configurations, but with the addition of the `gpu` parameter.
```python
gpu_compute = Compute(
cpu_kind="shared",
gpu_kind="a100",
gpus=1,
cpus=1,
memory_mb=1024,
num_replicas=1,
regions=["ewr"],
)
```
For a full list of configuration options, see the [Compute API documentation](../../compute/infra-as-code.mdx#gpu).
## Defining Compute Requirements in Workflows
### Workflow Definition
Use compute configuration can then be passed to the `step` decorator to define compute requirements for each step in your workflow.
```python
from hatchet_sdk import Context, Hatchet
hatchet = Hatchet()
@hatchet.workflow(on_events=["user:create"])
class ManagedWorkflow:
@hatchet.step(
timeout="11s",
retries=3,
compute=default_compute
)
def step1(self, context: Context):
print("executed step1")
time.sleep(10)
return {
"step1": "step1",
}
@hatchet.step(
timeout="11s",
retries=3,
compute=basic
)
def step2(self, context: Context):
print("executed step2")
time.sleep(10)
return {
"step2": "step2",
}
```
## Worker Management
### Setting Up Workers
Configure and start workers to execute your workflows as you would normally do with any other Hatchet worker:
```python
def main():
# Create workflow instance
workflow = ManagedWorkflow()
# Initialize worker with max runs limit
worker = hatchet.worker("test-worker", max_runs=1)
# Register workflow with worker
worker.register_workflow(workflow)
# Start the worker
worker.start()
```
A complete example of a workflow that uses managed compute can be found [here](https://github.com/hatchet-dev/managed-compute-examples/tree/main/python).
@@ -0,0 +1,5 @@
# Self-Hosted Hatchet
Hatchet can be self-hosted by running the Hatchet worker in your own environment.
Simply run the Hatchet worker in your own environment.
@@ -25,6 +25,13 @@
"run-workflow-events": "Event-Triggered Workflows",
"run-workflow-cron": "Cron Workflows",
"run-workflow-schedule": "Scheduled Workflows",
"--- Deploying Workers": {
"type": "separator",
"title": "Deploying Workers"
},
"docker": "Docker",
"managed-compute": "Managed Compute",
"self-hosted": "Self-Hosted",
"--- Getting Workflow Results": {
"type": "separator",
"title": "Getting Workflow Results"
@@ -0,0 +1,252 @@
import { Tabs, Callout } from "nextra/components";
# Dockerizing Hatchet TypeScript Applications
This guide explains how to create Dockerfiles for Hatchet TypeScript applications using different package managers.
## Entry Point Configuration for Hatchet
Before creating your Dockerfile, ensure your TypeScript worker:
1. Implements the Hatchet worker startup using `worker.start()`
2. Has proper environment variables configured
3. Registers workflows using the `worker.register` method
## Example Docker Files
<Tabs items={['npm', 'pnpm', 'yarn']}>
<Tabs.Tab>
```dockerfile
# Stage 1: Build
FROM node:18 AS builder
WORKDIR /app
# Copy package files
COPY package\*.json ./
# Install dependencies
RUN npm ci
# Copy source code
COPY . .
# Build TypeScript
RUN npm run build
# Stage 2: Production
FROM node:18-alpine
WORKDIR /app
# Copy package files
COPY package\*.json ./
# Install production dependencies only
RUN npm ci --omit=dev
# Copy built assets from builder
COPY --from=builder /app/dist ./dist
# Set production environment
ENV NODE_ENV=production
# Start the worker
CMD ["node", "dist/worker.js"]
```
<Callout type="info">
Use `npm ci` instead of `npm install` for more reliable builds. It's faster and ensures consistent installs across environments.
</Callout>
</Tabs.Tab>
<Tabs.Tab>
```dockerfile
# Stage 1: Build
FROM node:18 AS builder
WORKDIR /app
# Install pnpm
RUN npm install -g pnpm
# Copy package files
COPY pnpm-lock.yaml package.json ./
# Install dependencies
RUN pnpm install --frozen-lockfile
# Copy source code
COPY . .
# Build TypeScript
RUN pnpm build
# Stage 2: Production
FROM node:18-alpine
WORKDIR /app
# Install pnpm
RUN npm install -g pnpm
# Copy package files
COPY pnpm-lock.yaml package.json ./
# Install production dependencies only
RUN pnpm install --frozen-lockfile --prod
# Copy built assets from builder
COPY --from=builder /app/dist ./dist
# Set production environment
ENV NODE_ENV=production
# Start the worker
CMD ["node", "dist/worker.js"]
```
<Callout type="info">
PNPM's `--frozen-lockfile` flag ensures consistent installs and fails if an update is needed.
</Callout>
</Tabs.Tab>
<Tabs.Tab>
```dockerfile
# Stage 1: Build
FROM node:18 AS builder
WORKDIR /app
# Copy package files
COPY package.json yarn.lock ./
# Install dependencies
RUN yarn install --frozen-lockfile
# Copy source code
COPY . .
# Build TypeScript
RUN yarn build
# Stage 2: Production
FROM node:18-alpine
WORKDIR /app
# Copy package files
COPY package.json yarn.lock ./
# Install production dependencies only
RUN yarn install --frozen-lockfile --production
# Copy built assets from builder
COPY --from=builder /app/dist ./dist
# Set production environment
ENV NODE_ENV=production
# Start the worker
CMD ["node", "dist/worker.js"]
```
<Callout type="info">
Yarn's `--frozen-lockfile` ensures your dependencies match the lock file exactly.
</Callout>
</Tabs.Tab>
</Tabs>
## Best Practices
### 1. Multi-stage Builds
Using multi-stage builds helps create smaller production images by excluding build tools and development dependencies.
### 2. Dependency Caching
```dockerfile
# Copy only package files first
COPY package*.json ./
# or
COPY pnpm-lock.yaml package.json ./
# or
COPY package.json yarn.lock ./
# Install dependencies
RUN <package-manager> install
```
### 3. Production Optimizations
```dockerfile
# Set production environment
ENV NODE_ENV=production
# Install only production dependencies
RUN npm ci --omit=dev
# or
RUN pnpm install --frozen-lockfile --prod
# or
RUN yarn install --frozen-lockfile --production
```
### 4. TypeScript Configuration
Ensure your `tsconfig.json` is properly configured:
```json
{
"compilerOptions": {
"outDir": "./dist",
"rootDir": "./src",
"module": "commonjs",
"target": "es2020",
"esModuleInterop": true,
"strict": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist"]
}
```
### 5. `.dockerignore` File
Remember to create a `.dockerignore` file to exclude unnecessary files:
```
node_modules
npm-debug.log
yarn-debug.log
yarn-error.log
.git
.gitignore
.env
dist
coverage
```
@@ -0,0 +1,204 @@
import { Callout } from "nextra/components";
# Managed Compute in TypeScript
<Callout type="info">
This feature is currently in beta and may be subject to change.
</Callout>
## Overview
Hatchet Managed Compute lets you define and manage compute resources directly in your TypeScript code. This guide shows you how to configure compute instances, create workflows, and manage workers using the Hatchet TypeScript SDK.
This guide assumes you are already familiar with the basics of Hatchet and have a local workflow running using Docker. If you are not in this state, please see the [Getting Started Guide](../../getting-started.mdx) and [Docker Guide](./docker.mdx).
## Basic Configuration
### Compute Configuration
Hatchet provides TypeScript interfaces for defining compute requirements. You can create multiple compute configurations to use in your workflows on a step-by-step basis, allowing you to optimize resources for different parts of your workflow.
```typescript
import { SharedCPUCompute } from "@hatchet-dev/typescript-sdk/clients/worker/compute/compute-config";
import { ManagedWorkerRegion } from "@hatchet-dev/typescript-sdk/clients/rest/generated/cloud/data-contracts";
// Define a basic compute configuration
const basicCompute: SharedCPUCompute = {
cpuKind: "shared",
memoryMb: 1024,
numReplicas: 1,
cpus: 1,
regions: [ManagedWorkerRegion.Ewr],
};
// Define a more powerful compute configuration
const performanceCompute: PerformanceCPUCompute = {
cpuKind: "performance",
memoryMb: 2048,
numReplicas: 2,
cpus: 2,
regions: [ManagedWorkerRegion.Ewr],
};
```
For a full list of configuration options, see the [Compute API documentation](../../compute/cpu.mdx#basic-configuration).
## GPU Support
<Callout type="warning">
GPU compute has limited region support and constraints. [See the GPU docs for
more information](../../compute/gpu.mdx).
</Callout>
Hatchet Managed Compute supports GPU instances. You can define GPU compute configurations using the `GPUCompute` interface:
```typescript
import { GPUCompute } from "@hatchet-dev/typescript-sdk/clients/worker/compute/compute-config";
const gpuCompute: GPUCompute = {
cpuKind: "shared",
gpuKind: "l40s",
memoryMb: 1024,
numReplicas: 1,
cpus: 2,
gpus: 1,
regions: [ManagedWorkerRegion.Ewr],
};
```
For a full list of GPU configuration options, see the [Compute API documentation](../../compute/infra-as-code.mdx#gpu).
## Defining Workflows with Compute Requirements
The compute configuration can be specified for each step in your workflow:
```typescript
import Hatchet, { Workflow } from "@hatchet-dev/typescript-sdk";
const hatchet = Hatchet.init();
const workflow: Workflow = {
id: "managed-workflow",
description: "Workflow with managed compute",
on: {
event: "user:create",
},
steps: [
{
name: "step1",
compute: basicCompute,
run: async (ctx) => {
console.log("Executing step1 with basic compute");
return { result: "Step 1 complete" };
},
},
{
name: "step2",
parents: ["step1"],
compute: gpuCompute,
run: async (ctx) => {
const step1Result = ctx.stepOutput("step1");
console.log("Executing step2 with GPU compute after", step1Result);
return { result: "Step 2 complete" };
},
},
],
};
```
## Worker Management
### Setting Up Workers
Configure and start workers to execute your workflows:
```typescript
async function main() {
// Initialize worker
const worker = await hatchet.worker("managed-worker");
// Register workflow
await worker.registerWorkflow(workflow);
// Start the worker
worker.start();
}
main();
```
## Complete Example
Here's a complete example of a workflow using different compute configurations:
```typescript
import Hatchet, { Workflow } from "@hatchet-dev/typescript-sdk";
import { ManagedWorkerRegion } from "@hatchet-dev/typescript-sdk/clients/rest/generated/cloud/data-contracts";
import {
GPUCompute,
SharedCPUCompute,
} from "@hatchet-dev/typescript-sdk/clients/worker/compute/compute-config";
const hatchet = Hatchet.init();
// Define compute configurations
const cpuCompute: SharedCPUCompute = {
cpuKind: "shared",
memoryMb: 1024,
numReplicas: 1,
cpus: 1,
regions: [ManagedWorkerRegion.Ewr],
};
const gpuCompute: GPUCompute = {
cpuKind: "shared",
gpuKind: "l40s",
memoryMb: 1024,
numReplicas: 1,
cpus: 2,
gpus: 1,
regions: [ManagedWorkerRegion.Ewr],
};
// Define workflow
const workflow: Workflow = {
id: "simple-workflow",
description: "Mixed compute workflow example",
on: {
event: "user:create",
},
steps: [
{
name: "step1",
compute: cpuCompute,
run: async (ctx) => {
console.log("executed step1!");
return { step1: "step1 results!" };
},
},
{
name: "step2",
parents: ["step1"],
compute: gpuCompute,
run: (ctx) => {
console.log(
"executed step2 after step1 returned ",
ctx.stepOutput("step1"),
);
return { step2: "step2 results!" };
},
},
],
};
// Start worker
async function main() {
const worker = await hatchet.worker("managed-worker");
await worker.registerWorkflow(workflow);
worker.start();
}
main();
```
A complete example of a workflow that uses managed compute can be found [here](https://github.com/hatchet-dev/managed-compute-examples/tree/main/typescript).
@@ -0,0 +1,5 @@
# Self-Hosted Hatchet
Hatchet can be self-hosted by running the Hatchet worker in your own environment.
Simply run the Hatchet worker in your own environment.
+58 -19
View File
@@ -16,6 +16,9 @@ import (
"github.com/hatchet-dev/hatchet/pkg/client/loader"
"github.com/hatchet-dev/hatchet/pkg/client/rest"
cloudrest "github.com/hatchet-dev/hatchet/pkg/client/cloud/rest"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/config/client"
"github.com/hatchet-dev/hatchet/pkg/logger"
@@ -28,8 +31,11 @@ type Client interface {
Event() EventClient
Subscribe() SubscribeClient
API() *rest.ClientWithResponses
CloudAPI() *cloudrest.ClientWithResponses
TenantId() string
Namespace() string
CloudRegisterID() *string
RunnableActions() []string
}
type clientImpl struct {
@@ -40,12 +46,16 @@ type clientImpl struct {
event EventClient
subscribe SubscribeClient
rest *rest.ClientWithResponses
cloudrest *cloudrest.ClientWithResponses
// the tenant id
tenantId string
namespace string
cloudRegisterID *string
runnableActions []string
l *zerolog.Logger
v validator.Validator
@@ -65,6 +75,9 @@ type ClientOpts struct {
token string
namespace string
cloudRegisterID *string
runnableActions []string
filesLoader filesLoaderFunc
initWorkflows bool
}
@@ -98,15 +111,17 @@ func defaultClientOpts(token *string, cf *client.ClientConfigFile) *ClientOpts {
logger := logger.NewDefaultLogger("client")
return &ClientOpts{
tenantId: clientConfig.TenantId,
token: clientConfig.Token,
l: &logger,
v: validator.NewDefaultValidator(),
tls: clientConfig.TLSConfig,
hostPort: clientConfig.GRPCBroadcastAddress,
serverURL: clientConfig.ServerURL,
filesLoader: types.DefaultLoader,
namespace: clientConfig.Namespace,
tenantId: clientConfig.TenantId,
token: clientConfig.Token,
l: &logger,
v: validator.NewDefaultValidator(),
tls: clientConfig.TLSConfig,
hostPort: clientConfig.GRPCBroadcastAddress,
serverURL: clientConfig.ServerURL,
filesLoader: types.DefaultLoader,
namespace: clientConfig.Namespace,
cloudRegisterID: clientConfig.CloudRegisterID,
runnableActions: clientConfig.RunnableActions,
}
}
@@ -265,6 +280,15 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
return nil, fmt.Errorf("could not create rest client: %w", err)
}
cloudrest, err := cloudrest.NewClientWithResponses(opts.serverURL, cloudrest.WithRequestEditorFn(func(ctx context.Context, req *http.Request) error {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", opts.token))
return nil
}))
if err != nil {
return nil, fmt.Errorf("could not create cloud REST client: %w", err)
}
// if init workflows is set, then we need to initialize the workflows
if opts.initWorkflows {
if err := initWorkflows(opts.filesLoader, admin); err != nil {
@@ -273,16 +297,19 @@ func newFromOpts(opts *ClientOpts) (Client, error) {
}
return &clientImpl{
conn: conn,
tenantId: opts.tenantId,
l: opts.l,
admin: admin,
dispatcher: dispatcher,
subscribe: subscribe,
event: event,
v: opts.v,
rest: rest,
namespace: opts.namespace,
conn: conn,
tenantId: opts.tenantId,
l: opts.l,
admin: admin,
dispatcher: dispatcher,
subscribe: subscribe,
event: event,
v: opts.v,
rest: rest,
cloudrest: cloudrest,
namespace: opts.namespace,
cloudRegisterID: opts.cloudRegisterID,
runnableActions: opts.runnableActions,
}, nil
}
@@ -306,6 +333,10 @@ func (c *clientImpl) API() *rest.ClientWithResponses {
return c.rest
}
func (c *clientImpl) CloudAPI() *cloudrest.ClientWithResponses {
return c.cloudrest
}
func (c *clientImpl) TenantId() string {
return c.tenantId
}
@@ -314,6 +345,14 @@ func (c *clientImpl) Namespace() string {
return c.namespace
}
func (c *clientImpl) CloudRegisterID() *string {
return c.cloudRegisterID
}
func (c *clientImpl) RunnableActions() []string {
return c.runnableActions
}
func initWorkflows(fl filesLoaderFunc, adminClient AdminClient) error {
files := fl()
File diff suppressed because it is too large Load Diff
+44
View File
@@ -0,0 +1,44 @@
package compute
import (
"crypto/sha256"
"encoding/json"
"fmt"
"github.com/hatchet-dev/hatchet/pkg/client/cloud/rest"
)
// Region represents a managed worker region
type Region = rest.ManagedWorkerRegion
type GPUKind = rest.CreateManagedWorkerRuntimeConfigRequestGpuKind
// Compute is the base struct for different compute configurations.
type Compute struct {
Pool *string `json:"pool,omitempty" validate:"omitempty"`
NumReplicas int `json:"numReplicas" validate:"min=0,max=1000"`
Regions []Region `json:"regions,omitempty" validate:"omitempty"`
CPUs int `json:"cpus" validate:"min=1,max=64"`
CPUKind CPUKind `json:"computeKind" validate:"required,oneof=shared performance"`
MemoryMB int `json:"memoryMb" validate:"min=256,max=65536"`
// GPU-specific fields
GPUKind *GPUKind `json:"gpuKind,omitempty" validate:"omitempty"`
GPUs *int `json:"gpus,omitempty" validate:"omitempty,min=1,max=8"`
}
// CPUKind represents the type of compute.
type CPUKind string
const (
ComputeKindSharedCPU CPUKind = "shared"
ComputeKindPerformanceCPU CPUKind = "performance"
)
// ComputeHash generates a SHA256 hash of the Compute configuration.
func (c *Compute) ComputeHash() (string, error) {
str, err := json.Marshal(c)
if err != nil {
return "", err
}
hash := sha256.Sum256(str)
return fmt.Sprintf("%x", hash), nil
}
+1 -1
View File
@@ -365,7 +365,7 @@ func (a *actionListenerImpl) Actions(ctx context.Context) (<-chan *Action, error
continue
}
a.l.Debug().Msgf("Received action type: %s", actionType)
a.l.Debug().Msgf("Received action type: %s for action: %s", actionType, assignedAction.ActionId)
unquoted := assignedAction.ActionPayload
+11
View File
@@ -110,6 +110,15 @@ func GetClientConfigFromConfigFile(cf *client.ClientConfigFile) (res *client.Cli
namespace = strings.ToLower(namespace + "_")
}
var rawRunnableActions []string
if cf.RawRunnableActions != nil {
rawRunnableActions = []string{}
for _, action := range cf.RawRunnableActions {
rawRunnableActions = append(rawRunnableActions, namespace+strings.TrimSpace(action))
}
}
return &client.ClientConfig{
TenantId: cf.TenantId,
TLSConfig: tlsConf,
@@ -117,6 +126,8 @@ func GetClientConfigFromConfigFile(cf *client.ClientConfigFile) (res *client.Cli
ServerURL: serverURL,
GRPCBroadcastAddress: grpcBroadcastAddress,
Namespace: namespace,
CloudRegisterID: cf.CloudRegisterID,
RunnableActions: rawRunnableActions,
}, nil
}
+9
View File
@@ -18,6 +18,9 @@ type ClientConfigFile struct {
TLS ClientTLSConfigFile `mapstructure:"tls" json:"tls,omitempty"`
Namespace string `mapstructure:"namespace" json:"namespace,omitempty"`
CloudRegisterID *string `mapstructure:"cloudRegisterID" json:"cloudRegisterID,omitempty"`
RawRunnableActions []string `mapstructure:"runnableActions" json:"runnableActions,omitempty"`
}
type ClientTLSConfigFile struct {
@@ -37,6 +40,9 @@ type ClientConfig struct {
TLSConfig *tls.Config
Namespace string
CloudRegisterID *string
RunnableActions []string
}
func BindAllEnv(v *viper.Viper) {
@@ -45,6 +51,9 @@ func BindAllEnv(v *viper.Viper) {
_ = v.BindEnv("hostPort", "HATCHET_CLIENT_HOST_PORT")
_ = v.BindEnv("namespace", "HATCHET_CLIENT_NAMESPACE")
_ = v.BindEnv("cloudRegisterID", "HATCHET_CLOUD_REGISTER_ID")
_ = v.BindEnv("runnableActions", "HATCHET_CLOUD_ACTIONS")
// tls options
_ = v.BindEnv("tls.base.tlsStrategy", "HATCHET_CLIENT_TLS_STRATEGY")
_ = v.BindEnv("tls.base.tlsCertFile", "HATCHET_CLIENT_TLS_CERT_FILE")
+134
View File
@@ -0,0 +1,134 @@
package worker
import (
"context"
"os"
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/cloud/rest"
"github.com/hatchet-dev/hatchet/pkg/logger"
)
type ManagedCompute struct {
ActionRegistry *ActionRegistry
Client client.Client
MaxRuns int
RuntimeConfigs []rest.CreateManagedWorkerRuntimeConfigRequest
CloudRegisterID *string
Logger *zerolog.Logger
}
func NewManagedCompute(actionRegistry *ActionRegistry, client client.Client, maxRuns int) *ManagedCompute {
if maxRuns == 0 {
maxRuns = 1
}
runtimeConfigs := getComputeConfigs(actionRegistry, maxRuns)
cloudRegisterID := client.CloudRegisterID()
logger := logger.NewDefaultLogger("managed-compute")
mc := &ManagedCompute{
ActionRegistry: actionRegistry,
Client: client,
MaxRuns: maxRuns,
RuntimeConfigs: runtimeConfigs,
CloudRegisterID: cloudRegisterID,
Logger: &logger,
}
if len(mc.RuntimeConfigs) == 0 {
mc.Logger.Debug().Msg("No compute configs found, skipping cloud registration and running all actions locally.")
return mc
}
if mc.CloudRegisterID == nil {
mc.Logger.Warn().Msg("Managed cloud compute plan:")
for _, runtimeConfig := range mc.RuntimeConfigs {
mc.Logger.Warn().Msg(" ----------------------------")
mc.Logger.Warn().Msgf(" actions: %v", *runtimeConfig.Actions)
mc.Logger.Warn().Msgf(" num replicas: %d", runtimeConfig.NumReplicas)
mc.Logger.Warn().Msgf(" cpu kind: %s", runtimeConfig.CpuKind)
mc.Logger.Warn().Msgf(" cpus: %d", runtimeConfig.Cpus)
mc.Logger.Warn().Msgf(" memory mb: %d", runtimeConfig.MemoryMb)
mc.Logger.Warn().Msgf(" regions: %v", runtimeConfig.Regions)
}
mc.Logger.Warn().Msg("NOTICE: local mode detected, skipping cloud registration and running all actions locally.")
return mc
}
// Register the cloud compute plan
mc.CloudRegister(context.Background())
return mc
}
func getComputeConfigs(actions *ActionRegistry, maxRuns int) []rest.CreateManagedWorkerRuntimeConfigRequest {
computeMap := make(map[string]rest.CreateManagedWorkerRuntimeConfigRequest)
for action, details := range *actions {
compute := details.Compute()
if compute == nil {
continue
}
key, err := compute.ComputeHash()
if err != nil {
panic(err)
}
if _, exists := computeMap[key]; !exists {
computeMap[key] = rest.CreateManagedWorkerRuntimeConfigRequest{
Actions: &[]string{},
NumReplicas: compute.NumReplicas,
CpuKind: string(compute.CPUKind),
Cpus: compute.CPUs,
MemoryMb: compute.MemoryMB,
Regions: &compute.Regions,
Slots: &maxRuns,
Gpus: compute.GPUs,
GpuKind: compute.GPUKind,
}
}
*computeMap[key].Actions = append(*computeMap[key].Actions, action)
}
var configs []rest.CreateManagedWorkerRuntimeConfigRequest
for _, config := range computeMap {
configs = append(configs, config)
}
return configs
}
func (mc *ManagedCompute) CloudRegister(ctx context.Context) {
if mc.CloudRegisterID != nil {
mc.Logger.Info().Msg("Registering cloud compute plan with ID: " + *mc.CloudRegisterID)
if len(mc.RuntimeConfigs) == 0 {
mc.Logger.Warn().Msg("No actions to register, skipping cloud registration.")
os.Exit(0)
}
req := rest.InfraAsCodeRequest{
RuntimeConfigs: mc.RuntimeConfigs,
}
_, err := mc.Client.CloudAPI().InfraAsCodeCreateWithResponse(ctx, uuid.MustParse(*mc.CloudRegisterID), req)
if err != nil {
mc.Logger.Error().Err(err).Msg("Could not register cloud compute plan.")
os.Exit(1)
}
os.Exit(0)
}
}
+12 -4
View File
@@ -3,6 +3,7 @@ package worker
import (
"fmt"
"github.com/hatchet-dev/hatchet/pkg/client/compute"
"github.com/hatchet-dev/hatchet/pkg/client/types"
)
@@ -42,7 +43,7 @@ func (s *Service) On(t triggerConverter, workflow workflowConverter) error {
}
// register all steps as actions
for actionId, fn := range workflow.ToActionMap(s.Name) {
for actionId, action := range workflow.ToActionMap(s.Name) {
parsedAction, err := types.ParseActionID(actionId)
if err != nil {
@@ -56,7 +57,7 @@ func (s *Service) On(t triggerConverter, workflow workflowConverter) error {
}
}
err = s.worker.registerAction(parsedAction.Service, parsedAction.Verb, fn)
err = s.worker.registerAction(parsedAction.Service, parsedAction.Verb, action.fn, action.compute)
if err != nil {
return err
@@ -67,7 +68,8 @@ func (s *Service) On(t triggerConverter, workflow workflowConverter) error {
}
type registerActionOpts struct {
name string
name string
compute *compute.Compute
}
type RegisterActionOpt func(*registerActionOpts)
@@ -78,6 +80,12 @@ func WithActionName(name string) RegisterActionOpt {
}
}
func WithCompute(compute *compute.Compute) RegisterActionOpt {
return func(opts *registerActionOpts) {
opts.compute = compute
}
}
func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error {
fnOpts := &registerActionOpts{}
@@ -89,7 +97,7 @@ func (s *Service) RegisterAction(fn any, opts ...RegisterActionOpt) error {
fnOpts.name = getFnName(fn)
}
return s.worker.registerAction(s.Name, fnOpts.name, fn)
return s.worker.registerAction(s.Name, fnOpts.name, fn, fnOpts.compute)
}
func (s *Service) Call(verb string) *WorkflowStep {
+31 -5
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"reflect"
"slices"
"strings"
"sync"
"time"
@@ -12,6 +13,7 @@ import (
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/compute"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/integrations"
@@ -34,6 +36,8 @@ type Action interface {
// Service returns the service that the action belongs to
Service() string
Compute() *compute.Compute
}
type actionImpl struct {
@@ -42,6 +46,8 @@ type actionImpl struct {
runConcurrencyAction GetWorkflowConcurrencyGroupFn
method any
service string
compute *compute.Compute
}
func (j *actionImpl) Name() string {
@@ -64,12 +70,18 @@ func (j *actionImpl) Service() string {
return j.service
}
func (j *actionImpl) Compute() *compute.Compute {
return j.compute
}
type ActionRegistry map[string]Action
type Worker struct {
client client.Client
name string
actions map[string]Action
actions ActionRegistry
registered_workflows map[string]bool
@@ -198,7 +210,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
client: opts.client,
name: opts.name,
l: opts.l,
actions: map[string]Action{},
actions: ActionRegistry{},
alerter: opts.alerter,
middlewares: mws,
maxRuns: opts.maxRuns,
@@ -209,6 +221,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
mws.add(w.panicMiddleware)
// TODO: Remove integrations
// register all integrations
for _, integration := range opts.integrations {
actions := integration.Actions()
@@ -217,7 +230,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
for _, integrationAction := range actions {
action := fmt.Sprintf("%s:%s", integrationId, integrationAction)
err := w.registerAction(integrationId, action, integration.ActionHandler(integrationAction))
err := w.registerAction(integrationId, action, integration.ActionHandler(integrationAction), nil)
if err != nil {
return nil, fmt.Errorf("could not register integration action %s: %w", action, err)
@@ -290,10 +303,10 @@ func (w *Worker) RegisterAction(actionId string, method any) error {
return fmt.Errorf("could not parse action id: %w", err)
}
return w.registerAction(action.Service, action.Verb, method)
return w.registerAction(action.Service, action.Verb, method, nil)
}
func (w *Worker) registerAction(service, verb string, method any) error {
func (w *Worker) registerAction(service, verb string, method any, compute *compute.Compute) error {
actionId := fmt.Sprintf("%s:%s", service, verb)
// if the service is "concurrency", then this is a special action
@@ -303,6 +316,7 @@ func (w *Worker) registerAction(service, verb string, method any) error {
runConcurrencyAction: method.(GetWorkflowConcurrencyGroupFn),
method: method,
service: service,
compute: compute,
}
return nil
@@ -326,6 +340,7 @@ func (w *Worker) registerAction(service, verb string, method any) error {
run: actionFunc,
method: method,
service: service,
compute: compute,
}
return nil
@@ -338,9 +353,20 @@ func (w *Worker) Start() (func() error, error) {
actionNames := []string{}
for _, action := range w.actions {
if w.client.RunnableActions() != nil {
if !slices.Contains(w.client.RunnableActions(), action.Name()) {
continue
}
}
actionNames = append(actionNames, action.Name())
}
w.l.Debug().Msgf("worker %s is listening for actions: %v", w.name, actionNames)
_ = NewManagedCompute(&w.actions, w.client, 1)
listener, id, err := w.client.Dispatcher().GetActionListener(ctx, &client.GetActionListenerRequest{
WorkerName: w.name,
Actions: actionNames,
+32 -8
View File
@@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/hatchet-dev/hatchet/pkg/client/compute"
"github.com/hatchet-dev/hatchet/pkg/client/types"
)
@@ -125,7 +126,7 @@ func (e eventsArr) ToWorkflowTriggers(wt *types.WorkflowTriggers, namespace stri
type workflowConverter interface {
ToWorkflow(svcName string, namespace string) types.Workflow
ToActionMap(svcName string) map[string]any
ToActionMap(svcName string) ActionMap
ToWorkflowTrigger() triggerConverter
}
@@ -264,17 +265,30 @@ func (j *WorkflowJob) ToWorkflowTrigger() triggerConverter {
return j.On
}
func (j *WorkflowJob) ToActionMap(svcName string) map[string]any {
res := map[string]any{}
type ActionWithCompute struct {
fn any
compute *compute.Compute
}
type ActionMap map[string]ActionWithCompute
func (j *WorkflowJob) ToActionMap(svcName string) ActionMap {
res := ActionMap{}
for i, step := range j.Steps {
actionId := step.GetActionId(svcName, i)
res[actionId] = step.Function
res[actionId] = ActionWithCompute{
fn: step.Function,
compute: step.Compute,
}
}
if j.Concurrency != nil && j.Concurrency.fn != nil {
res["concurrency:"+getFnName(j.Concurrency.fn)] = j.Concurrency.fn
res["concurrency:"+getFnName(j.Concurrency.fn)] = ActionWithCompute{
fn: j.Concurrency.fn,
compute: nil, // TODO add compute to concurrency
}
}
if j.OnFailure != nil {
@@ -306,6 +320,8 @@ type WorkflowStep struct {
RateLimit []RateLimit
DesiredLabels map[string]*types.DesiredWorkerLabel
Compute *compute.Compute
}
type RateLimit struct {
@@ -332,6 +348,11 @@ func (w *WorkflowStep) SetName(name string) *WorkflowStep {
return w
}
func (w *WorkflowStep) SetCompute(compute *compute.Compute) *WorkflowStep {
w.Compute = compute
return w
}
func (w *WorkflowStep) SetDesiredLabels(labels map[string]*types.DesiredWorkerLabel) *WorkflowStep {
w.DesiredLabels = labels
return w
@@ -377,11 +398,14 @@ func (w *WorkflowStep) ToWorkflow(svcName string, namespace string) types.Workfl
return workflowJob.ToWorkflow(svcName, namespace)
}
func (w *WorkflowStep) ToActionMap(svcName string) map[string]any {
func (w *WorkflowStep) ToActionMap(svcName string) ActionMap {
step := *w
return map[string]any{
step.GetActionId(svcName, 0): w.Function,
return ActionMap{
step.GetActionId(svcName, 0): ActionWithCompute{
fn: w.Function,
compute: w.Compute,
},
}
}