feat: fix the graceful shutdown using the new ocis and reva runners

Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
This commit is contained in:
Roman Perekhod
2025-05-12 23:15:07 +02:00
committed by Jörn Friedrich Dreyer
parent 7727c3ff1b
commit 65d05bbd5c
27 changed files with 935 additions and 838 deletions

View File

@@ -26,6 +26,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
"time"
@@ -88,13 +89,18 @@ func NewWatcher(opts ...Option) *Watcher {
// Exit exits the current process cleaning up
// existing pid files.
func (w *Watcher) Exit(errc int) {
w.Clean()
os.Exit(errc)
}
// Clean removes the pid file.
func (w *Watcher) Clean() {
err := w.clean()
if err != nil {
w.log.Warn().Err(err).Msg("error removing pid file")
} else {
w.log.Info().Msgf("pid file %q got removed", w.pidFile)
}
os.Exit(errc)
}
func (w *Watcher) clean() error {
@@ -266,7 +272,7 @@ type Server interface {
// TrapSignals captures the OS signal.
func (w *Watcher) TrapSignals() {
signalCh := make(chan os.Signal, 1024)
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT)
signal.Notify(signalCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
for {
s := <-signalCh
w.log.Info().Msgf("%v signal received", s)
@@ -284,14 +290,9 @@ func (w *Watcher) TrapSignals() {
w.log.Info().Msgf("child forked with new pid %d", p.Pid)
w.childPIDs = append(w.childPIDs, p.Pid)
}
case syscall.SIGQUIT:
gracefulShutdown(w)
case syscall.SIGINT, syscall.SIGTERM:
if w.gracefulShutdownTimeout == 0 {
hardShutdown(w)
}
case syscall.SIGQUIT, syscall.SIGINT, syscall.SIGTERM:
gracefulShutdown(w)
return
}
}
}
@@ -300,38 +301,43 @@ func (w *Watcher) TrapSignals() {
// exit() is problematic (i.e. racey) especiaily when orchestrating multiple
// reva services from some external runtime (like in the "opencloud server" case
func gracefulShutdown(w *Watcher) {
defer w.Clean()
w.log.Info().Int("Timeout", w.gracefulShutdownTimeout).Msg("preparing for a graceful shutdown with deadline")
wg := sync.WaitGroup{}
for _, s := range w.ss {
wg.Add(1)
go func() {
defer wg.Done()
w.log.Info().Msgf("fd to %s:%s gracefully closed", s.Network(), s.Address())
err := s.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
}()
}
done := make(chan struct{})
go func() {
count := w.gracefulShutdownTimeout
ticker := time.NewTicker(time.Second)
for ; true; <-ticker.C {
w.log.Info().Msgf("shutting down in %d seconds", count-1)
count--
if count <= 0 {
w.log.Info().Msg("deadline reached before draining active conns, hard stopping ...")
for _, s := range w.ss {
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
}
w.Exit(1)
wg.Wait()
close(done)
}()
select {
case <-time.After(time.Duration(w.gracefulShutdownTimeout) * time.Second):
w.log.Info().Msg("graceful shutdown timeout reached. running hard shutdown")
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
}
}()
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s gracefully closed ", s.Network(), s.Address())
err := s.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
w.log.Info().Msg("exit with error code 1")
w.Exit(1)
}
return
case <-done:
w.log.Info().Msg("all servers gracefully stopped")
return
}
w.log.Info().Msg("exit with error code 0")
w.Exit(0)
}
// TODO: Ideally this would call exit() but properly return an error. The

View File

@@ -0,0 +1,178 @@
package runtime
import (
"errors"
"fmt"
"net"
"net/http"
"os"
"time"
"github.com/opencloud-eu/reva/v2/pkg/registry"
"github.com/rs/zerolog"
)
const (
HTTP = iota
GRPC
)
// RevaDrivenServer is an interface that defines the methods for starting and stopping reva HTTP/GRPC services.
type RevaDrivenServer interface {
Start() error
Stop() error
}
// revaServer is an interface that defines the methods for starting and stopping a reva server.
type revaServer interface {
Start(ln net.Listener) error
Stop() error
GracefulStop() error
Network() string
Address() string
}
// sever represents a generic reva server that implements the RevaDrivenServer interface.
type server struct {
srv revaServer
log *zerolog.Logger
gracefulShutdownTimeout time.Duration
protocol string
}
// NewDrivenHTTPServerWithOptions runs a revad server w/o watcher with the given config file and options.
// Use it in cases where you want to run a revad server without the need for a watcher and the os signal handling as a part of another runtime.
// Returns nil if no http server is configured in the config file.
// The GracefulShutdownTimeout set to default 20 seconds and can be overridden in the core config.
// Logging a fatal error and exit with code 1 if the http server cannot be created.
func NewDrivenHTTPServerWithOptions(mainConf map[string]interface{}, opts ...Option) RevaDrivenServer {
if !isEnabledHTTP(mainConf) {
return nil
}
options := newOptions(opts...)
if srv := newServer(HTTP, mainConf, options); srv != nil {
return srv
}
options.Logger.Fatal().Msg("nothing to do, no http enabled_services declared in config")
return nil
}
// NewDrivenGRPCServerWithOptions runs a revad server w/o watcher with the given config file and options.
// Use it in cases where you want to run a revad server without the need for a watcher and the os signal handling as a part of another runtime.
// Returns nil if no grpc server is configured in the config file.
// The GracefulShutdownTimeout set to default 20 seconds and can be overridden in the core config.
// Logging a fatal error and exit with code 1 if the grpc server cannot be created.
func NewDrivenGRPCServerWithOptions(mainConf map[string]interface{}, opts ...Option) RevaDrivenServer {
if !isEnabledGRPC(mainConf) {
return nil
}
options := newOptions(opts...)
if srv := newServer(GRPC, mainConf, options); srv != nil {
return srv
}
options.Logger.Fatal().Msg("nothing to do, no grpc enabled_services declared in config")
return nil
}
// Start starts the reva server, listening on the configured address and network.
func (s *server) Start() error {
if s.srv == nil {
err := fmt.Errorf("reva %s server not initialized", s.protocol)
s.log.Fatal().Err(err).Send()
return err
}
ln, err := net.Listen(s.srv.Network(), s.srv.Address())
if err != nil {
s.log.Fatal().Err(err).Send()
return err
}
if err = s.srv.Start(ln); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
s.log.Error().Err(err).Msgf("reva %s server error", s.protocol)
}
return err
}
return nil
}
// Stop gracefully stops the reva server, waiting for the graceful shutdown timeout.
func (s *server) Stop() error {
if s.srv == nil {
return nil
}
done := make(chan struct{})
go func() {
s.log.Info().Msgf("gracefully stopping %s:%s reva %s server", s.srv.Network(), s.srv.Address(), s.protocol)
if err := s.srv.GracefulStop(); err != nil {
s.log.Error().Err(err).Msgf("error gracefully stopping reva %s server", s.protocol)
s.srv.Stop()
}
close(done)
}()
select {
case <-time.After(s.gracefulShutdownTimeout):
s.log.Info().Msg("graceful shutdown timeout reached. running hard shutdown")
err := s.srv.Stop()
if err != nil {
s.log.Error().Err(err).Msgf("error stopping reva %s server", s.protocol)
}
return nil
case <-done:
s.log.Info().Msgf("reva %s server gracefully stopped", s.protocol)
return nil
}
}
// newServer runs a revad server w/o watcher with the given config file and options.
func newServer(protocol int, mainConf map[string]interface{}, options Options) RevaDrivenServer {
parseSharedConfOrDie(mainConf["shared"])
coreConf := parseCoreConfOrDie(mainConf["core"])
log := options.Logger
if err := registry.Init(options.Registry); err != nil {
log.Fatal().Err(err).Msg("failed to initialize registry client")
return nil
}
host, _ := os.Hostname()
log.Info().Msgf("host info: %s", host)
// Only initialize tracing if we didn't get a tracer provider.
if options.TraceProvider == nil {
log.Debug().Msg("no pre-existing tracer given, initializing tracing")
options.TraceProvider = initTracing(coreConf)
}
initCPUCount(coreConf, log)
gracefulShutdownTimeout := 20 * time.Second
if coreConf.GracefulShutdownTimeout > 0 {
gracefulShutdownTimeout = time.Duration(coreConf.GracefulShutdownTimeout) * time.Second
}
srv := &server{
log: options.Logger,
gracefulShutdownTimeout: gracefulShutdownTimeout,
}
switch protocol {
case HTTP:
s, err := getHTTPServer(mainConf["http"], options.Logger, options.TraceProvider)
if err != nil {
options.Logger.Fatal().Err(err).Msg("error creating http server")
return nil
}
srv.srv = s
srv.protocol = "http"
return srv
case GRPC:
s, err := getGRPCServer(mainConf["grpc"], options.Logger, options.TraceProvider)
if err != nil {
options.Logger.Fatal().Err(err).Msg("error creating grpc server")
return nil
}
srv.srv = s
srv.protocol = "grpc"
return srv
}
return nil
}

View File

@@ -53,7 +53,8 @@ func RunWithOptions(mainConf map[string]interface{}, pidFile string, opts ...Opt
coreConf := parseCoreConfOrDie(mainConf["core"])
if err := registry.Init(options.Registry); err != nil {
panic(err)
options.Logger.Fatal().Err(err).Msg("failed to initialize registry client")
return
}
run(mainConf, coreConf, options.Logger, options.TraceProvider, pidFile)

View File

@@ -279,15 +279,15 @@ func (s *Server) cleanupServices() {
// Stop stops the server.
func (s *Server) Stop() error {
s.cleanupServices()
s.s.Stop()
s.cleanupServices()
return nil
}
// GracefulStop gracefully stops the server.
func (s *Server) GracefulStop() error {
s.cleanupServices()
s.s.GracefulStop()
s.cleanupServices()
return nil
}

View File

@@ -132,10 +132,10 @@ func (s *Server) Start(ln net.Listener) error {
// Stop stops the server.
func (s *Server) Stop() error {
s.closeServices()
// TODO(labkode): set ctx deadline to zero
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
defer s.closeServices()
return s.httpServer.Shutdown(ctx)
}
@@ -164,7 +164,7 @@ func (s *Server) Address() string {
// GracefulStop gracefully stops the server.
func (s *Server) GracefulStop() error {
s.closeServices()
defer s.closeServices()
return s.httpServer.Shutdown(context.Background())
}