mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-02-08 21:18:52 -06:00
to separate controll ower the http and grpc driven services
This commit is contained in:
committed by
Jörn Friedrich Dreyer
parent
65d05bbd5c
commit
9a3fc08dd4
@@ -85,8 +85,6 @@ type Service struct {
|
||||
Log log.Logger
|
||||
|
||||
serviceToken map[string][]suture.ServiceToken
|
||||
context context.Context
|
||||
cancel context.CancelFunc
|
||||
cfg *occfg.Config
|
||||
}
|
||||
|
||||
@@ -108,16 +106,12 @@ func NewService(ctx context.Context, options ...Option) (*Service, error) {
|
||||
log.Level(opts.Config.Log.Level),
|
||||
)
|
||||
|
||||
globalCtx, cancelGlobal := context.WithCancel(ctx)
|
||||
|
||||
s := &Service{
|
||||
Services: make([]serviceFuncMap, len(_waitFuncs)),
|
||||
Additional: make(serviceFuncMap),
|
||||
Log: l,
|
||||
|
||||
serviceToken: make(map[string][]suture.ServiceToken),
|
||||
context: globalCtx,
|
||||
cancel: cancelGlobal,
|
||||
cfg: opts.Config,
|
||||
}
|
||||
|
||||
@@ -362,8 +356,11 @@ func Start(ctx context.Context, o ...Option) error {
|
||||
}
|
||||
|
||||
// cancel the context when a signal is received.
|
||||
notifyCtx, cancel := signal.NotifyContext(ctx, runner.StopSignals...)
|
||||
defer cancel()
|
||||
var cancel context.CancelFunc
|
||||
if ctx == nil {
|
||||
ctx, cancel = signal.NotifyContext(context.Background(), runner.StopSignals...)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
// tolerance controls backoff cycles from the supervisor.
|
||||
tolerance := 5
|
||||
@@ -416,12 +413,12 @@ func Start(ctx context.Context, o ...Option) error {
|
||||
// prepare the set of services to run
|
||||
s.generateRunSet(s.cfg)
|
||||
|
||||
// there are reasons not to do this, but we have race conditions ourselves. Until we resolve them, mind the following disclaimer:
|
||||
// There are reasons not to do this, but we have race conditions ourselves. Until we resolve them, mind the following disclaimer:
|
||||
// Calling ServeBackground will CORRECTLY start the supervisor running in a new goroutine. It is risky to directly run
|
||||
// go supervisor.Serve()
|
||||
// because that will briefly create a race condition as it starts up, if you try to .Add() services immediately afterward.
|
||||
// https://pkg.go.dev/github.com/thejerf/suture/v4@v4.0.0#Supervisor
|
||||
go s.Supervisor.ServeBackground(s.context) // TODO Why does Supervisor uses s.context?
|
||||
go s.Supervisor.ServeBackground(ctx)
|
||||
|
||||
for i, service := range s.Services {
|
||||
scheduleServiceTokens(s, service)
|
||||
@@ -442,7 +439,7 @@ func Start(ctx context.Context, o ...Option) error {
|
||||
}()
|
||||
|
||||
// trapShutdownCtx will block on the context-done channel for interruptions.
|
||||
trapShutdownCtx(s, srv, notifyCtx)
|
||||
trapShutdownCtx(s, srv, ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -516,8 +513,7 @@ func trapShutdownCtx(s *Service, srv *http.Server, ctx context.Context) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
// TODO: To discuss the default timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), runner.DefaultInterruptDuration)
|
||||
defer cancel()
|
||||
if err := srv.Shutdown(ctx); err != nil {
|
||||
s.Log.Error().Err(err).Msg("could not shutdown tcp listener")
|
||||
@@ -530,13 +526,12 @@ func trapShutdownCtx(s *Service, srv *http.Server, ctx context.Context) {
|
||||
for i := range s.serviceToken[sName] {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
s.Log.Warn().Msgf("=== RemoveAndWait for %s", sName)
|
||||
s.Log.Warn().Msgf("call supervisor RemoveAndWait for %s", sName)
|
||||
defer wg.Done()
|
||||
// TODO: To discuss the default timeout
|
||||
if err := s.Supervisor.RemoveAndWait(s.serviceToken[sName][i], 20*time.Second); err != nil && !errors.Is(err, suture.ErrSupervisorNotRunning) {
|
||||
if err := s.Supervisor.RemoveAndWait(s.serviceToken[sName][i], runner.DefaultInterruptDuration); err != nil && !errors.Is(err, suture.ErrSupervisorNotRunning) {
|
||||
s.Log.Error().Err(err).Str("service", sName).Msgf("terminating with signal: %+v", s)
|
||||
}
|
||||
s.Log.Warn().Msgf("=== Done RemoveAndWait for %s", sName)
|
||||
s.Log.Warn().Msgf("done supervisor RemoveAndWait for %s", sName)
|
||||
}()
|
||||
}
|
||||
}
|
||||
@@ -548,8 +543,7 @@ func trapShutdownCtx(s *Service, srv *http.Server, ctx context.Context) {
|
||||
}()
|
||||
|
||||
select {
|
||||
// TODO: To discuss the default timeout
|
||||
case <-time.After(30 * time.Second):
|
||||
case <-time.After(runner.DefaultGroupInterruptDuration):
|
||||
s.Log.Fatal().Msg("ocis graceful shutdown timeout reached, terminating")
|
||||
case <-done:
|
||||
s.Log.Info().Msg("all ocis services gracefully stopped")
|
||||
|
||||
Reference in New Issue
Block a user