mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-05-12 14:30:19 -05:00
Improve Single Binary Boot time (#8320)
* remove wait times on single binary boot Signed-off-by: jkoberg <jkoberg@owncloud.com> * rework service starting Signed-off-by: jkoberg <jkoberg@owncloud.com> --------- Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
Enhancement: Improve ocis single binary start
|
||||
|
||||
Removes waiting times when starting the single binary. Improves ocis single binary boot time from 8s to 2.5s
|
||||
|
||||
https://github.com/owncloud/ocis/pull/8320
|
||||
+174
-140
@@ -13,12 +13,17 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/cs3org/reva/v2/pkg/events/stream"
|
||||
"github.com/cs3org/reva/v2/pkg/logger"
|
||||
"github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool"
|
||||
"github.com/mohae/deepcopy"
|
||||
"github.com/olekukonko/tablewriter"
|
||||
"github.com/thejerf/suture/v4"
|
||||
|
||||
ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/log"
|
||||
ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc"
|
||||
"github.com/owncloud/ocis/v2/ocis-pkg/shared"
|
||||
antivirus "github.com/owncloud/ocis/v2/services/antivirus/pkg/command"
|
||||
appProvider "github.com/owncloud/ocis/v2/services/app-provider/pkg/command"
|
||||
@@ -64,22 +69,19 @@ import (
|
||||
var (
|
||||
// runset keeps track of which services to start supervised.
|
||||
runset map[string]struct{}
|
||||
// time to wait after starting the preliminary services
|
||||
_preliminaryDelay = 6 * time.Second
|
||||
// time to wait between starting service groups (preliminary, main, delayed)
|
||||
_startDelay = 2 * time.Second
|
||||
|
||||
// wait funcs run after the service group has been started.
|
||||
_waitFuncs = []func(*ociscfg.Config) error{pingNats, pingGateway, nil, wait(time.Second), nil}
|
||||
)
|
||||
|
||||
type serviceFuncMap map[string]func(*ociscfg.Config) suture.Service
|
||||
|
||||
// Service represents a RPC service.
|
||||
type Service struct {
|
||||
Supervisor *suture.Supervisor
|
||||
Preliminary serviceFuncMap
|
||||
ServicesRegistry serviceFuncMap
|
||||
Delayed serviceFuncMap
|
||||
Additional serviceFuncMap
|
||||
Log log.Logger
|
||||
Supervisor *suture.Supervisor
|
||||
Services []serviceFuncMap
|
||||
Additional serviceFuncMap
|
||||
Log log.Logger
|
||||
|
||||
serviceToken map[string][]suture.ServiceToken
|
||||
context context.Context
|
||||
@@ -108,11 +110,9 @@ func NewService(options ...Option) (*Service, error) {
|
||||
globalCtx, cancelGlobal := context.WithCancel(context.Background())
|
||||
|
||||
s := &Service{
|
||||
Preliminary: make(serviceFuncMap),
|
||||
ServicesRegistry: make(serviceFuncMap),
|
||||
Delayed: make(serviceFuncMap),
|
||||
Additional: make(serviceFuncMap),
|
||||
Log: l,
|
||||
Services: make([]serviceFuncMap, len(_waitFuncs)),
|
||||
Additional: make(serviceFuncMap),
|
||||
Log: l,
|
||||
|
||||
serviceToken: make(map[string][]suture.ServiceToken),
|
||||
context: globalCtx,
|
||||
@@ -120,162 +120,200 @@ func NewService(options ...Option) (*Service, error) {
|
||||
cfg: opts.Config,
|
||||
}
|
||||
|
||||
// start nats first - it is used as service registry
|
||||
s.Preliminary[opts.Config.Nats.Service.Name] = NewSutureServiceBuilder(func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
// populate services
|
||||
reg := func(priority int, name string, exec func(context.Context, *ociscfg.Config) error) {
|
||||
if s.Services[priority] == nil {
|
||||
s.Services[priority] = make(serviceFuncMap)
|
||||
}
|
||||
s.Services[priority][name] = NewSutureServiceBuilder(exec)
|
||||
}
|
||||
|
||||
// nats is in priority group 0. It needs to start before all other services
|
||||
reg(0, opts.Config.Nats.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Nats.Context = ctx
|
||||
cfg.Nats.Commons = cfg.Commons
|
||||
return nats.Execute(cfg.Nats)
|
||||
})
|
||||
|
||||
// populate services
|
||||
reg := func(name string, exec func(context.Context, *ociscfg.Config) error) {
|
||||
s.ServicesRegistry[name] = NewSutureServiceBuilder(exec)
|
||||
}
|
||||
reg(opts.Config.AppProvider.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AppProvider.Context = ctx
|
||||
cfg.AppProvider.Commons = cfg.Commons
|
||||
return appProvider.Execute(cfg.AppProvider)
|
||||
})
|
||||
reg(opts.Config.AppRegistry.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AppRegistry.Context = ctx
|
||||
cfg.AppRegistry.Commons = cfg.Commons
|
||||
return appRegistry.Execute(cfg.AppRegistry)
|
||||
})
|
||||
reg(opts.Config.AuthBasic.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AuthBasic.Context = ctx
|
||||
cfg.AuthBasic.Commons = cfg.Commons
|
||||
return authbasic.Execute(cfg.AuthBasic)
|
||||
})
|
||||
reg(opts.Config.AuthMachine.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AuthMachine.Context = ctx
|
||||
cfg.AuthMachine.Commons = cfg.Commons
|
||||
return authmachine.Execute(cfg.AuthMachine)
|
||||
})
|
||||
reg(opts.Config.AuthService.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AuthService.Context = ctx
|
||||
cfg.AuthService.Commons = cfg.Commons
|
||||
return authservice.Execute(cfg.AuthService)
|
||||
})
|
||||
reg(opts.Config.Clientlog.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Clientlog.Context = ctx
|
||||
cfg.Clientlog.Commons = cfg.Commons
|
||||
return clientlog.Execute(cfg.Clientlog)
|
||||
})
|
||||
reg(opts.Config.EventHistory.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.EventHistory.Context = ctx
|
||||
cfg.EventHistory.Commons = cfg.Commons
|
||||
return eventhistory.Execute(cfg.EventHistory)
|
||||
})
|
||||
reg(opts.Config.Gateway.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
// gateway is in priority group 1. It needs to start before the reva services
|
||||
reg(1, opts.Config.Gateway.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Gateway.Context = ctx
|
||||
cfg.Gateway.Commons = cfg.Commons
|
||||
return gateway.Execute(cfg.Gateway)
|
||||
})
|
||||
reg(opts.Config.Graph.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
|
||||
// priority group 2 is empty for now
|
||||
|
||||
// most services are in priority group 3
|
||||
reg(3, opts.Config.AppProvider.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AppProvider.Context = ctx
|
||||
cfg.AppProvider.Commons = cfg.Commons
|
||||
return appProvider.Execute(cfg.AppProvider)
|
||||
})
|
||||
reg(3, opts.Config.AppRegistry.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AppRegistry.Context = ctx
|
||||
cfg.AppRegistry.Commons = cfg.Commons
|
||||
return appRegistry.Execute(cfg.AppRegistry)
|
||||
})
|
||||
reg(3, opts.Config.AuthBasic.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AuthBasic.Context = ctx
|
||||
cfg.AuthBasic.Commons = cfg.Commons
|
||||
return authbasic.Execute(cfg.AuthBasic)
|
||||
})
|
||||
reg(3, opts.Config.AuthMachine.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AuthMachine.Context = ctx
|
||||
cfg.AuthMachine.Commons = cfg.Commons
|
||||
return authmachine.Execute(cfg.AuthMachine)
|
||||
})
|
||||
reg(3, opts.Config.AuthService.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.AuthService.Context = ctx
|
||||
cfg.AuthService.Commons = cfg.Commons
|
||||
return authservice.Execute(cfg.AuthService)
|
||||
})
|
||||
reg(3, opts.Config.Clientlog.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Clientlog.Context = ctx
|
||||
cfg.Clientlog.Commons = cfg.Commons
|
||||
return clientlog.Execute(cfg.Clientlog)
|
||||
})
|
||||
reg(3, opts.Config.EventHistory.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.EventHistory.Context = ctx
|
||||
cfg.EventHistory.Commons = cfg.Commons
|
||||
return eventhistory.Execute(cfg.EventHistory)
|
||||
})
|
||||
reg(3, opts.Config.Graph.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Graph.Context = ctx
|
||||
cfg.Graph.Commons = cfg.Commons
|
||||
return graph.Execute(cfg.Graph)
|
||||
})
|
||||
reg(opts.Config.Groups.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Groups.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Groups.Context = ctx
|
||||
cfg.Groups.Commons = cfg.Commons
|
||||
return groups.Execute(cfg.Groups)
|
||||
})
|
||||
reg(opts.Config.IDM.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.IDM.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.IDM.Context = ctx
|
||||
cfg.IDM.Commons = cfg.Commons
|
||||
return idm.Execute(cfg.IDM)
|
||||
})
|
||||
reg(opts.Config.Invitations.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Invitations.Context = ctx
|
||||
cfg.Invitations.Commons = cfg.Commons
|
||||
return invitations.Execute(cfg.Invitations)
|
||||
})
|
||||
reg(opts.Config.Notifications.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Notifications.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Notifications.Context = ctx
|
||||
cfg.Notifications.Commons = cfg.Commons
|
||||
return notifications.Execute(cfg.Notifications)
|
||||
})
|
||||
reg(opts.Config.OCDav.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.OCDav.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.OCDav.Context = ctx
|
||||
cfg.OCDav.Commons = cfg.Commons
|
||||
return ocdav.Execute(cfg.OCDav)
|
||||
})
|
||||
reg(opts.Config.OCS.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.OCS.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.OCS.Context = ctx
|
||||
cfg.OCS.Commons = cfg.Commons
|
||||
return ocs.Execute(cfg.OCS)
|
||||
})
|
||||
reg(opts.Config.Postprocessing.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Postprocessing.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Postprocessing.Context = ctx
|
||||
cfg.Postprocessing.Commons = cfg.Commons
|
||||
return postprocessing.Execute(cfg.Postprocessing)
|
||||
})
|
||||
reg(opts.Config.Search.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Search.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Search.Context = ctx
|
||||
cfg.Search.Commons = cfg.Commons
|
||||
return search.Execute(cfg.Search)
|
||||
})
|
||||
reg(opts.Config.Settings.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Settings.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Settings.Context = ctx
|
||||
cfg.Settings.Commons = cfg.Commons
|
||||
return settings.Execute(cfg.Settings)
|
||||
})
|
||||
reg(opts.Config.StoragePublicLink.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.StoragePublicLink.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.StoragePublicLink.Context = ctx
|
||||
cfg.StoragePublicLink.Commons = cfg.Commons
|
||||
return storagepublic.Execute(cfg.StoragePublicLink)
|
||||
})
|
||||
reg(opts.Config.StorageShares.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.StorageShares.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.StorageShares.Context = ctx
|
||||
cfg.StorageShares.Commons = cfg.Commons
|
||||
return storageshares.Execute(cfg.StorageShares)
|
||||
})
|
||||
reg(opts.Config.StorageSystem.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.StorageSystem.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.StorageSystem.Context = ctx
|
||||
cfg.StorageSystem.Commons = cfg.Commons
|
||||
return storageSystem.Execute(cfg.StorageSystem)
|
||||
})
|
||||
reg(opts.Config.StorageUsers.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.StorageUsers.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.StorageUsers.Context = ctx
|
||||
cfg.StorageUsers.Commons = cfg.Commons
|
||||
return storageusers.Execute(cfg.StorageUsers)
|
||||
})
|
||||
reg(opts.Config.Store.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Store.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Store.Context = ctx
|
||||
cfg.Store.Commons = cfg.Commons
|
||||
return store.Execute(cfg.Store)
|
||||
})
|
||||
reg(opts.Config.Thumbnails.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Thumbnails.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Thumbnails.Context = ctx
|
||||
cfg.Thumbnails.Commons = cfg.Commons
|
||||
return thumbnails.Execute(cfg.Thumbnails)
|
||||
})
|
||||
reg(opts.Config.Userlog.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Userlog.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Userlog.Context = ctx
|
||||
cfg.Userlog.Commons = cfg.Commons
|
||||
return userlog.Execute(cfg.Userlog)
|
||||
})
|
||||
reg(opts.Config.Users.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Users.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Users.Context = ctx
|
||||
cfg.Users.Commons = cfg.Commons
|
||||
return users.Execute(cfg.Users)
|
||||
})
|
||||
reg(opts.Config.Web.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Web.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Web.Context = ctx
|
||||
cfg.Web.Commons = cfg.Commons
|
||||
return web.Execute(cfg.Web)
|
||||
})
|
||||
reg(opts.Config.WebDAV.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.WebDAV.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.WebDAV.Context = ctx
|
||||
cfg.WebDAV.Commons = cfg.Commons
|
||||
return webdav.Execute(cfg.WebDAV)
|
||||
})
|
||||
reg(opts.Config.Webfinger.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
reg(3, opts.Config.Webfinger.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Webfinger.Context = ctx
|
||||
cfg.Webfinger.Commons = cfg.Commons
|
||||
return webfinger.Execute(cfg.Webfinger)
|
||||
})
|
||||
reg(3, opts.Config.IDP.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.IDP.Context = ctx
|
||||
cfg.IDP.Commons = cfg.Commons
|
||||
return idp.Execute(cfg.IDP)
|
||||
})
|
||||
reg(3, opts.Config.Proxy.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Proxy.Context = ctx
|
||||
cfg.Proxy.Commons = cfg.Commons
|
||||
return proxy.Execute(cfg.Proxy)
|
||||
})
|
||||
reg(3, opts.Config.Sharing.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Sharing.Context = ctx
|
||||
cfg.Sharing.Commons = cfg.Commons
|
||||
return sharing.Execute(cfg.Sharing)
|
||||
})
|
||||
reg(3, opts.Config.SSE.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.SSE.Context = ctx
|
||||
cfg.SSE.Commons = cfg.Commons
|
||||
return sse.Execute(cfg.SSE)
|
||||
})
|
||||
reg(3, opts.Config.OCM.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.OCM.Context = ctx
|
||||
cfg.OCM.Commons = cfg.Commons
|
||||
return ocm.Execute(cfg.OCM)
|
||||
})
|
||||
|
||||
// out of some unknown reason ci gets angry when frontend service starts in priority group 3
|
||||
// this is not reproducible locally, it can start when nats and gateway are already running
|
||||
// FIXME: find out why
|
||||
reg(4, opts.Config.Frontend.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Frontend.Context = ctx
|
||||
cfg.Frontend.Commons = cfg.Commons
|
||||
return frontend.Execute(cfg.Frontend)
|
||||
})
|
||||
|
||||
// populate optional services
|
||||
areg := func(name string, exec func(context.Context, *ociscfg.Config) error) {
|
||||
@@ -296,40 +334,10 @@ func NewService(options ...Option) (*Service, error) {
|
||||
cfg.Policies.Commons = cfg.Commons
|
||||
return policies.Execute(cfg.Policies)
|
||||
})
|
||||
|
||||
// populate delayed services
|
||||
dreg := func(name string, exec func(context.Context, *ociscfg.Config) error) {
|
||||
s.Delayed[name] = NewSutureServiceBuilder(exec)
|
||||
}
|
||||
dreg(opts.Config.Frontend.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Frontend.Context = ctx
|
||||
cfg.Frontend.Commons = cfg.Commons
|
||||
return frontend.Execute(cfg.Frontend)
|
||||
})
|
||||
dreg(opts.Config.IDP.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.IDP.Context = ctx
|
||||
cfg.IDP.Commons = cfg.Commons
|
||||
return idp.Execute(cfg.IDP)
|
||||
})
|
||||
dreg(opts.Config.Proxy.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Proxy.Context = ctx
|
||||
cfg.Proxy.Commons = cfg.Commons
|
||||
return proxy.Execute(cfg.Proxy)
|
||||
})
|
||||
dreg(opts.Config.Sharing.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Sharing.Context = ctx
|
||||
cfg.Sharing.Commons = cfg.Commons
|
||||
return sharing.Execute(cfg.Sharing)
|
||||
})
|
||||
dreg(opts.Config.SSE.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.SSE.Context = ctx
|
||||
cfg.SSE.Commons = cfg.Commons
|
||||
return sse.Execute(cfg.SSE)
|
||||
})
|
||||
dreg(opts.Config.OCM.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.OCM.Context = ctx
|
||||
cfg.OCM.Commons = cfg.Commons
|
||||
return ocm.Execute(cfg.OCM)
|
||||
areg(opts.Config.Invitations.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
|
||||
cfg.Invitations.Context = ctx
|
||||
cfg.Invitations.Commons = cfg.Commons
|
||||
return invitations.Execute(cfg.Invitations)
|
||||
})
|
||||
|
||||
return s, nil
|
||||
@@ -378,14 +386,14 @@ func Start(o ...Option) error {
|
||||
|
||||
if err = rpc.Register(s); err != nil {
|
||||
if s != nil {
|
||||
s.Log.Fatal().Err(err)
|
||||
s.Log.Fatal().Err(err).Msg("could not register rpc service")
|
||||
}
|
||||
}
|
||||
rpc.HandleHTTP()
|
||||
|
||||
l, err := net.Listen("tcp", net.JoinHostPort(s.cfg.Runtime.Host, s.cfg.Runtime.Port))
|
||||
if err != nil {
|
||||
s.Log.Fatal().Err(err)
|
||||
s.Log.Fatal().Err(err).Msg("could not start listener")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
@@ -402,9 +410,6 @@ func Start(o ...Option) error {
|
||||
// prepare the set of services to run
|
||||
s.generateRunSet(s.cfg)
|
||||
|
||||
// schedule preliminary services first
|
||||
scheduleServiceTokens(s, s.Preliminary)
|
||||
|
||||
// there are reasons not to do this, but we have race conditions ourselves. Until we resolve them, mind the following disclaimer:
|
||||
// Calling ServeBackground will CORRECTLY start the supervisor running in a new goroutine. It is risky to directly run
|
||||
// go supervisor.Serve()
|
||||
@@ -415,19 +420,18 @@ func Start(o ...Option) error {
|
||||
// trap will block on halt channel for interruptions.
|
||||
go trap(s, halt)
|
||||
|
||||
// grace period for preliminary services to get up
|
||||
time.Sleep(_preliminaryDelay)
|
||||
|
||||
// schedule services that we are sure don't have interdependencies.
|
||||
scheduleServiceTokens(s, s.ServicesRegistry)
|
||||
for i, service := range s.Services {
|
||||
scheduleServiceTokens(s, service)
|
||||
if _waitFuncs[i] != nil {
|
||||
if err := _waitFuncs[i](s.cfg); err != nil {
|
||||
s.Log.Fatal().Err(err).Msg("wait func failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// schedule services that are optional
|
||||
scheduleServiceTokens(s, s.Additional)
|
||||
|
||||
// add services with delayed execution.
|
||||
time.Sleep(_startDelay)
|
||||
scheduleServiceTokens(s, s.Delayed)
|
||||
|
||||
return http.Serve(l, nil)
|
||||
}
|
||||
|
||||
@@ -454,16 +458,10 @@ func (s *Service) generateRunSet(cfg *ociscfg.Config) {
|
||||
return
|
||||
}
|
||||
|
||||
for name := range s.Preliminary {
|
||||
runset[name] = struct{}{}
|
||||
}
|
||||
|
||||
for name := range s.ServicesRegistry {
|
||||
runset[name] = struct{}{}
|
||||
}
|
||||
|
||||
for name := range s.Delayed {
|
||||
runset[name] = struct{}{}
|
||||
for _, service := range s.Services {
|
||||
for name := range service {
|
||||
runset[name] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// add additional services if explicitly added by config
|
||||
@@ -516,3 +514,39 @@ func trap(s *Service, halt chan os.Signal) {
|
||||
s.Log.Debug().Str("service", "runtime service").Msgf("terminating with signal: %v", s)
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// pingNats will attempt to connect to nats, blocking until a connection is established
|
||||
func pingNats(cfg *ociscfg.Config) error {
|
||||
// We need to get a natsconfig from somewhere. We can use any one.
|
||||
evcfg := cfg.Postprocessing.Postprocessing.Events
|
||||
_, err := stream.NatsFromConfig("initial", true, stream.NatsConfig(evcfg))
|
||||
return err
|
||||
}
|
||||
|
||||
func pingGateway(_ *ociscfg.Config) error {
|
||||
// init grpc connection
|
||||
_, err := ogrpc.NewClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
b := backoff.NewExponentialBackOff()
|
||||
o := func() error {
|
||||
n := b.NextBackOff()
|
||||
_, err := pool.GetGatewayServiceClient("com.owncloud.api.gateway")
|
||||
if err != nil && n > time.Second {
|
||||
logger.New().Error().Err(err).Msgf("can't connect to gateway service, retrying in %s", n)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err = backoff.Retry(o, b)
|
||||
return err
|
||||
}
|
||||
|
||||
func wait(d time.Duration) func(cfg *ociscfg.Config) error {
|
||||
return func(cfg *ociscfg.Config) error {
|
||||
time.Sleep(d)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user