Merge pull request #1401 from opencloud-eu/dependabot/go_modules/github.com/nats-io/nats.go-1.45.0

build(deps): bump github.com/nats-io/nats.go from 1.44.0 to 1.45.0
This commit is contained in:
Ralf Haferkamp
2025-08-27 08:02:48 +02:00
committed by GitHub
14 changed files with 197 additions and 98 deletions

2
go.mod
View File

@@ -56,7 +56,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.7
github.com/nats-io/nats.go v1.44.0
github.com/nats-io/nats.go v1.45.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.0.9
github.com/onsi/ginkgo v1.16.5

4
go.sum
View File

@@ -821,8 +821,8 @@ github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.7 h1:lINWQ/Hb3cnaoHmWTjj/7WppZnaSh9C/1cD//nHCbms=
github.com/nats-io/nats-server/v2 v2.11.7/go.mod h1:DchDPVzAsAPqhqm7VLedX0L7hjnV/SYtlmsl9F8U53s=
github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M=
github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.44.0
go get github.com/nats-io/nats.go@v1.45.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest

View File

@@ -317,6 +317,10 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
return nil, resp.Error
}
if resp.Error == nil && resp.ConsumerInfo == nil {
return nil, ErrConsumerCreationResponseEmpty
}
// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(resp.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported

View File

@@ -118,6 +118,11 @@ var (
// does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}
// ErrConsumerCreationResponseEmpty is an error returned when the response from the server
// when creating a consumer is empty. This means that the state of the consumer is unknown and
// the consumer may not have been created successfully.
ErrConsumerCreationResponseEmpty JetStreamError = &jsError{message: "consumer creation response is empty"}
// ErrConsumerExists is returned when attempting to create a consumer with
// CreateConsumer but a consumer with given name already exists.
ErrConsumerExists JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerExists, Description: "consumer already exists", Code: 400}}

View File

@@ -576,27 +576,10 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream
if cancel != nil {
defer cancel()
}
ncfg := cfg
// If we have a mirror and an external domain, convert to ext.APIPrefix.
if ncfg.Mirror != nil && ncfg.Mirror.Domain != "" {
// Copy so we do not change the caller's version.
ncfg.Mirror = ncfg.Mirror.copy()
if err := ncfg.Mirror.convertDomain(); err != nil {
return nil, err
}
}
// Check sources for the same.
if len(ncfg.Sources) > 0 {
ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...)
for i, ss := range ncfg.Sources {
if ss.Domain != "" {
ncfg.Sources[i] = ss.copy()
if err := ncfg.Sources[i].convertDomain(); err != nil {
return nil, err
}
}
}
ncfg, err := convertStreamConfigDomains(cfg)
if err != nil {
return nil, err
}
req, err := json.Marshal(ncfg)
@@ -676,6 +659,36 @@ func (ss *StreamSource) copy() *StreamSource {
return &nss
}
// convertStreamConfigDomains converts domain configurations to external configurations
// in both mirror and sources of a StreamConfig. It creates a copy of the config to avoid
// modifying the caller's version.
func convertStreamConfigDomains(cfg StreamConfig) (StreamConfig, error) {
ncfg := cfg
// If we have a mirror and an external domain, convert to ext.APIPrefix.
if ncfg.Mirror != nil && ncfg.Mirror.Domain != "" {
// Copy so we do not change the caller's version.
ncfg.Mirror = ncfg.Mirror.copy()
if err := ncfg.Mirror.convertDomain(); err != nil {
return StreamConfig{}, err
}
}
// Check sources for the same.
if len(ncfg.Sources) > 0 {
ncfg.Sources = append([]*StreamSource(nil), ncfg.Sources...)
for i, ss := range ncfg.Sources {
if ss.Domain != "" {
ncfg.Sources[i] = ss.copy()
if err := ncfg.Sources[i].convertDomain(); err != nil {
return StreamConfig{}, err
}
}
}
}
return ncfg, nil
}
// UpdateStream updates an existing stream. If stream does not exist,
// ErrStreamNotFound is returned.
func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream, error) {
@@ -687,7 +700,12 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
defer cancel()
}
req, err := json.Marshal(cfg)
ncfg, err := convertStreamConfigDomains(cfg)
if err != nil {
return nil, err
}
req, err := json.Marshal(ncfg)
if err != nil {
return nil, err
}

