From de7b659e3d47aabf776f5aa2ed7557cda8b821bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Duffeck?= Date: Tue, 15 Jul 2025 09:33:08 +0200 Subject: [PATCH] Expose the MaxAckPending and AckWait settings for postprocessing --- opencloud/pkg/runtime/service/service.go | 10 +++++++++- .../postprocessing/pkg/command/postprocessing.go | 10 +++++++++- services/postprocessing/pkg/config/config.go | 3 +++ .../pkg/config/defaults/defaultconfig.go | 6 ++++-- services/postprocessing/pkg/service/service.go | 13 ++++++++++++- 5 files changed, 37 insertions(+), 5 deletions(-) diff --git a/opencloud/pkg/runtime/service/service.go b/opencloud/pkg/runtime/service/service.go index 4025c608d7..bbf92aa082 100644 --- a/opencloud/pkg/runtime/service/service.go +++ b/opencloud/pkg/runtime/service/service.go @@ -521,7 +521,15 @@ func trap(s *Service, ctx context.Context) { func pingNats(cfg *occfg.Config) error { // We need to get a natsconfig from somewhere. We can use any one. evcfg := cfg.Postprocessing.Postprocessing.Events - _, err := stream.NatsFromConfig("initial", true, stream.NatsConfig(evcfg)) + _, err := stream.NatsFromConfig("initial", true, stream.NatsConfig{ + Endpoint: evcfg.Endpoint, + Cluster: evcfg.Cluster, + EnableTLS: evcfg.EnableTLS, + TLSInsecure: evcfg.TLSInsecure, + TLSRootCACertificate: evcfg.TLSRootCACertificate, + AuthUsername: evcfg.AuthUsername, + AuthPassword: evcfg.AuthPassword, + }) return err } diff --git a/services/postprocessing/pkg/command/postprocessing.go b/services/postprocessing/pkg/command/postprocessing.go index 8abfa1c5f0..1494b335b0 100644 --- a/services/postprocessing/pkg/command/postprocessing.go +++ b/services/postprocessing/pkg/command/postprocessing.go @@ -40,7 +40,15 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events)) + stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ + Endpoint: cfg.Postprocessing.Events.Endpoint, + Cluster: cfg.Postprocessing.Events.Cluster, + EnableTLS: cfg.Postprocessing.Events.EnableTLS, + TLSInsecure: cfg.Postprocessing.Events.TLSInsecure, + TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate, + AuthUsername: cfg.Postprocessing.Events.AuthUsername, + AuthPassword: cfg.Postprocessing.Events.AuthPassword, + }) if err != nil { return err } diff --git a/services/postprocessing/pkg/config/config.go b/services/postprocessing/pkg/config/config.go index 004e3ee1d9..5ff29a7ee4 100644 --- a/services/postprocessing/pkg/config/config.go +++ b/services/postprocessing/pkg/config/config.go @@ -45,6 +45,9 @@ type Events struct { EnableTLS bool `yaml:"enable_tls" env:"OC_EVENTS_ENABLE_TLS;POSTPROCESSING_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"` AuthUsername string `yaml:"username" env:"OC_EVENTS_AUTH_USERNAME;POSTPROCESSING_EVENTS_AUTH_USERNAME" desc:"The username to authenticate with the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"` AuthPassword string `yaml:"password" env:"OC_EVENTS_AUTH_PASSWORD;POSTPROCESSING_EVENTS_AUTH_PASSWORD" desc:"The password to authenticate with the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"` + + MaxAckPending int `yaml:"max_ack_pending" env:"SEARCH_EVENTS_MAX_ACK_PENDING" desc:"The maximum number of unacknowledged messages. This is used to limit the number of messages that can be in flight at the same time." introductionVersion:"%%NEXT%%"` + AckWait time.Duration `yaml:"ack_wait" env:"SEARCH_EVENTS_ACK_WAIT" desc:"The time to wait for an ack before the message is redelivered. This is used to ensure that messages are not lost if the consumer crashes." introductionVersion:"%%NEXT%%"` } // Debug defines the available debug configuration. diff --git a/services/postprocessing/pkg/config/defaults/defaultconfig.go b/services/postprocessing/pkg/config/defaults/defaultconfig.go index 3510c0969c..2a58db25db 100644 --- a/services/postprocessing/pkg/config/defaults/defaultconfig.go +++ b/services/postprocessing/pkg/config/defaults/defaultconfig.go @@ -28,8 +28,10 @@ func DefaultConfig() *config.Config { }, Postprocessing: config.Postprocessing{ Events: config.Events{ - Endpoint: "127.0.0.1:9233", - Cluster: "opencloud-cluster", + Endpoint: "127.0.0.1:9233", + Cluster: "opencloud-cluster", + MaxAckPending: 10_000, + AckWait: 1 * time.Minute, }, Workers: 3, RetryBackoffDuration: 5 * time.Second, diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 8782757ac3..2b41b28880 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -43,7 +43,16 @@ var ( // NewPostprocessingService returns a new instance of a postprocessing service func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) { - pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events)) + pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{ + Endpoint: cfg.Postprocessing.Events.Endpoint, + Cluster: cfg.Postprocessing.Events.Cluster, + EnableTLS: cfg.Postprocessing.Events.EnableTLS, + TLSInsecure: cfg.Postprocessing.Events.TLSInsecure, + TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate, + AuthUsername: cfg.Postprocessing.Events.AuthUsername, + AuthPassword: cfg.Postprocessing.Events.AuthPassword, + }) + if err != nil { return nil, err } @@ -55,6 +64,8 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store. TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate, AuthUsername: cfg.Postprocessing.Events.AuthUsername, AuthPassword: cfg.Postprocessing.Events.AuthPassword, + MaxAckPending: cfg.Postprocessing.Events.MaxAckPending, + AckWait: cfg.Postprocessing.Events.AckWait, }) evs, err := raw.Consume("postprocessing-pull",