start nats before other services

Signed-off-by: jkoberg <jkoberg@owncloud.com>
This commit is contained in:
jkoberg
2023-10-17 15:32:14 +02:00
parent 73c99cce9e
commit 55013c3d0d

View File

@@ -71,6 +71,7 @@ type serviceFuncMap map[string]func(*ociscfg.Config) suture.Service
// Service represents a RPC service.
type Service struct {
Supervisor *suture.Supervisor
Preliminary serviceFuncMap
ServicesRegistry serviceFuncMap
Delayed serviceFuncMap
Additional serviceFuncMap
@@ -103,6 +104,7 @@ func NewService(options ...Option) (*Service, error) {
globalCtx, cancelGlobal := context.WithCancel(context.Background())
s := &Service{
Preliminary: make(serviceFuncMap),
ServicesRegistry: make(serviceFuncMap),
Delayed: make(serviceFuncMap),
Additional: make(serviceFuncMap),
@@ -114,6 +116,13 @@ func NewService(options ...Option) (*Service, error) {
cfg: opts.Config,
}
// start nats first - it is used as service registry
s.Preliminary[opts.Config.Nats.Service.Name] = NewSutureServiceBuilder(func(ctx context.Context, cfg *ociscfg.Config) error {
cfg.Nats.Context = ctx
cfg.Nats.Commons = cfg.Commons
return nats.Execute(cfg.Nats)
})
// populate services
reg := func(name string, exec func(context.Context, *ociscfg.Config) error) {
s.ServicesRegistry[name] = NewSutureServiceBuilder(exec)
@@ -183,11 +192,6 @@ func NewService(options ...Option) (*Service, error) {
cfg.Invitations.Commons = cfg.Commons
return invitations.Execute(cfg.Invitations)
})
reg(opts.Config.Nats.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
cfg.Nats.Context = ctx
cfg.Nats.Commons = cfg.Commons
return nats.Execute(cfg.Nats)
})
reg(opts.Config.Notifications.Service.Name, func(ctx context.Context, cfg *ociscfg.Config) error {
cfg.Notifications.Context = ctx
cfg.Notifications.Commons = cfg.Commons
@@ -394,11 +398,8 @@ func Start(o ...Option) error {
// prepare the set of services to run
s.generateRunSet(s.cfg)
// schedule services that we are sure don't have interdependencies.
scheduleServiceTokens(s, s.ServicesRegistry)
// schedule services that are optional
scheduleServiceTokens(s, s.Additional)
// schedule preliminary services first
scheduleServiceTokens(s, s.Preliminary)
// there are reasons not to do this, but we have race conditions ourselves. Until we resolve them, mind the following disclaimer:
// Calling ServeBackground will CORRECTLY start the supervisor running in a new goroutine. It is risky to directly run
@@ -410,6 +411,15 @@ func Start(o ...Option) error {
// trap will block on halt channel for interruptions.
go trap(s, halt)
// grace period for supervisor to get up
time.Sleep(time.Second)
// schedule services that we are sure don't have interdependencies.
scheduleServiceTokens(s, s.ServicesRegistry)
// schedule services that are optional
scheduleServiceTokens(s, s.Additional)
// add services with delayed execution.
time.Sleep(1 * time.Second)
scheduleServiceTokens(s, s.Delayed)
@@ -440,6 +450,10 @@ func (s *Service) generateRunSet(cfg *ociscfg.Config) {
return
}
for name := range s.Preliminary {
runset[name] = struct{}{}
}
for name := range s.ServicesRegistry {
runset[name] = struct{}{}
}