Expose the MaxAckPending and AckWait settings for postprocessing

This commit is contained in:
André Duffeck
2025-07-15 09:33:08 +02:00
parent ad8ac8ee25
commit de7b659e3d
5 changed files with 37 additions and 5 deletions

View File

@@ -521,7 +521,15 @@ func trap(s *Service, ctx context.Context) {
func pingNats(cfg *occfg.Config) error {
// We need to get a natsconfig from somewhere. We can use any one.
evcfg := cfg.Postprocessing.Postprocessing.Events
_, err := stream.NatsFromConfig("initial", true, stream.NatsConfig(evcfg))
_, err := stream.NatsFromConfig("initial", true, stream.NatsConfig{
Endpoint: evcfg.Endpoint,
Cluster: evcfg.Cluster,
EnableTLS: evcfg.EnableTLS,
TLSInsecure: evcfg.TLSInsecure,
TLSRootCACertificate: evcfg.TLSRootCACertificate,
AuthUsername: evcfg.AuthUsername,
AuthPassword: evcfg.AuthPassword,
})
return err
}

View File

@@ -40,7 +40,15 @@ func RestartPostprocessing(cfg *config.Config) *cli.Command {
return configlog.ReturnFatal(parser.ParseConfig(cfg))
},
Action: func(c *cli.Context) error {
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events))
stream, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
TLSInsecure: cfg.Postprocessing.Events.TLSInsecure,
TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate,
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
})
if err != nil {
return err
}

View File

@@ -45,6 +45,9 @@ type Events struct {
EnableTLS bool `yaml:"enable_tls" env:"OC_EVENTS_ENABLE_TLS;POSTPROCESSING_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"`
AuthUsername string `yaml:"username" env:"OC_EVENTS_AUTH_USERNAME;POSTPROCESSING_EVENTS_AUTH_USERNAME" desc:"The username to authenticate with the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"`
AuthPassword string `yaml:"password" env:"OC_EVENTS_AUTH_PASSWORD;POSTPROCESSING_EVENTS_AUTH_PASSWORD" desc:"The password to authenticate with the events broker. The events broker is the OpenCloud service which receives and delivers events between the services." introductionVersion:"1.0.0"`
MaxAckPending int `yaml:"max_ack_pending" env:"SEARCH_EVENTS_MAX_ACK_PENDING" desc:"The maximum number of unacknowledged messages. This is used to limit the number of messages that can be in flight at the same time." introductionVersion:"%%NEXT%%"`
AckWait time.Duration `yaml:"ack_wait" env:"SEARCH_EVENTS_ACK_WAIT" desc:"The time to wait for an ack before the message is redelivered. This is used to ensure that messages are not lost if the consumer crashes." introductionVersion:"%%NEXT%%"`
}
// Debug defines the available debug configuration.

View File

@@ -28,8 +28,10 @@ func DefaultConfig() *config.Config {
},
Postprocessing: config.Postprocessing{
Events: config.Events{
Endpoint: "127.0.0.1:9233",
Cluster: "opencloud-cluster",
Endpoint: "127.0.0.1:9233",
Cluster: "opencloud-cluster",
MaxAckPending: 10_000,
AckWait: 1 * time.Minute,
},
Workers: 3,
RetryBackoffDuration: 5 * time.Second,

View File

@@ -43,7 +43,16 @@ var (
// NewPostprocessingService returns a new instance of a postprocessing service
func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.Store, tp trace.TracerProvider, cfg *config.Config) (*PostprocessingService, error) {
pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig(cfg.Postprocessing.Events))
pub, err := stream.NatsFromConfig(cfg.Service.Name, false, stream.NatsConfig{
Endpoint: cfg.Postprocessing.Events.Endpoint,
Cluster: cfg.Postprocessing.Events.Cluster,
EnableTLS: cfg.Postprocessing.Events.EnableTLS,
TLSInsecure: cfg.Postprocessing.Events.TLSInsecure,
TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate,
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
})
if err != nil {
return nil, err
}
@@ -55,6 +64,8 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store.
TLSRootCACertificate: cfg.Postprocessing.Events.TLSRootCACertificate,
AuthUsername: cfg.Postprocessing.Events.AuthUsername,
AuthPassword: cfg.Postprocessing.Events.AuthPassword,
MaxAckPending: cfg.Postprocessing.Events.MaxAckPending,
AckWait: cfg.Postprocessing.Events.AckWait,
})
evs, err := raw.Consume("postprocessing-pull",