fix(go-sdk): clean up worker create signature (#1434)

* clean up worker create signature

* put together Go migration guide

* add more go docs
This commit is contained in:
abelanger5
2025-03-27 17:48:49 -07:00
committed by GitHub
parent 08f49031be
commit 404eb97771
11 changed files with 409 additions and 265 deletions

View File

@@ -1,29 +0,0 @@
name: "create-slack-onboarding-channel"
version: 0.1.0
triggers:
events:
- team:create
jobs:
create-slack-channel:
steps:
- name: Create onboarding channel
action: slack:create-channel
id: createChannel
timeout: 60s
with:
channelName: "{{ .input.name }}-onboarding"
- name: Add user to channel
action: slack:add-users-to-channel
id: addUserToChannel
timeout: 60s
with:
channelId: "{{ .steps.createChannel.channelId }}"
userIds:
- "$SLACK_USER_ID"
- name: Send message to channel
action: slack:send-message
id: sendMessageToChannel
timeout: 60s
with:
channelId: "{{ .steps.createChannel.channelId }}"
message: "Welcome to your dedicated onboarding channel, {{ .input.name }}!"

View File

@@ -1,47 +0,0 @@
## Simple Workflow Example
This example runs the [slack-channel.yaml](./.hatchet/slack-channel.yaml).
## Explanation
This folder contains a demo example of a workflow that creates a Slack channel, adds a default user to that Slack channel, and send an initial message to the channel. The workflow file showcases the following features:
- Running a simple job with a set of dependent steps
- Variable references within step arguments -- each subsequent step in a workflow can call `.steps.<step_id>.outputs` to access output arguments
While the `main.go` file showcases the following features:
- Using an existing integration called `SlackIntegration` which provides several actions to perform
- Providing a custom workflow file (as the workflow file needs to be populated with an env var `$SLACK_USER_ID`)
## How to run
Navigate to this directory and run the following steps:
1. Make sure you have a Hatchet server running (see the instructions [here](../../README.md)). After running `task seed`, grab the tenant ID which is output to the console.
2. Set your environment variables -- if you're using the bundled Temporal server, this will look like:
```sh
cat > .env <<EOF
SLACK_USER_ID=<TODO>
SLACK_TOKEN=<TODO>
SLACK_TEAM_ID=<TODO>
HATCHET_CLIENT_TENANT_ID=<tenant-id-from-seed-command>
HATCHET_CLIENT_TLS_ROOT_CA_FILE=../../hack/dev/certs/ca.cert
HATCHET_CLIENT_TLS_CERT_FILE=../../hack/dev/certs/client-worker.pem
HATCHET_CLIENT_TLS_KEY_FILE=../../hack/dev/certs/client-worker.key
HATCHET_CLIENT_TLS_SERVER_NAME=cluster
EOF
```
3. Run the following within this directory:
```sh
/bin/bash -c '
set -a
. .env
set +a
go run main.go';
```

View File

@@ -1,119 +0,0 @@
package main
import (
"context"
_ "embed"
"os"
"strings"
"time"
"github.com/joho/godotenv"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/integrations/slack"
"github.com/hatchet-dev/hatchet/pkg/worker"
)
type teamCreateEvent struct {
Name string `json:"name"`
}
//go:embed .hatchet/slack-channel.yaml
var SlackChannelWorkflow []byte
func init() {
err := godotenv.Load()
if err != nil {
panic(err)
}
// initialize the slack channel workflow with SLACK_USER_ID
slackUserId := os.Getenv("SLACK_USER_ID")
if slackUserId == "" {
panic("SLACK_USER_ID environment variable must be set")
}
slackFileWithReplacedEnv := strings.Replace(string(SlackChannelWorkflow), "$SLACK_USER_ID", slackUserId, 1)
SlackChannelWorkflow = []byte(slackFileWithReplacedEnv)
}
func main() {
// read the slack workflow
slackWorkflowFile, err := types.ParseYAML(context.Background(), SlackChannelWorkflow)
if err != nil {
panic(err)
}
// render the slack workflow using the environment variable SLACK_USER_ID
slackToken := os.Getenv("SLACK_TOKEN")
slackTeamId := os.Getenv("SLACK_TEAM_ID")
if slackToken == "" {
panic("SLACK_TOKEN environment variable must be set")
}
if slackTeamId == "" {
panic("SLACK_TEAM_ID environment variable must be set")
}
slackInt := slack.NewSlackIntegration(slackToken, slackTeamId, true)
client, err := client.New(
client.InitWorkflows(),
client.WithWorkflows([]*types.Workflow{
&slackWorkflowFile,
}),
)
if err != nil {
panic(err)
}
worker, err := worker.NewWorker(
worker.WithClient(
client,
),
worker.WithIntegration(
slackInt,
),
)
if err != nil {
panic(err)
}
interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan())
defer cancel()
go worker.Start()
testEvent := teamCreateEvent{
Name: "test-team-2",
}
// push an event
err = client.Event().Push(
context.Background(),
"team:create",
testEvent,
)
if err != nil {
panic(err)
}
for {
select {
case <-interruptCtx.Done():
return
default:
time.Sleep(time.Second)
}
}
}

