start eventlistener in group

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-09-06 16:46:43 +02:00
parent 8e1b033a63
commit de14367d16
2 changed files with 26 additions and 15 deletions

View File

@@ -31,21 +31,22 @@ var _registeredEvents = []events.Unmarshaller{
}
// ListenForEvents listens for events and acts accordingly
func ListenForEvents(cfg *config.Config, l log.Logger) {
func ListenForEvents(ctx context.Context, cfg *config.Config, l log.Logger) error {
bus, err := stream.NatsFromConfig(cfg.Service.Name, stream.NatsConfig(cfg.Events))
if err != nil {
l.Error().Err(err).Msg("cannot connect to nats")
return
return err
}
evChannel, err := events.Consume(bus, "frontend", _registeredEvents...)
if err != nil {
l.Error().Err(err).Msg("cannot consume from nats")
return err
}
tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode)
if err != nil {
return
return err
}
gatewaySelector, err := pool.GatewaySelector(
@@ -56,19 +57,19 @@ func ListenForEvents(cfg *config.Config, l log.Logger) {
)
if err != nil {
l.Error().Err(err).Msg("cannot get gateway selector")
return
return err
}
gwc, err := gatewaySelector.Next()
if err != nil {
l.Error().Err(err).Msg("cannot get gateway client")
return
return err
}
traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name)
if err != nil {
l.Error().Err(err).Msg("cannot initialize tracing")
return
return err
}
grpcClient, err := grpc.NewClient(
@@ -79,17 +80,23 @@ func ListenForEvents(cfg *config.Config, l log.Logger) {
)
if err != nil {
l.Error().Err(err).Msg("cannot create grpc client")
return
return err
}
valueService := settingssvc.NewValueService("com.owncloud.api.settings", grpcClient)
for e := range evChannel {
switch ev := e.Event.(type) {
default:
l.Error().Interface("event", e).Msg("unhandled event")
case events.ShareCreated:
AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gwc, valueService, cfg.ServiceAccount)
for {
select {
case e := <-evChannel:
switch ev := e.Event.(type) {
default:
l.Error().Interface("event", e).Msg("unhandled event")
case events.ShareCreated:
AutoAcceptShares(ev, cfg.AutoAcceptShares, l, gwc, valueService, cfg.ServiceAccount)
}
case <-ctx.Done():
l.Info().Msg("context cancelled")
return ctx.Err()
}
}
}

View File

@@ -90,8 +90,12 @@ func Server(cfg *config.Config) *cli.Command {
logger.Fatal().Err(err).Msg("failed to register the http service")
}
// start event handler
go ListenForEvents(cfg, logger)
// add event handler
gr.Add(func() error {
return ListenForEvents(ctx, cfg, logger)
}, func(_ error) {
cancel()
})
return gr.Run()
},