diff --git a/pkg/runner/factory.go b/pkg/runner/factory.go new file mode 100644 index 000000000..e7925941e --- /dev/null +++ b/pkg/runner/factory.go @@ -0,0 +1,134 @@ +package runner + +import ( + "context" + "errors" + "net" + "net/http" + "time" + + ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" + ohttp "github.com/opencloud-eu/opencloud/pkg/service/http" + "google.golang.org/grpc" +) + +// NewGoMicroGrpcServerRunner creates a new runner based on the provided go-micro's +// GRPC service. The service is expected to be created via +// "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc".NewService(...) function +// +// The runner will behave as described: +// * The task is to start a server and listen for connections. If the server +// can't start, the task will finish with that error. +// * The stopper will call the server's stop method and send the result to +// the task. +// * The stopper will run asynchronously because the stop method could take a +// while and we don't want to block +func NewGoMicroGrpcServerRunner(name string, server ogrpc.Service, opts ...Option) *Runner { + httpCh := make(chan error, 1) + r := New(name, func() error { + // start the server and return if it fails + if err := server.Server().Start(); err != nil { + return err + } + return <-httpCh // wait for the result + }, func() { + // stop implies deregistering and waiting for request to finish, + // so don't block + go func() { + httpCh <- server.Server().Stop() // stop and send result through channel + close(httpCh) + }() + }, opts...) + return r +} + +// NewGoMicroHttpServerRunner creates a new runner based on the provided go-micro's +// HTTP service. The service is expected to be created via +// "github.com/owncloud/ocis/v2/ocis-pkg/service/http".NewService(...) function +// +// The runner will behave as described: +// * The task is to start a server and listen for connections. If the server +// can't start, the task will finish with that error. +// * The stopper will call the server's stop method and send the result to +// the task. +// * The stopper will run asynchronously because the stop method could take a +// while and we don't want to block +func NewGoMicroHttpServerRunner(name string, server ohttp.Service, opts ...Option) *Runner { + httpCh := make(chan error, 1) + r := New(name, func() error { + // start the server and return if it fails + if err := server.Server().Start(); err != nil { + return err + } + return <-httpCh // wait for the result + }, func() { + // stop implies deregistering and waiting for request to finish, + // so don't block + go func() { + httpCh <- server.Server().Stop() // stop and send result through channel + close(httpCh) + }() + }, opts...) + return r +} + +// NewGolangHttpServerRunner creates a new runner based on the provided HTTP server. +// The HTTP server is expected to be created via +// "github.com/owncloud/ocis/v2/ocis-pkg/service/debug".NewService(...) function +// and it's expected to be a regular golang HTTP server +// +// The runner will behave as described: +// * The task starts a server and listen for connections. If the server +// can't start, the task will finish with that error. If the server is shutdown +// the task will wait for the shutdown to return that result (task won't finish +// immediately, but wait until shutdown returns) +// * The stopper will call the server's shutdown method and send the result to +// the task. The stopper will wait up to 5 secs for the shutdown. +// * The stopper will run asynchronously because the shutdown could take a +// while and we don't want to block +func NewGolangHttpServerRunner(name string, server *http.Server, opts ...Option) *Runner { + debugCh := make(chan error, 1) + r := New(name, func() error { + // start listening and return if the error is NOT ErrServerClosed. + // ListenAndServe will always return a non-nil error. + // We need to wait and get the result of the Shutdown call. + // App shouldn't exit until Shutdown has returned. + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + return err + } + // wait for the shutdown and return the result + return <-debugCh + }, func() { + // Since Shutdown might take some time, don't block + go func() { + // give 5 secs for the shutdown to finish + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + debugCh <- server.Shutdown(shutdownCtx) + close(debugCh) + }() + }, opts...) + + return r +} + +// NewGolangGrpcServerRunner creates a new runner based on the provided GRPC +// server. The GRPC server is expected to be a regular golang GRPC server, +// created via "google.golang.org/grpc".NewServer(...) +// A listener also needs to be provided for the server to listen there. +// +// The runner will just start the GRPC server in the listener, and the server +// will be gracefully stopped when interrupted +func NewGolangGrpcServerRunner(name string, server *grpc.Server, listener net.Listener, opts ...Option) *Runner { + r := New(name, func() error { + return server.Serve(listener) + }, func() { + // Since GracefulStop might take some time, don't block + go func() { + server.GracefulStop() + }() + }, opts...) + + return r +} diff --git a/pkg/runner/types.go b/pkg/runner/types.go index c4914e326..4f226c18c 100644 --- a/pkg/runner/types.go +++ b/pkg/runner/types.go @@ -1,10 +1,16 @@ package runner import ( + "os" "strings" + "syscall" "time" ) +var ( + StopSignals = []os.Signal{syscall.SIGTERM, syscall.SIGINT, syscall.SIGQUIT} +) + // Runable represent a task that can be executed by the Runner. // It expected to be a long running task with an indefinite execution time, // so it's suitable for servers or services. diff --git a/services/antivirus/pkg/command/server.go b/services/antivirus/pkg/command/server.go index 4b05bd8fb..05f3186c0 100644 --- a/services/antivirus/pkg/command/server.go +++ b/services/antivirus/pkg/command/server.go @@ -3,12 +3,13 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/services/antivirus/pkg/config" "github.com/opencloud-eu/opencloud/services/antivirus/pkg/config/parser" @@ -26,31 +27,38 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - logger = log.NewLogger( - log.Name(cfg.Service.Name), - log.Level(cfg.Log.Level), - log.Pretty(cfg.Log.Pretty), - log.Color(cfg.Log.Color), - log.File(cfg.Log.File), - ) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + + logger := log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), ) - defer cancel() + traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) if err != nil { return err } + + gr := runner.NewGroup() { svc, err := service.NewAntivirus(cfg, logger, traceProvider) if err != nil { return cli.Exit(err.Error(), 1) } - gr.Add(svc.Run, func(_ error) { - cancel() - }) + gr.Add(runner.New("antivirus_svc", func() error { + return svc.Run() + }, func() { + svc.Close() + })) } { @@ -64,13 +72,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("antivirus_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/antivirus/pkg/service/service.go b/services/antivirus/pkg/service/service.go index 6dea06db6..99c78d879 100644 --- a/services/antivirus/pkg/service/service.go +++ b/services/antivirus/pkg/service/service.go @@ -11,6 +11,7 @@ import ( "os" "slices" "sync" + "sync/atomic" "time" "github.com/opencloud-eu/reva/v2/pkg/bytesize" @@ -54,7 +55,15 @@ func NewAntivirus(cfg *config.Config, logger log.Logger, tracerProvider trace.Tr return Antivirus{}, err } - av := Antivirus{config: cfg, log: logger, tracerProvider: tracerProvider, scanner: scanner, client: rhttp.GetHTTPClient(rhttp.Insecure(true))} + av := Antivirus{ + config: cfg, + log: logger, + tracerProvider: tracerProvider, + scanner: scanner, + client: rhttp.GetHTTPClient(rhttp.Insecure(true)), + stopCh: make(chan struct{}, 1), + stopped: new(atomic.Bool), + } switch mode := cfg.MaxScanSizeMode; mode { case config.MaxScanSizeModeSkip, config.MaxScanSizeModePartial: @@ -91,7 +100,9 @@ type Antivirus struct { maxScanSize uint64 tracerProvider trace.TracerProvider - client *http.Client + client *http.Client + stopCh chan struct{} + stopped *atomic.Bool } // Run runs the service @@ -131,26 +142,48 @@ func (av Antivirus) Run() error { wg.Add(1) go func() { defer wg.Done() - for e := range ch { - err := av.processEvent(e, natsStream) - if err != nil { - switch { - case errors.Is(err, ErrFatal): - av.log.Fatal().Err(err).Msg("fatal error - exiting") - case errors.Is(err, ErrEvent): - av.log.Error().Err(err).Msg("continuing") - default: - av.log.Fatal().Err(err).Msg("unknown error - exiting") + + EventLoop: + for { + select { + case e, ok := <-ch: + if !ok { + break EventLoop } + + err := av.processEvent(e, natsStream) + if err != nil { + switch { + case errors.Is(err, ErrFatal): + av.log.Fatal().Err(err).Msg("fatal error - exiting") + case errors.Is(err, ErrEvent): + av.log.Error().Err(err).Msg("continuing") + default: + av.log.Fatal().Err(err).Msg("unknown error - exiting") + } + } + + if av.stopped.Load() { + break EventLoop + } + case <-av.stopCh: + break EventLoop } } }() } + wg.Wait() return nil } +func (av Antivirus) Close() { + if av.stopped.CompareAndSwap(false, true) { + close(av.stopCh) + } +} + func (av Antivirus) processEvent(e events.Event, s events.Publisher) error { ctx, span := av.tracerProvider.Tracer("antivirus").Start(e.GetTraceContext(context.Background()), "processEvent") defer span.End() diff --git a/services/audit/pkg/command/server.go b/services/audit/pkg/command/server.go index 38727520b..78f05df85 100644 --- a/services/audit/pkg/command/server.go +++ b/services/audit/pkg/command/server.go @@ -3,14 +3,15 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/services/audit/pkg/config" "github.com/opencloud-eu/opencloud/services/audit/pkg/config/parser" "github.com/opencloud-eu/opencloud/services/audit/pkg/logging" @@ -29,13 +30,15 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - logger = logging.Configure(cfg.Service.Name, cfg.Log) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - ctx, cancel = context.WithCancel(c.Context) - ) - defer cancel() + logger := logging.Configure(cfg.Service.Name, cfg.Log) + gr := runner.NewGroup() connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus) client, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) @@ -47,24 +50,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - svc.AuditLoggerFromConfig(ctx, cfg.Auditlog, evts, logger) - return nil - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } + // we need an additional context for the audit server in order to + // cancel it anytime + svcCtx, svcCancel := context.WithCancel(ctx) + defer svcCancel() - cancel() - }) + gr.Add(runner.New("audit_svc", func() error { + svc.AuditLoggerFromConfig(svcCtx, cfg.Auditlog, evts, logger) + return nil + }, func() { + svcCancel() + })) { debugServer, err := debug.Server( @@ -77,12 +73,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("audit_debug", debugServer)) } - return gr.Run() + + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/audit/pkg/service/service.go b/services/audit/pkg/service/service.go index 9afb079b1..1de8d27e7 100644 --- a/services/audit/pkg/service/service.go +++ b/services/audit/pkg/service/service.go @@ -42,7 +42,11 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge select { case <-ctx.Done(): return - case i := <-ch: + case i, ok := <-ch: + if !ok { + return + } + var auditEvent interface{} switch ev := i.Event.(type) { case events.ShareCreated: @@ -113,6 +117,10 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge auditEvent = types.ScienceMeshInviteTokenGenerated(ev) default: log.Error().Interface("event", ev).Msg(fmt.Sprintf("can't handle event of type '%T'", ev)) + if ctx.Err() != nil { + // if context is done, do not process more events + return + } continue } @@ -120,12 +128,19 @@ func StartAuditLogger(ctx context.Context, ch <-chan events.Event, log log.Logge b, err := marshaller(auditEvent) if err != nil { log.Error().Err(err).Msg("error marshaling the event") + if ctx.Err() != nil { + return + } continue } for _, l := range logto { l(b) } + + if ctx.Err() != nil { + return + } } } diff --git a/services/clientlog/pkg/command/server.go b/services/clientlog/pkg/command/server.go index 9392f3a39..f9e1e8a7c 100644 --- a/services/clientlog/pkg/command/server.go +++ b/services/clientlog/pkg/command/server.go @@ -3,8 +3,8 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" @@ -13,6 +13,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/clientlog/pkg/config" @@ -61,14 +62,16 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } mtrcs := metrics.New() mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) - defer cancel() - connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus) s, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { @@ -90,6 +93,7 @@ func Server(cfg *config.Config) *cli.Command { return fmt.Errorf("could not get reva client selector: %s", err) } + gr := runner.NewGroup() { svc, err := service.NewClientlogService( service.Logger(logger), @@ -105,23 +109,11 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { + gr.Add(runner.New("clientlog_svc", func() error { return svc.Run() - }, func(err error) { - if err != nil { - logger.Info(). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + }, func() { + svc.Close() + })) } { @@ -135,13 +127,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("clientlog_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/clientlog/pkg/service/service.go b/services/clientlog/pkg/service/service.go index 72b6c7a6b..487fbdc80 100644 --- a/services/clientlog/pkg/service/service.go +++ b/services/clientlog/pkg/service/service.go @@ -7,6 +7,7 @@ import ( "fmt" "path/filepath" "reflect" + "sync/atomic" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" @@ -33,6 +34,8 @@ type ClientlogService struct { tracer trace.Tracer publisher events.Publisher ch <-chan events.Event + stopCh chan struct{} + stopped atomic.Bool } // NewClientlogService returns a clientlog service @@ -60,6 +63,7 @@ func NewClientlogService(opts ...Option) (*ClientlogService, error) { tracer: o.TraceProvider.Tracer("github.com/opencloud-eu/opencloud/services/clientlog/pkg/service"), publisher: o.Stream, ch: ch, + stopCh: make(chan struct{}, 1), } for _, e := range o.RegisteredEvents { @@ -72,13 +76,32 @@ func NewClientlogService(opts ...Option) (*ClientlogService, error) { // Run runs the service func (cl *ClientlogService) Run() error { - for event := range cl.ch { - cl.processEvent(event) +EventLoop: + for { + select { + case event, ok := <-cl.ch: + if !ok { + break EventLoop + } + cl.processEvent(event) + + if cl.stopped.Load() { + break EventLoop + } + case <-cl.stopCh: + break EventLoop + } } return nil } +func (cl *ClientlogService) Close() { + if cl.stopped.CompareAndSwap(false, true) { + close(cl.stopCh) + } +} + func (cl *ClientlogService) processEvent(event events.Event) { gwc, err := cl.gatewaySelector.Next() if err != nil { diff --git a/services/collaboration/pkg/command/server.go b/services/collaboration/pkg/command/server.go index a048470b6..2bb369956 100644 --- a/services/collaboration/pkg/command/server.go +++ b/services/collaboration/pkg/command/server.go @@ -4,14 +4,15 @@ import ( "context" "fmt" "net" + "os/signal" "time" - "github.com/oklog/run" "github.com/urfave/cli/v2" microstore "go-micro.dev/v4/store" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/services/collaboration/pkg/config" "github.com/opencloud-eu/opencloud/services/collaboration/pkg/config/parser" @@ -41,9 +42,12 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } // prepare components if err := helpers.RegisterOpenCloudService(ctx, cfg, logger); err != nil { @@ -89,6 +93,8 @@ func Server(cfg *config.Config) *cli.Command { store.Authentication(cfg.Store.AuthUsername, cfg.Store.AuthPassword), ) + gr := runner.NewGroup() + // start GRPC server grpcServer, teardown, err := grpc.Server( grpc.AppURLs(appUrls), @@ -103,28 +109,11 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - l, err := net.Listen("tcp", cfg.GRPC.Addr) - if err != nil { - return err - } - return grpcServer.Serve(l) - }, - func(err error) { - if err != nil { - logger.Info(). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + l, err := net.Listen("tcp", cfg.GRPC.Addr) + if err != nil { + return err + } + gr.Add(runner.NewGolangGrpcServerRunner("collaboration_grpc", grpcServer, l)) // start debug server debugServer, err := debug.Server( @@ -136,11 +125,7 @@ func Server(cfg *config.Config) *cli.Command { logger.Error().Err(err).Str("transport", "debug").Msg("Failed to initialize server") return err } - - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("collaboration_debug", debugServer)) // start HTTP server httpServer, err := http.Server( @@ -152,14 +137,20 @@ func Server(cfg *config.Config) *cli.Command { http.Store(st), ) if err != nil { - logger.Error().Err(err).Str("transport", "http").Msg("Failed to initialize server") + logger.Info().Err(err).Str("transport", "http").Msg("Failed to initialize server") return err } - gr.Add(httpServer.Run, func(_ error) { - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("collaboration_http", httpServer)) - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/eventhistory/pkg/command/server.go b/services/eventhistory/pkg/command/server.go index 7d4049191..eaa85e248 100644 --- a/services/eventhistory/pkg/command/server.go +++ b/services/eventhistory/pkg/command/server.go @@ -3,8 +3,8 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/opencloud-eu/reva/v2/pkg/store" "github.com/urfave/cli/v2" @@ -12,6 +12,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -46,16 +47,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - m = metrics.New() - ) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + m := metrics.New() m.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() + connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus) consumer, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { @@ -84,21 +87,7 @@ func Server(cfg *config.Config) *cli.Command { grpc.TraceProvider(traceProvider), ) - gr.Add(service.Run, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroGrpcServerRunner("eventhistory_grpc", service)) { debugServer, err := debug.Server( @@ -111,13 +100,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("eventhistory_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/graph/pkg/command/server.go b/services/graph/pkg/command/server.go index 9cf76e4ad..4eb5c5c1d 100644 --- a/services/graph/pkg/command/server.go +++ b/services/graph/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/graph/pkg/config" @@ -33,14 +34,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + mtrcs := metrics.New() - - defer cancel() - mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() { server, err := http.Server( http.Logger(logger), @@ -54,23 +58,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("graph_http", server)) } { @@ -84,13 +72,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("graph_debug", server)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/idm/pkg/command/server.go b/services/idm/pkg/command/server.go index 1789ce98f..85546df7b 100644 --- a/services/idm/pkg/command/server.go +++ b/services/idm/pkg/command/server.go @@ -7,18 +7,19 @@ import ( "fmt" "html/template" "os" + "os/signal" "strings" "github.com/go-ldap/ldif" "github.com/libregraph/idm/pkg/ldappassword" "github.com/libregraph/idm/pkg/ldbbolt" "github.com/libregraph/idm/server" - "github.com/oklog/run" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" pkgcrypto "github.com/opencloud-eu/opencloud/pkg/crypto" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/services/idm" "github.com/opencloud-eu/opencloud/services/idm/pkg/config" "github.com/opencloud-eu/opencloud/services/idm/pkg/config/parser" @@ -36,14 +37,16 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - logger = logging.Configure(cfg.Service.Name, cfg.Log) - ctx, cancel = context.WithCancel(c.Context) - ) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - defer cancel() + logger := logging.Configure(cfg.Service.Name, cfg.Log) + gr := runner.NewGroup() { servercfg := server.Config{ Logger: log.LogrusWrap(logger.Logger), @@ -75,30 +78,16 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - err := make(chan error, 1) - select { - case <-ctx.Done(): - return nil + // we need an additional context for the idm server in order to + // cancel it anytime + svcCtx, svcCancel := context.WithCancel(ctx) + defer svcCancel() - case err <- svc.Serve(ctx): - return <-err - } - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.New("idm_svc", func() error { + return svc.Serve(svcCtx) + }, func() { + svcCancel() + })) } { @@ -112,14 +101,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("idm_debug", debugServer)) } - return gr.Run() - //return start(ctx, logger, cfg) + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/idp/pkg/command/server.go b/services/idp/pkg/command/server.go index 350d93fdf..db12bcc0c 100644 --- a/services/idp/pkg/command/server.go +++ b/services/idp/pkg/command/server.go @@ -12,10 +12,11 @@ import ( "io" "io/fs" "os" + "os/signal" "path/filepath" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/idp/pkg/config" @@ -57,16 +58,18 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - metrics = metrics.New() - ) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + metrics := metrics.New() metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() { server, err := http.Server( http.Logger(logger), @@ -84,23 +87,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("idp_http", server)) } { @@ -114,13 +101,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("idp_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/invitations/pkg/command/server.go b/services/invitations/pkg/command/server.go index e4d8046fa..647637bd4 100644 --- a/services/invitations/pkg/command/server.go +++ b/services/invitations/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/invitations/pkg/config" @@ -34,16 +35,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - metrics = metrics.New(metrics.Logger(logger)) - ) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + metrics := metrics.New(metrics.Logger(logger)) metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() { svc, err := service.New( @@ -74,23 +76,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err != nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("invitations_http", server)) } { @@ -104,13 +90,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("invitations_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/nats/pkg/command/server.go b/services/nats/pkg/command/server.go index 14ac3e2ce..56c0618d9 100644 --- a/services/nats/pkg/command/server.go +++ b/services/nats/pkg/command/server.go @@ -4,14 +4,13 @@ import ( "context" "crypto/tls" "fmt" - "time" - - "github.com/oklog/run" + "os/signal" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" pkgcrypto "github.com/opencloud-eu/opencloud/pkg/crypto" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/services/nats/pkg/config" "github.com/opencloud-eu/opencloud/services/nats/pkg/config/parser" "github.com/opencloud-eu/opencloud/services/nats/pkg/logging" @@ -31,11 +30,14 @@ func Server(cfg *config.Config) *cli.Command { Action: func(c *cli.Context) error { logger := logging.Configure(cfg.Service.Name, cfg.Log) - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + gr := runner.NewGroup() { debugServer, err := debug.Server( debug.Logger(logger), @@ -47,10 +49,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("nats_debug", debugServer)) } var tlsConf *tls.Config @@ -77,8 +76,7 @@ func Server(cfg *config.Config) *cli.Command { } } natsServer, err := nats.NewNATSServer( - ctx, - logger, + logging.NewLogWrapper(logger), nats.Host(cfg.Nats.Host), nats.Port(cfg.Nats.Port), nats.ClusterID(cfg.Nats.ClusterID), @@ -90,40 +88,21 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - err := make(chan error, 1) - select { - case <-ctx.Done(): - return nil - case err <- natsServer.ListenAndServe(): - return <-err - } - - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "nats"). - Str("server", cfg.Service.Name). - Msg("letting other services deregister") - - time.Sleep(3 * time.Second) - - logger.Info(). - Str("transport", "nats"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "nats"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - + gr.Add(runner.New("nats_svc", func() error { + return natsServer.ListenAndServe() + }, func() { natsServer.Shutdown() - cancel() - }) + })) - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/nats/pkg/server/nats/nats.go b/services/nats/pkg/server/nats/nats.go index a6550e1bc..e13d7f8be 100644 --- a/services/nats/pkg/server/nats/nats.go +++ b/services/nats/pkg/server/nats/nats.go @@ -1,24 +1,19 @@ package nats import ( - "context" "time" nserver "github.com/nats-io/nats-server/v2/server" - "github.com/opencloud-eu/opencloud/pkg/log" - "github.com/opencloud-eu/opencloud/services/nats/pkg/logging" - "github.com/rs/zerolog" ) var NATSListenAndServeLoopTimer = 1 * time.Second type NATSServer struct { - ctx context.Context server *nserver.Server } // NatsOption configures the new NATSServer instance -func NewNATSServer(ctx context.Context, logger log.Logger, opts ...NatsOption) (*NATSServer, error) { +func NewNATSServer(logger nserver.Logger, opts ...NatsOption) (*NATSServer, error) { natsOpts := &nserver.Options{} for _, o := range opts { @@ -35,19 +30,17 @@ func NewNATSServer(ctx context.Context, logger log.Logger, opts ...NatsOption) ( return nil, err } - nLogger := logging.NewLogWrapper(logger) - server.SetLoggerV2(nLogger, logger.GetLevel() <= zerolog.DebugLevel, logger.GetLevel() <= zerolog.TraceLevel, false) + server.SetLoggerV2(logger, true, true, false) return &NATSServer{ - ctx: ctx, server: server, }, nil } // ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs func (n *NATSServer) ListenAndServe() (err error) { - go n.server.Start() - <-n.ctx.Done() + n.server.Start() // it won't block + n.server.WaitForShutdown() // block until the server is fully shutdown return nil } diff --git a/services/notifications/pkg/command/server.go b/services/notifications/pkg/command/server.go index ae14d9486..f49bfd4cc 100644 --- a/services/notifications/pkg/command/server.go +++ b/services/notifications/pkg/command/server.go @@ -3,13 +3,13 @@ package command import ( "context" "fmt" + "os/signal" "reflect" ehsvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/eventhistory/v0" "github.com/opencloud-eu/reva/v2/pkg/store" microstore "go-micro.dev/v4/store" - "github.com/oklog/run" "github.com/urfave/cli/v2" "github.com/opencloud-eu/reva/v2/pkg/events" @@ -19,6 +19,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" settingssvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/settings/v0" @@ -57,11 +58,14 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + gr := runner.NewGroup() { debugServer, err := debug.Server( debug.Logger(logger), @@ -73,10 +77,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("notifications_debug", debugServer)) } // evs defines a list of events to subscribe to @@ -139,11 +140,21 @@ func Server(cfg *config.Config) *cli.Command { cfg.Notifications.EmailTemplatePath, cfg.Notifications.DefaultLanguage, cfg.WebUIURL, cfg.Notifications.TranslationPath, cfg.Notifications.SMTP.Sender, notificationStore, historyClient, registeredEvents) - gr.Add(svc.Run, func(error) { - cancel() - }) + gr.Add(runner.New("notifications_svc", func() error { + return svc.Run() + }, func() { + svc.Close() + })) - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/notifications/pkg/service/service.go b/services/notifications/pkg/service/service.go index e3b717731..0f27191bd 100644 --- a/services/notifications/pkg/service/service.go +++ b/services/notifications/pkg/service/service.go @@ -5,11 +5,10 @@ import ( "errors" "fmt" "net/url" - "os" - "os/signal" "path" "strings" - "syscall" + "sync" + "sync/atomic" ehsvc "github.com/opencloud-eu/opencloud/protogen/gen/opencloud/services/eventhistory/v0" "go-micro.dev/v4/store" @@ -44,6 +43,7 @@ func init() { // Service should be named `Runner` type Service interface { Run() error + Close() } // NewEventsNotifier provides a new eventsNotifier @@ -62,7 +62,6 @@ func NewEventsNotifier( logger: logger, channel: channel, events: events, - signals: make(chan os.Signal, 1), gatewaySelector: gatewaySelector, valueService: valueService, serviceAccountID: serviceAccountID, @@ -76,6 +75,8 @@ func NewEventsNotifier( splitter: newIntervalSplitter(logger, valueService), userEventStore: newUserEventStore(logger, store, historyClient), registeredEvents: registeredEvents, + stopCh: make(chan struct{}, 1), + stopped: new(atomic.Bool), } } @@ -83,7 +84,6 @@ type eventsNotifier struct { logger log.Logger channel channels.Channel events <-chan events.Event - signals chan os.Signal gatewaySelector pool.Selectable[gateway.GatewayAPIClient] valueService settingssvc.ValueService emailTemplatePath string @@ -97,16 +97,27 @@ type eventsNotifier struct { splitter *intervalSplitter userEventStore *userEventStore registeredEvents map[string]events.Unmarshaller + stopCh chan struct{} + stopped *atomic.Bool } func (s eventsNotifier) Run() error { - signal.Notify(s.signals, syscall.SIGINT, syscall.SIGTERM) + var wg sync.WaitGroup + s.logger.Debug(). Msg("eventsNotifier started") +EventLoop: for { select { - case evt := <-s.events: + case evt, ok := <-s.events: + if !ok { + break EventLoop + } + // TODO: needs to be replaced with a worker pool + wg.Add(1) go func() { + defer wg.Done() + switch e := evt.Event.(type) { case events.SpaceShared: s.handleSpaceShared(e, evt.ID) @@ -124,12 +135,25 @@ func (s eventsNotifier) Run() error { s.sendGroupedEmailsJob(e, evt.ID) } }() - case <-s.signals: + + if s.stopped.Load() { + break EventLoop + } + case <-s.stopCh: s.logger.Debug(). Msg("eventsNotifier stopped") - return nil + break EventLoop } } + // wait until all the goroutines processing events have finished + wg.Wait() + return nil +} + +func (s eventsNotifier) Close() { + if s.stopped.CompareAndSwap(false, true) { + close(s.stopCh) + } } func (s eventsNotifier) render(ctx context.Context, template email.MessageTemplate, diff --git a/services/ocdav/pkg/command/server.go b/services/ocdav/pkg/command/server.go index 6201ddfa9..9b37eaeb2 100644 --- a/services/ocdav/pkg/command/server.go +++ b/services/ocdav/pkg/command/server.go @@ -3,11 +3,13 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/broker" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" + ohttp "github.com/opencloud-eu/opencloud/pkg/service/http" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/ocdav/pkg/config" @@ -34,85 +36,77 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - gr.Add(func() error { - // init reva shared config explicitly as the go-micro based ocdav does not use - // the reva runtime. But we need e.g. the shared client settings to be initialized - sc := map[string]interface{}{ - "jwt_secret": cfg.TokenManager.JWTSecret, - "gatewaysvc": cfg.Reva.Address, - "skip_user_groups_in_token": cfg.SkipUserGroupsInToken, - "grpc_client_options": cfg.Reva.GetGRPCClientConfig(), - } - if err := sharedconf.Decode(sc); err != nil { - logger.Error().Err(err).Msg("error decoding shared config for ocdav") - } - opts := []ocdav.Option{ - ocdav.Name(cfg.HTTP.Namespace + "." + cfg.Service.Name), - ocdav.Version(version.GetString()), - ocdav.Context(ctx), - ocdav.Logger(logger.Logger), - ocdav.Address(cfg.HTTP.Addr), - ocdav.AllowCredentials(cfg.HTTP.CORS.AllowCredentials), - ocdav.AllowedMethods(cfg.HTTP.CORS.AllowedMethods), - ocdav.AllowedHeaders(cfg.HTTP.CORS.AllowedHeaders), - ocdav.AllowedOrigins(cfg.HTTP.CORS.AllowedOrigins), - ocdav.FilesNamespace(cfg.FilesNamespace), - ocdav.WebdavNamespace(cfg.WebdavNamespace), - ocdav.OCMNamespace(cfg.OCMNamespace), - ocdav.AllowDepthInfinity(cfg.AllowPropfindDepthInfinity), - ocdav.SharesNamespace(cfg.SharesNamespace), - ocdav.Timeout(cfg.Timeout), - ocdav.Insecure(cfg.Insecure), - ocdav.PublicURL(cfg.PublicURL), - ocdav.Prefix(cfg.HTTP.Prefix), - ocdav.GatewaySvc(cfg.Reva.Address), - ocdav.JWTSecret(cfg.TokenManager.JWTSecret), - ocdav.ProductName(cfg.Status.ProductName), - ocdav.ProductVersion(cfg.Status.ProductVersion), - ocdav.Product(cfg.Status.Product), - ocdav.Version(cfg.Status.Version), - ocdav.VersionString(cfg.Status.VersionString), - ocdav.Edition(cfg.Status.Edition), - ocdav.MachineAuthAPIKey(cfg.MachineAuthAPIKey), - ocdav.Broker(broker.NoOp{}), - // ocdav.FavoriteManager() // FIXME needs a proper persistence implementation https://github.com/owncloud/ocis/issues/1228 - // ocdav.LockSystem(), // will default to the CS3 lock system - // ocdav.TLSConfig() // tls config for the http server - ocdav.MetricsEnabled(true), - ocdav.MetricsNamespace("opencloud"), - ocdav.Tracing("Adding these strings is a workaround for ->", "https://github.com/cs3org/reva/issues/4131"), - ocdav.WithTraceProvider(traceProvider), - ocdav.RegisterTTL(registry.GetRegisterTTL()), - ocdav.RegisterInterval(registry.GetRegisterInterval()), - ocdav.URLSigningSharedSecret(cfg.URLSigningSharedSecret), - } + gr := runner.NewGroup() - s, err := ocdav.Service(opts...) - if err != nil { - return err - } + // init reva shared config explicitly as the go-micro based ocdav does not use + // the reva runtime. But we need e.g. the shared client settings to be initialized + sc := map[string]interface{}{ + "jwt_secret": cfg.TokenManager.JWTSecret, + "gatewaysvc": cfg.Reva.Address, + "skip_user_groups_in_token": cfg.SkipUserGroupsInToken, + "grpc_client_options": cfg.Reva.GetGRPCClientConfig(), + } + if err := sharedconf.Decode(sc); err != nil { + logger.Error().Err(err).Msg("error decoding shared config for ocdav") + } + opts := []ocdav.Option{ + ocdav.Name(cfg.HTTP.Namespace + "." + cfg.Service.Name), + ocdav.Version(version.GetString()), + ocdav.Context(ctx), + ocdav.Logger(logger.Logger), + ocdav.Address(cfg.HTTP.Addr), + ocdav.AllowCredentials(cfg.HTTP.CORS.AllowCredentials), + ocdav.AllowedMethods(cfg.HTTP.CORS.AllowedMethods), + ocdav.AllowedHeaders(cfg.HTTP.CORS.AllowedHeaders), + ocdav.AllowedOrigins(cfg.HTTP.CORS.AllowedOrigins), + ocdav.FilesNamespace(cfg.FilesNamespace), + ocdav.WebdavNamespace(cfg.WebdavNamespace), + ocdav.OCMNamespace(cfg.OCMNamespace), + ocdav.AllowDepthInfinity(cfg.AllowPropfindDepthInfinity), + ocdav.SharesNamespace(cfg.SharesNamespace), + ocdav.Timeout(cfg.Timeout), + ocdav.Insecure(cfg.Insecure), + ocdav.PublicURL(cfg.PublicURL), + ocdav.Prefix(cfg.HTTP.Prefix), + ocdav.GatewaySvc(cfg.Reva.Address), + ocdav.JWTSecret(cfg.TokenManager.JWTSecret), + ocdav.ProductName(cfg.Status.ProductName), + ocdav.ProductVersion(cfg.Status.ProductVersion), + ocdav.Product(cfg.Status.Product), + ocdav.Version(cfg.Status.Version), + ocdav.VersionString(cfg.Status.VersionString), + ocdav.Edition(cfg.Status.Edition), + ocdav.MachineAuthAPIKey(cfg.MachineAuthAPIKey), + ocdav.Broker(broker.NoOp{}), + // ocdav.FavoriteManager() // FIXME needs a proper persistence implementation https://github.com/owncloud/ocis/issues/1228 + // ocdav.LockSystem(), // will default to the CS3 lock system + // ocdav.TLSConfig() // tls config for the http server + ocdav.MetricsEnabled(true), + ocdav.MetricsNamespace("ocis"), + ocdav.Tracing("Adding these strings is a workaround for ->", "https://github.com/cs3org/reva/issues/4131"), + ocdav.WithTraceProvider(traceProvider), + ocdav.RegisterTTL(registry.GetRegisterTTL()), + ocdav.RegisterInterval(registry.GetRegisterInterval()), + ocdav.URLSigningSharedSecret(cfg.URLSigningSharedSecret), + } - return s.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } + s, err := ocdav.Service(opts...) + if err != nil { + return err + } - cancel() - }) + // creating a runner for a go-micro service is a bit complex, so we'll + // wrap the go-micro service with an ocis service the same way as + // ocis-pkg/service/http is doing in order to reuse the factory. + gr.Add(runner.NewGoMicroHttpServerRunner("ocdav_http", ohttp.Service{Service: s})) debugServer, err := debug.Server( debug.Logger(logger), @@ -125,12 +119,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("ocdav_debug", debugServer)) - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/ocs/pkg/command/server.go b/services/ocs/pkg/command/server.go index dbf8c3c5d..a8df3fde1 100644 --- a/services/ocs/pkg/command/server.go +++ b/services/ocs/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -39,16 +40,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - metrics = metrics.New() - ) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + metrics := metrics.New() metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() { server, err := http.Server( http.Logger(logger), @@ -67,27 +69,11 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("ocs_http", server)) } { - server, err := debug.Server( + debugServer, err := debug.Server( debug.Logger(logger), debug.Context(ctx), debug.Config(cfg), @@ -98,13 +84,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("ocs_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/policies/pkg/command/server.go b/services/policies/pkg/command/server.go index ab671a84b..eaf49364b 100644 --- a/services/policies/pkg/command/server.go +++ b/services/policies/pkg/command/server.go @@ -3,14 +3,15 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/urfave/cli/v2" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -33,18 +34,20 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - logger = log.NewLogger( - log.Name(cfg.Service.Name), - log.Level(cfg.Log.Level), - log.Pretty(cfg.Log.Pretty), - log.Color(cfg.Log.Color), - log.File(cfg.Log.File), - ).SubloggerWithRequestID(ctx) - ) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + + logger := log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), + ).SubloggerWithRequestID(ctx) traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) if err != nil { @@ -56,6 +59,7 @@ func Server(cfg *config.Config) *cli.Command { return err } + gr := runner.NewGroup() { grpcClient, err := grpc.NewClient( append( @@ -98,9 +102,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(svc.Run, func(_ error) { - cancel() - }) + gr.Add(runner.NewGoMicroGrpcServerRunner("policies_grpc", svc)) } { @@ -116,9 +118,11 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(eventSvc.Run, func(_ error) { - cancel() - }) + gr.Add(runner.New("policies_svc", func() error { + return eventSvc.Run() + }, func() { + eventSvc.Close() + })) } { @@ -132,13 +136,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("policies_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/policies/pkg/service/event/service.go b/services/policies/pkg/service/event/service.go index 32ca2e067..1561fcfda 100644 --- a/services/policies/pkg/service/event/service.go +++ b/services/policies/pkg/service/event/service.go @@ -2,6 +2,7 @@ package eventSVC import ( "context" + "sync/atomic" "github.com/opencloud-eu/opencloud/pkg/log" "github.com/opencloud-eu/opencloud/services/policies/pkg/engine" @@ -11,23 +12,27 @@ import ( // Service defines the service handlers. type Service struct { - ctx context.Context - query string - log log.Logger - stream events.Stream - engine engine.Engine - tp trace.TracerProvider + ctx context.Context + query string + log log.Logger + stream events.Stream + engine engine.Engine + tp trace.TracerProvider + stopCh chan struct{} + stopped *atomic.Bool } // New returns a service implementation for Service. func New(ctx context.Context, stream events.Stream, logger log.Logger, tp trace.TracerProvider, engine engine.Engine, query string) (Service, error) { svc := Service{ - ctx: ctx, - log: logger, - query: query, - tp: tp, - engine: engine, - stream: stream, + ctx: ctx, + log: logger, + query: query, + tp: tp, + engine: engine, + stream: stream, + stopCh: make(chan struct{}, 1), + stopped: new(atomic.Bool), } return svc, nil @@ -40,16 +45,42 @@ func (s Service) Run() error { return err } - for e := range ch { - err := s.processEvent(e) - if err != nil { - return err +EventLoop: + for { + select { + case <-s.stopCh: + break EventLoop + case e, ok := <-ch: + if !ok { + break EventLoop + } + + err := s.processEvent(e) + if err != nil { + return err + } + + if s.stopped.Load() { + break EventLoop + } } } return nil } +// Close will make the policies service to stop processing, so the `Run` +// method can finish. +// TODO: Underlying services can't be stopped. This means that some goroutines +// will get stuck trying to push events through a channel nobody is reading +// from, so resources won't be freed and there will be memory leaks. For now, +// if the service is stopped, you should close the app soon after. +func (s Service) Close() { + if s.stopped.CompareAndSwap(false, true) { + close(s.stopCh) + } +} + func (s Service) processEvent(e events.Event) error { ctx := e.GetTraceContext(s.ctx) ctx, span := s.tp.Tracer("policies").Start(ctx, "processEvent") diff --git a/services/postprocessing/pkg/command/server.go b/services/postprocessing/pkg/command/server.go index 1cbeafacb..9a8a72d6e 100644 --- a/services/postprocessing/pkg/command/server.go +++ b/services/postprocessing/pkg/command/server.go @@ -4,12 +4,13 @@ import ( "context" "fmt" "os" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/pkg/store" "github.com/urfave/cli/v2" microstore "go-micro.dev/v4/store" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config" "github.com/opencloud-eu/opencloud/services/postprocessing/pkg/config/parser" @@ -33,18 +34,21 @@ func Server(cfg *config.Config) *cli.Command { return err }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - logger = logging.Configure(cfg.Service.Name, cfg.Log) - ctx, cancel = context.WithCancel(c.Context) - ) - defer cancel() + logger := logging.Configure(cfg.Service.Name, cfg.Log) + + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } traceProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) if err != nil { return err } + gr := runner.NewGroup() { st := store.Create( store.Store(cfg.Store.Store), @@ -59,30 +63,12 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr.Add(func() error { - err := make(chan error, 1) - select { - case <-ctx.Done(): - return nil - case err <- svc.Run(): - return <-err - } - }, func(err error) { - if err != nil { - logger.Info(). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "stream"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.New("postprocessing_svc", func() error { + return svc.Run() + }, func() { + svc.Close() + })) } { @@ -96,12 +82,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("postprocessing_debug", debugServer)) } - return gr.Run() + + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/postprocessing/pkg/service/service.go b/services/postprocessing/pkg/service/service.go index 9fb8639d1..9fb6b18d0 100644 --- a/services/postprocessing/pkg/service/service.go +++ b/services/postprocessing/pkg/service/service.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/opencloud-eu/opencloud/pkg/generators" @@ -34,6 +35,8 @@ type PostprocessingService struct { c config.Postprocessing tp trace.TracerProvider metrics *metrics.Metrics + stopCh chan struct{} + stopped atomic.Bool } var ( @@ -97,6 +100,7 @@ func NewPostprocessingService(ctx context.Context, logger log.Logger, sto store. c: cfg.Postprocessing, tp: tp, metrics: m, + stopCh: make(chan struct{}, 1), }, nil } @@ -108,26 +112,66 @@ func (pps *PostprocessingService) Run() error { wg.Add(1) go func() { defer wg.Done() - for e := range pps.events { - if err := pps.processEvent(e); err != nil { - switch { - case errors.Is(err, ErrFatal): - pps.log.Fatal().Err(err).Msg("fatal error - exiting") - case errors.Is(err, ErrEvent): - pps.log.Error().Err(err).Msg("continuing") - default: - pps.log.Fatal().Err(err).Msg("unknown error - exiting") + + EventLoop: + for { + select { + case <-pps.stopCh: + // stop requested + // TODO: we might need a way to unsubscribe from the event channel, otherwise + // we'll be leaking a goroutine in reva that will be stuck waiting for + // someone to read from the event channel. + // Note: redis implementation seems to have a timeout, so the goroutine + // will exit if there is nobody processing the events and the timeout + // is reached. The behavior is unclear with natsjs + break EventLoop + case e, ok := <-pps.events: + if !ok { + // event channel is closed, so nothing more to do + break EventLoop + } + + err := pps.processEvent(e) + if err != nil { + switch { + case errors.Is(err, ErrFatal): + pps.log.Fatal().Err(err).Msg("fatal error - exiting") + case errors.Is(err, ErrEvent): + pps.log.Error().Err(err).Msg("continuing") + default: + pps.log.Fatal().Err(err).Msg("unknown error - exiting") + } + } + + if pps.stopped.Load() { + // if stopped, don't process any more events + break EventLoop } } } }() } + wg.Wait() return nil } +// Close will make the postprocessing service to stop processing, so the `Run` +// method can finish. +// TODO: Underlying services can't be stopped. This means that some goroutines +// will get stuck trying to push events through a channel nobody is reading +// from, so resources won't be freed and there will be memory leaks. For now, +// if the service is stopped, you should close the app soon after. +func (pps *PostprocessingService) Close() { + if pps.stopped.CompareAndSwap(false, true) { + close(pps.stopCh) + } +} + func (pps *PostprocessingService) processEvent(e raw.Event) error { + pps.log.Debug().Str("Type", e.Type).Str("ID", e.ID).Msg("processing event received") + var ( next interface{} pp *postprocessing.Postprocessing diff --git a/services/proxy/pkg/command/server.go b/services/proxy/pkg/command/server.go index 14b96513e..31fa3a669 100644 --- a/services/proxy/pkg/command/server.go +++ b/services/proxy/pkg/command/server.go @@ -5,18 +5,19 @@ import ( "crypto/tls" "fmt" "net/http" + "os/signal" "time" gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" chimiddleware "github.com/go-chi/chi/v5/middleware" "github.com/justinas/alice" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" pkgmiddleware "github.com/opencloud-eu/opencloud/pkg/middleware" "github.com/opencloud-eu/opencloud/pkg/oidc" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -107,13 +108,14 @@ func Server(cfg *config.Config) *cli.Command { oidc.WithJWKSOptions(cfg.OIDC.JWKS), ) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + m := metrics.New() - - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - - defer cancel() - m.BuildInfo.WithLabelValues(version.GetString()).Set(1) rp, err := proxy.NewMultiHostReverseProxy( @@ -183,6 +185,7 @@ func Server(cfg *config.Config) *cli.Command { return fmt.Errorf("failed to initialize reverse proxy: %w", err) } + gr := runner.NewGroup() { middlewares := loadMiddlewares(logger, cfg, userInfoCache, signingKeyStore, traceProvider, *m, userProvider, publisher, gatewaySelector, serviceSelector) @@ -203,23 +206,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("proxy_http", server)) } { @@ -233,13 +220,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("proxy_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/search/pkg/command/server.go b/services/search/pkg/command/server.go index bbb2c191a..261b65b63 100644 --- a/services/search/pkg/command/server.go +++ b/services/search/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -40,13 +41,19 @@ func Server(cfg *config.Config) *cli.Command { if err != nil { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } mtrcs := metrics.New() mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() + grpcServer, teardown, err := grpc.Server( grpc.Config(cfg), grpc.Logger(logger), @@ -62,21 +69,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(grpcServer.Run, func(_ error) { - if err == nil { - logger.Info(). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroGrpcServerRunner("search_grpc", grpcServer)) debugServer, err := debug.Server( debug.Logger(logger), @@ -88,12 +81,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("search_debug", debugServer)) - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/settings/pkg/command/server.go b/services/settings/pkg/command/server.go index ce66284fd..4ae0942c6 100644 --- a/services/settings/pkg/command/server.go +++ b/services/settings/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -42,15 +43,20 @@ func Server(cfg *config.Config) *cli.Command { return err } - servers := run.Group{} - ctx, cancel := context.WithCancel(c.Context) - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } mtrcs := metrics.New() mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) handle := svc.NewDefaultLanguageService(cfg, svc.NewService(cfg, logger)) + servers := runner.NewGroup() + // prepare an HTTP server and add it to the group run. httpServer, err := http.Server( http.Name(cfg.Service.Name), @@ -67,21 +73,7 @@ func Server(cfg *config.Config) *cli.Command { Msg("Error initializing http service") return fmt.Errorf("could not initialize http service: %w", err) } - servers.Add(httpServer.Run, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + servers.Add(runner.NewGoMicroHttpServerRunner("settings_http", httpServer)) // prepare a gRPC server and add it to the group run. grpcServer := grpc.Server( @@ -93,21 +85,7 @@ func Server(cfg *config.Config) *cli.Command { grpc.ServiceHandler(handle), grpc.TraceProvider(traceProvider), ) - servers.Add(grpcServer.Run, func(_ error) { - if err == nil { - logger.Info(). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + servers.Add(runner.NewGoMicroGrpcServerRunner("settings_grpc", grpcServer)) // prepare a debug server and add it to the group run. debugServer, err := debug.Server( @@ -120,12 +98,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - servers.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + servers.Add(runner.NewGolangHttpServerRunner("settings_debug", debugServer)) - return servers.Run() + grResults := servers.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/sse/pkg/command/server.go b/services/sse/pkg/command/server.go index 27fbd576c..4579567d3 100644 --- a/services/sse/pkg/command/server.go +++ b/services/sse/pkg/command/server.go @@ -3,8 +3,8 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/urfave/cli/v2" @@ -12,6 +12,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/log" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/services/sse/pkg/config" "github.com/opencloud-eu/opencloud/services/sse/pkg/config/parser" @@ -34,24 +35,27 @@ func Server(cfg *config.Config) *cli.Command { return configlog.ReturnFatal(parser.ParseConfig(cfg)) }, Action: func(c *cli.Context) error { - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - logger = log.NewLogger( - log.Name(cfg.Service.Name), - log.Level(cfg.Log.Level), - log.Pretty(cfg.Log.Pretty), - log.Color(cfg.Log.Color), - log.File(cfg.Log.File), - ) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + + logger := log.NewLogger( + log.Name(cfg.Service.Name), + log.Level(cfg.Log.Level), + log.Pretty(cfg.Log.Pretty), + log.Color(cfg.Log.Color), + log.File(cfg.Log.File), ) - defer cancel() tracerProvider, err := tracing.GetServiceTraceProvider(cfg.Tracing, cfg.Service.Name) if err != nil { return err } + gr := runner.NewGroup() { connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus) natsStream, err := stream.NatsFromConfig(connName, true, stream.NatsConfig(cfg.Events)) @@ -71,9 +75,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(server.Run, func(_ error) { - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("sse_http", server)) } { @@ -87,13 +89,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("sse_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/thumbnails/pkg/command/server.go b/services/thumbnails/pkg/command/server.go index d434e60b0..cf1410bd8 100644 --- a/services/thumbnails/pkg/command/server.go +++ b/services/thumbnails/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -40,16 +41,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - m = metrics.New() - ) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + m := metrics.New() m.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() + service := grpc.NewService( grpc.Logger(logger), grpc.Context(ctx), @@ -61,22 +64,7 @@ func Server(cfg *config.Config) *cli.Command { grpc.TraceProvider(traceProvider), grpc.MaxConcurrentRequests(cfg.GRPC.MaxConcurrentRequests), ) - - gr.Add(service.Run, func(_ error) { - if err == nil { - logger.Info(). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "grpc"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroGrpcServerRunner("thumbnails_grpc", service)) server, err := debug.Server( debug.Logger(logger), @@ -87,11 +75,7 @@ func Server(cfg *config.Config) *cli.Command { logger.Info().Err(err).Str("transport", "debug").Msg("Failed to initialize server") return err } - - gr.Add(server.ListenAndServe, func(_ error) { - _ = server.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("thumbnails_debug", server)) httpServer, err := http.Server( http.Logger(logger), @@ -109,24 +93,17 @@ func Server(cfg *config.Config) *cli.Command { return err } + gr.Add(runner.NewGoMicroHttpServerRunner("thumbnails_http", httpServer)) - gr.Add(httpServer.Run, func(_ error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError } - - cancel() - }) - - return gr.Run() + } + return nil }, } } diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index b75e9f7ef..d99c6b312 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -3,8 +3,8 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/reva/v2/pkg/events" "github.com/opencloud-eu/reva/v2/pkg/events/stream" "github.com/opencloud-eu/reva/v2/pkg/rgrpc/todo/pool" @@ -15,6 +15,7 @@ import ( "github.com/opencloud-eu/opencloud/pkg/config/configlog" "github.com/opencloud-eu/opencloud/pkg/generators" "github.com/opencloud-eu/opencloud/pkg/registry" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -69,14 +70,16 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr := run.Group{} - ctx, cancel := context.WithCancel(c.Context) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } mtrcs := metrics.New() mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) - defer cancel() - connName := generators.GenerateConnectionName(cfg.Service.Name, generators.NTypeBus) stream, err := stream.NatsFromConfig(connName, false, stream.NatsConfig(cfg.Events)) if err != nil { @@ -111,6 +114,7 @@ func Server(cfg *config.Config) *cli.Command { vClient := settingssvc.NewValueService("eu.opencloud.api.settings", grpcClient) rClient := settingssvc.NewRoleService("eu.opencloud.api.settings", grpcClient) + gr := runner.NewGroup() { server, err := http.Server( http.Logger(logger), @@ -132,23 +136,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("userlog_http", server)) } { @@ -162,13 +150,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("userlog_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/web/pkg/command/server.go b/services/web/pkg/command/server.go index 25eeec208..9bffe86bd 100644 --- a/services/web/pkg/command/server.go +++ b/services/web/pkg/command/server.go @@ -5,9 +5,10 @@ import ( "encoding/json" "fmt" "os" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/services/web/pkg/config" "github.com/opencloud-eu/opencloud/services/web/pkg/config/parser" @@ -47,14 +48,16 @@ func Server(cfg *config.Config) *cli.Command { } } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - m = metrics.New() - ) + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } - defer cancel() + m := metrics.New() + gr := runner.NewGroup() { server, err := http.Server( http.Logger(logger), @@ -73,30 +76,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - err := server.Run() - if err != nil { - logger.Error(). - Err(err). - Str("transport", "http"). - Msg("Failed to start server") - } - return err - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("web_http", server)) } { @@ -110,13 +90,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(_ error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("web_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/webdav/pkg/command/server.go b/services/webdav/pkg/command/server.go index 51cf58ecd..48f0e0795 100644 --- a/services/webdav/pkg/command/server.go +++ b/services/webdav/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" ogrpc "github.com/opencloud-eu/opencloud/pkg/service/grpc" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" @@ -41,16 +42,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - metrics = metrics.New() - ) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + metrics := metrics.New() metrics.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() { server, err := http.Server( http.Logger(logger), @@ -69,23 +71,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("webdav_http", server)) } { @@ -100,13 +86,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(err error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("webdav_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } } diff --git a/services/webfinger/pkg/command/server.go b/services/webfinger/pkg/command/server.go index 432bc6121..1c58a4ee3 100644 --- a/services/webfinger/pkg/command/server.go +++ b/services/webfinger/pkg/command/server.go @@ -3,9 +3,10 @@ package command import ( "context" "fmt" + "os/signal" - "github.com/oklog/run" "github.com/opencloud-eu/opencloud/pkg/config/configlog" + "github.com/opencloud-eu/opencloud/pkg/runner" "github.com/opencloud-eu/opencloud/pkg/tracing" "github.com/opencloud-eu/opencloud/pkg/version" "github.com/opencloud-eu/opencloud/services/webfinger/pkg/config" @@ -35,16 +36,17 @@ func Server(cfg *config.Config) *cli.Command { return err } - var ( - gr = run.Group{} - ctx, cancel = context.WithCancel(c.Context) - m = metrics.New(metrics.Logger(logger)) - ) - - defer cancel() + var cancel context.CancelFunc + ctx := cfg.Context + if ctx == nil { + ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...) + defer cancel() + } + m := metrics.New(metrics.Logger(logger)) m.BuildInfo.WithLabelValues(version.GetString()).Set(1) + gr := runner.NewGroup() { relationProviders, err := getRelationProviders(cfg) if err != nil { @@ -82,23 +84,7 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(func() error { - return server.Run() - }, func(err error) { - if err == nil { - logger.Info(). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } else { - logger.Error().Err(err). - Str("transport", "http"). - Str("server", cfg.Service.Name). - Msg("Shutting down server") - } - - cancel() - }) + gr.Add(runner.NewGoMicroHttpServerRunner("webfinger_http", server)) } { @@ -113,13 +99,18 @@ func Server(cfg *config.Config) *cli.Command { return err } - gr.Add(debugServer.ListenAndServe, func(err error) { - _ = debugServer.Shutdown(ctx) - cancel() - }) + gr.Add(runner.NewGolangHttpServerRunner("webfinger_debug", debugServer)) } - return gr.Run() + grResults := gr.Run(ctx) + + // return the first non-nil error found in the results + for _, grResult := range grResults { + if grResult.RunnerError != nil { + return grResult.RunnerError + } + } + return nil }, } }