From d76afadd4d47b7ec62ad07436363d7933e341205 Mon Sep 17 00:00:00 2001 From: Roman Perekhod Date: Wed, 28 May 2025 11:28:25 +0200 Subject: [PATCH] clean up --- opencloud/pkg/runtime/service/service.go | 58 +++++++++++--------- services/storage-users/pkg/command/server.go | 7 ++- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/opencloud/pkg/runtime/service/service.go b/opencloud/pkg/runtime/service/service.go index 7683c0262..1310f11b7 100644 --- a/opencloud/pkg/runtime/service/service.go +++ b/opencloud/pkg/runtime/service/service.go @@ -392,8 +392,18 @@ func Start(ctx context.Context, o ...Option) error { } } - // register and run the runtime rpc server - s.runRPCServer() + if err = rpc.Register(s); err != nil { + if s != nil { + s.Log.Fatal().Err(err).Msg("could not register rpc service") + } + } + rpc.HandleHTTP() + + l, err := net.Listen("tcp", net.JoinHostPort(s.cfg.Runtime.Host, s.cfg.Runtime.Port)) + if err != nil { + s.Log.Fatal().Err(err).Msg("could not start listener") + } + srv := new(http.Server) // prepare the set of services to run s.generateRunSet(s.cfg) @@ -417,8 +427,14 @@ func Start(ctx context.Context, o ...Option) error { // schedule services that are optional scheduleServiceTokens(s, s.Additional) + go func() { + if err = srv.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) { + s.Log.Fatal().Err(err).Msg("could not start rpc server") + } + }() + // trapShutdownCtx will block on the context-done channel for interruptions. - trapShutdownCtx(s, ctx) + trapShutdownCtx(s, srv, ctx) return nil } @@ -486,32 +502,20 @@ func (s *Service) List(_ struct{}, reply *string) error { return nil } -// register and run the runtime rpc server -func (s *Service) runRPCServer() { - if err := rpc.Register(s); err != nil { - if s != nil { - s.Log.Fatal().Err(err).Msg("could not register rpc service") - return - } - } - rpc.HandleHTTP() - // run rpc server - srv := new(http.Server) - go func() { - l, err := net.Listen("tcp", net.JoinHostPort(s.cfg.Runtime.Host, s.cfg.Runtime.Port)) - if err != nil { - s.Log.Fatal().Err(err).Msg("could not start listener") - return - } - if err = srv.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) { - s.Log.Fatal().Err(err).Msg("could not start rpc server") - } - }() -} - -func trapShutdownCtx(s *Service, ctx context.Context) { +func trapShutdownCtx(s *Service, srv *http.Server, ctx context.Context) { <-ctx.Done() wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), _defaultShutdownTimeoutDuration) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + s.Log.Error().Err(err).Msg("could not shutdown tcp listener") + return + } + s.Log.Info().Msg("tcp listener shutdown") + }() for sName := range s.serviceToken { for i := range s.serviceToken[sName] { diff --git a/services/storage-users/pkg/command/server.go b/services/storage-users/pkg/command/server.go index c029c04b7..a374757ac 100644 --- a/services/storage-users/pkg/command/server.go +++ b/services/storage-users/pkg/command/server.go @@ -100,7 +100,12 @@ func Server(cfg *config.Config) *cli.Command { logger.Fatal().Err(err).Msg("can't create event handler") } // The event service Run() function handles the stop signal itself - go eventSVC.Run() + go func() { + err := eventSVC.Run() + if err != nil { + logger.Fatal().Err(err).Msg("can't run event server") + } + }() } grResults := gr.Run(ctx)