diff --git a/nats/pkg/command/server.go b/nats/pkg/command/server.go index c1d671916..8a46c2125 100644 --- a/nats/pkg/command/server.go +++ b/nats/pkg/command/server.go @@ -38,14 +38,10 @@ func Server(cfg *config.Config) *cli.Command { natsServer, err := nats.NewNATSServer( ctx, logging.NewLogWrapper(logger), - []nats.NatsOption{ - nats.Host(cfg.Nats.Host), - nats.Port(cfg.Nats.Port), - nats.ClusterID(cfg.Nats.ClusterID), - }, - []nats.JetStreamOption{ - nats.JetStreamStoreDir(cfg.Nats.StoreDir), - }, + nats.Host(cfg.Nats.Host), + nats.Port(cfg.Nats.Port), + nats.ClusterID(cfg.Nats.ClusterID), + nats.StoreDir(cfg.Nats.StoreDir), ) if err != nil { return err diff --git a/nats/pkg/server/nats/nats.go b/nats/pkg/server/nats/nats.go index e7717d644..e346596fd 100644 --- a/nats/pkg/server/nats/nats.go +++ b/nats/pkg/server/nats/nats.go @@ -10,24 +10,21 @@ import ( var NATSListenAndServeLoopTimer = 1 * time.Second type NATSServer struct { - ctx context.Context - jetStreamConfig *nserver.JetStreamConfig - server *nserver.Server + ctx context.Context + server *nserver.Server } -func NewNATSServer(ctx context.Context, logger nserver.Logger, natsOpts []NatsOption, jetstreamOpts []JetStreamOption) (*NATSServer, error) { - natsOptions := &nserver.Options{} - jetStreamOptions := &nserver.JetStreamConfig{} +func NewNATSServer(ctx context.Context, logger nserver.Logger, opts ...NatsOption) (*NATSServer, error) { + natsOpts := &nserver.Options{} - for _, o := range natsOpts { - o(natsOptions) + for _, o := range opts { + o(natsOpts) } - for _, o := range jetstreamOpts { - o(jetStreamOptions) - } + // enable JetStream + natsOpts.JetStream = true - server, err := nserver.NewServer(natsOptions) + server, err := nserver.NewServer(natsOpts) if err != nil { return nil, err } @@ -35,22 +32,14 @@ func NewNATSServer(ctx context.Context, logger nserver.Logger, natsOpts []NatsOp server.SetLoggerV2(logger, true, true, false) return &NATSServer{ - ctx: ctx, - jetStreamConfig: jetStreamOptions, - server: server, + ctx: ctx, + server: server, }, nil } // ListenAndServe runs the NATSServer in a blocking way until the server is shutdown or an error occurs func (n *NATSServer) ListenAndServe() (err error) { - // start NATS first go n.server.Start() - // start NATS JetStream second - n.server.EnableJetStream(n.jetStreamConfig) - if err != nil { - return err - } - <-n.ctx.Done() return nil } diff --git a/nats/pkg/server/nats/options.go b/nats/pkg/server/nats/options.go index cceb83ec4..43816dd4f 100644 --- a/nats/pkg/server/nats/options.go +++ b/nats/pkg/server/nats/options.go @@ -28,12 +28,9 @@ func ClusterID(clusterID string) NatsOption { } } -// NatsOption configures the nats server -type JetStreamOption func(*nserver.JetStreamConfig) - -// ClusterID sets the name for the nats cluster -func JetStreamStoreDir(StoreDir string) JetStreamOption { - return func(o *nserver.JetStreamConfig) { +// StoreDir sets the folder for persistence +func StoreDir(StoreDir string) NatsOption { + return func(o *nserver.Options) { o.StoreDir = StoreDir } }