This commit is contained in:
Roman Perekhod
2025-05-28 11:28:25 +02:00
committed by Jörn Friedrich Dreyer
parent fbbcf3d833
commit d76afadd4d
2 changed files with 37 additions and 28 deletions

View File

@@ -392,8 +392,18 @@ func Start(ctx context.Context, o ...Option) error {
}
}
// register and run the runtime rpc server
s.runRPCServer()
if err = rpc.Register(s); err != nil {
if s != nil {
s.Log.Fatal().Err(err).Msg("could not register rpc service")
}
}
rpc.HandleHTTP()
l, err := net.Listen("tcp", net.JoinHostPort(s.cfg.Runtime.Host, s.cfg.Runtime.Port))
if err != nil {
s.Log.Fatal().Err(err).Msg("could not start listener")
}
srv := new(http.Server)
// prepare the set of services to run
s.generateRunSet(s.cfg)
@@ -417,8 +427,14 @@ func Start(ctx context.Context, o ...Option) error {
// schedule services that are optional
scheduleServiceTokens(s, s.Additional)
go func() {
if err = srv.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.Log.Fatal().Err(err).Msg("could not start rpc server")
}
}()
// trapShutdownCtx will block on the context-done channel for interruptions.
trapShutdownCtx(s, ctx)
trapShutdownCtx(s, srv, ctx)
return nil
}
@@ -486,32 +502,20 @@ func (s *Service) List(_ struct{}, reply *string) error {
return nil
}
// register and run the runtime rpc server
func (s *Service) runRPCServer() {
if err := rpc.Register(s); err != nil {
if s != nil {
s.Log.Fatal().Err(err).Msg("could not register rpc service")
return
}
}
rpc.HandleHTTP()
// run rpc server
srv := new(http.Server)
go func() {
l, err := net.Listen("tcp", net.JoinHostPort(s.cfg.Runtime.Host, s.cfg.Runtime.Port))
if err != nil {
s.Log.Fatal().Err(err).Msg("could not start listener")
return
}
if err = srv.Serve(l); err != nil && !errors.Is(err, http.ErrServerClosed) {
s.Log.Fatal().Err(err).Msg("could not start rpc server")
}
}()
}
func trapShutdownCtx(s *Service, ctx context.Context) {
func trapShutdownCtx(s *Service, srv *http.Server, ctx context.Context) {
<-ctx.Done()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), _defaultShutdownTimeoutDuration)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
s.Log.Error().Err(err).Msg("could not shutdown tcp listener")
return
}
s.Log.Info().Msg("tcp listener shutdown")
}()
for sName := range s.serviceToken {
for i := range s.serviceToken[sName] {