diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 578f580cd..4af814642 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -20,7 +20,7 @@ jobs: - name: Pull and push hatchet-api run: | docker pull ghcr.io/hatchet-dev/hatchet/hatchet-api:${{steps.tag_name.outputs.tag}} - docker tag ghcr.io/hatchet-dev/hatchet/hatchet-api:${{steps.tag_name.outputs.tag}} ghcr.io/hatchet-dev/hatchet/hatchet-server:latest + docker tag ghcr.io/hatchet-dev/hatchet/hatchet-api:${{steps.tag_name.outputs.tag}} ghcr.io/hatchet-dev/hatchet/hatchet-api:latest docker push ghcr.io/hatchet-dev/hatchet/hatchet-api:latest - name: Pull and push hatchet-engine run: | diff --git a/.gitignore b/.gitignore index 6fb4aaa9a..768537ecf 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ dump.rdb *.pfx *.cert generated +.next node_modules diff --git a/api-contracts/dispatcher/dispatcher.proto b/api-contracts/dispatcher/dispatcher.proto index 38d903758..66ede681a 100644 --- a/api-contracts/dispatcher/dispatcher.proto +++ b/api-contracts/dispatcher/dispatcher.proto @@ -23,6 +23,9 @@ message WorkerRegisterRequest { // a list of actions that this worker can run repeated string actions = 3; + + // (optional) the services for this worker + repeated string services = 4; } message WorkerRegisterResponse { diff --git a/cmd/hatchet-admin/cli/quickstart.go b/cmd/hatchet-admin/cli/quickstart.go index 3505edf52..4aa94411e 100644 --- a/cmd/hatchet-admin/cli/quickstart.go +++ b/cmd/hatchet-admin/cli/quickstart.go @@ -2,7 +2,6 @@ package cli import ( _ "embed" - "io/ioutil" "fmt" "os" @@ -189,22 +188,25 @@ func setupCerts(generated *generatedConfigFiles) error { return fmt.Errorf("could not create worker-client-cert.conf file: %w", err) } - // run openssl commands - c := exec.Command("bash", "-s", "-", fullPathCertDir) + // if CA files don't exists, run the script to regenerate all certs + if overwrite || (!fileExists(filepath.Join(fullPathCertDir, "./ca.key")) || !fileExists(filepath.Join(fullPathCertDir, "./ca.cert"))) { + // run openssl commands + c := exec.Command("bash", "-s", "-", fullPathCertDir) - c.Stdin = strings.NewReader(GenerateCertsScript) - c.Stdout = os.Stdout - c.Stderr = os.Stderr + c.Stdin = strings.NewReader(GenerateCertsScript) + c.Stdout = os.Stdout + c.Stderr = os.Stderr - err = c.Run() + err = c.Run() - if err != nil { - return err + if err != nil { + return err + } } generated.sc.TLS.TLSRootCAFile = filepath.Join(fullPathCertDir, "ca.cert") - generated.sc.TLS.TLSCertFile = filepath.Join(fullPathCertDir, "client-internal-admin.pem") - generated.sc.TLS.TLSKeyFile = filepath.Join(fullPathCertDir, "client-internal-admin.key") + generated.sc.TLS.TLSCertFile = filepath.Join(fullPathCertDir, "cluster.pem") + generated.sc.TLS.TLSKeyFile = filepath.Join(fullPathCertDir, "cluster.key") return nil } @@ -273,7 +275,7 @@ func getFiles(name string) [][]byte { basePath := filepath.Join(configDirectory, name) if fileExists(basePath) { - configFileBytes, err := ioutil.ReadFile(basePath) + configFileBytes, err := os.ReadFile(basePath) if err != nil { panic(err) @@ -285,7 +287,7 @@ func getFiles(name string) [][]byte { generatedPath := filepath.Join(generatedConfigDir, name) if fileExists(generatedPath) { - generatedFileBytes, err := ioutil.ReadFile(filepath.Join(generatedConfigDir, name)) + generatedFileBytes, err := os.ReadFile(filepath.Join(generatedConfigDir, name)) if err != nil { panic(err) @@ -314,7 +316,7 @@ func writeGeneratedConfig(generated *generatedConfigFiles) error { return err } - err = ioutil.WriteFile(databasePath, databaseConfigBytes, 0666) + err = os.WriteFile(databasePath, databaseConfigBytes, 0666) if err != nil { return fmt.Errorf("could not write database.yaml file: %w", err) @@ -328,7 +330,7 @@ func writeGeneratedConfig(generated *generatedConfigFiles) error { return err } - err = ioutil.WriteFile(serverPath, serverConfigBytes, 0666) + err = os.WriteFile(serverPath, serverConfigBytes, 0666) if err != nil { return fmt.Errorf("could not write server.yaml file: %w", err) diff --git a/cmd/hatchet-api/main.go b/cmd/hatchet-api/main.go index caa6b5854..e0a73b762 100644 --- a/cmd/hatchet-api/main.go +++ b/cmd/hatchet-api/main.go @@ -5,8 +5,8 @@ import ( "os" "github.com/hatchet-dev/hatchet/api/v1/server/run" - "github.com/hatchet-dev/hatchet/cmd/cmdutils" "github.com/hatchet-dev/hatchet/internal/config/loader" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/spf13/cobra" ) @@ -62,7 +62,7 @@ func startServerOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) { panic(err) } - ctx, cancel := cmdutils.InterruptContext(interruptCh) + ctx, cancel := cmdutils.InterruptContextFromChan(interruptCh) defer cancel() runner := run.NewAPIServer(sc) diff --git a/cmd/hatchet-engine/main.go b/cmd/hatchet-engine/main.go index b36f00824..71276c4e1 100644 --- a/cmd/hatchet-engine/main.go +++ b/cmd/hatchet-engine/main.go @@ -5,7 +5,6 @@ import ( "os" "sync" - "github.com/hatchet-dev/hatchet/cmd/cmdutils" "github.com/hatchet-dev/hatchet/internal/config/loader" "github.com/hatchet-dev/hatchet/internal/services/admin" "github.com/hatchet-dev/hatchet/internal/services/dispatcher" @@ -14,6 +13,7 @@ import ( "github.com/hatchet-dev/hatchet/internal/services/ingestor" "github.com/hatchet-dev/hatchet/internal/services/jobscontroller" "github.com/hatchet-dev/hatchet/internal/services/ticker" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/spf13/cobra" ) @@ -69,7 +69,7 @@ func startEngineOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) { } errCh := make(chan error) - ctx, cancel := cmdutils.InterruptContext(interruptCh) + ctx, cancel := cmdutils.InterruptContextFromChan(interruptCh) wg := sync.WaitGroup{} if sc.HasService("grpc") { @@ -121,6 +121,8 @@ func startEngineOrDie(cf *loader.ConfigLoader, interruptCh <-chan interface{}) { grpc.WithAdmin(adminSvc), grpc.WithLogger(sc.Logger), grpc.WithTLSConfig(sc.TLSConfig), + grpc.WithPort(sc.Runtime.GRPCPort), + grpc.WithBindAddress(sc.Runtime.GRPCBindAddress), ) if err != nil { diff --git a/examples/cron/main.go b/examples/cron/main.go index 37c7e799b..21e37aae2 100644 --- a/examples/cron/main.go +++ b/examples/cron/main.go @@ -4,14 +4,21 @@ import ( "context" "fmt" - "github.com/hatchet-dev/hatchet/cmd/cmdutils" "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/hatchet-dev/hatchet/pkg/worker" + "github.com/joho/godotenv" ) type printInput struct{} func main() { + err := godotenv.Load() + + if err != nil { + panic(err) + } + client, err := client.New( client.InitWorkflows(), ) @@ -23,8 +30,8 @@ func main() { // Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet // directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options. worker, err := worker.NewWorker( - worker.WithDispatcherClient( - client.Dispatcher(), + worker.WithClient( + client, ), ) @@ -42,7 +49,7 @@ func main() { panic(err) } - interruptCtx, cancel := cmdutils.InterruptContext(cmdutils.InterruptChan()) + interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan()) defer cancel() err = worker.Start(interruptCtx) diff --git a/examples/go-sdk-workflow/main.go b/examples/go-sdk-workflow/main.go new file mode 100644 index 000000000..5738d548e --- /dev/null +++ b/examples/go-sdk-workflow/main.go @@ -0,0 +1,135 @@ +package main + +import ( + "context" + "time" + + "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" + "github.com/hatchet-dev/hatchet/pkg/worker" + "github.com/joho/godotenv" +) + +type userCreateEvent struct { + Username string `json:"username"` + UserId string `json:"user_id"` + Data map[string]string `json:"data"` +} + +type stepOneOutput struct { + Message string `json:"message"` +} + +func StepOne(ctx context.Context, input *userCreateEvent) (result *stepOneOutput, err error) { + return &stepOneOutput{ + Message: "Username is: " + input.Username, + }, nil +} + +func StepTwo(ctx context.Context, input *stepOneOutput) (result *stepOneOutput, err error) { + return &stepOneOutput{ + Message: "Above message is: " + input.Message, + }, nil +} + +func main() { + err := godotenv.Load() + + if err != nil { + panic(err) + } + + client, err := client.New() + + if err != nil { + panic(err) + } + + // Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet + // directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options. + w, err := worker.NewWorker( + worker.WithClient( + client, + ), + ) + + if err != nil { + panic(err) + } + + err = w.On(worker.Event("user:create"), &worker.WorkflowJob{ + Name: "test-job", + Description: "This is a test job.", + Steps: []worker.WorkflowStep{ + { + Function: StepOne, + }, + { + Function: StepTwo, + }, + }, + }) + + if err != nil { + panic(err) + } + + // err = worker.RegisterAction("echo:echo", func(ctx context.Context, input *actionInput) (result any, err error) { + // return map[string]interface{}{ + // "message": input.Message, + // }, nil + // }) + + // if err != nil { + // panic(err) + // } + + // err = worker.RegisterAction("echo:object", func(ctx context.Context, input *actionInput) (result any, err error) { + // return nil, nil + // }) + + // if err != nil { + // panic(err) + // } + + interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan()) + defer cancel() + + go func() { + err = w.Start(interruptCtx) + + if err != nil { + panic(err) + } + + cancel() + }() + + testEvent := userCreateEvent{ + Username: "echo-test", + UserId: "1234", + Data: map[string]string{ + "test": "test", + }, + } + + // push an event + err = client.Event().Push( + context.Background(), + "user:create", + testEvent, + ) + + if err != nil { + panic(err) + } + + for { + select { + case <-interruptCtx.Done(): + return + default: + time.Sleep(time.Second) + } + } +} diff --git a/examples/requeue/main.go b/examples/requeue/main.go index 203d6fa0e..3dfb98228 100644 --- a/examples/requeue/main.go +++ b/examples/requeue/main.go @@ -4,9 +4,10 @@ import ( "context" "time" - "github.com/hatchet-dev/hatchet/cmd/cmdutils" "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/hatchet-dev/hatchet/pkg/worker" + "github.com/joho/godotenv" ) type sampleEvent struct{} @@ -14,6 +15,12 @@ type sampleEvent struct{} type requeueInput struct{} func main() { + err := godotenv.Load() + + if err != nil { + panic(err) + } + client, err := client.New( client.InitWorkflows(), ) @@ -25,8 +32,8 @@ func main() { // Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet // directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options. worker, err := worker.NewWorker( - worker.WithDispatcherClient( - client.Dispatcher(), + worker.WithClient( + client, ), ) @@ -42,7 +49,7 @@ func main() { panic(err) } - interruptCtx, cancel := cmdutils.InterruptContext(cmdutils.InterruptChan()) + interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan()) defer cancel() event := sampleEvent{} diff --git a/examples/schedule-timeout/main.go b/examples/schedule-timeout/main.go index 1a20c75b0..851c118de 100644 --- a/examples/schedule-timeout/main.go +++ b/examples/schedule-timeout/main.go @@ -6,6 +6,7 @@ import ( "time" "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/joho/godotenv" ) type sampleEvent struct{} @@ -13,6 +14,12 @@ type sampleEvent struct{} type timeoutInput struct{} func main() { + err := godotenv.Load() + + if err != nil { + panic(err) + } + client, err := client.New( client.InitWorkflows(), ) diff --git a/examples/simple/.hatchet/sample-workflow.yaml b/examples/simple/.hatchet/sample-workflow.yaml index 7f30ef30b..749e721f6 100644 --- a/examples/simple/.hatchet/sample-workflow.yaml +++ b/examples/simple/.hatchet/sample-workflow.yaml @@ -21,3 +21,8 @@ jobs: timeout: 60s with: message: "Above message is: {{ .steps.echo2.message }}" + - id: testObject + action: echo:object + timeout: 60s + with: + object: "{{ .steps.echo3.json }}" diff --git a/examples/simple/main.go b/examples/simple/main.go index 6f5e9d60e..8d8b97f74 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -4,9 +4,10 @@ import ( "context" "time" - "github.com/hatchet-dev/hatchet/cmd/cmdutils" "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/hatchet-dev/hatchet/pkg/worker" + "github.com/joho/godotenv" ) type userCreateEvent struct { @@ -20,6 +21,12 @@ type actionInput struct { } func main() { + err := godotenv.Load() + + if err != nil { + panic(err) + } + client, err := client.New( client.InitWorkflows(), ) @@ -31,8 +38,8 @@ func main() { // Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet // directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options. worker, err := worker.NewWorker( - worker.WithDispatcherClient( - client.Dispatcher(), + worker.WithClient( + client, ), ) @@ -50,7 +57,15 @@ func main() { panic(err) } - interruptCtx, cancel := cmdutils.InterruptContext(cmdutils.InterruptChan()) + err = worker.RegisterAction("echo:object", func(ctx context.Context, input *actionInput) (result any, err error) { + return nil, nil + }) + + if err != nil { + panic(err) + } + + interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan()) defer cancel() go func() { diff --git a/examples/slack/main.go b/examples/slack/main.go index 1b960aecf..8d4a6de7b 100644 --- a/examples/slack/main.go +++ b/examples/slack/main.go @@ -8,11 +8,12 @@ import ( "context" "time" - "github.com/hatchet-dev/hatchet/cmd/cmdutils" "github.com/hatchet-dev/hatchet/pkg/client" "github.com/hatchet-dev/hatchet/pkg/client/types" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/hatchet-dev/hatchet/pkg/integrations/slack" "github.com/hatchet-dev/hatchet/pkg/worker" + "github.com/joho/godotenv" ) type userCreateEvent struct { @@ -27,6 +28,12 @@ type actionInput struct { var SlackChannelWorkflow []byte func init() { + err := godotenv.Load() + + if err != nil { + panic(err) + } + // initialize the slack channel workflow with SLACK_USER_ID slackUserId := os.Getenv("SLACK_USER_ID") @@ -75,8 +82,8 @@ func main() { // Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet // directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options. worker, err := worker.NewWorker( - worker.WithDispatcherClient( - client.Dispatcher(), + worker.WithClient( + client, ), worker.WithIntegration( slackInt, @@ -87,7 +94,7 @@ func main() { panic(err) } - interruptCtx, cancel := cmdutils.InterruptContext(cmdutils.InterruptChan()) + interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan()) defer cancel() go worker.Start(interruptCtx) diff --git a/examples/timeout/main.go b/examples/timeout/main.go index e4a390bd5..95faec9c9 100644 --- a/examples/timeout/main.go +++ b/examples/timeout/main.go @@ -5,9 +5,10 @@ import ( "fmt" "time" - "github.com/hatchet-dev/hatchet/cmd/cmdutils" "github.com/hatchet-dev/hatchet/pkg/client" + "github.com/hatchet-dev/hatchet/pkg/cmdutils" "github.com/hatchet-dev/hatchet/pkg/worker" + "github.com/joho/godotenv" ) type sampleEvent struct{} @@ -15,6 +16,12 @@ type sampleEvent struct{} type timeoutInput struct{} func main() { + err := godotenv.Load() + + if err != nil { + panic(err) + } + client, err := client.New( client.InitWorkflows(), ) @@ -26,8 +33,8 @@ func main() { // Create a worker. This automatically reads in a TemporalClient from .env and workflow files from the .hatchet // directory, but this can be customized with the `worker.WithTemporalClient` and `worker.WithWorkflowFiles` options. worker, err := worker.NewWorker( - worker.WithDispatcherClient( - client.Dispatcher(), + worker.WithClient( + client, ), ) @@ -48,7 +55,7 @@ func main() { panic(err) } - interruptCtx, cancel := cmdutils.InterruptContext(cmdutils.InterruptChan()) + interruptCtx, cancel := cmdutils.InterruptContextFromChan(cmdutils.InterruptChan()) defer cancel() go func() { diff --git a/frontend/app/src/lib/atoms.ts b/frontend/app/src/lib/atoms.ts index 9b7967423..4542cde4d 100644 --- a/frontend/app/src/lib/atoms.ts +++ b/frontend/app/src/lib/atoms.ts @@ -87,7 +87,7 @@ export function useTenantContext(): [ } else if (computedCurrTenant?.metadata.id) { const newSearchParams = new URLSearchParams(searchParams); newSearchParams.set('tenant', computedCurrTenant?.metadata.id); - setSearchParams(newSearchParams); + setSearchParams(newSearchParams, { replace: true }); } } }, [ @@ -116,7 +116,7 @@ export function useTenantContext(): [ const setTenant = (tenant: Tenant) => { const newSearchParams = new URLSearchParams(searchParams); newSearchParams.set('tenant', tenant.metadata.id); - setSearchParams(newSearchParams); + setSearchParams(newSearchParams, { replace: true }); }; return [currTenant || computedCurrTenant, setTenant]; diff --git a/frontend/app/src/pages/main/index.tsx b/frontend/app/src/pages/main/index.tsx index 55a4bbdd1..1b73f8a24 100644 --- a/frontend/app/src/pages/main/index.tsx +++ b/frontend/app/src/pages/main/index.tsx @@ -70,7 +70,7 @@ function Main() {
-
+
diff --git a/frontend/docs/package.json b/frontend/docs/package.json index 7af20ad54..b8c3899b3 100644 --- a/frontend/docs/package.json +++ b/frontend/docs/package.json @@ -29,4 +29,4 @@ "@types/node": "18.11.10", "typescript": "^4.9.3" } -} +} \ No newline at end of file diff --git a/frontend/docs/pages/go-sdk/creating-a-worker.mdx b/frontend/docs/pages/go-sdk/creating-a-worker.mdx index 9d3629c51..9f6cbebef 100644 --- a/frontend/docs/pages/go-sdk/creating-a-worker.mdx +++ b/frontend/docs/pages/go-sdk/creating-a-worker.mdx @@ -55,4 +55,4 @@ The client to use to communicate with the Hatchet instance. This is required. ### `worker.WithName` -The name of the worker. This is used to identify the worker in the Hatchet UI. \ No newline at end of file +The name of the worker. This is used to identify the worker in the Hatchet UI. diff --git a/internal/auth/cookie/sessionstore.go b/internal/auth/cookie/sessionstore.go index ab904eb08..473e6c77f 100644 --- a/internal/auth/cookie/sessionstore.go +++ b/internal/auth/cookie/sessionstore.go @@ -159,6 +159,10 @@ func (store *UserSessionStore) New(r *http.Request, name string) (*sessions.Sess } else { session.IsNew = false } + } else if strings.Contains(err.Error(), "the value is not valid") { + // this error occurs if the encryption keys have been rotated, in which case we'd like a new cookie + err = nil + session.IsNew = true } } diff --git a/internal/config/loader/loader.go b/internal/config/loader/loader.go index f13c66a37..2b0919532 100644 --- a/internal/config/loader/loader.go +++ b/internal/config/loader/loader.go @@ -1,24 +1,18 @@ -// Adapted from: https://github.com/hatchet-dev/hatchet/blob/3c2c13168afa1af68d4baaf5ed02c9d49c5f0323/internal/config/loader/loader.go +// Adapted from: https://github.com/hatchet-dev/hatchet-v1-archived/blob/3c2c13168afa1af68d4baaf5ed02c9d49c5f0323/internal/config/loader/loader.go package loader import ( - "bytes" "context" - "crypto/tls" - "crypto/x509" "fmt" - "io/ioutil" "os" "path/filepath" "strings" - "github.com/creasty/defaults" "github.com/hatchet-dev/hatchet/internal/auth/cookie" - "github.com/hatchet-dev/hatchet/internal/config/client" "github.com/hatchet-dev/hatchet/internal/config/database" + "github.com/hatchet-dev/hatchet/internal/config/loader/loaderutils" "github.com/hatchet-dev/hatchet/internal/config/server" - "github.com/hatchet-dev/hatchet/internal/config/shared" "github.com/hatchet-dev/hatchet/internal/repository/prisma" "github.com/hatchet-dev/hatchet/internal/repository/prisma/db" "github.com/hatchet-dev/hatchet/internal/services/ingestor" @@ -26,7 +20,6 @@ import ( "github.com/hatchet-dev/hatchet/internal/validator" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" - "github.com/spf13/viper" ) // LoadDatabaseConfigFile loads the database config file via viper @@ -34,7 +27,7 @@ func LoadDatabaseConfigFile(files ...[]byte) (*database.ConfigFile, error) { configFile := &database.ConfigFile{} f := database.BindAllEnv - _, err := LoadConfigFromViper(f, configFile, files...) + _, err := loaderutils.LoadConfigFromViper(f, configFile, files...) return configFile, err } @@ -44,45 +37,11 @@ func LoadServerConfigFile(files ...[]byte) (*server.ServerConfigFile, error) { configFile := &server.ServerConfigFile{} f := server.BindAllEnv - _, err := LoadConfigFromViper(f, configFile, files...) + _, err := loaderutils.LoadConfigFromViper(f, configFile, files...) return configFile, err } -// LoadClientConfigFile loads the worker config file via viper -func LoadClientConfigFile(files ...[]byte) (*client.ClientConfigFile, error) { - configFile := &client.ClientConfigFile{} - f := client.BindAllEnv - - _, err := LoadConfigFromViper(f, configFile, files...) - - return configFile, err -} - -func LoadConfigFromViper(bindFunc func(v *viper.Viper), configFile interface{}, files ...[]byte) (*viper.Viper, error) { - v := viper.New() - v.SetConfigType("yaml") - bindFunc(v) - - for _, f := range files { - err := v.MergeConfig(bytes.NewBuffer(f)) - - if err != nil { - return nil, fmt.Errorf("could not load viper config: %w", err) - } - } - - defaults.Set(configFile) - - err := v.Unmarshal(configFile) - - if err != nil { - return nil, fmt.Errorf("could not unmarshal viper config: %w", err) - } - - return v, nil -} - type ConfigLoader struct { directory string } @@ -94,7 +53,7 @@ func NewConfigLoader(directory string) *ConfigLoader { // LoadDatabaseConfig loads the database configuration func (c *ConfigLoader) LoadDatabaseConfig() (res *database.Config, err error) { sharedFilePath := filepath.Join(c.directory, "database.yaml") - configFileBytes, err := getConfigBytes(sharedFilePath) + configFileBytes, err := loaderutils.GetConfigBytes(sharedFilePath) if err != nil { return nil, err @@ -112,7 +71,7 @@ func (c *ConfigLoader) LoadDatabaseConfig() (res *database.Config, err error) { // LoadServerConfig loads the server configuration func (c *ConfigLoader) LoadServerConfig() (res *server.ServerConfig, err error) { sharedFilePath := filepath.Join(c.directory, "server.yaml") - configFileBytes, err := getConfigBytes(sharedFilePath) + configFileBytes, err := loaderutils.GetConfigBytes(sharedFilePath) if err != nil { return nil, err @@ -126,54 +85,13 @@ func (c *ConfigLoader) LoadServerConfig() (res *server.ServerConfig, err error) cf, err := LoadServerConfigFile(configFileBytes...) + if err != nil { + return nil, err + } + return GetServerConfigFromConfigfile(dc, cf) } -// LoadClientConfig loads the client configuration -func (c *ConfigLoader) LoadClientConfig() (res *client.ClientConfig, err error) { - sharedFilePath := filepath.Join(c.directory, "client.yaml") - configFileBytes, err := getConfigBytes(sharedFilePath) - - if err != nil { - return nil, err - } - - cf, err := LoadClientConfigFile(configFileBytes...) - - if err != nil { - return nil, err - } - - return GetClientConfigFromConfigFile(cf) -} - -func getConfigBytes(configFilePath string) ([][]byte, error) { - configFileBytes := make([][]byte, 0) - - if fileExists(configFilePath) { - fileBytes, err := ioutil.ReadFile(configFilePath) // #nosec G304 -- config files are meant to be read from user-supplied directory - - if err != nil { - return nil, fmt.Errorf("could not read config file at path %s: %w", configFilePath, err) - } - - configFileBytes = append(configFileBytes, fileBytes) - } - - return configFileBytes, nil -} - -func fileExists(filename string) bool { - info, err := os.Stat(filename) - if err != nil && os.IsNotExist(err) { - return false - } else if err != nil { - return false - } - - return !info.IsDir() -} - func GetDatabaseConfigFromConfigFile(cf *database.ConfigFile) (res *database.Config, err error) { databaseUrl := fmt.Sprintf( "postgresql://%s:%s@%s:%d/%s?sslmode=%s", @@ -185,7 +103,7 @@ func GetDatabaseConfigFromConfigFile(cf *database.ConfigFile) (res *database.Con cf.PostgresSSLMode, ) - os.Setenv("DATABASE_URL", databaseUrl) + // os.Setenv("DATABASE_URL", databaseUrl) client := db.NewClient( // db.WithDatasourceURL(databaseUrl), @@ -211,17 +129,12 @@ func GetDatabaseConfigFromConfigFile(cf *database.ConfigFile) (res *database.Con func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigFile) (res *server.ServerConfig, err error) { l := zerolog.New(os.Stderr) - tls, err := loadServerTLSConfig(&cf.TLS) + tls, err := loaderutils.LoadServerTLSConfig(&cf.TLS) if err != nil { return nil, fmt.Errorf("could not load TLS config: %w", err) } - runtime := server.ServerRuntimeConfig{ - ServerURL: cf.Runtime.ServerURL, - Port: cf.Runtime.Port, - } - ss, err := cookie.NewUserSessionStore( cookie.WithSessionRepository(dc.Repository.UserSession()), cookie.WithCookieAllowInsecure(cf.Auth.Cookie.Insecure), @@ -246,7 +159,7 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF } return &server.ServerConfig{ - Runtime: runtime, + Runtime: cf.Runtime, Auth: cf.Auth, Config: dc, TaskQueue: rabbitmq.New(context.Background(), rabbitmq.WithURL(cf.TaskQueue.RabbitMQ.URL)), @@ -259,78 +172,6 @@ func GetServerConfigFromConfigfile(dc *database.Config, cf *server.ServerConfigF }, nil } -func GetClientConfigFromConfigFile(cf *client.ClientConfigFile) (res *client.ClientConfig, err error) { - tlsConf, err := loadClientTLSConfig(&cf.TLS) - - if err != nil { - return nil, fmt.Errorf("could not load TLS config: %w", err) - } - - return &client.ClientConfig{ - TenantId: cf.TenantId, - TLSConfig: tlsConf, - }, nil -} - -func loadClientTLSConfig(tlsConfig *client.ClientTLSConfigFile) (*tls.Config, error) { - res, ca, err := LoadBaseTLSConfig(&tlsConfig.Base) - - if err != nil { - return nil, err - } - - res.ServerName = tlsConfig.TLSServerName - res.RootCAs = ca - - return res, nil -} - -func loadServerTLSConfig(tlsConfig *shared.TLSConfigFile) (*tls.Config, error) { - res, ca, err := LoadBaseTLSConfig(tlsConfig) - - if err != nil { - return nil, err - } - - res.ClientAuth = tls.RequireAndVerifyClientCert - res.ClientCAs = ca - - return res, nil -} - -func LoadBaseTLSConfig(tlsConfig *shared.TLSConfigFile) (*tls.Config, *x509.CertPool, error) { - var x509Cert tls.Certificate - var err error - - if tlsConfig.TLSCert != "" && tlsConfig.TLSKey != "" { - x509Cert, err = tls.X509KeyPair([]byte(tlsConfig.TLSCert), []byte(tlsConfig.TLSKey)) - } else if tlsConfig.TLSCertFile != "" && tlsConfig.TLSKeyFile != "" { - x509Cert, err = tls.LoadX509KeyPair(tlsConfig.TLSCertFile, tlsConfig.TLSKeyFile) - } else { - return nil, nil, fmt.Errorf("no cert or key provided") - } - - var caBytes []byte - - if tlsConfig.TLSRootCA != "" { - caBytes = []byte(tlsConfig.TLSRootCA) - } else if tlsConfig.TLSRootCAFile != "" { - caBytes, err = os.ReadFile(tlsConfig.TLSRootCAFile) - } else { - return nil, nil, fmt.Errorf("no root CA provided") - } - - ca := x509.NewCertPool() - - if ok := ca.AppendCertsFromPEM(caBytes); !ok { - return nil, nil, fmt.Errorf("could not append root CA to cert pool: %w", err) - } - - return &tls.Config{ - Certificates: []tls.Certificate{x509Cert}, - }, ca, nil -} - func getStrArr(v string) []string { return strings.Split(v, " ") } diff --git a/internal/config/loader/loaderutils/files.go b/internal/config/loader/loaderutils/files.go new file mode 100644 index 000000000..7c1678d15 --- /dev/null +++ b/internal/config/loader/loaderutils/files.go @@ -0,0 +1,33 @@ +package loaderutils + +import ( + "fmt" + "os" +) + +func GetConfigBytes(configFilePath string) ([][]byte, error) { + configFileBytes := make([][]byte, 0) + + if fileExists(configFilePath) { + fileBytes, err := os.ReadFile(configFilePath) // #nosec G304 -- config files are meant to be read from user-supplied directory + + if err != nil { + return nil, fmt.Errorf("could not read config file at path %s: %w", configFilePath, err) + } + + configFileBytes = append(configFileBytes, fileBytes) + } + + return configFileBytes, nil +} + +func fileExists(filename string) bool { + info, err := os.Stat(filename) + if err != nil && os.IsNotExist(err) { + return false + } else if err != nil { + return false + } + + return !info.IsDir() +} diff --git a/internal/config/loader/loaderutils/tls.go b/internal/config/loader/loaderutils/tls.go new file mode 100644 index 000000000..149e4dcb4 --- /dev/null +++ b/internal/config/loader/loaderutils/tls.go @@ -0,0 +1,70 @@ +package loaderutils + +import ( + "crypto/tls" + "crypto/x509" + "fmt" + "os" + + "github.com/hatchet-dev/hatchet/internal/config/client" + "github.com/hatchet-dev/hatchet/internal/config/shared" +) + +func LoadClientTLSConfig(tlsConfig *client.ClientTLSConfigFile) (*tls.Config, error) { + res, ca, err := LoadBaseTLSConfig(&tlsConfig.Base) + + if err != nil { + return nil, err + } + + res.ServerName = tlsConfig.TLSServerName + res.RootCAs = ca + + return res, nil +} + +func LoadServerTLSConfig(tlsConfig *shared.TLSConfigFile) (*tls.Config, error) { + res, ca, err := LoadBaseTLSConfig(tlsConfig) + + if err != nil { + return nil, err + } + + res.ClientAuth = tls.RequireAndVerifyClientCert + res.ClientCAs = ca + + return res, nil +} + +func LoadBaseTLSConfig(tlsConfig *shared.TLSConfigFile) (*tls.Config, *x509.CertPool, error) { + var x509Cert tls.Certificate + var err error + + if tlsConfig.TLSCert != "" && tlsConfig.TLSKey != "" { + x509Cert, err = tls.X509KeyPair([]byte(tlsConfig.TLSCert), []byte(tlsConfig.TLSKey)) + } else if tlsConfig.TLSCertFile != "" && tlsConfig.TLSKeyFile != "" { + x509Cert, err = tls.LoadX509KeyPair(tlsConfig.TLSCertFile, tlsConfig.TLSKeyFile) + } else { + return nil, nil, fmt.Errorf("no cert or key provided") + } + + var caBytes []byte + + if tlsConfig.TLSRootCA != "" { + caBytes = []byte(tlsConfig.TLSRootCA) + } else if tlsConfig.TLSRootCAFile != "" { + caBytes, err = os.ReadFile(tlsConfig.TLSRootCAFile) + } else { + return nil, nil, fmt.Errorf("no root CA provided") + } + + ca := x509.NewCertPool() + + if ok := ca.AppendCertsFromPEM(caBytes); !ok { + return nil, nil, fmt.Errorf("could not append root CA to cert pool: %w", err) + } + + return &tls.Config{ + Certificates: []tls.Certificate{x509Cert}, + }, ca, nil +} diff --git a/internal/config/loader/loaderutils/viper.go b/internal/config/loader/loaderutils/viper.go new file mode 100644 index 000000000..9adcd9d01 --- /dev/null +++ b/internal/config/loader/loaderutils/viper.go @@ -0,0 +1,33 @@ +package loaderutils + +import ( + "bytes" + "fmt" + + "github.com/creasty/defaults" + "github.com/spf13/viper" +) + +func LoadConfigFromViper(bindFunc func(v *viper.Viper), configFile interface{}, files ...[]byte) (*viper.Viper, error) { + v := viper.New() + v.SetConfigType("yaml") + bindFunc(v) + + for _, f := range files { + err := v.MergeConfig(bytes.NewBuffer(f)) + + if err != nil { + return nil, fmt.Errorf("could not load viper config: %w", err) + } + } + + defaults.Set(configFile) + + err := v.Unmarshal(configFile) + + if err != nil { + return nil, fmt.Errorf("could not unmarshal viper config: %w", err) + } + + return v, nil +} diff --git a/internal/config/server/server.go b/internal/config/server/server.go index 208be1e5e..8ce7d0f7f 100644 --- a/internal/config/server/server.go +++ b/internal/config/server/server.go @@ -32,6 +32,12 @@ type ConfigFileRuntime struct { // ServerURL is the full server URL of the instance, including protocol. ServerURL string `mapstructure:"url" json:"url,omitempty" default:"http://localhost:8080"` + + // GRPCPort is the port that the grpc service listens on + GRPCPort int `mapstructure:"grpcPort" json:"grpcPort,omitempty" default:"7070"` + + // GRPCBindAddress is the address that the grpc server binds to. Should set to 0.0.0.0 if binding in docker container. + GRPCBindAddress string `mapstructure:"grpcBindAddress" json:"grpcBindAddress,omitempty" default:"127.0.0.1"` } type ConfigFileAuth struct { @@ -66,17 +72,12 @@ type RabbitMQConfigFile struct { URL string `mapstructure:"url" json:"url,omitempty" validate:"required" default:"amqp://user:password@localhost:5672/"` } -type ServerRuntimeConfig struct { - ServerURL string - Port int -} - type ServerConfig struct { *database.Config Auth ConfigFileAuth - Runtime ServerRuntimeConfig + Runtime ConfigFileRuntime Services []string @@ -109,6 +110,8 @@ func BindAllEnv(v *viper.Viper) { // runtime options v.BindEnv("runtime.port", "SERVER_PORT") v.BindEnv("runtime.url", "SERVER_URL") + v.BindEnv("runtime.grpcPort", "SERVER_GRPC_PORT") + v.BindEnv("runtime.grpcBindAddress", "SERVER_GRPC_BIND_ADDRESS") v.BindEnv("services", "SERVER_SERVICES") // auth options diff --git a/internal/datautils/job_data.go b/internal/datautils/job_data.go index 6c28c65cb..1fe8f4403 100644 --- a/internal/datautils/job_data.go +++ b/internal/datautils/job_data.go @@ -21,10 +21,12 @@ func NewJobRunLookupDataFromInputBytes(input []byte) (JobRunLookupData, error) { return JobRunLookupData{}, fmt.Errorf("failed to convert input to map: %w", err) } - return NewJobRunLookupData(inputMap), nil + return NewJobRunLookupData(inputMap, input), nil } -func NewJobRunLookupData(input map[string]interface{}) JobRunLookupData { +func NewJobRunLookupData(input map[string]interface{}, rawInput []byte) JobRunLookupData { + input["json"] = string(rawInput) + return JobRunLookupData{ Input: input, } @@ -62,6 +64,9 @@ func AddStepOutput(data *types.JSON, stepReadableId string, stepOutput []byte) ( return nil, fmt.Errorf("failed to convert step output to map: %w", err) } + // add a "json" accessor to the output + outputMap["json"] = unquoted + currData := JobRunLookupData{} err = FromJSONType(data, &currData) diff --git a/internal/datautils/map.go b/internal/datautils/map.go index 5cd9711e9..c92d8e879 100644 --- a/internal/datautils/map.go +++ b/internal/datautils/map.go @@ -34,6 +34,10 @@ func jsonBytesToMap(jsonBytes []byte) (map[string]interface{}, error) { return nil, err } + if dataMap == nil { + return map[string]interface{}{}, nil + } + return dataMap, nil } diff --git a/internal/datautils/render.go b/internal/datautils/render.go index 1dd615e84..cc674edee 100644 --- a/internal/datautils/render.go +++ b/internal/datautils/render.go @@ -2,42 +2,63 @@ package datautils import ( "bytes" + "encoding/json" "fmt" "reflect" "text/template" ) // RenderTemplateFields recursively processes the input map, rendering any string fields using the data map. -func RenderTemplateFields(data map[string]interface{}, input map[string]interface{}) error { +func RenderTemplateFields(data map[string]interface{}, input map[string]interface{}) (map[string]interface{}, error) { + output := map[string]interface{}{} + for key, val := range input { switch v := val.(type) { case string: tmpl, err := template.New(key).Parse(v) if err != nil { - return fmt.Errorf("error creating template for key %s: %v", key, err) + return nil, fmt.Errorf("error creating template for key %s: %v", key, err) } var tpl bytes.Buffer err = tmpl.Execute(&tpl, data) if err != nil { - return fmt.Errorf("error executing template for key %s: %v", key, err) + return nil, fmt.Errorf("error executing template for key %s: %v", key, err) + } + + res := tpl.String() + + // if the string can be unmarshalled into a map[string]interface{}, do so + resMap := map[string]interface{}{} + + if err := json.Unmarshal(tpl.Bytes(), &resMap); err == nil { + output[key] = resMap + + // if the key is "object", the entire input is replaced with the rendered value + if key == "object" { + // note we do not recursively render the new input, as it may contain untrusted data. + return resMap, nil + } + } else { + output[key] = res } - input[key] = tpl.String() case map[string]interface{}: // if we hit a nested map[string]interface{}, render those recursively - err := RenderTemplateFields(data, v) + recOut, err := RenderTemplateFields(data, v) if err != nil { - return err + return nil, err } + + output[key] = recOut default: if reflect.TypeOf(v).Kind() == reflect.Map { // If it's a map but not map[string]interface{}, return an error - return fmt.Errorf("encountered a map that is not map[string]interface{}: %s", key) + return nil, fmt.Errorf("encountered a map that is not map[string]interface{}: %s", key) } } } - return nil + return output, nil } diff --git a/internal/datautils/render_test.go b/internal/datautils/render_test.go new file mode 100644 index 000000000..a906e7cc9 --- /dev/null +++ b/internal/datautils/render_test.go @@ -0,0 +1,88 @@ +package datautils + +import ( + "encoding/json" + "testing" +) + +func TestRenderTemplateFields(t *testing.T) { + tests := []struct { + name string + data map[string]interface{} + input map[string]interface{} + expected map[string]interface{} + wantErr bool + }{ + { + name: "simple string template", + data: map[string]interface{}{"testing": "datavalue"}, + input: map[string]interface{}{ + "render": "{{ .testing }}", + }, + expected: map[string]interface{}{ + "render": "datavalue", + }, + wantErr: false, + }, + { + name: "nested map template", + data: map[string]interface{}{"testing": "nestedvalue"}, + input: map[string]interface{}{ + "nested": map[string]interface{}{ + "render": "{{ .testing }}", + }, + }, + expected: map[string]interface{}{ + "nested": map[string]interface{}{ + "render": "nestedvalue", + }, + }, + wantErr: false, + }, + { + name: "object template", + data: map[string]interface{}{"testing": `{ "nested": "nestedvalue" }`}, + input: map[string]interface{}{ + "nested": map[string]interface{}{ + "render": "{{ .testing }}", + }, + }, + expected: map[string]interface{}{ + "nested": map[string]interface{}{ + "render": map[string]interface{}{ + "nested": "nestedvalue", + }, + }, + }, + wantErr: false, + }, + { + name: "replace object", + data: map[string]interface{}{"testing": `{ "nested": "nestedvalue" }`}, + input: map[string]interface{}{ + "object": "{{ .testing }}", + }, + expected: map[string]interface{}{ + "nested": "nestedvalue", + }, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + output, err := RenderTemplateFields(tt.data, tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("RenderTemplateFields() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr { + jsonExpected, _ := json.Marshal(tt.expected) + jsonResult, _ := json.Marshal(output) + if string(jsonExpected) != string(jsonResult) { + t.Errorf("Expected %v, got %v", string(jsonExpected), string(jsonResult)) + } + } + }) + } +} diff --git a/internal/repository/event.go b/internal/repository/event.go index 5d54e6275..a39dd4a72 100644 --- a/internal/repository/event.go +++ b/internal/repository/event.go @@ -9,8 +9,8 @@ type CreateEventOpts struct { // (required) the tenant id TenantId string `validate:"required,uuid"` - // (required) the event key - must be in actionId form - Key string `validate:"required,actionId"` + // (required) the event key + Key string `validate:"required"` // (optional) the event data Data *db.JSON diff --git a/internal/repository/prisma/worker.go b/internal/repository/prisma/worker.go index 44aaa8b63..d892d61da 100644 --- a/internal/repository/prisma/worker.go +++ b/internal/repository/prisma/worker.go @@ -157,7 +157,43 @@ func (w *workerRepository) CreateNewWorker(tenantId string, opts *repository.Cre txs := []transaction.Param{} - optionals := []db.WorkerSetParam{} + workerId := uuid.New().String() + + createTx := w.client.Worker.CreateOne( + db.Worker.Tenant.Link( + db.Tenant.ID.Equals(tenantId), + ), + db.Worker.Name.Set(opts.Name), + db.Worker.Dispatcher.Link( + db.Dispatcher.ID.Equals(opts.DispatcherId), + ), + db.Worker.ID.Set(workerId), + ).Tx() + + txs = append(txs, createTx) + + for _, svc := range opts.Services { + upsertServiceTx := w.client.Service.UpsertOne( + db.Service.TenantIDName( + db.Service.TenantID.Equals(tenantId), + db.Service.Name.Equals(svc), + ), + ).Create( + db.Service.Name.Set(svc), + db.Service.Tenant.Link( + db.Tenant.ID.Equals(tenantId), + ), + db.Service.Workers.Link( + db.Worker.ID.Equals(workerId), + ), + ).Update( + db.Service.Workers.Link( + db.Worker.ID.Equals(workerId), + ), + ).Tx() + + txs = append(txs, upsertServiceTx) + } if len(opts.Actions) > 0 { for _, action := range opts.Actions { @@ -173,28 +209,22 @@ func (w *workerRepository) CreateNewWorker(tenantId string, opts *repository.Cre ), ).Update().Tx()) - optionals = append(optionals, db.Worker.Actions.Link( - db.Action.TenantIDID( - db.Action.TenantID.Equals(tenantId), - db.Action.ID.Equals(action), + // This is unfortunate but due to https://github.com/steebchen/prisma-client-go/issues/1095, + // we cannot set db.Worker.Actions.Link multiple times, and since Link required a unique action + // where clause, we have to do these in separate transactions + txs = append(txs, w.client.Worker.FindUnique( + db.Worker.ID.Equals(workerId), + ).Update( + db.Worker.Actions.Link( + db.Action.TenantIDID( + db.Action.TenantID.Equals(tenantId), + db.Action.ID.Equals(action), + ), ), - )) + ).Tx()) } } - createTx := w.client.Worker.CreateOne( - db.Worker.Tenant.Link( - db.Tenant.ID.Equals(tenantId), - ), - db.Worker.Name.Set(opts.Name), - db.Worker.Dispatcher.Link( - db.Dispatcher.ID.Equals(opts.DispatcherId), - ), - optionals..., - ).Tx() - - txs = append(txs, createTx) - err := w.client.Prisma.Transaction(txs...).Exec(context.Background()) if err != nil { @@ -235,12 +265,19 @@ func (w *workerRepository) UpdateWorker(tenantId, workerId string, opts *reposit ), ).Update().Tx()) - optionals = append(optionals, db.Worker.Actions.Link( - db.Action.TenantIDID( - db.Action.TenantID.Equals(tenantId), - db.Action.ID.Equals(action), + // This is unfortunate but due to https://github.com/steebchen/prisma-client-go/issues/1095, + // we cannot set db.Worker.Actions.Link multiple times, and since Link required a unique action + // where clause, we have to do these in separate transactions + txs = append(txs, w.client.Worker.FindUnique( + db.Worker.ID.Equals(workerId), + ).Update( + db.Worker.Actions.Link( + db.Action.TenantIDID( + db.Action.TenantID.Equals(tenantId), + db.Action.ID.Equals(action), + ), ), - )) + ).Tx()) } } diff --git a/internal/repository/worker.go b/internal/repository/worker.go index 40cfbf41c..7d72daa50 100644 --- a/internal/repository/worker.go +++ b/internal/repository/worker.go @@ -13,6 +13,9 @@ type CreateWorkerOpts struct { // The name of the worker Name string `validate:"required,hatchetName"` + // The name of the service + Services []string `validate:"dive,hatchetName"` + // A list of actions this worker can run Actions []string `validate:"dive,actionId"` } diff --git a/internal/repository/workflow.go b/internal/repository/workflow.go index 8f008ca4b..6630651cb 100644 --- a/internal/repository/workflow.go +++ b/internal/repository/workflow.go @@ -17,7 +17,7 @@ type CreateWorkflowVersionOpts struct { Version string `validate:"required"` // (optional) event triggers for the workflow - EventTriggers []string `validate:"dive,actionId"` + EventTriggers []string // (optional) cron triggers for the workflow CronTriggers []string `validate:"dive,cron"` diff --git a/internal/repository/workflow_run.go b/internal/repository/workflow_run.go index eb16d0b50..5bffaee78 100644 --- a/internal/repository/workflow_run.go +++ b/internal/repository/workflow_run.go @@ -1,6 +1,7 @@ package repository import ( + "encoding/json" "fmt" "time" @@ -29,14 +30,7 @@ func GetCreateWorkflowRunOptsFromEvent(event *db.EventModel, workflowVersion *db eventId := event.ID data := event.InnerEvent.Data - dataInputMap := make(map[string]interface{}) - err := datautils.FromJSONType(data, &dataInputMap) - - if err != nil { - return nil, fmt.Errorf("could not marshal event data: %w", err) - } - - structuredJobRunData := datautils.NewJobRunLookupData(dataInputMap) + structuredJobRunData, err := datautils.NewJobRunLookupDataFromInputBytes([]byte(json.RawMessage(*data))) if err != nil { return nil, fmt.Errorf("could not create job run lookup data: %w", err) diff --git a/internal/services/dispatcher/contracts/dispatcher.pb.go b/internal/services/dispatcher/contracts/dispatcher.pb.go index c353f300c..79298bb7b 100644 --- a/internal/services/dispatcher/contracts/dispatcher.pb.go +++ b/internal/services/dispatcher/contracts/dispatcher.pb.go @@ -130,6 +130,8 @@ type WorkerRegisterRequest struct { WorkerName string `protobuf:"bytes,2,opt,name=workerName,proto3" json:"workerName,omitempty"` // a list of actions that this worker can run Actions []string `protobuf:"bytes,3,rep,name=actions,proto3" json:"actions,omitempty"` + // (optional) the services for this worker + Services []string `protobuf:"bytes,4,rep,name=services,proto3" json:"services,omitempty"` } func (x *WorkerRegisterRequest) Reset() { @@ -185,6 +187,13 @@ func (x *WorkerRegisterRequest) GetActions() []string { return nil } +func (x *WorkerRegisterRequest) GetServices() []string { + if x != nil { + return x.Services + } + return nil +} + type WorkerRegisterResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -733,115 +742,117 @@ var file_dispatcher_proto_rawDesc = []byte{ 0x0a, 0x10, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0x6d, 0x0a, 0x15, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, - 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, - 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x6b, - 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, - 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x63, 0x74, 0x69, - 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x61, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x22, 0x70, 0x0a, 0x16, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, - 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, + 0x6f, 0x74, 0x6f, 0x22, 0x89, 0x01, 0x0a, 0x15, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, + 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, + 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x77, 0x6f, 0x72, + 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, + 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x09, 0x52, 0x07, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x73, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x18, + 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x22, + 0x70, 0x0a, 0x16, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, + 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, + 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, + 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, + 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x18, + 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, 0x6d, + 0x65, 0x22, 0x9d, 0x02, 0x0a, 0x0e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, + 0x12, 0x14, 0x0a, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x6a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, + 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x4e, 0x61, 0x6d, 0x65, + 0x12, 0x1a, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x08, 0x6a, 0x6f, 0x62, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, + 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, + 0x65, 0x70, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, + 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, + 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x07, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x2b, + 0x0a, 0x0a, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x18, 0x08, 0x20, 0x01, + 0x28, 0x0e, 0x32, 0x0b, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, + 0x0a, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x61, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x09, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x22, 0x4d, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, 0x74, 0x65, + 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, + 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, + 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, + 0x22, 0x52, 0x0a, 0x18, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, + 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4e, 0x61, - 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, - 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x9d, 0x02, 0x0a, 0x0e, 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, - 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, - 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, - 0x74, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x6a, 0x6f, 0x62, - 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x6a, 0x6f, 0x62, 0x4e, - 0x61, 0x6d, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, - 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6a, 0x6f, 0x62, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, - 0x16, 0x0a, 0x06, 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x06, 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, - 0x75, 0x6e, 0x49, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x74, 0x65, 0x70, - 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, - 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, - 0x64, 0x12, 0x2b, 0x0a, 0x0a, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x18, - 0x08, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0b, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, - 0x70, 0x65, 0x52, 0x0a, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x24, - 0x0a, 0x0d, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, - 0x09, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0d, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x50, 0x61, 0x79, - 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x4d, 0x0a, 0x13, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, - 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x74, - 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, - 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, - 0x72, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, - 0x72, 0x49, 0x64, 0x22, 0x52, 0x0a, 0x18, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x55, 0x6e, 0x73, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, - 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, - 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, - 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x22, 0x53, 0x0a, 0x19, 0x57, 0x6f, 0x72, 0x6b, 0x65, - 0x72, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, + 0x65, 0x72, 0x49, 0x64, 0x22, 0x53, 0x0a, 0x19, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x55, 0x6e, + 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, + 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x22, 0xe1, 0x02, 0x0a, 0x0b, 0x41, 0x63, + 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, + 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, + 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, + 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x6a, 0x6f, 0x62, 0x52, 0x75, + 0x6e, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6a, 0x6f, 0x62, 0x52, 0x75, + 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, 0x18, 0x05, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, 0x12, 0x1c, 0x0a, 0x09, 0x73, + 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, + 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x61, 0x63, 0x74, + 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x42, 0x0a, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65, 0x76, 0x65, 0x6e, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x2e, 0x0a, 0x09, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x10, 0x2e, 0x41, + 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x09, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x22, 0x0a, 0x0c, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x22, 0x4d, 0x0a, + 0x13, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x22, 0xe1, 0x02, 0x0a, - 0x0b, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1a, 0x0a, 0x08, - 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, - 0x65, 0x72, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, - 0x65, 0x72, 0x49, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x6a, 0x6f, 0x62, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x6a, 0x6f, - 0x62, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x6a, 0x6f, - 0x62, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, - 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x65, 0x70, 0x49, 0x64, 0x12, 0x1c, - 0x0a, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x73, 0x74, 0x65, 0x70, 0x52, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x12, 0x42, 0x0a, 0x0e, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0e, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x2e, 0x0a, 0x09, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0e, 0x32, - 0x10, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, - 0x65, 0x52, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x22, 0x0a, 0x0c, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x0a, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0c, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, - 0x22, 0x4d, 0x0a, 0x13, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, - 0x74, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, - 0x74, 0x49, 0x64, 0x12, 0x1a, 0x0a, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x2a, - 0x35, 0x0a, 0x0a, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, - 0x0e, 0x53, 0x54, 0x41, 0x52, 0x54, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, - 0x00, 0x12, 0x13, 0x0a, 0x0f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x53, 0x54, 0x45, 0x50, - 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x01, 0x2a, 0x86, 0x01, 0x0a, 0x0f, 0x41, 0x63, 0x74, 0x69, 0x6f, - 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, - 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, - 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50, 0x5f, - 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, - 0x45, 0x44, 0x10, 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, - 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, - 0x44, 0x10, 0x02, 0x12, 0x1a, 0x0a, 0x16, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, - 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x32, - 0x81, 0x02, 0x0a, 0x0a, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x12, 0x3d, - 0x0a, 0x08, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x57, 0x6f, 0x72, - 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, - 0x74, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, - 0x06, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x12, 0x14, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, - 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, - 0x41, 0x73, 0x73, 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, - 0x30, 0x01, 0x12, 0x37, 0x0a, 0x0f, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, - 0x65, 0x6e, 0x74, 0x1a, 0x14, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, - 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x46, 0x0a, 0x0b, 0x55, - 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x19, 0x2e, 0x57, 0x6f, 0x72, - 0x6b, 0x65, 0x72, 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x55, 0x6e, - 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x42, 0x47, 0x5a, 0x45, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x68, 0x61, - 0x74, 0x63, 0x68, 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73, - 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, - 0x65, 0x72, 0x2f, 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x28, 0x09, 0x52, 0x08, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x64, 0x2a, 0x35, 0x0a, 0x0a, + 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, + 0x41, 0x52, 0x54, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, 0x4e, 0x10, 0x00, 0x12, 0x13, + 0x0a, 0x0f, 0x43, 0x41, 0x4e, 0x43, 0x45, 0x4c, 0x5f, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x52, 0x55, + 0x4e, 0x10, 0x01, 0x2a, 0x86, 0x01, 0x0a, 0x0f, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50, 0x5f, + 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, + 0x57, 0x4e, 0x10, 0x00, 0x12, 0x1b, 0x0a, 0x17, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, + 0x4e, 0x54, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, + 0x01, 0x12, 0x1d, 0x0a, 0x19, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x43, 0x4f, 0x4d, 0x50, 0x4c, 0x45, 0x54, 0x45, 0x44, 0x10, 0x02, + 0x12, 0x1a, 0x0a, 0x16, 0x53, 0x54, 0x45, 0x50, 0x5f, 0x45, 0x56, 0x45, 0x4e, 0x54, 0x5f, 0x54, + 0x59, 0x50, 0x45, 0x5f, 0x46, 0x41, 0x49, 0x4c, 0x45, 0x44, 0x10, 0x03, 0x32, 0x81, 0x02, 0x0a, + 0x0a, 0x44, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x12, 0x3d, 0x0a, 0x08, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x12, 0x16, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, + 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x17, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x33, 0x0a, 0x06, 0x4c, 0x69, + 0x73, 0x74, 0x65, 0x6e, 0x12, 0x14, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x4c, 0x69, 0x73, + 0x74, 0x65, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0f, 0x2e, 0x41, 0x73, 0x73, + 0x69, 0x67, 0x6e, 0x65, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, + 0x37, 0x0a, 0x0f, 0x53, 0x65, 0x6e, 0x64, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, + 0x6e, 0x74, 0x12, 0x0c, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x1a, 0x14, 0x2e, 0x41, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x46, 0x0a, 0x0b, 0x55, 0x6e, 0x73, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x19, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, + 0x55, 0x6e, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x55, 0x6e, 0x73, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, + 0x42, 0x47, 0x5a, 0x45, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, + 0x61, 0x74, 0x63, 0x68, 0x65, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x68, 0x61, 0x74, 0x63, 0x68, + 0x65, 0x74, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x73, 0x2f, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x2f, + 0x63, 0x6f, 0x6e, 0x74, 0x72, 0x61, 0x63, 0x74, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( diff --git a/internal/services/dispatcher/server.go b/internal/services/dispatcher/server.go index eacfa88cd..7a9ebd3e7 100644 --- a/internal/services/dispatcher/server.go +++ b/internal/services/dispatcher/server.go @@ -85,24 +85,18 @@ func (s *DispatcherImpl) Register(ctx context.Context, request *contracts.Worker s.l.Debug().Msgf("Received register request from ID %s with actions %v", request.WorkerName, request.Actions) - // // get a list of step ids based on a list of action ids for the tenant - // steps, err := s.repo.Step().ListStepsByActions(request.TenantId, request.Actions) + svcs := request.Services - // if err != nil { - // return nil, err - // } - - // stepIds := make([]string, len(steps)) - - // for i, step := range steps { - // stepIds[i] = step.ID - // } + if svcs == nil || len(svcs) == 0 { + svcs = []string{"default"} + } // create a worker in the database worker, err := s.repo.Worker().CreateNewWorker(request.TenantId, &repository.CreateWorkerOpts{ DispatcherId: s.dispatcherId, Name: request.WorkerName, Actions: request.Actions, + Services: svcs, }) if err != nil { diff --git a/internal/services/grpc/server.go b/internal/services/grpc/server.go index 1107f04d9..07ebd2947 100644 --- a/internal/services/grpc/server.go +++ b/internal/services/grpc/server.go @@ -23,8 +23,9 @@ type Server struct { dispatchercontracts.UnimplementedDispatcherServer admincontracts.UnimplementedWorkflowServiceServer - l *zerolog.Logger - port int + l *zerolog.Logger + port int + bindAddress string ingestor ingestor.Ingestor dispatcher dispatcher.Dispatcher @@ -35,20 +36,22 @@ type Server struct { type ServerOpt func(*ServerOpts) type ServerOpts struct { - l *zerolog.Logger - port int - ingestor ingestor.Ingestor - dispatcher dispatcher.Dispatcher - admin admin.AdminService - tls *tls.Config + l *zerolog.Logger + port int + bindAddress string + ingestor ingestor.Ingestor + dispatcher dispatcher.Dispatcher + admin admin.AdminService + tls *tls.Config } func defaultServerOpts() *ServerOpts { logger := zerolog.New(os.Stderr) return &ServerOpts{ - l: &logger, - port: 7070, + l: &logger, + port: 7070, + bindAddress: "127.0.0.1", } } @@ -58,6 +61,12 @@ func WithLogger(l *zerolog.Logger) ServerOpt { } } +func WithBindAddress(bindAddress string) ServerOpt { + return func(opts *ServerOpts) { + opts.bindAddress = bindAddress + } +} + func WithPort(port int) ServerOpt { return func(opts *ServerOpts) { opts.port = port @@ -100,12 +109,13 @@ func NewServer(fs ...ServerOpt) (*Server, error) { } return &Server{ - l: opts.l, - port: opts.port, - ingestor: opts.ingestor, - dispatcher: opts.dispatcher, - admin: opts.admin, - tls: opts.tls, + l: opts.l, + port: opts.port, + bindAddress: opts.bindAddress, + ingestor: opts.ingestor, + dispatcher: opts.dispatcher, + admin: opts.admin, + tls: opts.tls, }, nil } @@ -114,9 +124,9 @@ func (s *Server) Start(ctx context.Context) error { } func (s *Server) startGRPC(ctx context.Context) error { - s.l.Debug().Msgf("starting grpc server on port %d", s.port) + s.l.Debug().Msgf("starting grpc server on %s:%d", s.bindAddress, s.port) - lis, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", s.port)) + lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", s.bindAddress, s.port)) if err != nil { return fmt.Errorf("failed to listen: %w", err) diff --git a/internal/services/jobscontroller/controller.go b/internal/services/jobscontroller/controller.go index 574f8c452..087aeabc1 100644 --- a/internal/services/jobscontroller/controller.go +++ b/internal/services/jobscontroller/controller.go @@ -479,7 +479,7 @@ func (ec *JobsControllerImpl) queueStepRun(ctx context.Context, tenantId, stepId return fmt.Errorf("could not get step inputs: %w", err) } - err = datautils.RenderTemplateFields(lookupDataMap, inputDataMap) + inputDataMap, err = datautils.RenderTemplateFields(lookupDataMap, inputDataMap) if err != nil { return fmt.Errorf("could not render template fields: %w", err) diff --git a/pkg/client/client.go b/pkg/client/client.go index aada5b242..d521d87a5 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -5,8 +5,8 @@ import ( "fmt" "os" - "github.com/hatchet-dev/hatchet/internal/config/loader" "github.com/hatchet-dev/hatchet/internal/validator" + "github.com/hatchet-dev/hatchet/pkg/client/loader" "github.com/hatchet-dev/hatchet/pkg/client/types" "github.com/rs/zerolog" "google.golang.org/grpc" diff --git a/pkg/client/dispatcher.go b/pkg/client/dispatcher.go index b4b2c649d..188b6cf98 100644 --- a/pkg/client/dispatcher.go +++ b/pkg/client/dispatcher.go @@ -32,6 +32,7 @@ const ( // TODO: add validator to client side type GetActionListenerRequest struct { WorkerName string + Services []string Actions []string } @@ -157,6 +158,7 @@ func (d *dispatcherClientImpl) newActionListener(ctx context.Context, req *GetAc TenantId: d.tenantId, WorkerName: req.WorkerName, Actions: req.Actions, + Services: req.Services, }) if err != nil { diff --git a/pkg/client/loader/loader.go b/pkg/client/loader/loader.go new file mode 100644 index 000000000..0b0dfed8f --- /dev/null +++ b/pkg/client/loader/loader.go @@ -0,0 +1,54 @@ +package loader + +import ( + "fmt" + "path/filepath" + + "github.com/hatchet-dev/hatchet/internal/config/client" + "github.com/hatchet-dev/hatchet/internal/config/loader/loaderutils" +) + +type ConfigLoader struct { + directory string +} + +// LoadClientConfig loads the client configuration +func (c *ConfigLoader) LoadClientConfig() (res *client.ClientConfig, err error) { + sharedFilePath := filepath.Join(c.directory, "client.yaml") + configFileBytes, err := loaderutils.GetConfigBytes(sharedFilePath) + + if err != nil { + return nil, err + } + + cf, err := LoadClientConfigFile(configFileBytes...) + + if err != nil { + return nil, err + } + + return GetClientConfigFromConfigFile(cf) +} + +// LoadClientConfigFile loads the worker config file via viper +func LoadClientConfigFile(files ...[]byte) (*client.ClientConfigFile, error) { + configFile := &client.ClientConfigFile{} + f := client.BindAllEnv + + _, err := loaderutils.LoadConfigFromViper(f, configFile, files...) + + return configFile, err +} + +func GetClientConfigFromConfigFile(cf *client.ClientConfigFile) (res *client.ClientConfig, err error) { + tlsConf, err := loaderutils.LoadClientTLSConfig(&cf.TLS) + + if err != nil { + return nil, fmt.Errorf("could not load TLS config: %w", err) + } + + return &client.ClientConfig{ + TenantId: cf.TenantId, + TLSConfig: tlsConf, + }, nil +} diff --git a/pkg/client/types/action.go b/pkg/client/types/action.go index 87bdcfae0..e671a61c1 100644 --- a/pkg/client/types/action.go +++ b/pkg/client/types/action.go @@ -35,6 +35,10 @@ func ParseActionID(actionID string) (Action, error) { parts := strings.Split(actionID, ":") numParts := len(parts) + if numParts < 2 || numParts > 3 { + return Action{}, fmt.Errorf("invalid action id %s, must have at least 2 strings separated : (colon)", actionID) + } + integrationId := firstToLower(parts[0]) verb := strings.ToLower(parts[1]) diff --git a/cmd/cmdutils/interrupt.go b/pkg/cmdutils/interrupt.go similarity index 51% rename from cmd/cmdutils/interrupt.go rename to pkg/cmdutils/interrupt.go index 3849e1ac1..01c4ece33 100644 --- a/cmd/cmdutils/interrupt.go +++ b/pkg/cmdutils/interrupt.go @@ -1,4 +1,4 @@ -// Adapted from: https://github.com/hatchet-dev/hatchet/blob/3c2c13168afa1af68d4baaf5ed02c9d49c5f0323/cmd/cmdutils/interrupt.go +// Adapted from: https://github.com/hatchet-dev/hatchet-v1-archived/blob/3c2c13168afa1af68d4baaf5ed02c9d49c5f0323/cmd/cmdutils/interrupt.go package cmdutils import ( @@ -22,7 +22,13 @@ func InterruptChan() <-chan interface{} { return ret } -func InterruptContext(interruptChan <-chan interface{}) (context.Context, context.CancelFunc) { +func NewInterruptContext() (context.Context, context.CancelFunc) { + interruptChan := InterruptChan() + + return InterruptContextFromChan(interruptChan) +} + +func InterruptContextFromChan(interruptChan <-chan interface{}) (context.Context, context.CancelFunc) { ctx, cancel := context.WithCancel(context.Background()) go func() { diff --git a/pkg/worker/args.go b/pkg/worker/args.go index 70c076aac..b43bf0724 100644 --- a/pkg/worker/args.go +++ b/pkg/worker/args.go @@ -32,3 +32,69 @@ func decodeArgsToInterface(fnType reflect.Type) (result interface{}, err error) return reflect.New(secondArgElem).Interface(), nil } + +func decodeFnArgTypes(fnType reflect.Type) (result []reflect.Type, err error) { + if fnType.Kind() != reflect.Func { + return nil, fmt.Errorf("method must be a function") + } + + // if not a function with two arguments, return error + if fnType.NumIn() != 2 { + return nil, fmt.Errorf("method must have exactly two arguments") + } + + // if first argument is not a context, return error + firstArg := fnType.In(0) + + if firstArg.Kind() != reflect.Interface || !firstArg.Implements(reflect.TypeOf((*context.Context)(nil)).Elem()) { + return nil, fmt.Errorf("first argument must be context.Context") + } + + // if second argument is not a pointer to a struct, return error + secondArg := fnType.In(1) + + if secondArg.Kind() != reflect.Ptr { + return nil, fmt.Errorf("second argument must be a pointer to a struct") + } + + secondArgElem := secondArg.Elem() + + if secondArgElem.Kind() != reflect.Struct { + return nil, fmt.Errorf("second argument must be a pointer to a struct") + } + + return []reflect.Type{firstArg, secondArg}, nil +} + +func decodeFnReturnTypes(fnType reflect.Type) (result []reflect.Type, err error) { + if fnType.NumOut() > 2 { + return nil, fmt.Errorf("fn cannot have more than 2 return values") + } + + firstOut := fnType.Out(0) + + // if there are two args, the first one should be a pointer to a struct + if fnType.NumOut() == 2 { + if firstOut.Kind() != reflect.Ptr { + return nil, fmt.Errorf("first argument must be a pointer to a struct when there are two return values") + } + + firstOutElem := firstOut.Elem() + + if firstOutElem.Kind() != reflect.Struct { + return nil, fmt.Errorf("first argument must be a pointer to a struct when there are two return values") + } + } + + lastOut := fnType.Out(fnType.NumOut() - 1) + + if lastOut.Kind() != reflect.Interface || !lastOut.Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return nil, fmt.Errorf("last return value must be error") + } + + if fnType.NumOut() == 1 { + return []reflect.Type{firstOut}, nil + } + + return []reflect.Type{firstOut, lastOut}, nil +} diff --git a/pkg/worker/method.go b/pkg/worker/method.go index deb11f3d6..242ba33c3 100644 --- a/pkg/worker/method.go +++ b/pkg/worker/method.go @@ -40,28 +40,26 @@ func getFnFromMethod(method any) (result actionFunc, err error) { } // if function does not return two values, return error - if methodType.NumOut() != 2 { - return nil, fmt.Errorf("method must return exactly two values") + if methodType.NumOut() == 2 { + // if first return value is not a pointer to a struct, return error + firstReturn := methodType.Out(0) + + if firstReturn.Kind() != reflect.Ptr { + return nil, fmt.Errorf("first return value must be a pointer to a struct") + } + + firstReturnElem := firstReturn.Elem() + + if firstReturnElem.Kind() != reflect.Struct { + return nil, fmt.Errorf("first return value must be a pointer to a struct") + } } - // if first return value is not a pointer to a struct, return error - // firstReturn := methodType.Out(0) + // if last return value is not an error, return error + lastReturn := methodType.Out(methodType.NumOut() - 1) - // if firstReturn.Kind() != reflect.Ptr { - // return nil, fmt.Errorf("first return value must be a pointer to a struct") - // } - - // firstReturnElem := firstReturn.Elem() - - // if firstReturnElem.Kind() != reflect.Struct { - // return nil, fmt.Errorf("first return value must be a pointer to a struct") - // } - - // if second return value is not an error, return error - secondReturn := methodType.Out(1) - - if secondReturn.Kind() != reflect.Interface || !secondReturn.Implements(reflect.TypeOf((*error)(nil)).Elem()) { - return nil, fmt.Errorf("second return value must be an error") + if lastReturn.Kind() != reflect.Interface || !lastReturn.Implements(reflect.TypeOf((*error)(nil)).Elem()) { + return nil, fmt.Errorf("second return value must be of type error") } return func(args ...interface{}) []interface{} { @@ -77,6 +75,12 @@ func getFnFromMethod(method any) (result actionFunc, err error) { }) // Return the results as an interface slice - return []interface{}{values[0].Interface(), values[1].Interface()} + res := []interface{}{} + + for i := range values { + res = append(res, values[i].Interface()) + } + + return res }, nil } diff --git a/pkg/worker/service.go b/pkg/worker/service.go new file mode 100644 index 000000000..843914ff2 --- /dev/null +++ b/pkg/worker/service.go @@ -0,0 +1,60 @@ +package worker + +import ( + "fmt" + "reflect" + + "github.com/hatchet-dev/hatchet/pkg/client/types" +) + +type Service struct { + Name string + + worker *Worker +} + +func (s *Service) On(t triggerConverter, workflow workflowConverter) error { + apiWorkflow := workflow.ToWorkflow(s.Name) + + wt := &types.WorkflowTriggers{} + + t.ToWorkflowTriggers(wt) + + apiWorkflow.Triggers = *wt + + // create the workflow via the API + err := s.worker.client.Admin().PutWorkflow(&apiWorkflow) + + if err != nil { + return err + } + + // register all steps as actions + for actionId, fn := range workflow.ToActionMap(s.Name) { + err := s.worker.registerAction(actionId, fn) + + if err != nil { + return err + } + } + + return nil +} + +func (s *Service) RegisterAction(fn any) error { + fnType := reflect.TypeOf(fn) + + if fnType.Kind() != reflect.Func { + return fmt.Errorf("method must be a function") + } + + if fnType.Name() == "" { + return fmt.Errorf("function cannot be anonymous") + } + + fnId := fnType.Name() + + actionId := fmt.Sprintf("%s:%s", s.Name, fnId) + + return s.worker.registerAction(actionId, fn) +} diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 47e279871..faddec9dc 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -12,7 +12,6 @@ import ( "github.com/hatchet-dev/hatchet/pkg/client" "github.com/hatchet-dev/hatchet/pkg/integrations" "github.com/rs/zerolog" - "google.golang.org/grpc" ) type actionFunc func(args ...any) []any @@ -47,11 +46,7 @@ func (j *actionImpl) MethodFn() any { } type Worker struct { - conn *grpc.ClientConn - client client.DispatcherClient - - // The worker id that gets assigned on register - workerId string + client client.Client name string @@ -60,12 +55,14 @@ type Worker struct { l *zerolog.Logger cancelMap sync.Map + + services sync.Map } type WorkerOpt func(*WorkerOpts) type WorkerOpts struct { - client client.DispatcherClient + client client.Client name string l *zerolog.Logger @@ -88,7 +85,7 @@ func WithName(name string) WorkerOpt { } } -func WithDispatcherClient(client client.DispatcherClient) WorkerOpt { +func WithClient(client client.Client) WorkerOpt { return func(opts *WorkerOpts) { opts.client = client } @@ -123,7 +120,7 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) { for _, integrationAction := range actions { action := fmt.Sprintf("%s:%s", integrationId, integrationAction) - err := w.RegisterAction(action, integration.ActionHandler(integrationAction)) + err := w.registerAction(action, integration.ActionHandler(integrationAction)) if err != nil { return nil, fmt.Errorf("could not register integration action %s: %w", action, err) @@ -131,10 +128,45 @@ func NewWorker(fs ...WorkerOpt) (*Worker, error) { } } + w.NewService("default") + return w, nil } +func (w *Worker) NewService(name string) *Service { + svc := &Service{ + Name: name, + worker: w, + } + + w.services.Store(name, svc) + + return svc +} + +func (w *Worker) On(t triggerConverter, workflow workflowConverter) error { + // get the default service + svc, ok := w.services.Load("default") + + if !ok { + return fmt.Errorf("could not load default service") + } + + return svc.(*Service).On(t, workflow) +} + func (w *Worker) RegisterAction(name string, method any) error { + // get the default service + svc, ok := w.services.Load("default") + + if !ok { + return fmt.Errorf("could not load default service") + } + + return svc.(*Service).RegisterAction(method) +} + +func (w *Worker) registerAction(name string, method any) error { actionFunc, err := getFnFromMethod(method) if err != nil { @@ -163,7 +195,7 @@ func (w *Worker) Start(ctx context.Context) error { actionNames = append(actionNames, job.Name()) } - listener, err := w.client.GetActionListener(ctx, &client.GetActionListenerRequest{ + listener, err := w.client.Dispatcher().GetActionListener(ctx, &client.GetActionListenerRequest{ WorkerName: w.name, Actions: actionNames, }) @@ -195,13 +227,10 @@ RunWorker: } w.l.Debug().Msgf("action %s completed with result %v", action.ActionId, res) - - return }(action) case <-ctx.Done(): w.l.Debug().Msgf("worker %s received context done, stopping", w.name) break RunWorker - default: } } @@ -228,7 +257,7 @@ func (w *Worker) executeAction(ctx context.Context, assignedAction *client.Actio func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action) (result any, err error) { // send a message that the step run started - _, err = w.client.SendActionEvent( + _, err = w.client.Dispatcher().SendActionEvent( ctx, w.getActionEvent(assignedAction, client.ActionEventTypeStarted), ) @@ -269,9 +298,11 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action default: } - result = runResults[0] + if len(runResults) == 2 { + result = runResults[0] + } - if runResults[1] != nil { + if runResults[len(runResults)-1] != nil { err = runResults[1].(error) } @@ -280,7 +311,7 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action failureEvent.EventPayload = err.Error() - _, err := w.client.SendActionEvent( + _, err := w.client.Dispatcher().SendActionEvent( ctx, failureEvent, ) @@ -292,13 +323,6 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action return nil, err } - // TODO: check last argument for error - - // if err != nil { - // // TODO: send a message that the step run failed - // return nil, fmt.Errorf("could not run job: %w", err) - // } - // send a message that the step run completed finishedEvent, err := w.getActionFinishedEvent(assignedAction, result) @@ -306,7 +330,7 @@ func (w *Worker) startStepRun(ctx context.Context, assignedAction *client.Action return nil, fmt.Errorf("could not create finished event: %w", err) } - _, err = w.client.SendActionEvent( + _, err = w.client.Dispatcher().SendActionEvent( ctx, finishedEvent, ) diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go new file mode 100644 index 000000000..cc679ef4d --- /dev/null +++ b/pkg/worker/workflow.go @@ -0,0 +1,201 @@ +package worker + +import ( + "fmt" + "reflect" + "runtime" + "strings" + + "github.com/hatchet-dev/hatchet/pkg/client/types" +) + +type triggerConverter interface { + ToWorkflowTriggers(*types.WorkflowTriggers) +} + +type Cron string + +func (c Cron) ToWorkflowTriggers(wt *types.WorkflowTriggers) { + if wt.Cron == nil { + wt.Cron = []string{} + } + + wt.Cron = append(wt.Cron, string(c)) +} + +type Event string + +func (e Event) ToWorkflowTriggers(wt *types.WorkflowTriggers) { + if wt.Events == nil { + wt.Events = []string{} + } + + wt.Events = append(wt.Events, string(e)) +} + +type workflowConverter interface { + ToWorkflow(svcName string) types.Workflow + ToActionMap(svcName string) map[string]any +} + +type Workflow struct { + Jobs []WorkflowJob +} + +type WorkflowJob struct { + // The name of the job + Name string + + Description string + + Timeout string + + // The steps that are run in the job + Steps []WorkflowStep +} + +func (j *WorkflowJob) ToWorkflow(svcName string) types.Workflow { + apiJob, err := j.ToWorkflowJob(svcName) + + if err != nil { + panic(err) + } + + jobs := map[string]types.WorkflowJob{ + j.Name: *apiJob, + } + + return types.Workflow{ + Name: j.Name, + Version: "v0.1.0", + Jobs: jobs, + } +} + +func (j *WorkflowJob) ToWorkflowJob(svcName string) (*types.WorkflowJob, error) { + apiJob := &types.WorkflowJob{ + Description: j.Description, + Timeout: j.Timeout, + Steps: []types.WorkflowStep{}, + } + + var prevStep *step + + for i, step := range j.Steps { + newStep, err := step.ToWorkflowStep(prevStep, svcName, i) + + if err != nil { + return nil, err + } + + apiJob.Steps = append(apiJob.Steps, newStep.APIStep) + + prevStep = newStep + } + + return apiJob, nil +} + +func (j *WorkflowJob) ToActionMap(svcName string) map[string]any { + res := map[string]any{} + + for i, step := range j.Steps { + actionId := step.GetActionId(svcName, i) + + res[actionId] = step.Function + } + + return res +} + +type WorkflowStep struct { + // The step timeout + Timeout string + + // The executed function + Function any +} + +type step struct { + Id string + + // non-ctx input is not optional + NonCtxInput reflect.Type + + // non-err output is optional + NonErrOutput *reflect.Type + + APIStep types.WorkflowStep +} + +func (s *WorkflowStep) ToWorkflowStep(prevStep *step, svcName string, index int) (*step, error) { + fnType := reflect.TypeOf(s.Function) + + res := &step{} + + res.Id = s.GetStepId(index) + + res.APIStep = types.WorkflowStep{ + Name: res.Id, + ID: s.GetStepId(index), + Timeout: s.Timeout, + ActionID: s.GetActionId(svcName, index), + } + + inputs, err := decodeFnArgTypes(fnType) + + if err != nil { + return nil, err + } + + res.NonCtxInput = inputs[1] + + outputs, err := decodeFnReturnTypes(fnType) + + if err != nil { + return nil, err + } + + if len(outputs) > 1 { + res.NonErrOutput = &outputs[0] + } + + // if the previous step's first output matches the last input of this step, then the data + // is passed through + if prevStep != nil && prevStep.NonErrOutput != nil { + if inputs[1] == *prevStep.NonErrOutput { + res.APIStep.With = map[string]interface{}{ + "object": "{{ .steps." + prevStep.Id + ".json }}", + } + } + } else { + res.APIStep.With = map[string]interface{}{ + "object": "{{ .input.json }}", + } + } + + return res, nil +} + +func (s *WorkflowStep) GetStepId(index int) string { + stepId := s.getFnName() + + // this can happen if the function is anonymous + if stepId == "" { + stepId = fmt.Sprintf("step%d", index) + } + + return stepId +} + +func (s *WorkflowStep) GetActionId(svcName string, index int) string { + stepId := s.GetStepId(index) + + return fmt.Sprintf("%s:%s", svcName, stepId) +} + +func (s *WorkflowStep) getFnName() string { + fnName := runtime.FuncForPC(reflect.ValueOf(s.Function).Pointer()).Name() + + return strings.Split(fnName, ".")[1] +} diff --git a/pkg/worker/workflow_test.go b/pkg/worker/workflow_test.go new file mode 100644 index 000000000..68b86b960 --- /dev/null +++ b/pkg/worker/workflow_test.go @@ -0,0 +1,48 @@ +package worker + +// import ( +// "context" +// "testing" +// ) + +// // type actionInput struct { +// // Message string `json:"message"` +// // } + +// // type stepOneOutput struct { +// // Message string `json:"message"` +// // } + +// // type stepTwoOutput struct { +// // Message string `json:"message"` +// // } + +// // func TestToWorkflowJob(t *testing.T) { +// // testJob := WorkflowJob{ +// // Name: "test", +// // Description: "test", +// // Timeout: "1m", +// // Steps: []WorkflowStep{ +// // { +// // ActionId: "test:test", +// // Function: func(ctx context.Context, input *actionInput) (result *stepOneOutput, err error) { +// // return nil, nil +// // }, +// // }, +// // { +// // ActionId: "test:test", +// // Function: func(ctx context.Context, input *stepOneOutput) (result *stepTwoOutput, err error) { +// // return nil, nil +// // }, +// // }, +// // }, +// // } + +// // job, err := testJob.ToWorkflowJob() + +// // if err != nil { +// // t.Fatalf("could not convert workflow job: %v", err) +// // } + +// // t.Fatalf("%v", job) +// // } diff --git a/prisma/migrations/20231222174753_init/migration.sql b/prisma/migrations/20231227194816_init/migration.sql similarity index 94% rename from prisma/migrations/20231222174753_init/migration.sql rename to prisma/migrations/20231227194816_init/migration.sql index f4e0934f6..84067f8c3 100644 --- a/prisma/migrations/20231222174753_init/migration.sql +++ b/prisma/migrations/20231227194816_init/migration.sql @@ -318,6 +318,19 @@ CREATE TABLE "Worker" ( CONSTRAINT "Worker_pkey" PRIMARY KEY ("id") ); +-- CreateTable +CREATE TABLE "Service" ( + "id" UUID NOT NULL, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "updatedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + "deletedAt" TIMESTAMP(3), + "name" TEXT NOT NULL, + "description" TEXT, + "tenantId" UUID NOT NULL, + + CONSTRAINT "Service_pkey" PRIMARY KEY ("id") +); + -- CreateTable CREATE TABLE "_WorkflowToWorkflowTag" ( "A" UUID NOT NULL, @@ -330,6 +343,12 @@ CREATE TABLE "_ActionToWorker" ( "B" UUID NOT NULL ); +-- CreateTable +CREATE TABLE "_ServiceToWorker" ( + "A" UUID NOT NULL, + "B" UUID NOT NULL +); + -- CreateIndex CREATE UNIQUE INDEX "User_id_key" ON "User"("id"); @@ -444,6 +463,12 @@ CREATE UNIQUE INDEX "Ticker_id_key" ON "Ticker"("id"); -- CreateIndex CREATE UNIQUE INDEX "Worker_id_key" ON "Worker"("id"); +-- CreateIndex +CREATE UNIQUE INDEX "Service_id_key" ON "Service"("id"); + +-- CreateIndex +CREATE UNIQUE INDEX "Service_tenantId_name_key" ON "Service"("tenantId", "name"); + -- CreateIndex CREATE UNIQUE INDEX "_WorkflowToWorkflowTag_AB_unique" ON "_WorkflowToWorkflowTag"("A", "B"); @@ -456,6 +481,12 @@ CREATE UNIQUE INDEX "_ActionToWorker_AB_unique" ON "_ActionToWorker"("A", "B"); -- CreateIndex CREATE INDEX "_ActionToWorker_B_index" ON "_ActionToWorker"("B"); +-- CreateIndex +CREATE UNIQUE INDEX "_ServiceToWorker_AB_unique" ON "_ServiceToWorker"("A", "B"); + +-- CreateIndex +CREATE INDEX "_ServiceToWorker_B_index" ON "_ServiceToWorker"("B"); + -- AddForeignKey ALTER TABLE "UserPassword" ADD CONSTRAINT "UserPassword_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE; @@ -579,6 +610,9 @@ ALTER TABLE "Worker" ADD CONSTRAINT "Worker_tenantId_fkey" FOREIGN KEY ("tenantI -- AddForeignKey ALTER TABLE "Worker" ADD CONSTRAINT "Worker_dispatcherId_fkey" FOREIGN KEY ("dispatcherId") REFERENCES "Dispatcher"("id") ON DELETE CASCADE ON UPDATE CASCADE; +-- AddForeignKey +ALTER TABLE "Service" ADD CONSTRAINT "Service_tenantId_fkey" FOREIGN KEY ("tenantId") REFERENCES "Tenant"("id") ON DELETE CASCADE ON UPDATE CASCADE; + -- AddForeignKey ALTER TABLE "_WorkflowToWorkflowTag" ADD CONSTRAINT "_WorkflowToWorkflowTag_A_fkey" FOREIGN KEY ("A") REFERENCES "Workflow"("id") ON DELETE CASCADE ON UPDATE CASCADE; @@ -590,3 +624,9 @@ ALTER TABLE "_ActionToWorker" ADD CONSTRAINT "_ActionToWorker_A_fkey" FOREIGN KE -- AddForeignKey ALTER TABLE "_ActionToWorker" ADD CONSTRAINT "_ActionToWorker_B_fkey" FOREIGN KEY ("B") REFERENCES "Worker"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "_ServiceToWorker" ADD CONSTRAINT "_ServiceToWorker_A_fkey" FOREIGN KEY ("A") REFERENCES "Service"("id") ON DELETE CASCADE ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "_ServiceToWorker" ADD CONSTRAINT "_ServiceToWorker_B_fkey" FOREIGN KEY ("B") REFERENCES "Worker"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index b63a94668..3b030496d 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -83,6 +83,7 @@ model Tenant { members TenantMember[] workflowTags WorkflowTag[] actions Action[] + services Service[] } enum TenantMemberRole { @@ -658,9 +659,33 @@ model Worker { dispatcher Dispatcher @relation(fields: [dispatcherId], references: [id], onDelete: Cascade, onUpdate: Cascade) dispatcherId String @db.Uuid + services Service[] + // the actions this worker can run actions Action[] // the jobs the worker has run stepRuns StepRun[] } + +model Service { + // base fields + id String @id @unique @default(uuid()) @db.Uuid + createdAt DateTime @default(now()) + updatedAt DateTime @default(now()) @updatedAt + deletedAt DateTime? + + // the service name + name String + + // the service description + description String? + + tenant Tenant @relation(fields: [tenantId], references: [id], onDelete: Cascade, onUpdate: Cascade) + tenantId String @db.Uuid + + // the service's workers + workers Worker[] + + @@unique([tenantId, name]) +}