From de14367d16c86ce1df9a4e4280e592bdaf1df192 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Wed, 6 Sep 2023 16:46:43 +0200 Subject: [PATCH] start eventlistener in group Signed-off-by: jkoberg --- services/frontend/pkg/command/events.go | 33 +++++++++++++++---------- services/frontend/pkg/command/server.go | 8 ++++-- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/services/frontend/pkg/command/events.go b/services/frontend/pkg/command/events.go index 24141a8ad..7a4ba1983 100644 --- a/services/frontend/pkg/command/events.go +++ b/services/frontend/pkg/command/events.go @@ -31,21 +31,22 @@ var _registeredEvents = []events.Unmarshaller{ } // ListenForEvents listens for events and acts accordingly -func ListenForEvents(cfg *config.Config, l log.Logger) { +func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error { bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events)) if err != nil { l.Error().Err(err).Msg("cannot connect to nats") - return + return err } evChannel, err := events.Consume(bus, "frontend", _registeredEvents...) if err != nil { l.Error().Err(err).Msg("cannot consume from nats") + return err } tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode) if err != nil { - return + return err } gatewaySelector, err := pool.GatewaySelector( @@ -56,19 +57,19 @@ func ListenForEvents(cfg *config.Config, l log.Logger) { ) if err != nil { l.Error().Err(err).Msg("cannot get gateway selector") - return + return err } gwc, err := gatewaySelector.Next() if err != nil { l.Error().Err(err).Msg("cannot get gateway client") - return + return err } traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) if err != nil { l.Error().Err(err).Msg("cannot initialize tracing") - return + return err } grpcClient, err := grpc.NewClient( @@ -79,17 +80,23 @@ func ListenForEvents(cfg *config.Config, l log.Logger) { ) if err != nil { l.Error().Err(err).Msg("cannot create grpc client") - return + return err } valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient) - for e := range evChannel { - switch ev := e.Event.(type) { - default: - l.Error().Interface("event", e).Msg("unhandled event") - case events.ShareCreated: - AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gwc, valueService, cfg.ServiceAccount) + for { + select { + case e := <-evChannel: + switch ev := e.Event.(type) { + default: + l.Error().Interface("event", e).Msg("unhandled event") + case events.ShareCreated: + AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gwc, valueService, cfg.ServiceAccount) + } + case <-ctx.Done(): + l.Info().Msg("context cancelled") + return ctx.Err() } } } diff --git a/services/frontend/pkg/command/server.go b/services/frontend/pkg/command/server.go index a6a813e55..1aa8a72ac 100644 --- a/services/frontend/pkg/command/server.go +++ b/services/frontend/pkg/command/server.go @@ -90,8 +90,12 @@ func Server(cfg *config.Config) *cli.Command { logger.Fatal().Err(err).Msg("failed to register the http service") } - // start event handler - go ListenForEvents(cfg, logger) + // add event handler + gr.Add(func() error { + return ListenForEvents(ctx, cfg, logger) + }, func(_ error) { + cancel() + }) return gr.Run() },