View File

@@ -7,7 +7,6 @@ import (
"time"
v1_workflows "github.com/hatchet-dev/hatchet/examples/v1/workflows"
"github.com/hatchet-dev/hatchet/pkg/client/create"
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
"github.com/hatchet-dev/hatchet/pkg/v1/worker"
"github.com/hatchet-dev/hatchet/pkg/v1/workflow"
@@ -63,10 +62,10 @@ func main() {
}
worker, err := hatchet.Worker(
create.WorkerOpts{
Name: fmt.Sprintf("%s-worker", workflowName),
worker.WorkerOpts{
Name: fmt.Sprintf("%s-worker", workflowName),
Workflows: workflow,
},
worker.WithWorkflows(workflow...),
)
if err != nil {

View File

@@ -1,3 +1,296 @@
import { Callout } from "nextra/components";
## Hatchet Go V1 Migration Guide
This document will become available on March 27th, 2025.
This guide will help you migrate Hatchet workflows from the V0 SDK to the V1 SDK. Note that the v1 engine will continue to support v0 workflows until September 30th, 2025.
The v1 Go SDK can be found in `github.com/hatchet-dev/hatchet/pkg/v1`. You can instantiate a new Hatchet client via:
```go
package main
import (
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
)
func main() {
hatchet, err := v1.NewHatchetClient()
// ...
}
```
## Declaring Tasks and Workflows
In the new SDKs, tasks and workflows use Go generics to make typing easier. As a result, there are a number of improvements made to the context methods and the way tasks are defined.
### Single-Task Workflows
Single tasks are much easier to define than before using the `factory` package in the V1 SDK. Here's an example of how to define a simple workflow with the V1 SDK:
```go
import "github.com/hatchet-dev/hatchet/pkg/v1/factory"
// ...
type SimpleInput struct {
Message string
}
type SimpleResult struct {
TransformedMessage string
}
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet, // a Hatchet client instance
)
```
The `simple` task automatically inherits type definitions from the function signature, so invoking `simple` will automatically provide type-checking when the task is run:
```go
result, err := simple.Run(ctx, SimpleInput{
Message: "Hello, World!",
})
if err != nil {
return err
}
fmt.Println(result.TransformedMessage)
```
### Complex Workflows
Complex, multi-task workflows can be defined using the `NewWorkflow` method on the `factory` package. Here's an example of how to define a workflow with multiple tasks:
```go
import "github.com/hatchet-dev/hatchet/pkg/v1/factory"
// ...
simple := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
},
hatchet,
)
step1 := simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step1",
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
return &SimpleOutput{
Step: 1,
}, nil
},
)
simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
var step1Output SimpleOutput
err := ctx.ParentOutput(step1, &step1Output)
if err != nil {
return nil, err
}
return &SimpleOutput{
Step: 2,
}, nil
},
)
return simple
```
<Callout type="warning">
Note that due to limitations with Go's type system, any methods defined on a
workflow from `factory.NewWorkflow` will need to return a `interface{}` type.
</Callout>
### Workflow and Task Configuration
In the V1 SDK, workflows and tasks are configured using the `create` package. The `create` package provides a number of options for configuring tasks and workflows, such as setting the task name, setting the task's parent tasks, and setting the task's wait conditions:
```go
// import the create package
import "github.com/hatchet-dev/hatchet/pkg/client/create"
// utilize the following structs to configure tasks and workflows
type WorkflowTask[I, O any] struct {
// (required) The name of the task and workflow
Name string
// (optional) The version of the workflow
Version string
// (optional) The human-readable description of the workflow
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
// WaitFor represents a set of conditions which must be satisfied before the task can run.
WaitFor condition.Condition
// SkipIf represents a set of conditions which, if satisfied, will cause the task to be skipped.
SkipIf condition.Condition
// CancelIf represents a set of conditions which, if satisfied, will cause the task to be canceled.
CancelIf condition.Condition
// (optional) Parents are the tasks that must successfully complete before this task can start
Parents []NamedTask
}
type WorkflowOnFailureTask[I, O any] struct {
// (optional) The version of the workflow
Version string
// (optional) The human-readable description of the workflow
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
}
// TaskCreateOpts defines options for creating a standalone task.
// This combines both workflow and task properties in a single type.
type StandaloneTask struct {
// (required) The name of the task and workflow
Name string
// (optional) The version of the workflow
Version string
// (optional) The human-readable description of the workflow
Description string
// (optional) ExecutionTimeout specifies the maximum duration a task can run before being terminated
ExecutionTimeout time.Duration
// (optional) ScheduleTimeout specifies the maximum time a task can wait to be scheduled
ScheduleTimeout time.Duration
// (optional) Retries defines the number of times to retry a failed task
Retries int32
// (optional) RetryBackoffFactor is the multiplier for increasing backoff between retries
RetryBackoffFactor float32
// (optional) RetryMaxBackoffSeconds is the maximum backoff duration in seconds between retries
RetryMaxBackoffSeconds int32
// (optional) RateLimits define constraints on how frequently the task can be executed
RateLimits []*types.RateLimit
// (optional) WorkerLabels specify requirements for workers that can execute this task
WorkerLabels map[string]*types.DesiredWorkerLabel
// (optional) Concurrency defines constraints on how many instances of this task can run simultaneously
Concurrency []*types.Concurrency
// (optional) The event names that trigger the workflow
OnEvents []string
// (optional) The cron expressions for scheduled workflow runs
OnCron []string
}
```
## Workers
To declare workers, you can use the `Worker` method on the `v1` Hatchet client. Note that instead of calling `RegisterWorkflow` on the worker, you can pass the workflows directly to the `Worker` method:
```go
package main
import (
"context"
"fmt"
v1 "github.com/hatchet-dev/hatchet/pkg/v1"
"github.com/hatchet-dev/hatchet/pkg/v1/worker"
"github.com/hatchet-dev/hatchet/pkg/v1/workflow"
)
func main() {
hatchet, err := v1.NewHatchetClient()
if err != nil {
panic(err)
}
worker, err := hatchet.Worker(
create.WorkerOpts{
Name: fmt.Sprintf("%s-worker", workflowName),
Workflows: []workflow.WorkflowBase{workflow}, // add your workflow here
},
)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
worker.StartBlocking(ctx)
}
```

View File

@@ -74,7 +74,5 @@ There are two ways of bulk cancelling or replaying workflows in both cases:
We've made a number of significant improvements to our SDKs in the V1 release. You can read about the improvements to each SDK in their corresponding migration guides:
- [Python SDK](./migration-guide-python.mdx)
- [TypeScript](./migration-guide-typescript.mdx)
- Go (coming soon)
{/* TODO V1 DOCS - Go migration guide */}
- [TypeScript SDK](./migration-guide-typescript.mdx)
- [Go SDK](./migration-guide-go.mdx)

View File

@@ -65,6 +65,76 @@ There are a handful of other new features that will make interfacing with the SD
{/* TODO V1 Docs */}
</Tabs.Tab>
<Tabs.Tab title="Go">
{/* TODO V1 Docs */}
### Highlights
The Go SDK has a number of notable highlights to showcase for V1. Many of them have been highlighted elsewhere, such as [in the migration guide](./migration-guide-go.mdx), an in various examples. Here, we'll list out each of them, along with their motivations and benefits.
1. Workflows and tasks are now instantiated via a factory pattern which makes it easier to define and run workflows. For example:
```go
type SimpleInput struct {
Message string
}
type SimpleResult struct {
TransformedMessage string
}
simple := factory.NewTask(
create.StandaloneTask{
Name: "simple-task",
}, func(ctx worker.HatchetContext, input SimpleInput) (*SimpleResult, error) {
return &SimpleResult{
TransformedMessage: strings.ToLower(input.Message),
}, nil
},
hatchet, // a Hatchet client instance
)
// somewhere else in your code
result, err := simple.Run(ctx, SimpleInput{
Message: "Hello, World!",
})
// result is fully typed!
```
2. Instead of passing parent references via `[]string`, you can simply pass task references directly to other tasks in a workflow, reducing the fragility of your code. For example:
```go
simple := factory.NewWorkflow[DagInput, DagResult](
create.WorkflowCreateOpts[DagInput]{
Name: "simple-dag",
},
hatchet,
)
step1 := simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step1",
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
// ...
},
)
simple.Task(
create.WorkflowTask[DagInput, DagResult]{
Name: "Step2",
Parents: []create.NamedTask{
step1,
},
}, func(ctx worker.HatchetContext, input DagInput) (interface{}, error) {
// getting parent input also uses the task reference, for example:
var step1Output SimpleOutput
ctx.ParentOutput(step1, &step1Output)
// ...
},
)
```
3. Configuring workflows and tasks is much easier, with all configuration options flattened into a single struct.
</Tabs.Tab>
</UniversalTabs>

View File

@@ -199,16 +199,6 @@ func InitWorkflows() ClientOpt {
}
}
// WithWorkflows sets the workflow files to use for the worker. If this is not passed in, the workflows files will be loaded
// from the .hatchet folder in the current directory.
func WithWorkflows(files []*types.Workflow) ClientOpt {
return func(opts *ClientOpts) {
opts.filesLoader = func() []*types.Workflow {
return files
}
}
}
type sharedClientOpts struct {
tenantId string
namespace string

View File

@@ -1,28 +0,0 @@
package create
import "github.com/rs/zerolog"
// WorkerLabels represents a map of labels that can be assigned to a worker
// for filtering and identification purposes.
type WorkerLabels map[string]interface{}
// CreateOpts defines the options for creating a new worker.
type WorkerOpts struct {
// (required) the friendly name of the worker
Name string
// (optional) maximum number of concurrent runs on this worker, defaults to 100
Slots int
// (optional) labels to set on the worker
Labels WorkerLabels
// (optional) logger to use for the worker
Logger *zerolog.Logger
// (optional) log level
LogLevel string
// (optional) maximum number of concurrent runs for durable tasks, defaults to 1000
DurableSlots int
}

View File

@@ -25,7 +25,7 @@ type HatchetClient interface {
// v1.WithWorkflows(simple)
// )
// ```
Worker(opts create.WorkerOpts, optFns ...func(*worker.WorkerImpl)) (worker.Worker, error)
Worker(opts worker.WorkerOpts) (worker.Worker, error)
// Feature clients
@@ -99,8 +99,8 @@ func (c *v1HatchetClientImpl) Events() client.EventClient {
}
// Worker creates and configures a new worker with the provided options and optional configuration functions.
func (c *v1HatchetClientImpl) Worker(opts create.WorkerOpts, optFns ...func(*worker.WorkerImpl)) (worker.Worker, error) {
return worker.NewWorker(c.workers, c.v0, opts, optFns...)
func (c *v1HatchetClientImpl) Worker(opts worker.WorkerOpts) (worker.Worker, error) {
return worker.NewWorker(c.workers, c.v0, opts)
}
func (c *v1HatchetClientImpl) RateLimits() features.RateLimitsClient {

View File

@@ -9,7 +9,6 @@ import (
"time"
v0Client "github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/create"
"github.com/hatchet-dev/hatchet/pkg/cmdutils"
"github.com/hatchet-dev/hatchet/pkg/v1/features"
"github.com/hatchet-dev/hatchet/pkg/v1/workflow"
@@ -40,6 +39,34 @@ type Worker interface {
Unpause(ctx context.Context) error
}
// WorkerLabels represents a map of labels that can be assigned to a worker
// for filtering and identification purposes.
type WorkerLabels map[string]interface{}
// CreateOpts defines the options for creating a new worker.
type WorkerOpts struct {
// (required) the friendly name of the worker
Name string
// (optional) a list of workflows to register on the worker. If not provided, the worker will not run any workflows.
Workflows []workflow.WorkflowBase
// (optional) maximum number of concurrent runs on this worker, defaults to 100
Slots int
// (optional) labels to set on the worker
Labels WorkerLabels
// (optional) logger to use for the worker
Logger *zerolog.Logger
// (optional) log level
LogLevel string
// (optional) maximum number of concurrent runs for durable tasks, defaults to 1000
DurableSlots int
}
// WorkerImpl is the concrete implementation of the Worker interface.
type WorkerImpl struct {
// v0 is the client used to communicate with the hatchet API.
@@ -70,30 +97,20 @@ type WorkerImpl struct {
logLevel string
// labels are the labels assigned to this worker
labels create.WorkerLabels
}
// WithWorkflows is a functional option that configures a worker with the specified workflows.
func WithWorkflows(workflows ...workflow.WorkflowBase) func(*WorkerImpl) {
return func(w *WorkerImpl) {
w.workflows = workflows
}
labels WorkerLabels
}
// NewWorker creates and configures a new Worker with the provided client and options.
// additional functional options can be provided to further customize the worker configuration.
// returns the created Worker interface and any error encountered during creation.
func NewWorker(workersClient features.WorkersClient, v0 v0Client.Client, opts create.WorkerOpts, optFns ...func(*WorkerImpl)) (Worker, error) {
func NewWorker(workersClient features.WorkersClient, v0 v0Client.Client, opts WorkerOpts) (Worker, error) {
w := &WorkerImpl{
v0: v0,
workers: workersClient,
name: opts.Name,
logLevel: opts.LogLevel,
labels: opts.Labels,
}
for _, optFn := range optFns {
optFn(w)
v0: v0,
workers: workersClient,
name: opts.Name,
logLevel: opts.LogLevel,
labels: opts.Labels,
workflows: opts.Workflows,
}
if opts.Slots == 0 {