storage-users: pass context to event handling service

This allows to properly shutdown the event handling loop by
cancelling the context.
This commit is contained in:
Ralf Haferkamp
2023-07-13 14:45:31 +02:00
committed by Ralf Haferkamp
parent 2a53dd89f4
commit 7dde3a1175
2 changed files with 56 additions and 35 deletions
+7 -2
View File
@@ -109,12 +109,17 @@ func Server(cfg *config.Config) *cli.Command {
return err
}
eventSVC, err := event.NewService(selector, stream, logger, *cfg)
eventSVC, err := event.NewService(ctx, selector, stream, logger, *cfg)
if err != nil {
logger.Fatal().Err(err).Msg("can't create event service")
logger.Fatal().Err(err).Msg("can't create event handler")
}
gr.Add(eventSVC.Run, func(_ error) {
logger.Error().
Err(err).
Str("server", cfg.Service.Name).
Msg("Shutting down event handler")
cancel()
})
}
+49 -33
View File
@@ -1,6 +1,7 @@
package event
import (
"context"
"time"
apiGateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1"
@@ -21,15 +22,17 @@ type Service struct {
eventStream events.Stream
logger log.Logger
config config.Config
ctx context.Context
}
// NewService prepares and returns a Service implementation.
func NewService(gatewaySelector pool.Selectable[apiGateway.GatewayAPIClient], eventStream events.Stream, logger log.Logger, conf config.Config) (Service, error) {
func NewService(ctx context.Context, gatewaySelector pool.Selectable[apiGateway.GatewayAPIClient], eventStream events.Stream, logger log.Logger, conf config.Config) (Service, error) {
svc := Service{
gatewaySelector: gatewaySelector,
eventStream: eventStream,
logger: logger,
config: conf,
ctx: ctx,
}
return svc, nil
@@ -42,39 +45,52 @@ func (s Service) Run() error {
return err
}
for e := range ch {
var errs []error
switch ev := e.Event.(type) {
case PurgeTrashBin:
executionTime := ev.ExecutionTime
if executionTime.IsZero() {
executionTime = time.Now()
for {
select {
case <-s.ctx.Done():
s.logger.Info().Str("service", s.config.Service.Name).Msg("Context canceled. Shutting down event handler")
return nil
case e, more := <-ch:
if !more {
s.logger.Info().Str("service", s.config.Service.Name).Msg("Event channel closed. Shutting down event handler")
// the channel was closed we can stop here
return nil
}
tasks := map[task.SpaceType]time.Time{
task.Project: executionTime.Add(-s.config.Tasks.PurgeTrashBin.ProjectDeleteBefore),
task.Personal: executionTime.Add(-s.config.Tasks.PurgeTrashBin.PersonalDeleteBefore),
}
for spaceType, deleteBefore := range tasks {
// skip task execution if the deleteBefore time is the same as the now time,
// which indicates that the duration configuration for this space type is set to 0 which is the equivalent to disabled.
if deleteBefore.Equal(executionTime) {
continue
}
if err = task.PurgeTrashBin(s.config.ServiceAccount.ServiceAccountID, deleteBefore, spaceType, s.gatewaySelector, s.config.ServiceAccount.ServiceAccountSecret); err != nil {
errs = append(errs, err)
}
}
}
for _, err := range errs {
s.logger.Error().Err(err).Interface("event", e)
s.handleEvent(e)
}
}
return nil
}
func (s Service) handleEvent(e events.Event) {
var errs []error
switch ev := e.Event.(type) {
case PurgeTrashBin:
executionTime := ev.ExecutionTime
if executionTime.IsZero() {
executionTime = time.Now()
}
tasks := map[task.SpaceType]time.Time{
task.Project: executionTime.Add(-s.config.Tasks.PurgeTrashBin.ProjectDeleteBefore),
task.Personal: executionTime.Add(-s.config.Tasks.PurgeTrashBin.PersonalDeleteBefore),
}
for spaceType, deleteBefore := range tasks {
// skip task execution if the deleteBefore time is the same as the now time,
// which indicates that the duration configuration for this space type is set to 0 which is the equivalent to disabled.
if deleteBefore.Equal(executionTime) {
continue
}
if err := task.PurgeTrashBin(s.config.ServiceAccount.ServiceAccountID, deleteBefore, spaceType, s.gatewaySelector, s.config.ServiceAccount.ServiceAccountSecret); err != nil {
errs = append(errs, err)
}
}
}
for _, err := range errs {
s.logger.Error().Err(err).Interface("event", e).Msg("Error running PurgeTrashBin task")
}
}