feat(go-sdk): add ability to create workflows from the Go SDK more easily, quickstart improvements (#30)

* feat: add initial docs site

* feat: allow workflows to be defined from go sdk

* fix release action

* chore: remove server dependencies from client

* fix: use correct certificate for server

* chore: add port and bind address to grpc config

* fix: add env for grpc config

* fix: nil pointer when output is null

* chore: support variation in output args

* fix unresolve merge conflict

* fix: quickstart improvements

* temp remove database url

* fix: action id not required for event

* fix: actionid validation for events

* Remove deleted files
This commit is contained in:
abelanger5
2024-01-02 09:02:53 -05:00
committed by GitHub
parent ba5e043451
commit 5937b9fd98
50 changed files with 1332 additions and 449 deletions
+1 -1
View File
@@ -5,8 +5,8 @@ import (
"fmt"
"os"
"github.com/hatchet-dev/hatchet/internal/config/loader"
"github.com/hatchet-dev/hatchet/internal/validator"
"github.com/hatchet-dev/hatchet/pkg/client/loader"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/rs/zerolog"
"google.golang.org/grpc"
+2
View File
@@ -32,6 +32,7 @@ const (
// TODO: add validator to client side
type GetActionListenerRequest struct {
WorkerName string
Services []string
Actions []string
}
@@ -157,6 +158,7 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc
TenantId: d.tenantId,
WorkerName: req.WorkerName,
Actions: req.Actions,
Services: req.Services,
})
if err != nil {
+54
View File
@@ -0,0 +1,54 @@
package loader
import (
"fmt"
"path/filepath"
"github.com/hatchet-dev/hatchet/internal/config/client"
"github.com/hatchet-dev/hatchet/internal/config/loader/loaderutils"
)
type ConfigLoader struct {
directory string
}
// LoadClientConfig loads the client configuration
func (c *ConfigLoader) LoadClientConfig() (res *client.ClientConfig, err error) {
sharedFilePath := filepath.Join(c.directory, "client.yaml")
configFileBytes, err := loaderutils.GetConfigBytes(sharedFilePath)
if err != nil {
return nil, err
}
cf, err := LoadClientConfigFile(configFileBytes...)
if err != nil {
return nil, err
}
return GetClientConfigFromConfigFile(cf)
}
// LoadClientConfigFile loads the worker config file via viper
func LoadClientConfigFile(files ...[]byte) (*client.ClientConfigFile, error) {
configFile := &client.ClientConfigFile{}
f := client.BindAllEnv
_, err := loaderutils.LoadConfigFromViper(f, configFile, files...)
return configFile, err
}
func GetClientConfigFromConfigFile(cf *client.ClientConfigFile) (res *client.ClientConfig, err error) {
tlsConf, err := loaderutils.LoadClientTLSConfig(&cf.TLS)
if err != nil {
return nil, fmt.Errorf("could not load TLS config: %w", err)
}
return &client.ClientConfig{
TenantId: cf.TenantId,
TLSConfig: tlsConf,
}, nil
}
+4
View File
@@ -35,6 +35,10 @@ func ParseActionID(actionID string) (Action, error) {
parts := strings.Split(actionID, ":")
numParts := len(parts)
if numParts < 2 || numParts > 3 {
return Action{}, fmt.Errorf("invalid action id %s, must have at least 2 strings separated : (colon)", actionID)
}
integrationId := firstToLower(parts[0])
verb := strings.ToLower(parts[1])
+40
View File
@@ -0,0 +1,40 @@
// Adapted from: https://github.com/hatchet-dev/hatchet-v1-archived/blob/3c2c13168afa1af68d4baaf5ed02c9d49c5f0323/cmd/cmdutils/interrupt.go
package cmdutils
import (
"context"
"os"
"os/signal"
"syscall"
)
func InterruptChan() <-chan interface{} {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
ret := make(chan interface{}, 1)
go func() {
s := <-c
ret <- s
close(ret)
}()
return ret
}
func NewInterruptContext() (context.Context, context.CancelFunc) {
interruptChan := InterruptChan()
return InterruptContextFromChan(interruptChan)
}
func InterruptContextFromChan(interruptChan <-chan interface{}) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-interruptChan
cancel()
}()
return ctx, cancel
}
+66
View File
@@ -32,3 +32,69 @@ func decodeArgsToInterface(fnType reflect.Type) (result interface{}, err error)
return reflect.New(secondArgElem).Interface(), nil
}
func decodeFnArgTypes(fnType reflect.Type) (result []reflect.Type, err error) {
if fnType.Kind() != reflect.Func {
return nil, fmt.Errorf("method must be a function")
}
// if not a function with two arguments, return error
if fnType.NumIn() != 2 {
return nil, fmt.Errorf("method must have exactly two arguments")
}
// if first argument is not a context, return error
firstArg := fnType.In(0)
if firstArg.Kind() != reflect.Interface || !firstArg.Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) {
return nil, fmt.Errorf("first argument must be context.Context")
}
// if second argument is not a pointer to a struct, return error
secondArg := fnType.In(1)
if secondArg.Kind() != reflect.Ptr {
return nil, fmt.Errorf("second argument must be a pointer to a struct")
}
secondArgElem := secondArg.Elem()
if secondArgElem.Kind() != reflect.Struct {
return nil, fmt.Errorf("second argument must be a pointer to a struct")
}
return []reflect.Type{firstArg, secondArg}, nil
}
func decodeFnReturnTypes(fnType reflect.Type) (result []reflect.Type, err error) {
if fnType.NumOut() > 2 {
return nil, fmt.Errorf("fn cannot have more than 2 return values")
}
firstOut := fnType.Out(0)
// if there are two args, the first one should be a pointer to a struct
if fnType.NumOut() == 2 {
if firstOut.Kind() != reflect.Ptr {
return nil, fmt.Errorf("first argument must be a pointer to a struct when there are two return values")
}
firstOutElem := firstOut.Elem()
if firstOutElem.Kind() != reflect.Struct {
return nil, fmt.Errorf("first argument must be a pointer to a struct when there are two return values")
}
}
lastOut := fnType.Out(fnType.NumOut() - 1)
if lastOut.Kind() != reflect.Interface || !lastOut.Implements(reflect.TypeOf((*error)(nil)).Elem()) {
return nil, fmt.Errorf("last return value must be error")
}
if fnType.NumOut() == 1 {
return []reflect.Type{firstOut}, nil
}
return []reflect.Type{firstOut, lastOut}, nil
}
+24 -20
View File
@@ -40,28 +40,26 @@ func getFnFromMethod(method any) (result actionFunc, err error) {
}
// if function does not return two values, return error
if methodType.NumOut() != 2 {
return nil, fmt.Errorf("method must return exactly two values")
if methodType.NumOut() == 2 {
// if first return value is not a pointer to a struct, return error
firstReturn := methodType.Out(0)
if firstReturn.Kind() != reflect.Ptr {
return nil, fmt.Errorf("first return value must be a pointer to a struct")
}
firstReturnElem := firstReturn.Elem()
if firstReturnElem.Kind() != reflect.Struct {
return nil, fmt.Errorf("first return value must be a pointer to a struct")
}
}
// if first return value is not a pointer to a struct, return error
// firstReturn := methodType.Out(0)
// if last return value is not an error, return error
lastReturn := methodType.Out(methodType.NumOut() - 1)
// if firstReturn.Kind() != reflect.Ptr {
// return nil, fmt.Errorf("first return value must be a pointer to a struct")
// }
// firstReturnElem := firstReturn.Elem()
// if firstReturnElem.Kind() != reflect.Struct {
// return nil, fmt.Errorf("first return value must be a pointer to a struct")
// }
// if second return value is not an error, return error
secondReturn := methodType.Out(1)
if secondReturn.Kind() != reflect.Interface || !secondReturn.Implements(reflect.TypeOf((*error)(nil)).Elem()) {
return nil, fmt.Errorf("second return value must be an error")
if lastReturn.Kind() != reflect.Interface || !lastReturn.Implements(reflect.TypeOf((*error)(nil)).Elem()) {
return nil, fmt.Errorf("second return value must be of type error")
}
return func(args ...interface{}) []interface{} {
@@ -77,6 +75,12 @@ func getFnFromMethod(method any) (result actionFunc, err error) {
})
// Return the results as an interface slice
return []interface{}{values[0].Interface(), values[1].Interface()}
res := []interface{}{}
for i := range values {
res = append(res, values[i].Interface())
}
return res
}, nil
}
+60
View File
@@ -0,0 +1,60 @@
package worker
import (
"fmt"
"reflect"
"github.com/hatchet-dev/hatchet/pkg/client/types"
)
type Service struct {
Name string
worker *Worker
}
func (s *Service) On(t triggerConverter, workflow workflowConverter) error {
apiWorkflow := workflow.ToWorkflow(s.Name)
wt := &types.WorkflowTriggers{}
t.ToWorkflowTriggers(wt)
apiWorkflow.Triggers = *wt
// create the workflow via the API
err := s.worker.client.Admin().PutWorkflow(&apiWorkflow)
if err != nil {
return err
}
// register all steps as actions
for actionId, fn := range workflow.ToActionMap(s.Name) {
err := s.worker.registerAction(actionId, fn)
if err != nil {
return err
}
}
return nil
}
func (s *Service) RegisterAction(fn any) error {
fnType := reflect.TypeOf(fn)
if fnType.Kind() != reflect.Func {
return fmt.Errorf("method must be a function")
}
if fnType.Name() == "" {
return fmt.Errorf("function cannot be anonymous")
}
fnId := fnType.Name()
actionId := fmt.Sprintf("%s:%s", s.Name, fnId)
return s.worker.registerAction(actionId, fn)
}
+49 -25
View File
@@ -12,7 +12,6 @@ import (
"github.com/hatchet-dev/hatchet/pkg/client"
"github.com/hatchet-dev/hatchet/pkg/integrations"
"github.com/rs/zerolog"
"google.golang.org/grpc"
)
type actionFunc func(args ...any) []any
@@ -47,11 +46,7 @@ func (j *actionImpl) MethodFn() any {
}
type Worker struct {
conn *grpc.ClientConn
client client.DispatcherClient
// The worker id that gets assigned on register
workerId string
client client.Client
name string
@@ -60,12 +55,14 @@ type Worker struct {
l *zerolog.Logger
cancelMap sync.Map
services sync.Map
}
type WorkerOpt func(*WorkerOpts)
type WorkerOpts struct {
client client.DispatcherClient
client client.Client
name string
l *zerolog.Logger
@@ -88,7 +85,7 @@ func WithName(name string) WorkerOpt {
}
}
func WithDispatcherClient(client client.DispatcherClient) WorkerOpt {
func WithClient(client client.Client) WorkerOpt {
return func(opts *WorkerOpts) {
opts.client = client
}
@@ -123,7 +120,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
for _, integrationAction := range actions {
action := fmt.Sprintf("%s:%s", integrationId, integrationAction)
err := w.RegisterAction(action, integration.ActionHandler(integrationAction))
err := w.registerAction(action, integration.ActionHandler(integrationAction))
if err != nil {
return nil, fmt.Errorf("could not register integration action %s: %w", action, err)
@@ -131,10 +128,45 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) {
}
}
w.NewService("default")
return w, nil
}
func (w *Worker) NewService(name string) *Service {
svc := &Service{
Name: name,
worker: w,
}
w.services.Store(name, svc)
return svc
}
func (w *Worker) On(t triggerConverter, workflow workflowConverter) error {
// get the default service
svc, ok := w.services.Load("default")
if !ok {
return fmt.Errorf("could not load default service")
}
return svc.(*Service).On(t, workflow)
}
func (w *Worker) RegisterAction(name string, method any) error {
// get the default service
svc, ok := w.services.Load("default")
if !ok {
return fmt.Errorf("could not load default service")
}
return svc.(*Service).RegisterAction(method)
}
func (w *Worker) registerAction(name string, method any) error {
actionFunc, err := getFnFromMethod(method)
if err != nil {
@@ -163,7 +195,7 @@ func (w *Worker) Start(ctx context.Context) error {
actionNames = append(actionNames, job.Name())
}
listener, err := w.client.GetActionListener(ctx, &client.GetActionListenerRequest{
listener, err := w.client.Dispatcher().GetActionListener(ctx, &client.GetActionListenerRequest{
WorkerName: w.name,
Actions: actionNames,
})
@@ -195,13 +227,10 @@ RunWorker:
}
w.l.Debug().Msgf("action %s completed with result %v", action.ActionId, res)
return
}(action)
case <-ctx.Done():
w.l.Debug().Msgf("worker %s received context done, stopping", w.name)
break RunWorker
default:
}
}
@@ -228,7 +257,7 @@ func (w *Worker) executeAction(ctx context.Context, assignedAction *client.Actio
func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action) (result any, err error) {
// send a message that the step run started
_, err = w.client.SendActionEvent(
_, err = w.client.Dispatcher().SendActionEvent(
ctx,
w.getActionEvent(assignedAction, client.ActionEventTypeStarted),
)
@@ -269,9 +298,11 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action
default:
}
result = runResults[0]
if len(runResults) == 2 {
result = runResults[0]
}
if runResults[1] != nil {
if runResults[len(runResults)-1] != nil {
err = runResults[1].(error)
}
@@ -280,7 +311,7 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action
failureEvent.EventPayload = err.Error()
_, err := w.client.SendActionEvent(
_, err := w.client.Dispatcher().SendActionEvent(
ctx,
failureEvent,
)
@@ -292,13 +323,6 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action
return nil, err
}
// TODO: check last argument for error
// if err != nil {
// // TODO: send a message that the step run failed
// return nil, fmt.Errorf("could not run job: %w", err)
// }
// send a message that the step run completed
finishedEvent, err := w.getActionFinishedEvent(assignedAction, result)
@@ -306,7 +330,7 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action
return nil, fmt.Errorf("could not create finished event: %w", err)
}
_, err = w.client.SendActionEvent(
_, err = w.client.Dispatcher().SendActionEvent(
ctx,
finishedEvent,
)
+201
View File
@@ -0,0 +1,201 @@
package worker
import (
"fmt"
"reflect"
"runtime"
"strings"
"github.com/hatchet-dev/hatchet/pkg/client/types"
)
type triggerConverter interface {
ToWorkflowTriggers(*types.WorkflowTriggers)
}
type Cron string
func (c Cron) ToWorkflowTriggers(wt *types.WorkflowTriggers) {
if wt.Cron == nil {
wt.Cron = []string{}
}
wt.Cron = append(wt.Cron, string(c))
}
type Event string
func (e Event) ToWorkflowTriggers(wt *types.WorkflowTriggers) {
if wt.Events == nil {
wt.Events = []string{}
}
wt.Events = append(wt.Events, string(e))
}
type workflowConverter interface {
ToWorkflow(svcName string) types.Workflow
ToActionMap(svcName string) map[string]any
}
type Workflow struct {
Jobs []WorkflowJob
}
type WorkflowJob struct {
// The name of the job
Name string
Description string
Timeout string
// The steps that are run in the job
Steps []WorkflowStep
}
func (j *WorkflowJob) ToWorkflow(svcName string) types.Workflow {
apiJob, err := j.ToWorkflowJob(svcName)
if err != nil {
panic(err)
}
jobs := map[string]types.WorkflowJob{
j.Name: *apiJob,
}
return types.Workflow{
Name: j.Name,
Version: "v0.1.0",
Jobs: jobs,
}
}
func (j *WorkflowJob) ToWorkflowJob(svcName string) (*types.WorkflowJob, error) {
apiJob := &types.WorkflowJob{
Description: j.Description,
Timeout: j.Timeout,
Steps: []types.WorkflowStep{},
}
var prevStep *step
for i, step := range j.Steps {
newStep, err := step.ToWorkflowStep(prevStep, svcName, i)
if err != nil {
return nil, err
}
apiJob.Steps = append(apiJob.Steps, newStep.APIStep)
prevStep = newStep
}
return apiJob, nil
}
func (j *WorkflowJob) ToActionMap(svcName string) map[string]any {
res := map[string]any{}
for i, step := range j.Steps {
actionId := step.GetActionId(svcName, i)
res[actionId] = step.Function
}
return res
}
type WorkflowStep struct {
// The step timeout
Timeout string
// The executed function
Function any
}
type step struct {
Id string
// non-ctx input is not optional
NonCtxInput reflect.Type
// non-err output is optional
NonErrOutput *reflect.Type
APIStep types.WorkflowStep
}
func (s *WorkflowStep) ToWorkflowStep(prevStep *step, svcName string, index int) (*step, error) {
fnType := reflect.TypeOf(s.Function)
res := &step{}
res.Id = s.GetStepId(index)
res.APIStep = types.WorkflowStep{
Name: res.Id,
ID: s.GetStepId(index),
Timeout: s.Timeout,
ActionID: s.GetActionId(svcName, index),
}
inputs, err := decodeFnArgTypes(fnType)
if err != nil {
return nil, err
}
res.NonCtxInput = inputs[1]
outputs, err := decodeFnReturnTypes(fnType)
if err != nil {
return nil, err
}
if len(outputs) > 1 {
res.NonErrOutput = &outputs[0]
}
// if the previous step's first output matches the last input of this step, then the data
// is passed through
if prevStep != nil && prevStep.NonErrOutput != nil {
if inputs[1] == *prevStep.NonErrOutput {
res.APIStep.With = map[string]interface{}{
"object": "{{ .steps." + prevStep.Id + ".json }}",
}
}
} else {
res.APIStep.With = map[string]interface{}{
"object": "{{ .input.json }}",
}
}
return res, nil
}
func (s *WorkflowStep) GetStepId(index int) string {
stepId := s.getFnName()
// this can happen if the function is anonymous
if stepId == "" {
stepId = fmt.Sprintf("step%d", index)
}
return stepId
}
func (s *WorkflowStep) GetActionId(svcName string, index int) string {
stepId := s.GetStepId(index)
return fmt.Sprintf("%s:%s", svcName, stepId)
}
func (s *WorkflowStep) getFnName() string {
fnName := runtime.FuncForPC(reflect.ValueOf(s.Function).Pointer()).Name()
return strings.Split(fnName, ".")[1]
}
+48
View File
@@ -0,0 +1,48 @@
package worker
// import (
// "context"
// "testing"
// )
// // type actionInput struct {
// // Message string `json:"message"`
// // }
// // type stepOneOutput struct {
// // Message string `json:"message"`
// // }
// // type stepTwoOutput struct {
// // Message string `json:"message"`
// // }
// // func TestToWorkflowJob(t *testing.T) {
// // testJob := WorkflowJob{
// // Name: "test",
// // Description: "test",
// // Timeout: "1m",
// // Steps: []WorkflowStep{
// // {
// // ActionId: "test:test",
// // Function: func(ctx context.Context, input *actionInput) (result *stepOneOutput, err error) {
// // return nil, nil
// // },
// // },
// // {
// // ActionId: "test:test",
// // Function: func(ctx context.Context, input *stepOneOutput) (result *stepTwoOutput, err error) {
// // return nil, nil
// // },
// // },
// // },
// // }
// // job, err := testJob.ToWorkflowJob()
// // if err != nil {
// // t.Fatalf("could not convert workflow job: %v", err)
// // }
// // t.Fatalf("%v", job)
// // }