Files
hatchet/pkg/worker/worker.go
Gabe Ruttner 2fdc47a6af feat: multiple slot types (#2927)
* feat: adds support for multiple slot types, primarily motivated by durable slots

---------

Co-authored-by: mrkaye97 <mrkaye97@gmail.com>
2026-02-17 05:43:47 -08:00

938 lines
28 KiB
Go

package worker
import (
"context"
"fmt"
"os"
"reflect"
"slices"
"strings"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/client/compute"
"github.com/hatchet-dev/hatchet/pkg/client/types"
clientconfig "github.com/hatchet-dev/hatchet/pkg/config/client"
"github.com/hatchet-dev/hatchet/pkg/errors"
"github.com/hatchet-dev/hatchet/pkg/integrations"
"github.com/hatchet-dev/hatchet/pkg/logger"
contracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
)
type actionFunc func(args ...any) []any
const (
slotTypeDefault = "default"
slotTypeDurable = "durable"
)
// Deprecated: Action is an internal interface used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
type Action interface {
// Name returns the name of the action
Name() string
// Run runs the action
Run(args ...any) []any
MethodFn() any
ConcurrencyFn() GetWorkflowConcurrencyGroupFn
// Service returns the service that the action belongs to
Service() string
Compute() *compute.Compute
}
type actionImpl struct {
name string
run actionFunc
runConcurrencyAction GetWorkflowConcurrencyGroupFn
method any
service string
compute *compute.Compute
}
// Deprecated: Name is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (j *actionImpl) Name() string {
return j.name
}
// Deprecated: Run is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (j *actionImpl) Run(args ...interface{}) []interface{} {
return j.run(args...)
}
// Deprecated: MethodFn is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (j *actionImpl) MethodFn() any {
return j.method
}
// Deprecated: ConcurrencyFn is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (j *actionImpl) ConcurrencyFn() GetWorkflowConcurrencyGroupFn {
return j.runConcurrencyAction
}
// Deprecated: Service is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (j *actionImpl) Service() string {
return j.service
}
// Deprecated: Compute is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (j *actionImpl) Compute() *compute.Compute {
return j.compute
}
// Deprecated: ActionRegistry is an internal type used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
type ActionRegistry map[string]Action
// Deprecated: Worker is an internal type used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
type Worker struct {
client client.Client
name string
actions ActionRegistry
registered_workflows map[string]bool
l *zerolog.Logger
cancelMap sync.Map
cancelConcurrencyMap sync.Map
services sync.Map
alerter errors.Alerter
middlewares *middlewares
slots *int
durableSlots *int
slotConfig map[string]int32
// legacySlots, when non-nil, causes the registration to use the deprecated
// `slots` proto field instead of `slot_config`. For backward compatibility
// with engines that do not support multiple slot types.
legacySlots *int32
initActionNames []string
labels map[string]interface{}
id *string
panicHandler func(ctx HatchetContext, recovered any)
}
// Deprecated: WorkerOpt is an internal type used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
type WorkerOpt func(*WorkerOpts)
// Deprecated: WorkerOpts is an internal type used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
type WorkerOpts struct {
client client.Client
name string
l *zerolog.Logger
integrations []integrations.Integration
alerter errors.Alerter
slots *int
durableSlots *int
slotConfig map[string]int32
legacySlots *int32
actions []string
labels map[string]interface{}
}
func defaultWorkerOpts() *WorkerOpts {
return &WorkerOpts{
name: getHostName(),
integrations: []integrations.Integration{},
alerter: errors.NoOpAlerter{},
}
}
// Deprecated: WithInternalData is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithInternalData(actions []string) WorkerOpt {
return func(opts *WorkerOpts) {
opts.actions = actions
}
}
// Deprecated: WithName is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithName(name string) WorkerOpt {
return func(opts *WorkerOpts) {
opts.name = name
}
}
// Deprecated: WithClient is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithClient(client client.Client) WorkerOpt {
return func(opts *WorkerOpts) {
opts.client = client
}
}
// Deprecated: WithIntegration is part of the legacy v0 workflow definition system.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithIntegration(integration integrations.Integration) WorkerOpt {
return func(opts *WorkerOpts) {
opts.integrations = append(opts.integrations, integration)
}
}
// Deprecated: WithErrorAlerter is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithErrorAlerter(alerter errors.Alerter) WorkerOpt {
return func(opts *WorkerOpts) {
opts.alerter = alerter
}
}
// Deprecated: WithMaxRuns is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithMaxRuns(maxRuns int) WorkerOpt {
return WithSlots(maxRuns)
}
// Deprecated: WithSlots is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithSlots(slots int) WorkerOpt {
return func(opts *WorkerOpts) {
opts.slots = &slots
}
}
// Deprecated: WithSlots is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithDurableSlots(durableSlots int) WorkerOpt {
return func(opts *WorkerOpts) {
opts.durableSlots = &durableSlots
}
}
// Deprecated: WithSlots is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithSlotConfig(slotConfig map[string]int32) WorkerOpt {
return func(opts *WorkerOpts) {
opts.slotConfig = slotConfig
}
}
// WithLegacySlots configures the worker to register using the deprecated `slots`
// proto field instead of `slot_config`. This is for backward compatibility with
// engines that do not support multiple slot types.
func WithLegacySlots(slots int32) WorkerOpt {
return func(opts *WorkerOpts) {
opts.legacySlots = &slots
}
}
// Deprecated: WithLabels is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithLabels(labels map[string]interface{}) WorkerOpt {
return func(opts *WorkerOpts) {
opts.labels = labels
}
}
// Deprecated: WithLogger is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithLogger(l *zerolog.Logger) WorkerOpt {
return func(opts *WorkerOpts) {
if opts.l != nil {
opts.l.Warn().Msg("WithLogger called multiple times or after WithLogLevel, ignoring")
return
}
opts.l = l
}
}
// Deprecated: WithLogLevel is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func WithLogLevel(lvl string) WorkerOpt {
return func(opts *WorkerOpts) {
var l zerolog.Logger
if opts.l == nil {
l = logger.NewDefaultLogger("worker")
} else {
l = *opts.l
}
lvl, err := zerolog.ParseLevel(lvl)
if err == nil {
l = l.Level(lvl)
}
opts.l = &l
}
}
// Deprecated: NewWorker is an internal function used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func NewWorker(fs ...WorkerOpt) (*Worker, error) {
opts := defaultWorkerOpts()
for _, f := range fs {
f(opts)
}
mws := newMiddlewares()
if opts.labels != nil {
for _, value := range opts.labels {
if reflect.TypeOf(value).Kind() != reflect.String && reflect.TypeOf(value).Kind() != reflect.Int {
return nil, fmt.Errorf("invalid label value: %v", value)
}
}
}
if opts.l == nil {
l := logger.NewDefaultLogger("worker")
opts.l = &l
}
if opts.slotConfig != nil && (opts.slots != nil || opts.durableSlots != nil) {
return nil, fmt.Errorf("cannot set both slot config and slots/durable slots")
}
// Backwards compatibility:
// If callers used the older slots/durableSlots options, map them to a slot config so that
// worker registration continues to work with multiple slot types.
if opts.slotConfig == nil {
legacySlotConfig := map[string]int32{}
if opts.slots != nil {
legacySlotConfig[slotTypeDefault] = int32(*opts.slots) // nolint:gosec
}
if opts.durableSlots != nil {
legacySlotConfig[slotTypeDurable] = int32(*opts.durableSlots) // nolint:gosec
}
if len(legacySlotConfig) > 0 {
opts.slotConfig = legacySlotConfig
}
}
if opts.slotConfig == nil {
opts.slotConfig = map[string]int32{slotTypeDefault: 100}
}
w := &Worker{
client: opts.client,
name: opts.name,
l: opts.l,
actions: ActionRegistry{},
alerter: opts.alerter,
middlewares: mws,
slots: opts.slots,
durableSlots: opts.durableSlots,
slotConfig: opts.slotConfig,
legacySlots: opts.legacySlots,
initActionNames: opts.actions,
labels: opts.labels,
registered_workflows: map[string]bool{},
}
mws.add(w.panicMiddleware)
// FIXME: Remove integrations
// register all integrations
for _, integration := range opts.integrations {
actions := integration.Actions()
integrationId := integration.GetId()
for _, integrationAction := range actions {
action := fmt.Sprintf("%s:%s", integrationId, integrationAction)
err := w.registerAction(integrationId, action, integration.ActionHandler(integrationAction), nil)
if err != nil {
return nil, fmt.Errorf("could not register integration action %s: %w", action, err)
}
}
}
return w, nil
}
// Deprecated: Use is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) Use(mws ...MiddlewareFunc) {
w.middlewares.add(mws...)
}
// Deprecated: SetPanicHandler is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) SetPanicHandler(panicHandler func(ctx HatchetContext, recovered any)) {
w.panicHandler = panicHandler
}
// Deprecated: NewService is part of the legacy v0 workflow definition system.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) NewService(name string) *Service {
ns := w.client.Namespace()
svcName := strings.ToLower(clientconfig.ApplyNamespace(name, &ns))
svc := &Service{
Name: svcName,
worker: w,
mws: newMiddlewares(),
}
w.services.Store(svcName, svc)
return svc
}
// Deprecated: RegisterWorkflow is part of the legacy v0 workflow definition system.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) RegisterWorkflow(workflow workflowConverter) error {
wf, ok := workflow.(*WorkflowJob)
if ok && wf.On == nil {
return fmt.Errorf("workflow must have an trigger defined via the `On` field")
}
w.registered_workflows[wf.Name] = true
return w.On(workflow.ToWorkflowTrigger(), workflow)
}
// Deprecated: RegisterWorkflowV1 is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) RegisterWorkflowV1(workflow *contracts.CreateWorkflowVersionRequest) error {
namespace := w.client.Namespace()
namespaced := namespace + workflow.Name
w.registered_workflows[namespaced] = true
return w.client.Admin().PutWorkflowV1(workflow)
}
// Deprecated: On is part of the legacy v0 workflow definition system.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) On(t triggerConverter, workflow workflowConverter) error {
svcName := workflow.ToWorkflow("", "").Name
svcName = strings.ToLower(svcName)
// get the default service
svc, ok := w.services.Load(svcName)
if !ok {
newSvc := w.NewService(svcName)
if w.middlewares != nil {
newSvc.Use(w.middlewares.middlewares...)
}
return newSvc.On(t, workflow)
}
return svc.(*Service).On(t, workflow)
}
// Deprecated: RegisterAction is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
//
// An action should be of the format <service>:<verb>, for example slack:create-channel.
//
// The method must match the following signatures:
// - func(ctx context.Context) error
// - func(ctx context.Context, input *Input) error
// - func(ctx context.Context, input *Input) (*Output, error)
// - func(ctx context.Context) (*Output, error)
func (w *Worker) RegisterAction(actionId string, method any) error {
// parse the action
action, err := types.ParseActionID(actionId)
if err != nil {
return fmt.Errorf("could not parse action id: %w", err)
}
// if the service does not exist, create a new service for the action
if _, ok := w.services.Load(action.Service); !ok {
w.NewService(action.Service)
}
return w.registerAction(action.Service, action.Verb, method, nil)
}
func (w *Worker) registerAction(service, verb string, method any, compute *compute.Compute) error {
actionID := strings.ToLower(fmt.Sprintf("%s:%s", service, verb))
// if the service is "concurrency", then this is a special action
if service == "concurrency" {
w.actions[actionID] = &actionImpl{
name: actionID,
runConcurrencyAction: method.(GetWorkflowConcurrencyGroupFn),
method: method,
service: service,
compute: compute,
}
return nil
}
actionFunc, err := getFnFromMethod(method)
if err != nil {
fmt.Println("err", err)
return fmt.Errorf("could not get function from method: %w", err)
}
// if action has already been registered, ensure that the method is the same
if currMethod, ok := w.actions[actionID]; ok {
if reflect.ValueOf(currMethod.MethodFn()).Pointer() != reflect.ValueOf(method).Pointer() {
return fmt.Errorf("action %s is already registered with function %s", actionID, getFnName(currMethod.MethodFn()))
}
}
w.actions[actionID] = &actionImpl{
name: actionID,
run: actionFunc,
method: method,
service: service,
compute: compute,
}
return nil
}
// Deprecated: Start is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) Start() (func() error, error) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
err := w.startBlocking(ctx)
if err != nil {
// NOTE: this matches the behavior of the old worker, until we change the signature of Start
panic(err)
}
}()
cleanup := func() error {
cancel()
w.l.Debug().Msgf("worker %s is stopping...", w.name)
return nil
}
return cleanup, nil
}
// Deprecated: Run is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) Run(ctx context.Context) error {
return w.startBlocking(ctx)
}
// Deprecated: Logger is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) Logger() *zerolog.Logger {
return w.l
}
// Deprecated: ID is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of calling this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (w *Worker) ID() *string {
return w.id
}
func (w *Worker) startBlocking(ctx context.Context) error {
actionNames := []string{}
for _, action := range w.actions {
if w.client.RunnableActions() != nil {
if !slices.Contains(w.client.RunnableActions(), action.Name()) {
continue
}
}
actionNames = append(actionNames, action.Name())
}
w.l.Debug().Msgf("worker %s is listening for actions: %v", w.name, actionNames)
_ = NewManagedCompute(&w.actions, w.client, 1)
listener, id, err := w.client.Dispatcher().GetActionListener(ctx, &client.GetActionListenerRequest{
WorkerName: w.name,
Actions: actionNames,
Labels: w.labels,
SlotConfig: w.slotConfig,
LegacySlots: w.legacySlots,
})
w.id = id
if err != nil {
return fmt.Errorf("could not get action listener: %w", err)
}
defer func() {
err := listener.Unregister()
if err != nil {
w.l.Error().Err(err).Msg("could not unregister worker")
}
}()
listenerCtx, cancel := context.WithCancel(ctx)
defer cancel()
actionCh, errCh, err := listener.Actions(listenerCtx)
if err != nil {
return fmt.Errorf("could not get action channel: %w", err)
}
go func() {
for {
select {
case action, ok := <-actionCh:
if !ok {
w.l.Debug().Msgf("worker %s received action channel closed, stopping", w.name)
return
}
go func(action *client.Action) {
err := w.executeAction(context.Background(), action)
if err != nil {
w.l.Error().Err(err).Msgf("could not execute action: %s", action.ActionId)
}
w.l.Debug().Msgf("action %s completed", action.ActionId)
}(action)
case <-ctx.Done():
w.l.Debug().Msgf("worker %s received context done, stopping", w.name)
return
}
}
}()
select {
case <-ctx.Done():
w.l.Debug().Msgf("worker %s received context done, stopping", w.name)
return nil
case err := <-errCh:
w.l.Error().Err(err).Msg("error from listener")
return err
}
}
func (w *Worker) executeAction(ctx context.Context, assignedAction *client.Action) error {
switch assignedAction.ActionType {
case client.ActionTypeStartStepRun:
return w.startStepRun(ctx, assignedAction)
case client.ActionTypeCancelStepRun:
return w.cancelStepRun(ctx, assignedAction)
case client.ActionTypeStartGetGroupKey:
return w.startGetGroupKey(ctx, assignedAction)
default:
return fmt.Errorf("unknown action type: %s", assignedAction.ActionType)
}
}
func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action) error {
// send a message that the step run started
actionEvent := w.getActionEvent(assignedAction, client.ActionEventTypeStarted)
go func() {
eventCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err := w.client.Dispatcher().SendStepActionEvent(
eventCtx,
actionEvent,
)
if err != nil {
w.l.Error().Err(err).Msgf("could not send action event")
}
}()
action, ok := w.actions[assignedAction.ActionId]
if !ok {
return fmt.Errorf("job not found")
}
arg, err := decodeArgsToInterface(reflect.TypeOf(action.MethodFn()))
if err != nil {
return fmt.Errorf("could not decode args to interface: %w", err)
}
runContext, cancel := context.WithCancel(context.Background())
w.cancelMap.Store(assignedAction.StepRunId, cancel)
defer w.cancelMap.Delete(assignedAction.StepRunId)
hCtx, err := newHatchetContext(runContext, assignedAction, w.client, w.l, w)
if err != nil {
return fmt.Errorf("could not create hatchet context: %w", err)
}
// get the action's service
svcAny, ok := w.services.Load(action.Service())
if !ok {
return fmt.Errorf("could not load service %s", action.Service())
}
svc := svcAny.(*Service)
// wrap the run with middleware. start by wrapping the global worker middleware, then
// the service-specific middleware
return w.middlewares.runAll(hCtx, func(ctx HatchetContext) error {
return svc.mws.runAll(ctx, func(ctx HatchetContext) error {
defer cancel()
args := []any{ctx}
if arg != nil {
args = append(args, arg)
}
runResults := action.Run(args...)
// check whether run context was cancelled while action was running
select {
case <-ctx.Done():
w.l.Debug().Msgf("step run %s was cancelled, returning", assignedAction.StepRunId)
return nil
default:
}
var result any
if len(runResults) == 2 {
result = runResults[0]
}
if runResults[len(runResults)-1] != nil {
err = runResults[len(runResults)-1].(error)
}
if err != nil {
return w.sendFailureEvent(ctx, err)
}
// send a message that the step run completed
finishedEvent, err := w.getActionFinishedEvent(assignedAction, result)
if err != nil {
return fmt.Errorf("could not create finished event: %w", err)
}
_, err = w.client.Dispatcher().SendStepActionEvent(
ctx,
finishedEvent,
)
if err != nil {
return fmt.Errorf("could not send action event: %w", err)
}
return nil
})
})
}
func (w *Worker) startGetGroupKey(ctx context.Context, assignedAction *client.Action) error {
// send a message that the step run started
_, err := w.client.Dispatcher().SendGroupKeyActionEvent(
ctx,
w.getActionEvent(assignedAction, client.ActionEventTypeStarted),
)
if err != nil {
return fmt.Errorf("could not send action event: %w", err)
}
action, ok := w.actions[assignedAction.ActionId]
if !ok {
return fmt.Errorf("job not found")
}
// action should be concurrency action
if action.ConcurrencyFn() == nil {
return fmt.Errorf("action %s is not a concurrency action", action.Name())
}
runContext, cancel := context.WithCancel(context.Background())
w.cancelConcurrencyMap.Store(assignedAction.WorkflowRunId, cancel)
hCtx, err := newHatchetContext(runContext, assignedAction, w.client, w.l, w)
if err != nil {
return fmt.Errorf("could not create hatchet context: %w", err)
}
concurrencyKey, err := action.ConcurrencyFn()(hCtx)
if err != nil {
failureEvent := w.getActionEvent(assignedAction, client.ActionEventTypeFailed)
w.alerter.SendAlert(context.Background(), err, map[string]interface{}{
"actionId": assignedAction.ActionId,
"workerId": assignedAction.WorkerId,
"workflowRunId": assignedAction.WorkflowRunId,
"jobName": assignedAction.JobName,
"actionType": assignedAction.ActionType,
})
failureEvent.EventPayload = err.Error()
_, err := w.client.Dispatcher().SendGroupKeyActionEvent(
ctx,
failureEvent,
)
if err != nil {
return fmt.Errorf("could not send action event: %w", err)
}
return err
}
// send a message that the step run completed
finishedEvent, err := w.getGroupKeyActionFinishedEvent(assignedAction, concurrencyKey)
if err != nil {
return fmt.Errorf("could not create finished event: %w", err)
}
_, err = w.client.Dispatcher().SendGroupKeyActionEvent(
ctx,
finishedEvent,
)
if err != nil {
return fmt.Errorf("could not send action event: %w", err)
}
return nil
}
func (w *Worker) cancelStepRun(ctx context.Context, assignedAction *client.Action) error {
cancel, ok := w.cancelMap.Load(assignedAction.StepRunId)
if !ok {
return fmt.Errorf("could not find step run to cancel")
}
w.l.Debug().Msgf("cancelling step run %s", assignedAction.StepRunId)
cancelFn := cancel.(context.CancelFunc)
cancelFn()
return nil
}
func (w *Worker) getActionEvent(action *client.Action, eventType client.ActionEventType) *client.ActionEvent {
timestamp := time.Now().UTC()
return &client.ActionEvent{
Action: action,
EventTimestamp: &timestamp,
EventType: eventType,
}
}
func (w *Worker) getActionFinishedEvent(action *client.Action, output any) (*client.ActionEvent, error) {
event := w.getActionEvent(action, client.ActionEventTypeCompleted)
event.EventPayload = output
return event, nil
}
func (w *Worker) getGroupKeyActionFinishedEvent(action *client.Action, output string) (*client.ActionEvent, error) {
event := w.getActionEvent(action, client.ActionEventTypeCompleted)
event.EventPayload = output
return event, nil
}
func (w *Worker) sendFailureEvent(ctx HatchetContext, err error) error {
assignedAction := ctx.action()
failureEvent := w.getActionEvent(assignedAction, client.ActionEventTypeFailed)
w.alerter.SendAlert(context.Background(), err, map[string]interface{}{
"actionId": assignedAction.ActionId,
"workerId": assignedAction.WorkerId,
"workflowRunId": assignedAction.WorkflowRunId,
"stepRunId": assignedAction.StepRunId,
"jobName": assignedAction.JobName,
"actionType": assignedAction.ActionType,
})
failureEvent.EventPayload = err.Error()
if IsNonRetryableError(err) {
shouldNotRetry := true
failureEvent.ShouldNotRetry = &shouldNotRetry
}
innerCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = w.client.Dispatcher().SendStepActionEvent(
innerCtx,
failureEvent,
)
if err != nil {
return fmt.Errorf("could not send action event: %w", err)
}
return err
}
func getHostName() string {
hostName, err := os.Hostname()
if err != nil {
hostName = "Unknown"
}
return hostName
}