diff --git a/go.mod b/go.mod index 1bb4b56bb..22d6110b9 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/nats-io/nats-server/v2 v2.11.7 - github.com/nats-io/nats.go v1.44.0 + github.com/nats-io/nats.go v1.45.0 github.com/oklog/run v1.2.0 github.com/olekukonko/tablewriter v1.0.9 github.com/onsi/ginkgo v1.16.5 diff --git a/go.sum b/go.sum index 7f98ba7b9..809eac525 100644 --- a/go.sum +++ b/go.sum @@ -821,8 +821,8 @@ github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= github.com/nats-io/nats-server/v2 v2.11.7 h1:lINWQ/Hb3cnaoHmWTjj/7WppZnaSh9C/1cD//nHCbms= github.com/nats-io/nats-server/v2 v2.11.7/go.mod h1:DchDPVzAsAPqhqm7VLedX0L7hjnV/SYtlmsl9F8U53s= -github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M= -github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= +github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 9da1bb632..80d8ff6be 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io go get github.com/nats-io/nats.go@latest # To get a specific version: -go get github.com/nats-io/nats.go@v1.44.0 +go get github.com/nats-io/nats.go@v1.45.0 # Note that the latest major version for NATS Server is v2: go get github.com/nats-io/nats-server/v2@latest diff --git a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go index 791423160..2ceb0d5bd 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go @@ -317,6 +317,10 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu return nil, resp.Error } + if resp.Error == nil && resp.ConsumerInfo == nil { + return nil, ErrConsumerCreationResponseEmpty + } + // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo if len(cfg.FilterSubjects) != 0 && len(resp.Config.FilterSubjects) == 0 { return nil, ErrConsumerMultipleFilterSubjectsNotSupported diff --git a/vendor/github.com/nats-io/nats.go/jetstream/errors.go b/vendor/github.com/nats-io/nats.go/jetstream/errors.go index 88051af8e..c8ffe236d 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/errors.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/errors.go @@ -118,6 +118,11 @@ var ( // does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} + // ErrConsumerCreationResponseEmpty is an error returned when the response from the server + // when creating a consumer is empty. This means that the state of the consumer is unknown and + // the consumer may not have been created successfully. + ErrConsumerCreationResponseEmpty JetStreamError = &jsError{message: "consumer creation response is empty"} + // ErrConsumerExists is returned when attempting to create a consumer with // CreateConsumer but a consumer with given name already exists. ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}} diff --git a/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go b/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go index be7fec409..dd900d75e 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go @@ -576,27 +576,10 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream if cancel != nil { defer cancel() } - ncfg := cfg - // If we have a mirror and an external domain, convert to ext.APIPrefix. - if ncfg.Mirror != nil && ncfg.Mirror.Domain != "" { - // Copy so we do not change the caller's version. - ncfg.Mirror = ncfg.Mirror.copy() - if err := ncfg.Mirror.convertDomain(); err != nil { - return nil, err - } - } - // Check sources for the same. - if len(ncfg.Sources) > 0 { - ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...) - for i, ss := range ncfg.Sources { - if ss.Domain != "" { - ncfg.Sources[i] = ss.copy() - if err := ncfg.Sources[i].convertDomain(); err != nil { - return nil, err - } - } - } + ncfg, err := convertStreamConfigDomains(cfg) + if err != nil { + return nil, err } req, err := json.Marshal(ncfg) @@ -676,6 +659,36 @@ func (ss *StreamSource) copy() *StreamSource { return &nss } +// convertStreamConfigDomains converts domain configurations to external configurations +// in both mirror and sources of a StreamConfig. It creates a copy of the config to avoid +// modifying the caller's version. +func convertStreamConfigDomains(cfg StreamConfig) (StreamConfig, error) { + ncfg := cfg + // If we have a mirror and an external domain, convert to ext.APIPrefix. + if ncfg.Mirror != nil && ncfg.Mirror.Domain != "" { + // Copy so we do not change the caller's version. + ncfg.Mirror = ncfg.Mirror.copy() + if err := ncfg.Mirror.convertDomain(); err != nil { + return StreamConfig{}, err + } + } + + // Check sources for the same. + if len(ncfg.Sources) > 0 { + ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...) + for i, ss := range ncfg.Sources { + if ss.Domain != "" { + ncfg.Sources[i] = ss.copy() + if err := ncfg.Sources[i].convertDomain(); err != nil { + return StreamConfig{}, err + } + } + } + } + + return ncfg, nil +} + // UpdateStream updates an existing stream. If stream does not exist, // ErrStreamNotFound is returned. func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) { @@ -687,7 +700,12 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream defer cancel() } - req, err := json.Marshal(cfg) + ncfg, err := convertStreamConfigDomains(cfg) + if err != nil { + return nil, err + } + + req, err := json.Marshal(ncfg) if err != nil { return nil, err } diff --git a/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go b/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go index 84c0c3ba1..0fffbc71c 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go @@ -587,6 +587,21 @@ func WithExpectLastSequencePerSubject(seq uint64) PublishOpt { } } +// WithExpectLastSequenceForSubject sets the sequence and subject for which the +// last sequence number should be checked. If the last message on a subject +// has a different sequence number server will reject the message and publish +// will fail. +func WithExpectLastSequenceForSubject(seq uint64, subject string) PublishOpt { + return func(opts *pubOpts) error { + if subject == "" { + return fmt.Errorf("%w: subject cannot be empty", ErrInvalidOption) + } + opts.lastSubjectSeq = &seq + opts.lastSubject = subject + return nil + } +} + // WithExpectLastMsgID sets the expected message ID the last message on a stream // should have. If the last message has a different message ID server will // reject the message and publish will fail. diff --git a/vendor/github.com/nats-io/nats.go/jetstream/kv.go b/vendor/github.com/nats-io/nats.go/jetstream/kv.go index a06ecb922..bd57cf75a 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/kv.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/kv.go @@ -254,6 +254,8 @@ type ( // LimitMarkerTTL is how long the bucket keeps markers when keys are // removed by the TTL setting. + // It is required for per-key TTL to work and for watcher to notify + // about TTL expirations (both per key and per bucket) LimitMarkerTTL time.Duration } diff --git a/vendor/github.com/nats-io/nats.go/jetstream/message.go b/vendor/github.com/nats-io/nats.go/jetstream/message.go index 11792a43b..53c7aab81 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/message.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/message.go @@ -191,6 +191,12 @@ const ( // [WithExpectLastSequencePerSubject] option. ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence" + // ExpectedLastSubjSeqSubjHeader contains the subject for which the + // expected last sequence number is set. This is used together with + // [ExpectedLastSubjSeqHeader] to apply optimistic concurrency control at + // subject level. Server will reject the message if it is not the case. + ExpectedLastSubjSeqSubjHeader = "Nats-Expected-Last-Subject-Sequence-Subject" + // ExpectedLastMsgIDHeader contains the expected last message ID on the // subject and can be used to apply optimistic concurrency control at // stream level. Server will reject the message if it is not the case. diff --git a/vendor/github.com/nats-io/nats.go/jetstream/publish.go b/vendor/github.com/nats-io/nats.go/jetstream/publish.go index 676ae27cd..8aaba4e1f 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/publish.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/publish.go @@ -47,7 +47,8 @@ type ( lastMsgID string // Expected last msgId stream string // Expected stream name lastSeq *uint64 // Expected last sequence - lastSubjectSeq *uint64 // Expected last sequence per subject + lastSubjectSeq *uint64 // Expected last sequence for subject + lastSubject string // Expected subject for last sequence ttl time.Duration // Message TTL // Publish retries for NoResponders err. @@ -195,6 +196,10 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis if o.lastSubjectSeq != nil { m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) } + if o.lastSubject != "" { + m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject) + m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) + } if o.ttl > 0 { m.Header.Set(MsgTTLHeader, o.ttl.String()) } @@ -281,6 +286,10 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut if o.lastSubjectSeq != nil { m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) } + if o.lastSubject != "" { + m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject) + m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) + } if o.ttl > 0 { m.Header.Set(MsgTTLHeader, o.ttl.String()) } diff --git a/vendor/github.com/nats-io/nats.go/jserrors.go b/vendor/github.com/nats-io/nats.go/jserrors.go index 6d72bbcc5..0e56a7800 100644 --- a/vendor/github.com/nats-io/nats.go/jserrors.go +++ b/vendor/github.com/nats-io/nats.go/jserrors.go @@ -60,6 +60,11 @@ var ( // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} + // ErrConsumerCreationResponseEmpty is an error returned when the response from the server + // when creating a consumer is empty. This means that the state of the consumer is unknown and + // the consumer may not have been created successfully. + ErrConsumerCreationResponseEmpty JetStreamError = &jsError{message: "consumer creation response is empty"} + // ErrMsgNotFound is returned when message with provided sequence number does npt exist. ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}} diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 22bab1205..bae376e39 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -553,6 +553,10 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o return nil, info.Error } + if info.Error == nil && info.ConsumerInfo == nil { + return nil, ErrConsumerCreationResponseEmpty + } + // check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 { return nil, ErrConsumerMultipleFilterSubjectsNotSupported diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index e2d39ee82..facedb251 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -33,6 +33,7 @@ import ( "path/filepath" "regexp" "runtime" + "slices" "strconv" "strings" "sync" @@ -47,7 +48,7 @@ import ( // Default Constants const ( - Version = "1.44.0" + Version = "1.45.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -84,68 +85,72 @@ const ( // ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired. ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired" - // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit + // MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit. MAX_CONNECTIONS_ERR = "maximum connections exceeded" - // MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit + // MAX_ACCOUNT_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit on the account. + MAX_ACCOUNT_CONNECTIONS_ERR = `maximum account active connections exceeded` + + // MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit. MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded" ) // Errors var ( - ErrConnectionClosed = errors.New("nats: connection closed") - ErrConnectionDraining = errors.New("nats: connection draining") - ErrDrainTimeout = errors.New("nats: draining connection timed out") - ErrConnectionReconnecting = errors.New("nats: connection reconnecting") - ErrSecureConnRequired = errors.New("nats: secure connection required") - ErrSecureConnWanted = errors.New("nats: secure connection not available") - ErrBadSubscription = errors.New("nats: invalid subscription") - ErrTypeSubscription = errors.New("nats: invalid subscription type") - ErrBadSubject = errors.New("nats: invalid subject") - ErrBadQueueName = errors.New("nats: invalid queue name") - ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") - ErrTimeout = errors.New("nats: timeout") - ErrBadTimeout = errors.New("nats: timeout invalid") - ErrAuthorization = errors.New("nats: authorization violation") - ErrAuthExpired = errors.New("nats: authentication expired") - ErrAuthRevoked = errors.New("nats: authentication revoked") - ErrPermissionViolation = errors.New("nats: permissions violation") - ErrAccountAuthExpired = errors.New("nats: account authentication expired") - ErrNoServers = errors.New("nats: no servers available for connection") - ErrJsonParse = errors.New("nats: connect message, json parse error") - ErrChanArg = errors.New("nats: argument needs to be a channel type") - ErrMaxPayload = errors.New("nats: maximum payload exceeded") - ErrMaxMessages = errors.New("nats: maximum messages delivered") - ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") - ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") - ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set") - ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") - ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") - ErrInvalidConnection = errors.New("nats: invalid connection") - ErrInvalidMsg = errors.New("nats: invalid message or message nil") - ErrInvalidArg = errors.New("nats: invalid argument") - ErrInvalidContext = errors.New("nats: invalid context") - ErrNoDeadlineContext = errors.New("nats: context requires a deadline") - ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") - ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") - ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") - ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") - ErrNoUserCB = errors.New("nats: user callback not defined") - ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") - ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") - ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) - ErrTokenAlreadySet = errors.New("nats: token and token handler both set") - ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass") - ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") - ErrMsgNoReply = errors.New("nats: message does not have a reply") - ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") - ErrDisconnected = errors.New("nats: server is disconnected") - ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") - ErrBadHeaderMsg = errors.New("nats: message could not decode headers") - ErrNoResponders = errors.New("nats: no responders available for request") - ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") - ErrConnectionNotTLS = errors.New("nats: connection is not tls") - ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded") + ErrConnectionClosed = errors.New("nats: connection closed") + ErrConnectionDraining = errors.New("nats: connection draining") + ErrDrainTimeout = errors.New("nats: draining connection timed out") + ErrConnectionReconnecting = errors.New("nats: connection reconnecting") + ErrSecureConnRequired = errors.New("nats: secure connection required") + ErrSecureConnWanted = errors.New("nats: secure connection not available") + ErrBadSubscription = errors.New("nats: invalid subscription") + ErrTypeSubscription = errors.New("nats: invalid subscription type") + ErrBadSubject = errors.New("nats: invalid subject") + ErrBadQueueName = errors.New("nats: invalid queue name") + ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped") + ErrTimeout = errors.New("nats: timeout") + ErrBadTimeout = errors.New("nats: timeout invalid") + ErrAuthorization = errors.New("nats: authorization violation") + ErrAuthExpired = errors.New("nats: authentication expired") + ErrAuthRevoked = errors.New("nats: authentication revoked") + ErrPermissionViolation = errors.New("nats: permissions violation") + ErrAccountAuthExpired = errors.New("nats: account authentication expired") + ErrNoServers = errors.New("nats: no servers available for connection") + ErrJsonParse = errors.New("nats: connect message, json parse error") + ErrChanArg = errors.New("nats: argument needs to be a channel type") + ErrMaxPayload = errors.New("nats: maximum payload exceeded") + ErrMaxMessages = errors.New("nats: maximum messages delivered") + ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription") + ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed") + ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set") + ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received") + ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded") + ErrInvalidConnection = errors.New("nats: invalid connection") + ErrInvalidMsg = errors.New("nats: invalid message or message nil") + ErrInvalidArg = errors.New("nats: invalid argument") + ErrInvalidContext = errors.New("nats: invalid context") + ErrNoDeadlineContext = errors.New("nats: context requires a deadline") + ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server") + ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server") + ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler") + ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler") + ErrNoUserCB = errors.New("nats: user callback not defined") + ErrNkeyAndUser = errors.New("nats: user callback and nkey defined") + ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server") + ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION) + ErrTokenAlreadySet = errors.New("nats: token and token handler both set") + ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass") + ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection") + ErrMsgNoReply = errors.New("nats: message does not have a reply") + ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server") + ErrDisconnected = errors.New("nats: server is disconnected") + ErrHeadersNotSupported = errors.New("nats: headers not supported by this server") + ErrBadHeaderMsg = errors.New("nats: message could not decode headers") + ErrNoResponders = errors.New("nats: no responders available for request") + ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded") + ErrMaxAccountConnectionsExceeded = errors.New("nats: maximum account active connections exceeded") + ErrConnectionNotTLS = errors.New("nats: connection is not tls") + ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded") ) // GetDefaultOptions returns default configuration options for the client. @@ -1655,14 +1660,17 @@ func (o Options) Connect() (*Conn, error) { // Create reader/writer nc.newReaderWriter() + // Spin up the async cb dispatcher before connect so it's ready + // to handle callbacks, especially when RetryOnFailedConnect is used + // and initial connection fails. + go nc.ach.asyncCBDispatcher() + connectionEstablished, err := nc.connect() if err != nil { + nc.ach.close() return nil, err } - // Spin up the async cb dispatcher on success - go nc.ach.asyncCBDispatcher() - if connectionEstablished && nc.Opts.ConnectedCB != nil { nc.ach.push(func() { nc.Opts.ConnectedCB(nc) }) } @@ -2549,7 +2557,7 @@ func (nc *Conn) connect() (bool, error) { nc.setup() nc.changeConnStatus(RECONNECTING) nc.bw.switchToPending() - go nc.doReconnect(ErrNoServers, false) + go nc.doReconnect(err, false) err = nil } else { nc.current = nil @@ -2872,6 +2880,7 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) { // Clear any errors. nc.err = nil + // Perform appropriate callback if needed for a disconnect. // DisconnectedErrCB has priority over deprecated DisconnectedCB if !nc.initc { @@ -2880,6 +2889,12 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) { } else if nc.Opts.DisconnectedCB != nil { nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) }) } + } else if nc.Opts.RetryOnFailedConnect && nc.initc && err != nil { + // For initial connection failure with RetryOnFailedConnect, + // report the error via ReconnectErrCB if available + if nc.Opts.ReconnectErrCB != nil { + nc.ach.push(func() { nc.Opts.ReconnectErrCB(nc, err) }) + } } // This is used to wait on go routines exit if we start them in the loop @@ -3432,6 +3447,10 @@ func (nc *Conn) processMsg(data []byte) { if sub.mch != nil { select { case sub.mch <- m: + // For ChanSubscribe, track delivered count here + if sub.typ == ChanSubscription { + sub.delivered++ + } default: goto slowConsumer } @@ -3491,6 +3510,19 @@ func (nc *Conn) processMsg(data []byte) { nc.checkForSequenceMismatch(m, sub, jsi) } + // Check if we need to auto-unsubscribe for chan subscriptions + if sub.typ == ChanSubscription && sub.max > 0 && !ctrlMsg { + sub.mu.Lock() + if sub.delivered >= sub.max { + sub.mu.Unlock() + nc.mu.Lock() + nc.removeSub(sub) + nc.mu.Unlock() + } else { + sub.mu.Unlock() + } + } + return slowConsumer: @@ -3794,6 +3826,8 @@ func (nc *Conn) processErr(ie string) { close = nc.processOpErr(ErrStaleConnection) } else if e == MAX_CONNECTIONS_ERR { close = nc.processOpErr(ErrMaxConnectionsExceeded) + } else if e == MAX_ACCOUNT_CONNECTIONS_ERR { + close = nc.processOpErr(ErrMaxAccountConnectionsExceeded) } else if strings.HasPrefix(e, PERMISSIONS_ERR) { nc.processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, ne)) } else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) { @@ -4686,7 +4720,7 @@ func (s *Subscription) registerStatusChangeListener(status SubStatus, ch chan Su // will not block. Lock should be held entering. func (s *Subscription) sendStatusEvent(status SubStatus) { for ch, statuses := range s.statListeners { - if !containsStatus(statuses, status) { + if !slices.Contains(statuses, status) { continue } // only send event if someone's listening @@ -4694,21 +4728,18 @@ func (s *Subscription) sendStatusEvent(status SubStatus) { case ch <- status: default: } - if status == SubscriptionClosed { + } + // After sending SubscriptionClosed status to all listeners, + // close all channels and clear the map to prevent future + // sends to closed channels that could cause panics + if status == SubscriptionClosed { + for ch := range s.statListeners { close(ch) } + s.statListeners = nil } } -func containsStatus(statuses []SubStatus, status SubStatus) bool { - for _, s := range statuses { - if s == status { - return true - } - } - return false -} - // changeSubStatus changes subscription status and sends events // to all listeners. Lock should be held entering. func (s *Subscription) changeSubStatus(status SubStatus) { diff --git a/vendor/modules.txt b/vendor/modules.txt index 2ee0e8c21..292e71773 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -961,7 +961,7 @@ github.com/nats-io/nats-server/v2/server/stree github.com/nats-io/nats-server/v2/server/sysmem github.com/nats-io/nats-server/v2/server/thw github.com/nats-io/nats-server/v2/server/tpm -# github.com/nats-io/nats.go v1.44.0 +# github.com/nats-io/nats.go v1.45.0 ## explicit; go 1.23.0 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin