Bump reva, for 'graceful_shutdown_timeout' setting

This commit is contained in:
Ralf Haferkamp
2023-07-18 12:29:08 +02:00
parent 87a1bf7e26
commit b480fc09b0
7 changed files with 98 additions and 58 deletions
+71 -46
View File
@@ -36,13 +36,14 @@ import (
// Watcher watches a process for a graceful restart
// preserving open network sockets to avoid packet loss.
type Watcher struct {
log zerolog.Logger
graceful bool
ppid int
lns map[string]net.Listener
ss map[string]Server
pidFile string
childPIDs []int
log zerolog.Logger
graceful bool
ppid int
lns map[string]net.Listener
ss map[string]Server
pidFile string
childPIDs []int
gracefulShutdownTimeout int
}
// Option represent an option.
@@ -62,6 +63,12 @@ func WithPIDFile(fn string) Option {
}
}
func WithGracefuleShutdownTimeout(seconds int) Option {
return func(w *Watcher) {
w.gracefulShutdownTimeout = seconds
}
}
// NewWatcher creates a Watcher.
func NewWatcher(opts ...Option) *Watcher {
w := &Watcher{
@@ -279,51 +286,69 @@ func (w *Watcher) TrapSignals() {
}
case syscall.SIGQUIT:
w.log.Info().Msg("preparing for a graceful shutdown with deadline of 10 seconds")
go func() {
count := 10
ticker := time.NewTicker(time.Second)
for ; true; <-ticker.C {
w.log.Info().Msgf("shutting down in %d seconds", count-1)
count--
if count <= 0 {
w.log.Info().Msg("deadline reached before draining active conns, hard stopping ...")
for _, s := range w.ss {
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
}
w.Exit(1)
}
}
}()
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s gracefully closed ", s.Network(), s.Address())
err := s.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
w.log.Info().Msg("exit with error code 1")
w.Exit(1)
}
}
w.log.Info().Msg("exit with error code 0")
w.Exit(0)
gracefulShutdown(w)
case syscall.SIGINT, syscall.SIGTERM:
w.log.Info().Msg("preparing for hard shutdown, aborting all conns")
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
if w.gracefulShutdownTimeout == 0 {
hardShutdown(w)
}
w.Exit(0)
gracefulShutdown(w)
}
}
}
// TODO: Ideally this would call exit() but properly return an error. The
// exit() is problematic (i.e. racey) especiaily when orchestrating multiple
// reva services from some external runtime (like in the "ocis server" case
func gracefulShutdown(w *Watcher) {
w.log.Info().Int("Timeout", w.gracefulShutdownTimeout).Msg("preparing for a graceful shutdown with deadline")
go func() {
count := w.gracefulShutdownTimeout
ticker := time.NewTicker(time.Second)
for ; true; <-ticker.C {
w.log.Info().Msgf("shutting down in %d seconds", count-1)
count--
if count <= 0 {
w.log.Info().Msg("deadline reached before draining active conns, hard stopping ...")
for _, s := range w.ss {
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
}
w.Exit(1)
}
}
}()
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s gracefully closed ", s.Network(), s.Address())
err := s.GracefulStop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
w.log.Info().Msg("exit with error code 1")
w.Exit(1)
}
}
w.log.Info().Msg("exit with error code 0")
w.Exit(0)
}
// TODO: Ideally this would call exit() but properly return an error. The
// exit() is problematic (i.e. racey) especiaily when orchestrating multiple
// reva services from some external runtime (like in the "ocis server" case
func hardShutdown(w *Watcher) {
w.log.Info().Msg("preparing for hard shutdown, aborting all conns")
for _, s := range w.ss {
w.log.Info().Msgf("fd to %s:%s abruptly closed", s.Network(), s.Address())
err := s.Stop()
if err != nil {
w.log.Error().Err(err).Msg("error stopping server")
}
}
w.Exit(0)
}
func getListenerFile(ln net.Listener) (*os.File, error) {
switch t := ln.(type) {
case *net.TCPListener:
+10 -8
View File
@@ -72,6 +72,8 @@ type coreConf struct {
// TracingService specifies the service. i.e OpenCensus, OpenTelemetry, OpenTracing...
TracingService string `mapstructure:"tracing_service"`
GracefulShutdownTimeout int `mapstructure:"graceful_shutdown_timeout"`
}
func run(
@@ -92,7 +94,7 @@ func run(
initCPUCount(coreConf, logger)
servers := initServers(mainConf, logger, tp)
watcher, err := initWatcher(logger, filename)
watcher, err := initWatcher(logger, filename, coreConf.GracefulShutdownTimeout)
if err != nil {
log.Panic(err)
}
@@ -110,8 +112,8 @@ func initListeners(watcher *grace.Watcher, servers map[string]grace.Server, log
return listeners
}
func initWatcher(log *zerolog.Logger, filename string) (*grace.Watcher, error) {
watcher, err := handlePIDFlag(log, filename)
func initWatcher(log *zerolog.Logger, filename string, gracefulShutdownTimeout int) (*grace.Watcher, error) {
watcher, err := handlePIDFlag(log, filename, gracefulShutdownTimeout)
// TODO(labkode): maybe pidfile can be created later on? like once a server is going to be created?
if err != nil {
log.Error().Err(err).Msg("error creating grace watcher")
@@ -187,11 +189,11 @@ func initLogger(conf *logConf) *zerolog.Logger {
return log
}
func handlePIDFlag(l *zerolog.Logger, pidFile string) (*grace.Watcher, error) {
var opts []grace.Option
opts = append(opts, grace.WithPIDFile(pidFile))
opts = append(opts, grace.WithLogger(l.With().Str("pkg", "grace").Logger()))
w := grace.NewWatcher(opts...)
func handlePIDFlag(l *zerolog.Logger, pidFile string, gracefulShutdownTimeout int) (*grace.Watcher, error) {
w := grace.NewWatcher(grace.WithPIDFile(pidFile),
grace.WithLogger(l.With().Str("pkg", "grace").Logger()),
grace.WithGracefuleShutdownTimeout(gracefulShutdownTimeout),
)
err := w.WritePID()
if err != nil {
return nil, err
@@ -49,6 +49,16 @@ func (md Attributes) SetInt64(key string, val int64) {
md[key] = []byte(strconv.FormatInt(val, 10))
}
// UInt64 reads an uint64 value
func (md Attributes) UInt64(key string) (uint64, error) {
return strconv.ParseUint(string(md[key]), 10, 64)
}
// SetInt64 sets an uint64 value
func (md Attributes) SetUInt64(key string, val uint64) {
md[key] = []byte(strconv.FormatUint(val, 10))
}
// SetXattrs sets multiple extended attributes on the write-through cache/node
func (n *Node) SetXattrsWithContext(ctx context.Context, attribs map[string][]byte, acquireLock bool) (err error) {
if n.xattrsCache != nil {
@@ -122,6 +122,9 @@ func (fs *Decomposedfs) CreateStorageSpace(ctx context.Context, req *provider.Cr
metadata.SetString(prefixes.NameAttr, req.Name)
metadata.SetString(prefixes.SpaceNameAttr, req.Name)
// This space is empty so set initial treesize to 0
metadata.SetUInt64(prefixes.TreesizeAttr, 0)
if req.Type != "" {
metadata.SetString(prefixes.SpaceTypeAttr, req.Type)
}