diff --git a/cmd/hatchet-engine/engine/run.go b/cmd/hatchet-engine/engine/run.go index e02b558c9..5ba21af93 100644 --- a/cmd/hatchet-engine/engine/run.go +++ b/cmd/hatchet-engine/engine/run.go @@ -786,57 +786,61 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro Fn: cleanupRetention, }) - tasks, err := task.New( - task.WithAlerter(sc.Alerter), - task.WithMessageQueue(sc.MessageQueueV1), - task.WithRepository(sc.EngineRepository), - task.WithV1Repository(sc.V1), - task.WithLogger(sc.Logger), - task.WithPartition(p), - task.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue), - task.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats), - ) + if isControllerActive(sc.PausedControllers, TaskController) { + tasks, err := task.New( + task.WithAlerter(sc.Alerter), + task.WithMessageQueue(sc.MessageQueueV1), + task.WithRepository(sc.EngineRepository), + task.WithV1Repository(sc.V1), + task.WithLogger(sc.Logger), + task.WithPartition(p), + task.WithQueueLoggerConfig(&sc.AdditionalLoggers.Queue), + task.WithPgxStatsLoggerConfig(&sc.AdditionalLoggers.PgxStats), + ) - if err != nil { - return nil, fmt.Errorf("could not create tasks controller: %w", err) + if err != nil { + return nil, fmt.Errorf("could not create tasks controller: %w", err) + } + + cleanupTasks, err := tasks.Start() + + if err != nil { + return nil, fmt.Errorf("could not start tasks controller: %w", err) + } + + teardown = append(teardown, Teardown{ + Name: "tasks controller", + Fn: cleanupTasks, + }) } - cleanupTasks, err := tasks.Start() + if isControllerActive(sc.PausedControllers, OLAPController) { + olap, err := olap.New( + olap.WithAlerter(sc.Alerter), + olap.WithMessageQueue(sc.MessageQueueV1), + olap.WithRepository(sc.V1), + olap.WithLogger(sc.Logger), + olap.WithPartition(p), + olap.WithTenantAlertManager(sc.TenantAlerter), + olap.WithSamplingConfig(sc.Sampling), + ) - if err != nil { - return nil, fmt.Errorf("could not start tasks controller: %w", err) + if err != nil { + return nil, fmt.Errorf("could not create olap controller: %w", err) + } + + cleanupOlap, err := olap.Start() + + if err != nil { + return nil, fmt.Errorf("could not start olap controller: %w", err) + } + + teardown = append(teardown, Teardown{ + Name: "olap controller", + Fn: cleanupOlap, + }) } - teardown = append(teardown, Teardown{ - Name: "tasks controller", - Fn: cleanupTasks, - }) - - olap, err := olap.New( - olap.WithAlerter(sc.Alerter), - olap.WithMessageQueue(sc.MessageQueueV1), - olap.WithRepository(sc.V1), - olap.WithLogger(sc.Logger), - olap.WithPartition(p), - olap.WithTenantAlertManager(sc.TenantAlerter), - olap.WithSamplingConfig(sc.Sampling), - ) - - if err != nil { - return nil, fmt.Errorf("could not create olap controller: %w", err) - } - - cleanupOlap, err := olap.Start() - - if err != nil { - return nil, fmt.Errorf("could not start olap controller: %w", err) - } - - teardown = append(teardown, Teardown{ - Name: "olap controller", - Fn: cleanupOlap, - }) - cleanup1, err := p.StartTenantWorkerPartition(ctx) if err != nil { @@ -1125,3 +1129,18 @@ func startPrometheus(l *zerolog.Logger, c shared.PrometheusConfigFile) Teardown }, } } + +type ControllerName string + +const ( + OLAPController ControllerName = "olap" + TaskController ControllerName = "task" +) + +func isControllerActive(pausedControllers map[string]bool, controllerName ControllerName) bool { + if isPaused, ok := pausedControllers[string(controllerName)]; !ok || !isPaused { + return true + } + + return false +} diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index d39b63f9b..aedf74567 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -608,6 +608,14 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers services = strings.Split(cf.ServicesString, " ") } + pausedControllers := make(map[string]bool) + + if cf.PausedControllers != "" { + for _, controller := range strings.Split(cf.PausedControllers, " ") { + pausedControllers[controller] = true + } + } + if cf.Runtime.Monitoring.TLSRootCAFile == "" { cf.Runtime.Monitoring.TLSRootCAFile = cf.TLS.TLSRootCAFile } @@ -630,6 +638,7 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers MessageQueue: mq, MessageQueueV1: mqv1, Services: services, + PausedControllers: pausedControllers, InternalClientFactory: internalClientFactory, Logger: &l, TLSConfig: tls, diff --git a/pkg/config/server/server.go b/pkg/config/server/server.go index a739c8997..4af7bf024 100644 --- a/pkg/config/server/server.go +++ b/pkg/config/server/server.go @@ -47,6 +47,8 @@ type ServerConfigFile struct { // Used to bind the environment variable, since the array is not well supported ServicesString string `mapstructure:"servicesString" json:"servicesString,omitempty"` + PausedControllers string `mapstructure:"pausedControllers" json:"pausedControllers,omitempty"` + EnableDataRetention bool `mapstructure:"enableDataRetention" json:"enableDataRetention,omitempty" default:"true"` EnableWorkerRetention bool `mapstructure:"enableWorkerRetention" json:"enableWorkerRetention,omitempty" default:"false"` @@ -481,6 +483,8 @@ type ServerConfig struct { Services []string + PausedControllers map[string]bool + EnableDataRetention bool EnableWorkerRetention bool @@ -548,6 +552,7 @@ func BindAllEnv(v *viper.Viper) { _ = v.BindEnv("runtime.grpcRateLimit", "SERVER_GRPC_RATE_LIMIT") _ = v.BindEnv("runtime.shutdownWait", "SERVER_SHUTDOWN_WAIT") _ = v.BindEnv("servicesString", "SERVER_SERVICES") + _ = v.BindEnv("pausedControllers", "SERVER_PAUSED_CONTROLLERS") _ = v.BindEnv("enableDataRetention", "SERVER_ENABLE_DATA_RETENTION") _ = v.BindEnv("enableWorkerRetention", "SERVER_ENABLE_WORKER_RETENTION") _ = v.BindEnv("runtime.enforceLimits", "SERVER_ENFORCE_LIMITS")