let nats start jetstream by itself

This commit is contained in:
Willy Kloucek
2022-03-24 08:10:07 +01:00
parent 09021c02a1
commit e91cf64a7b
3 changed files with 18 additions and 36 deletions

View File

@@ -38,14 +38,10 @@ func Server(cfg *config.Config) *cli.Command {
natsServer, err := nats.NewNATSServer(
ctx,
logging.NewLogWrapper(logger),
[]nats.NatsOption{
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
nats.ClusterID(cfg.Nats.ClusterID),
},
[]nats.JetStreamOption{
nats.JetStreamStoreDir(cfg.Nats.StoreDir),
},
nats.Host(cfg.Nats.Host),
nats.Port(cfg.Nats.Port),
nats.ClusterID(cfg.Nats.ClusterID),
nats.StoreDir(cfg.Nats.StoreDir),
)
if err != nil {
return err

View File

@@ -10,24 +10,21 @@ import (
var NATSListenAndServeLoopTimer = 1 * time.Second
type NATSServer struct {
ctx context.Context
jetStreamConfig *nserver.JetStreamConfig
server *nserver.Server
ctx context.Context
server *nserver.Server
}
func NewNATSServer(ctx context.Context, logger nserver.Logger, natsOpts []NatsOption, jetstreamOpts []JetStreamOption) (*NATSServer, error) {
natsOptions := &nserver.Options{}
jetStreamOptions := &nserver.JetStreamConfig{}
func NewNATSServer(ctx context.Context, logger nserver.Logger, opts ...NatsOption) (*NATSServer, error) {
natsOpts := &nserver.Options{}
for _, o := range natsOpts {
o(natsOptions)
for _, o := range opts {
o(natsOpts)
}
for _, o := range jetstreamOpts {
o(jetStreamOptions)
}
// enable JetStream
natsOpts.JetStream = true
server, err := nserver.NewServer(natsOptions)
server, err := nserver.NewServer(natsOpts)
if err != nil {
return nil, err
}
@@ -35,22 +32,14 @@ func NewNATSServer(ctx context.Context, logger nserver.Logger, natsOpts []NatsOp
server.SetLoggerV2(logger, true, true, false)
return &NATSServer{
ctx: ctx,
jetStreamConfig: jetStreamOptions,
server: server,
ctx: ctx,
server: server,
}, nil
}
// ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs
func (n *NATSServer) ListenAndServe() (err error) {
// start NATS first
go n.server.Start()
// start NATS JetStream second
n.server.EnableJetStream(n.jetStreamConfig)
if err != nil {
return err
}
<-n.ctx.Done()
return nil
}

View File

@@ -28,12 +28,9 @@ func ClusterID(clusterID string) NatsOption {
}
}
// NatsOption configures the nats server
type JetStreamOption func(*nserver.JetStreamConfig)
// ClusterID sets the name for the nats cluster
func JetStreamStoreDir(StoreDir string) JetStreamOption {
return func(o *nserver.JetStreamConfig) {
// StoreDir sets the folder for persistence
func StoreDir(StoreDir string) NatsOption {
return func(o *nserver.Options) {
o.StoreDir = StoreDir
}
}