View File

@@ -587,6 +587,21 @@ func WithExpectLastSequencePerSubject(seq uint64) PublishOpt {
}
}
// WithExpectLastSequenceForSubject sets the sequence and subject for which the
// last sequence number should be checked. If the last message on a subject
// has a different sequence number server will reject the message and publish
// will fail.
func WithExpectLastSequenceForSubject(seq uint64, subject string) PublishOpt {
return func(opts *pubOpts) error {
if subject == "" {
return fmt.Errorf("%w: subject cannot be empty", ErrInvalidOption)
}
opts.lastSubjectSeq = &seq
opts.lastSubject = subject
return nil
}
}
// WithExpectLastMsgID sets the expected message ID the last message on a stream
// should have. If the last message has a different message ID server will
// reject the message and publish will fail.

View File

@@ -254,6 +254,8 @@ type (
// LimitMarkerTTL is how long the bucket keeps markers when keys are
// removed by the TTL setting.
// It is required for per-key TTL to work and for watcher to notify
// about TTL expirations (both per key and per bucket)
LimitMarkerTTL time.Duration
}

View File

@@ -191,6 +191,12 @@ const (
// [WithExpectLastSequencePerSubject] option.
ExpectedLastSubjSeqHeader = "Nats-Expected-Last-Subject-Sequence"
// ExpectedLastSubjSeqSubjHeader contains the subject for which the
// expected last sequence number is set. This is used together with
// [ExpectedLastSubjSeqHeader] to apply optimistic concurrency control at
// subject level. Server will reject the message if it is not the case.
ExpectedLastSubjSeqSubjHeader = "Nats-Expected-Last-Subject-Sequence-Subject"
// ExpectedLastMsgIDHeader contains the expected last message ID on the
// subject and can be used to apply optimistic concurrency control at
// stream level. Server will reject the message if it is not the case.

View File

@@ -47,7 +47,8 @@ type (
lastMsgID string // Expected last msgId
stream string // Expected stream name
lastSeq *uint64 // Expected last sequence
lastSubjectSeq *uint64 // Expected last sequence per subject
lastSubjectSeq *uint64 // Expected last sequence for subject
lastSubject string // Expected subject for last sequence
ttl time.Duration // Message TTL
// Publish retries for NoResponders err.
@@ -195,6 +196,10 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis
if o.lastSubjectSeq != nil {
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.lastSubject != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject)
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.ttl > 0 {
m.Header.Set(MsgTTLHeader, o.ttl.String())
}
@@ -281,6 +286,10 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
if o.lastSubjectSeq != nil {
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.lastSubject != "" {
m.Header.Set(ExpectedLastSubjSeqSubjHeader, o.lastSubject)
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}
if o.ttl > 0 {
m.Header.Set(MsgTTLHeader, o.ttl.String())
}

View File

@@ -60,6 +60,11 @@ var (
// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}
// ErrConsumerCreationResponseEmpty is an error returned when the response from the server
// when creating a consumer is empty. This means that the state of the consumer is unknown and
// the consumer may not have been created successfully.
ErrConsumerCreationResponseEmpty JetStreamError = &jsError{message: "consumer creation response is empty"}
// ErrMsgNotFound is returned when message with provided sequence number does npt exist.
ErrMsgNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMessageNotFound, Description: "message not found", Code: 404}}

View File

@@ -553,6 +553,10 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
return nil, info.Error
}
if info.Error == nil && info.ConsumerInfo == nil {
return nil, ErrConsumerCreationResponseEmpty
}
// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported

View File

@@ -33,6 +33,7 @@ import (
"path/filepath"
"regexp"
"runtime"
"slices"
"strconv"
"strings"
"sync"
@@ -47,7 +48,7 @@ import (
// Default Constants
const (
Version = "1.44.0"
Version = "1.45.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -84,68 +85,72 @@ const (
// ACCOUNT_AUTHENTICATION_EXPIRED_ERR is for when nats server account authorization has expired.
ACCOUNT_AUTHENTICATION_EXPIRED_ERR = "account authentication expired"
// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit.
MAX_CONNECTIONS_ERR = "maximum connections exceeded"
// MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit
// MAX_ACCOUNT_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit on the account.
MAX_ACCOUNT_CONNECTIONS_ERR = `maximum account active connections exceeded`
// MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit.
MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded"
)
// Errors
var (
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrPermissionViolation = errors.New("nats: permissions violation")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded")
ErrConnectionClosed = errors.New("nats: connection closed")
ErrConnectionDraining = errors.New("nats: connection draining")
ErrDrainTimeout = errors.New("nats: draining connection timed out")
ErrConnectionReconnecting = errors.New("nats: connection reconnecting")
ErrSecureConnRequired = errors.New("nats: secure connection required")
ErrSecureConnWanted = errors.New("nats: secure connection not available")
ErrBadSubscription = errors.New("nats: invalid subscription")
ErrTypeSubscription = errors.New("nats: invalid subscription type")
ErrBadSubject = errors.New("nats: invalid subject")
ErrBadQueueName = errors.New("nats: invalid queue name")
ErrSlowConsumer = errors.New("nats: slow consumer, messages dropped")
ErrTimeout = errors.New("nats: timeout")
ErrBadTimeout = errors.New("nats: timeout invalid")
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrPermissionViolation = errors.New("nats: permissions violation")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
ErrChanArg = errors.New("nats: argument needs to be a channel type")
ErrMaxPayload = errors.New("nats: maximum payload exceeded")
ErrMaxMessages = errors.New("nats: maximum messages delivered")
ErrSyncSubRequired = errors.New("nats: illegal call on an async subscription")
ErrMultipleTLSConfigs = errors.New("nats: multiple tls.Configs not allowed")
ErrClientCertOrRootCAsRequired = errors.New("nats: at least one of certCB or rootCAsCB must be set")
ErrNoInfoReceived = errors.New("nats: protocol exception, INFO not received")
ErrReconnectBufExceeded = errors.New("nats: outbound buffer limit exceeded")
ErrInvalidConnection = errors.New("nats: invalid connection")
ErrInvalidMsg = errors.New("nats: invalid message or message nil")
ErrInvalidArg = errors.New("nats: invalid argument")
ErrInvalidContext = errors.New("nats: invalid context")
ErrNoDeadlineContext = errors.New("nats: context requires a deadline")
ErrNoEchoNotSupported = errors.New("nats: no echo option not supported by this server")
ErrClientIDNotSupported = errors.New("nats: client ID not supported by this server")
ErrUserButNoSigCB = errors.New("nats: user callback defined without a signature handler")
ErrNkeyButNoSigCB = errors.New("nats: nkey defined without a signature handler")
ErrNoUserCB = errors.New("nats: user callback not defined")
ErrNkeyAndUser = errors.New("nats: user callback and nkey defined")
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
ErrDisconnected = errors.New("nats: server is disconnected")
ErrHeadersNotSupported = errors.New("nats: headers not supported by this server")
ErrBadHeaderMsg = errors.New("nats: message could not decode headers")
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrMaxAccountConnectionsExceeded = errors.New("nats: maximum account active connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded")
)
// GetDefaultOptions returns default configuration options for the client.
@@ -1655,14 +1660,17 @@ func (o Options) Connect() (*Conn, error) {
// Create reader/writer
nc.newReaderWriter()
// Spin up the async cb dispatcher before connect so it's ready
// to handle callbacks, especially when RetryOnFailedConnect is used
// and initial connection fails.
go nc.ach.asyncCBDispatcher()
connectionEstablished, err := nc.connect()
if err != nil {
nc.ach.close()
return nil, err
}
// Spin up the async cb dispatcher on success
go nc.ach.asyncCBDispatcher()
if connectionEstablished && nc.Opts.ConnectedCB != nil {
nc.ach.push(func() { nc.Opts.ConnectedCB(nc) })
}
@@ -2549,7 +2557,7 @@ func (nc *Conn) connect() (bool, error) {
nc.setup()
nc.changeConnStatus(RECONNECTING)
nc.bw.switchToPending()
go nc.doReconnect(ErrNoServers, false)
go nc.doReconnect(err, false)
err = nil
} else {
nc.current = nil
@@ -2872,6 +2880,7 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
// Clear any errors.
nc.err = nil
// Perform appropriate callback if needed for a disconnect.
// DisconnectedErrCB has priority over deprecated DisconnectedCB
if !nc.initc {
@@ -2880,6 +2889,12 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
} else if nc.Opts.DisconnectedCB != nil {
nc.ach.push(func() { nc.Opts.DisconnectedCB(nc) })
}
} else if nc.Opts.RetryOnFailedConnect && nc.initc && err != nil {
// For initial connection failure with RetryOnFailedConnect,
// report the error via ReconnectErrCB if available
if nc.Opts.ReconnectErrCB != nil {
nc.ach.push(func() { nc.Opts.ReconnectErrCB(nc, err) })
}
}
// This is used to wait on go routines exit if we start them in the loop
@@ -3432,6 +3447,10 @@ func (nc *Conn) processMsg(data []byte) {
if sub.mch != nil {
select {
case sub.mch <- m:
// For ChanSubscribe, track delivered count here
if sub.typ == ChanSubscription {
sub.delivered++
}
default:
goto slowConsumer
}
@@ -3491,6 +3510,19 @@ func (nc *Conn) processMsg(data []byte) {
nc.checkForSequenceMismatch(m, sub, jsi)
}
// Check if we need to auto-unsubscribe for chan subscriptions
if sub.typ == ChanSubscription && sub.max > 0 && !ctrlMsg {
sub.mu.Lock()
if sub.delivered >= sub.max {
sub.mu.Unlock()
nc.mu.Lock()
nc.removeSub(sub)
nc.mu.Unlock()
} else {
sub.mu.Unlock()
}
}
return
slowConsumer:
@@ -3794,6 +3826,8 @@ func (nc *Conn) processErr(ie string) {
close = nc.processOpErr(ErrStaleConnection)
} else if e == MAX_CONNECTIONS_ERR {
close = nc.processOpErr(ErrMaxConnectionsExceeded)
} else if e == MAX_ACCOUNT_CONNECTIONS_ERR {
close = nc.processOpErr(ErrMaxAccountConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, ne))
} else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) {
@@ -4686,7 +4720,7 @@ func (s *Subscription) registerStatusChangeListener(status SubStatus, ch chan Su
// will not block. Lock should be held entering.
func (s *Subscription) sendStatusEvent(status SubStatus) {
for ch, statuses := range s.statListeners {
if !containsStatus(statuses, status) {
if !slices.Contains(statuses, status) {
continue
}
// only send event if someone's listening
@@ -4694,21 +4728,18 @@ func (s *Subscription) sendStatusEvent(status SubStatus) {
case ch <- status:
default:
}
if status == SubscriptionClosed {
}
// After sending SubscriptionClosed status to all listeners,
// close all channels and clear the map to prevent future
// sends to closed channels that could cause panics
if status == SubscriptionClosed {
for ch := range s.statListeners {
close(ch)
}
s.statListeners = nil
}
}
func containsStatus(statuses []SubStatus, status SubStatus) bool {
for _, s := range statuses {
if s == status {
return true
}
}
return false
}
// changeSubStatus changes subscription status and sends events
// to all listeners. Lock should be held entering.
func (s *Subscription) changeSubStatus(status SubStatus) {

2
vendor/modules.txt vendored
View File

@@ -961,7 +961,7 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.44.0
# github.com/nats-io/nats.go v1.45.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin