mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-01-07 12:50:21 -06:00
check running nats server for errors
This commit is contained in:
@@ -1,16 +1,16 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/run"
|
||||
|
||||
"github.com/cs3org/reva/v2/pkg/events/server"
|
||||
"github.com/owncloud/ocis/nats/pkg/config"
|
||||
"github.com/owncloud/ocis/nats/pkg/config/parser"
|
||||
"github.com/owncloud/ocis/nats/pkg/logging"
|
||||
"github.com/owncloud/ocis/ocis-pkg/log"
|
||||
"github.com/owncloud/ocis/nats/pkg/server/nats"
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
// TODO: .Logger Option on events/server would make this import redundant
|
||||
@@ -28,65 +28,68 @@ func Server(cfg *config.Config) *cli.Command {
|
||||
},
|
||||
Action: func(c *cli.Context) error {
|
||||
logger := logging.Configure(cfg.Service.Name, cfg.Log)
|
||||
ch := make(chan os.Signal, 1)
|
||||
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
|
||||
err := server.RunNatsServer(server.Host(cfg.Nats.Host), server.Port(cfg.Nats.Port), server.StanOpts(func(o *stanServer.Options) {
|
||||
o.CustomLogger = &logWrapper{logger}
|
||||
}))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ch:
|
||||
// TODO: Should we shut down the NatsServer in a proper way here?
|
||||
// That would require a reference to the StanServer instance for being able to call
|
||||
// StanServer.Shutdown() github.com/cs3org/reva/pkg/events/server doesn't provide that
|
||||
// currently
|
||||
return nil
|
||||
|
||||
gr := run.Group{}
|
||||
ctx, cancel := func() (context.Context, context.CancelFunc) {
|
||||
if cfg.Context == nil {
|
||||
return context.WithCancel(context.Background())
|
||||
}
|
||||
}
|
||||
return context.WithCancel(cfg.Context)
|
||||
}()
|
||||
|
||||
defer cancel()
|
||||
|
||||
var natsServer *stanServer.StanServer
|
||||
|
||||
gr.Add(func() error {
|
||||
var err error
|
||||
|
||||
natsServer, err = nats.RunNatsServer(
|
||||
nats.Host(cfg.Nats.Host),
|
||||
nats.Port(cfg.Nats.Port),
|
||||
nats.StanOpts(
|
||||
func(o *stanServer.Options) {
|
||||
o.CustomLogger = logging.NewLogWrapper(logger)
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
errChan := make(chan error)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
// check if NATs server has an encountered an error
|
||||
if err := natsServer.LastError(); err != nil {
|
||||
errChan <- err
|
||||
return
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return // context closed
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case err = <-errChan:
|
||||
return err
|
||||
}
|
||||
|
||||
}, func(_ error) {
|
||||
logger.Info().
|
||||
Msg("Shutting down server")
|
||||
|
||||
natsServer.Shutdown()
|
||||
cancel()
|
||||
})
|
||||
|
||||
return gr.Run()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// we need to wrap our logger so we can pass it to the nats server
|
||||
type logWrapper struct {
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// Noticef logs a notice statement
|
||||
func (l *logWrapper) Noticef(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Info().Msg(msg)
|
||||
}
|
||||
|
||||
// Warnf logs a warning statement
|
||||
func (l *logWrapper) Warnf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Warn().Msg(msg)
|
||||
}
|
||||
|
||||
// Fatalf logs a fatal statement
|
||||
func (l *logWrapper) Fatalf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Fatal().Msg(msg)
|
||||
}
|
||||
|
||||
// Errorf logs an error statement
|
||||
func (l *logWrapper) Errorf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Error().Msg(msg)
|
||||
}
|
||||
|
||||
// Debugf logs a debug statement
|
||||
func (l *logWrapper) Debugf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Debug().Msg(msg)
|
||||
}
|
||||
|
||||
// Tracef logs a trace statement
|
||||
func (l *logWrapper) Tracef(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Trace().Msg(msg)
|
||||
}
|
||||
|
||||
52
nats/pkg/logging/nats.go
Normal file
52
nats/pkg/logging/nats.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package logging
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/owncloud/ocis/ocis-pkg/log"
|
||||
)
|
||||
|
||||
func NewLogWrapper(logger log.Logger) *LogWrapper {
|
||||
return &LogWrapper{logger}
|
||||
}
|
||||
|
||||
// we need to wrap our logger so we can pass it to the nats server
|
||||
type LogWrapper struct {
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
// Noticef logs a notice statement
|
||||
func (l *LogWrapper) Noticef(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Info().Msg(msg)
|
||||
}
|
||||
|
||||
// Warnf logs a warning statement
|
||||
func (l *LogWrapper) Warnf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Warn().Msg(msg)
|
||||
}
|
||||
|
||||
// Fatalf logs a fatal statement
|
||||
func (l *LogWrapper) Fatalf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Fatal().Msg(msg)
|
||||
}
|
||||
|
||||
// Errorf logs an error statement
|
||||
func (l *LogWrapper) Errorf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Error().Msg(msg)
|
||||
}
|
||||
|
||||
// Debugf logs a debug statement
|
||||
func (l *LogWrapper) Debugf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Debug().Msg(msg)
|
||||
}
|
||||
|
||||
// Tracef logs a trace statement
|
||||
func (l *LogWrapper) Tracef(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.logger.Trace().Msg(msg)
|
||||
}
|
||||
17
nats/pkg/server/nats/nats.go
Normal file
17
nats/pkg/server/nats/nats.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
stanServer "github.com/nats-io/nats-streaming-server/server"
|
||||
)
|
||||
|
||||
// RunNatsServer runs the nats streaming server
|
||||
func RunNatsServer(opts ...Option) (*stanServer.StanServer, error) {
|
||||
natsOpts := stanServer.DefaultNatsServerOptions
|
||||
stanOpts := stanServer.GetDefaultOptions()
|
||||
|
||||
for _, o := range opts {
|
||||
o(&natsOpts, stanOpts)
|
||||
}
|
||||
s, err := stanServer.RunServerWithOpts(stanOpts, &natsOpts)
|
||||
return s, err
|
||||
}
|
||||
37
nats/pkg/server/nats/options.go
Normal file
37
nats/pkg/server/nats/options.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package nats
|
||||
|
||||
import (
|
||||
natsServer "github.com/nats-io/nats-server/v2/server"
|
||||
stanServer "github.com/nats-io/nats-streaming-server/server"
|
||||
)
|
||||
|
||||
// Option configures the nats server
|
||||
type Option func(*natsServer.Options, *stanServer.Options)
|
||||
|
||||
// Host sets the host URL for the nats server
|
||||
func Host(url string) Option {
|
||||
return func(no *natsServer.Options, _ *stanServer.Options) {
|
||||
no.Host = url
|
||||
}
|
||||
}
|
||||
|
||||
// Port sets the host URL for the nats server
|
||||
func Port(port int) Option {
|
||||
return func(no *natsServer.Options, _ *stanServer.Options) {
|
||||
no.Port = port
|
||||
}
|
||||
}
|
||||
|
||||
// NatsOpts allows setting Options from nats package directly
|
||||
func NatsOpts(opt func(*natsServer.Options)) Option {
|
||||
return func(no *natsServer.Options, _ *stanServer.Options) {
|
||||
opt(no)
|
||||
}
|
||||
}
|
||||
|
||||
// StanOpts allows setting Options from stan package directly
|
||||
func StanOpts(opt func(*stanServer.Options)) Option {
|
||||
return func(_ *natsServer.Options, so *stanServer.Options) {
|
||||
opt(so)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user