postprocessing event workers

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Jörn Friedrich Dreyer
2024-10-22 10:21:02 +02:00
parent d6958f3a3e
commit 91dc7699d8
4 changed files with 10 additions and 4 deletions

View File

@@ -25,7 +25,9 @@ type Config struct {
// Postprocessing defines the config options for the postprocessing service.
type Postprocessing struct {
Events Events `yaml:"events"`
Events Events `yaml:"events"`
Workers int `yaml:"workers" env:"POSTPROCESSING_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"6.7"`
Steps []string `yaml:"steps" env:"POSTPROCESSING_STEPS" desc:"A list of postprocessing steps processed in order of their appearance. Currently supported values by the system are: 'virusscan', 'policies' and 'delay'. Custom steps are allowed. See the documentation for instructions. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"`
Delayprocessing time.Duration `yaml:"delayprocessing" env:"POSTPROCESSING_DELAY" desc:"After uploading a file but before making it available for download, a delay step can be added. Intended for developing purposes only. If a duration is set but the keyword 'delay' is not explicitely added to 'POSTPROCESSING_STEPS', the delay step will be processed as last step. In such a case, a log entry will be written on service startup to remind the admin about that situation. See the Environment Variable Types description for more details." introductionVersion:"pre5.0"`
@@ -35,7 +37,6 @@ type Postprocessing struct {
// Events combines the configuration options for the event bus.
type Events struct {
Workers int `yaml:"workers" env:"POSTPROCESSING_EVENTS_WORKERS" desc:"The number of concurrent go routines that fetch events from the event queue." introductionVersion:"%%NEXT%%"`
Endpoint string `yaml:"endpoint" env:"OCIS_EVENTS_ENDPOINT;POSTPROCESSING_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture." introductionVersion:"pre5.0"`
Cluster string `yaml:"cluster" env:"OCIS_EVENTS_CLUSTER;POSTPROCESSING_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system." introductionVersion:"pre5.0"`

View File

@@ -28,10 +28,10 @@ func DefaultConfig() *config.Config {
},
Postprocessing: config.Postprocessing{
Events: config.Events{
Workers: 3,
Endpoint: "127.0.0.1:9233",
Cluster: "ocis-cluster",
},
Workers: 3,
RetryBackoffDuration: 5 * time.Second,
MaxRetries: 14,
},

View File

@@ -66,7 +66,7 @@ func NewPostprocessingService(ctx context.Context, stream events.Stream, logger
// Run to fulfil Runner interface
func (pps *PostprocessingService) Run() error {
// Spawn workers that'll concurrently work the queue
for i := 0; i < 3; i++ {
for i := 0; i < pps.c.Workers; i++ {
go (func() {
for e := range pps.events {
err := pps.processEvent(e)