From 865d4b6980dcbcca15256f88fd9599edbe91ae3c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 24 Sep 2025 14:18:32 +0000 Subject: [PATCH] build(deps): bump github.com/nats-io/nats.go from 1.45.0 to 1.46.0 Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.45.0 to 1.46.0. - [Release notes](https://github.com/nats-io/nats.go/releases) - [Commits](https://github.com/nats-io/nats.go/compare/v1.45.0...v1.46.0) --- updated-dependencies: - dependency-name: github.com/nats-io/nats.go dependency-version: 1.46.0 dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- vendor/github.com/nats-io/nats.go/.gitignore | 3 + vendor/github.com/nats-io/nats.go/README.md | 2 +- vendor/github.com/nats-io/nats.go/go_test.mod | 16 ++-- vendor/github.com/nats-io/nats.go/go_test.sum | 33 +++---- .../nats-io/nats.go/jetstream/consumer.go | 4 + .../nats.go/jetstream/consumer_config.go | 15 +++ .../nats-io/nats.go/jetstream/errors.go | 31 +++--- .../nats-io/nats.go/jetstream/jetstream.go | 6 +- .../nats.go/jetstream/jetstream_options.go | 82 ++++++++++++++++ .../nats-io/nats.go/jetstream/kv.go | 15 ++- .../nats-io/nats.go/jetstream/ordered.go | 15 ++- .../nats-io/nats.go/jetstream/publish.go | 2 + .../nats-io/nats.go/jetstream/pull.go | 96 ++++++++++++++++--- .../nats-io/nats.go/jetstream/stream.go | 8 +- .../nats.go/jetstream/stream_config.go | 76 ++++++++++++++- vendor/github.com/nats-io/nats.go/js.go | 4 + vendor/github.com/nats-io/nats.go/jserrors.go | 3 + vendor/github.com/nats-io/nats.go/jsm.go | 10 +- vendor/github.com/nats-io/nats.go/nats.go | 2 +- vendor/modules.txt | 2 +- 22 files changed, 362 insertions(+), 69 deletions(-) diff --git a/go.mod b/go.mod index 7189213087..fcb8162388 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/nats-io/nats-server/v2 v2.11.9 - github.com/nats-io/nats.go v1.45.0 + github.com/nats-io/nats.go v1.46.0 github.com/oklog/run v1.2.0 github.com/olekukonko/tablewriter v1.0.9 github.com/onsi/ginkgo v1.16.5 diff --git a/go.sum b/go.sum index 12d68fff3c..58095ee766 100644 --- a/go.sum +++ b/go.sum @@ -885,8 +885,8 @@ github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= github.com/nats-io/nats-server/v2 v2.11.9 h1:k7nzHZjUf51W1b08xiQih63Rdxh0yr5O4K892Mx5gQA= github.com/nats-io/nats-server/v2 v2.11.9/go.mod h1:1MQgsAQX1tVjpf3Yzrk3x2pzdsZiNL/TVP3Amhp3CR8= -github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA= -github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8= +github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/.gitignore b/vendor/github.com/nats-io/nats.go/.gitignore index 02cb0f397e..01716ae1b1 100644 --- a/vendor/github.com/nats-io/nats.go/.gitignore +++ b/vendor/github.com/nats-io/nats.go/.gitignore @@ -21,6 +21,9 @@ _testmain.go *.exe +# Git backup files +*.orig + # Emacs *~ \#*\# diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 80d8ff6beb..5ea5a0aee2 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io go get github.com/nats-io/nats.go@latest # To get a specific version: -go get github.com/nats-io/nats.go@v1.45.0 +go get github.com/nats-io/nats.go@v1.46.0 # Note that the latest major version for NATS Server is v2: go get github.com/nats-io/nats-server/v2@latest diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index 5d2111b246..c3b7499e04 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -1,22 +1,22 @@ module github.com/nats-io/nats.go -go 1.23.0 +go 1.24.0 require ( github.com/golang/protobuf v1.4.2 github.com/klauspost/compress v1.18.0 - github.com/nats-io/jwt v1.2.2 - github.com/nats-io/nats-server/v2 v2.11.2 + github.com/nats-io/jwt/v2 v2.8.0 + github.com/nats-io/nats-server/v2 v2.12.0 github.com/nats-io/nkeys v0.4.11 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 ) require ( - github.com/google/go-tpm v0.9.3 // indirect + github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op // indirect + github.com/google/go-tpm v0.9.5 // indirect github.com/minio/highwayhash v1.0.3 // indirect - github.com/nats-io/jwt/v2 v2.7.4 // indirect - golang.org/x/crypto v0.37.0 // indirect - golang.org/x/sys v0.32.0 // indirect - golang.org/x/time v0.11.0 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/sys v0.36.0 // indirect + golang.org/x/time v0.13.0 // indirect ) diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index 6d411a20a7..9b1b8c3283 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -12,36 +12,27 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= -github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU= +github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= -github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= -github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= -github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI= -github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= -github.com/nats-io/nats-server/v2 v2.11.2 h1:k5KBAuRpJW9qAF11Io2txNhR5m1KUmqVkalLAw2yLfk= -github.com/nats-io/nats-server/v2 v2.11.2/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY= -github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= +github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g= +github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA= +github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74= +github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww= github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= -golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= -golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI= +golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go index 2ceb0d5bd5..b7d81aafe5 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/consumer.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/consumer.go @@ -314,6 +314,10 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu if resp.Error.ErrorCode == JSErrCodeStreamNotFound { return nil, ErrStreamNotFound } + if resp.Error.ErrorCode == JSErrCodeMaximumConsumersLimit { + return nil, ErrMaximumConsumersLimit + } + return nil, resp.Error } diff --git a/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go b/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go index 6f08b0bca2..f543e609a0 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/consumer_config.go @@ -325,6 +325,12 @@ type ( // associating metadata on the consumer. This feature requires // nats-server v2.10.0 or later. Metadata map[string]string `json:"metadata,omitempty"` + + // NamePrefix is an optional custom prefix for the consumer name. + // If provided, ordered consumer names will be generated as: + // {NamePrefix}_{sequence_number} (e.g., "custom_1", "custom_2"). + // If not provided, a unique ID (NUID) will be used as the prefix. + NamePrefix string `json:"-"` } // DeliverPolicy determines from which point to start delivering messages. @@ -362,6 +368,11 @@ const ( // restricting when a consumer will receive messages based on the number of // pending messages or acks. PriorityPolicyOverflow + + // PriorityPolicyPrioritized is the priority policy that allows for the + // server to deliver messages to clients based on their priority (instead + // of round-robin). Requires nats-server v2.12.0 or later. + PriorityPolicyPrioritized ) func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { @@ -372,6 +383,8 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error { *p = PriorityPolicyPinned case jsonString("overflow"): *p = PriorityPolicyOverflow + case jsonString("prioritized"): + *p = PriorityPolicyPrioritized default: return fmt.Errorf("nats: can not unmarshal %q", data) } @@ -386,6 +399,8 @@ func (p PriorityPolicy) MarshalJSON() ([]byte, error) { return json.Marshal("pinned_client") case PriorityPolicyOverflow: return json.Marshal("overflow") + case PriorityPolicyPrioritized: + return json.Marshal("prioritized") } return nil, fmt.Errorf("nats: unknown priority policy %v", p) } diff --git a/vendor/github.com/nats-io/nats.go/jetstream/errors.go b/vendor/github.com/nats-io/nats.go/jetstream/errors.go index c8ffe236d1..d537d5cca5 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/errors.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/errors.go @@ -43,27 +43,28 @@ type ( ) const ( + JSErrCodeBadRequest ErrorCode = 10003 + JSErrCodeConsumerCreate ErrorCode = 10012 + JSErrCodeConsumerNameExists ErrorCode = 10013 + JSErrCodeConsumerNotFound ErrorCode = 10014 + JSErrCodeMaximumConsumersLimit ErrorCode = 10026 + + JSErrCodeMessageNotFound ErrorCode = 10037 JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039 - JSErrCodeJetStreamNotEnabled ErrorCode = 10076 - JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 + JSErrCodeStreamNotFound ErrorCode = 10059 + + JSErrCodeStreamWrongLastSequence ErrorCode = 10071 + JSErrCodeJetStreamNotEnabled ErrorCode = 10076 + + JSErrCodeConsumerAlreadyExists ErrorCode = 10105 - JSErrCodeConsumerCreate ErrorCode = 10012 - JSErrCodeConsumerNotFound ErrorCode = 10014 - JSErrCodeConsumerNameExists ErrorCode = 10013 - JSErrCodeConsumerAlreadyExists ErrorCode = 10105 - JSErrCodeConsumerExists ErrorCode = 10148 JSErrCodeDuplicateFilterSubjects ErrorCode = 10136 JSErrCodeOverlappingFilterSubjects ErrorCode = 10138 JSErrCodeConsumerEmptyFilter ErrorCode = 10139 + JSErrCodeConsumerExists ErrorCode = 10148 JSErrCodeConsumerDoesNotExist ErrorCode = 10149 - - JSErrCodeMessageNotFound ErrorCode = 10037 - - JSErrCodeBadRequest ErrorCode = 10003 - - JSErrCodeStreamWrongLastSequence ErrorCode = 10071 ) var ( @@ -142,6 +143,10 @@ var ( // creating consumer (e.g. illegal update). ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}} + // ErrMaximumConsumersLimit is returned when user limit of allowed + // consumers for stream is reached + ErrMaximumConsumersLimit JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMaximumConsumersLimit, Description: "maximum consumers limit reached", Code: 400}} + // ErrDuplicateFilterSubjects is returned when both FilterSubject and // FilterSubjects are specified when creating consumer. ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}} diff --git a/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go b/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go index dd900d75ee..f0a5e30638 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/jetstream.go @@ -864,11 +864,15 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord if err := validateStreamName(stream); err != nil { return nil, err } + namePrefix := cfg.NamePrefix + if namePrefix == "" { + namePrefix = nuid.Next() + } oc := &orderedConsumer{ js: js, cfg: &cfg, stream: stream, - namePrefix: nuid.Next(), + namePrefix: namePrefix, doReset: make(chan struct{}, 1), } consCfg := oc.getConsumerConfig() diff --git a/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go b/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go index 0fffbc71c9..3a9b16bcdb 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/jetstream_options.go @@ -14,6 +14,7 @@ package jetstream import ( + "context" "fmt" "time" ) @@ -347,6 +348,26 @@ func (min PullMinAckPending) configureMessages(opts *consumeOpts) error { return nil } +// PullPrioritized sets the priority used when sending pull requests for consumer with +// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the +// highest priority). Maximum priority value is 9. +// +// If provided, PullPriorityGroup must be set as well and the consumer has to +// have PriorityPolicy set to PriorityPolicyPrioritized. +// +// PullPrioritized implements both PullConsumeOpt and PullMessagesOpt, allowing +// it to configure Consumer.Consume and Consumer.Messages. +type PullPrioritized uint8 + +func (p PullPrioritized) configureConsume(opts *consumeOpts) error { + opts.Priority = uint8(p) + return nil +} +func (p PullPrioritized) configureMessages(opts *consumeOpts) error { + opts.Priority = uint8(p) + return nil +} + // PullPriorityGroup sets the priority group for a consumer. // It has to match one of the priority groups set on the consumer. // @@ -468,6 +489,19 @@ func FetchMinAckPending(min int64) FetchOpt { } } +// FetchPrioritized sets the priority used when sending fetch requests for consumer with +// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the +// highest priority). Maximum priority value is 9. +// +// If provided, FetchPriorityGroup must be set as well and the consumer has to +// have PriorityPolicy set to PriorityPolicyPrioritized. +func FetchPrioritized(priority uint8) FetchOpt { + return func(req *pullRequest) error { + req.Priority = priority + return nil + } +} + // FetchPriorityGroup sets the priority group for a consumer. // It has to match one of the priority groups set on the consumer. func FetchPriorityGroup(group string) FetchOpt { @@ -486,6 +520,7 @@ func FetchMaxWait(timeout time.Duration) FetchOpt { return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption) } req.Expires = timeout + req.maxWaitSet = true return nil } } @@ -508,6 +543,31 @@ func FetchHeartbeat(hb time.Duration) FetchOpt { } } +// FetchContext sets a context for the Fetch operation. +// The Fetch operation will be canceled if the context is canceled. +// If the context has a deadline, it will be used to set expiry on pull request. +func FetchContext(ctx context.Context) FetchOpt { + return func(req *pullRequest) error { + req.ctx = ctx + + // If context has a deadline, use it to set expiry + if deadline, ok := ctx.Deadline(); ok { + remaining := time.Until(deadline) + if remaining <= 0 { + return fmt.Errorf("%w: context deadline already exceeded", ErrInvalidOption) + } + // Use 90% of remaining time for server (capped at 1s) + buffer := time.Duration(float64(remaining) * 0.1) + if buffer > time.Second { + buffer = time.Second + } + req.Expires = remaining - buffer + } + + return nil + } +} + // WithDeletedDetails can be used to display the information about messages // deleted from a stream on a stream info request func WithDeletedDetails(deletedDetails bool) StreamInfoOpt { @@ -648,3 +708,25 @@ func WithStallWait(ttl time.Duration) PublishOpt { return nil } } + +type nextOptFunc func(*nextOpts) + +func (fn nextOptFunc) configureNext(opts *nextOpts) { + fn(opts) +} + +// NextMaxWait sets a timeout for the Next operation. +// If the timeout is reached before a message is available, a timeout error is returned. +func NextMaxWait(timeout time.Duration) NextOpt { + return nextOptFunc(func(opts *nextOpts) { + opts.timeout = timeout + }) +} + +// NextContext sets a context for the Next operation. +// The Next operation will be canceled if the context is canceled. +func NextContext(ctx context.Context) NextOpt { + return nextOptFunc(func(opts *nextOpts) { + opts.ctx = ctx + }) +} diff --git a/vendor/github.com/nats-io/nats.go/jetstream/kv.go b/vendor/github.com/nats-io/nats.go/jetstream/kv.go index bd57cf75a7..55cd62e036 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/kv.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/kv.go @@ -256,7 +256,11 @@ type ( // removed by the TTL setting. // It is required for per-key TTL to work and for watcher to notify // about TTL expirations (both per key and per bucket) - LimitMarkerTTL time.Duration + LimitMarkerTTL time.Duration `json:"limit_marker_ttl,omitempty"` + + // Metadata is a set of application-defined key-value pairs that can be + // used to store arbitrary metadata about the bucket. + Metadata map[string]string `json:"metadata,omitempty"` } // KeyLister is used to retrieve a list of key value store keys. It returns @@ -316,6 +320,9 @@ type ( // LimitMarkerTTL is how long the bucket keeps markers when keys are // removed by the TTL setting, 0 meaning markers are not supported. LimitMarkerTTL() time.Duration + + // Metadata returns the metadata associated with the bucket. + Metadata() map[string]string } // KeyWatcher is what is returned when doing a watch. It can be used to @@ -667,6 +674,7 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf Discard: DiscardNew, AllowMsgTTL: allowMsgTTL, SubjectDeleteMarkerTTL: subjectDeleteMarkerTTL, + Metadata: cfg.Metadata, } if cfg.Mirror != nil { // Copy in case we need to make changes so we do not change caller's version. @@ -813,6 +821,11 @@ func (s *KeyValueBucketStatus) LimitMarkerTTL() time.Duration { return s.info.Config.SubjectDeleteMarkerTTL } +// Metadata returns the metadata associated with the bucket. +func (s *KeyValueBucketStatus) Metadata() map[string]string { + return s.info.Config.Metadata +} + type kvLister struct { kvs chan KeyValueStatus kvNames chan string diff --git a/vendor/github.com/nats-io/nats.go/jetstream/ordered.go b/vendor/github.com/nats-io/nats.go/jetstream/ordered.go index 35705a5e2e..4d10c29c8c 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/ordered.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/ordered.go @@ -282,10 +282,21 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er return sub, nil } -func (s *orderedSubscription) Next() (Msg, error) { +func (s *orderedSubscription) Next(opts ...NextOpt) (Msg, error) { for { - msg, err := s.consumer.currentSub.Next() + msg, err := s.consumer.currentSub.Next(opts...) if err != nil { + // Check for errors which should be returned directly + // without resetting the consumer + if errors.Is(err, ErrInvalidOption) { + return nil, err + } + if errors.Is(err, nats.ErrTimeout) { + return nil, err + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return nil, err + } if errors.Is(err, ErrMsgIteratorClosed) { s.Stop() return nil, err diff --git a/vendor/github.com/nats-io/nats.go/jetstream/publish.go b/vendor/github.com/nats-io/nats.go/jetstream/publish.go index 8aaba4e1f2..0692505a3d 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/publish.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/publish.go @@ -130,6 +130,8 @@ type ( // Domain is the domain the message was published to. Domain string `json:"domain,omitempty"` + + Value string `json:"val,omitempty"` } ) diff --git a/vendor/github.com/nats-io/nats.go/jetstream/pull.go b/vendor/github.com/nats-io/nats.go/jetstream/pull.go index 3aacc0b89b..d2b2778935 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/pull.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/pull.go @@ -14,6 +14,7 @@ package jetstream import ( + "context" "encoding/json" "errors" "fmt" @@ -34,8 +35,11 @@ type ( MessagesContext interface { // Next retrieves next message on a stream. It will block until the next // message is available. If the context is canceled, Next will return - // ErrMsgIteratorClosed error. - Next() (Msg, error) + // ErrMsgIteratorClosed error. An optional timeout or context can be + // provided using NextOpt options. If none are provided, Next will block + // indefinitely until a message is available, iterator is closed or a + // heartbeat error occurs. + Next(opts ...NextOpt) (Msg, error) // Stop unsubscribes from the stream and cancels subscription. Calling // Next after calling Stop will return ErrMsgIteratorClosed error. @@ -92,15 +96,18 @@ type ( } pullRequest struct { - Expires time.Duration `json:"expires,omitempty"` - Batch int `json:"batch,omitempty"` - MaxBytes int `json:"max_bytes,omitempty"` - NoWait bool `json:"no_wait,omitempty"` - Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` - MinPending int64 `json:"min_pending,omitempty"` - MinAckPending int64 `json:"min_ack_pending,omitempty"` - PinID string `json:"id,omitempty"` - Group string `json:"group,omitempty"` + Expires time.Duration `json:"expires,omitempty"` + Batch int `json:"batch,omitempty"` + MaxBytes int `json:"max_bytes,omitempty"` + NoWait bool `json:"no_wait,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` + MinPending int64 `json:"min_pending,omitempty"` + MinAckPending int64 `json:"min_ack_pending,omitempty"` + PinID string `json:"id,omitempty"` + Group string `json:"group,omitempty"` + Priority uint8 `json:"priority,omitempty"` + ctx context.Context `json:"-"` + maxWaitSet bool `json:"-"` } consumeOpts struct { @@ -110,6 +117,7 @@ type ( LimitSize bool MinPending int64 MinAckPending int64 + Priority uint8 Group string Heartbeat time.Duration ErrHandler ConsumeErrHandler @@ -167,6 +175,16 @@ type ( timer *time.Timer sync.Mutex } + + // NextOpt is an option for configuring the behavior of MessagesContext.Next. + NextOpt interface { + configureNext(*nextOpts) + } + + nextOpts struct { + timeout time.Duration + ctx context.Context + } ) const ( @@ -314,6 +332,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( Heartbeat: consumeOpts.Heartbeat, MinPending: consumeOpts.MinPending, MinAckPending: consumeOpts.MinAckPending, + Priority: consumeOpts.Priority, Group: consumeOpts.Group, PinID: p.getPinID(), }, subject); err != nil { @@ -353,6 +372,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( Heartbeat: sub.consumeOpts.Heartbeat, MinPending: sub.consumeOpts.MinPending, MinAckPending: sub.consumeOpts.MinAckPending, + Priority: sub.consumeOpts.Priority, Group: sub.consumeOpts.Group, PinID: p.getPinID(), } @@ -383,6 +403,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( Heartbeat: sub.consumeOpts.Heartbeat, MinPending: sub.consumeOpts.MinPending, MinAckPending: sub.consumeOpts.MinAckPending, + Priority: sub.consumeOpts.Priority, Group: sub.consumeOpts.Group, PinID: p.getPinID(), } @@ -468,6 +489,7 @@ func (s *pullSubscription) checkPending() { Group: s.consumeOpts.Group, MinPending: s.consumeOpts.MinPending, MinAckPending: s.consumeOpts.MinAckPending, + Priority: s.consumeOpts.Priority, } s.pending.msgCount = s.consumeOpts.MaxMessages @@ -569,7 +591,30 @@ var ( // Next retrieves next message on a stream. It will block until the next // message is available. If the context is canceled, Next will return // ErrMsgIteratorClosed error. -func (s *pullSubscription) Next() (Msg, error) { +func (s *pullSubscription) Next(opts ...NextOpt) (Msg, error) { + var nextOpts nextOpts + for _, opt := range opts { + opt.configureNext(&nextOpts) + } + + if nextOpts.timeout > 0 && nextOpts.ctx != nil { + return nil, fmt.Errorf("%w: cannot specify both NextMaxWait and NextContext", ErrInvalidOption) + } + + // Create timeout channel if needed + var timeoutCh <-chan time.Time + if nextOpts.timeout > 0 { + timer := time.NewTimer(nextOpts.timeout) + defer timer.Stop() + timeoutCh = timer.C + } + + // Use context if provided + var ctxDone <-chan struct{} + if nextOpts.ctx != nil { + ctxDone = nextOpts.ctx.Done() + } + s.Lock() defer s.Unlock() drainMode := s.draining.Load() == 1 @@ -660,6 +705,10 @@ func (s *pullSubscription) Next() (Msg, error) { } isConnected = false } + case <-timeoutCh: + return nil, nats.ErrTimeout + case <-ctxDone: + return nil, nextOpts.ctx.Err() } } } @@ -779,6 +828,11 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) return nil, err } } + + if req.ctx != nil && req.maxWaitSet { + return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption) + } + // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls // and disable it for shorter pulls if req.Heartbeat == unset { @@ -808,6 +862,11 @@ func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch, return nil, err } } + + if req.ctx != nil && req.maxWaitSet { + return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption) + } + // if heartbeat was not explicitly set, set it to 5 seconds for longer pulls // and disable it for shorter pulls if req.Heartbeat == unset { @@ -862,6 +921,13 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { var receivedMsgs, receivedBytes int hbTimer := sub.scheduleHeartbeatCheck(req.Heartbeat) + + // Use context if provided + var ctxDone <-chan struct{} + if req.ctx != nil { + ctxDone = req.ctx.Done() + } + go func(res *fetchResult) { defer sub.subscription.Unsubscribe() defer close(res.msgs) @@ -922,6 +988,12 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) { res.done = true res.Unlock() return + case <-ctxDone: + res.Lock() + res.err = req.ctx.Err() + res.done = true + res.Unlock() + return } } }(res) diff --git a/vendor/github.com/nats-io/nats.go/jetstream/stream.go b/vendor/github.com/nats-io/nats.go/jetstream/stream.go index f2600af1a9..ef8e1202c5 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/stream.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/stream.go @@ -349,11 +349,15 @@ func (s *stream) UpdatePushConsumer(ctx context.Context, cfg ConsumerConfig) (Pu // messages from a stream. Ordered consumers are ephemeral in-memory // pull consumers and are resilient to deletes and restarts. func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) { + namePrefix := cfg.NamePrefix + if namePrefix == "" { + namePrefix = nuid.Next() + } oc := &orderedConsumer{ js: s.js, cfg: &cfg, stream: s.name, - namePrefix: nuid.Next(), + namePrefix: namePrefix, doReset: make(chan struct{}, 1), } consCfg := oc.getConsumerConfig() @@ -528,6 +532,7 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream if err != nil { return nil, err } + var gmSubj string // handle direct gets @@ -598,6 +603,7 @@ func convertDirectGetMsgResponseToMsg(r *nats.Msg) (*RawStreamMsg, error) { } } } + // Check for headers that give us the required information to // reconstruct the message. if len(r.Header) == 0 { diff --git a/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go b/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go index 28184a3914..e5342b1466 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/stream_config.go @@ -201,6 +201,18 @@ type ( // Enables and sets a duration for adding server markers for delete, purge and max age limits. // This feature requires nats-server v2.11.0 or later. SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"` + + // AllowMsgCounter enables the feature + AllowMsgCounter bool `json:"allow_msg_counter"` + + // AllowAtomicPublish allows atomic batch publishing into the stream. + AllowAtomicPublish bool `json:"allow_atomic,omitempty"` + + // AllowMsgSchedules enables the scheduling of messages + AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty"` + + // PersistMode allows to opt-in to different persistence mode settings. + PersistMode PersistModeType `json:"persist_mode,omitempty"` } // StreamSourceInfo shows information about an upstream stream @@ -276,10 +288,25 @@ type ( // Name is the name of the cluster. Name string `json:"name,omitempty"` + // RaftGroup is the name of the Raft group managing the asset (in + // clustered environments). + RaftGroup string `json:"raft_group,omitempty"` + // Leader is the server name of the RAFT leader. Leader string `json:"leader,omitempty"` - // Replicas is the list of members of the RAFT cluster + // LeaderSince is the time that it was elected as leader in RFC3339 + // format, absent when not the leader. + LeaderSince *time.Time `json:"leader_since,omitempty"` + + // SystemAcc indicates if the traffic_account is the system account. + // When true, replication traffic goes over the system account. + SystemAcc bool `json:"system_account,omitempty"` + + // TrafficAcc is the account where the replication traffic goes over. + TrafficAcc string `json:"traffic_account,omitempty"` + + // Replicas is the list of members of the RAFT cluster. Replicas []*PeerInfo `json:"replicas,omitempty"` } @@ -407,6 +434,9 @@ type ( // StoreCompression determines how messages are compressed. StoreCompression uint8 + + // PersistModeType determines what persistence mode the stream uses. + PersistModeType int ) const ( @@ -438,6 +468,16 @@ const ( workQueuePolicyString = "workqueue" ) +const ( + // DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed. + // The publish acknowledgement will be sent after the persisting completes. + DefaultPersistMode = PersistModeType(iota) + // AsyncPersistMode specifies writes to the stream will be flushed asynchronously. + // The publish acknowledgement may be sent before the persisting completes. + // This means writes could be lost if they weren't flushed prior to a hard kill of the server. + AsyncPersistMode +) + func (rp RetentionPolicy) String() string { switch rp { case LimitsPolicy: @@ -512,6 +552,40 @@ func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error { return nil } +func (pm PersistModeType) String() string { + switch pm { + case DefaultPersistMode: + return "Default" + case AsyncPersistMode: + return "Async" + default: + return "Unknown Persist Mode" + } +} + +func (pm PersistModeType) MarshalJSON() ([]byte, error) { + switch pm { + case DefaultPersistMode: + return json.Marshal("default") + case AsyncPersistMode: + return json.Marshal("async") + default: + return nil, fmt.Errorf("nats: can not marshal %v", pm) + } +} + +func (pm *PersistModeType) UnmarshalJSON(data []byte) error { + switch strings.ToLower(string(data)) { + case jsonString("default"): + *pm = DefaultPersistMode + case jsonString("async"): + *pm = AsyncPersistMode + default: + return fmt.Errorf("nats: can not unmarshal %q", data) + } + return nil +} + const ( // FileStorage specifies on disk storage. It's the default. FileStorage StorageType = iota diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 6dafb8c45b..90f8df6157 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -2836,6 +2836,10 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ { + if sub.jsi.ordered { + sub.mu.Unlock() + return nil, ErrConsumerInfoOnOrderedReset + } sub.mu.Unlock() return nil, ErrTypeSubscription } diff --git a/vendor/github.com/nats-io/nats.go/jserrors.go b/vendor/github.com/nats-io/nats.go/jserrors.go index 0e56a78000..af9e015b91 100644 --- a/vendor/github.com/nats-io/nats.go/jserrors.go +++ b/vendor/github.com/nats-io/nats.go/jserrors.go @@ -150,6 +150,9 @@ var ( // ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"} + // ErrConsumerInfoOnOrderedReset is returned when attempting to fetch consumer info for an ordered consumer that is currently being recreated. + ErrConsumerInfoOnOrderedReset JetStreamError = &jsError{message: "cannot fetch consumer info; ordered consumer is being reset"} + // ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer. ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"} diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index bae376e393..eaa2f632b9 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -1093,9 +1093,13 @@ type StreamState struct { // ClusterInfo shows information about the underlying set of servers // that make up the stream or consumer. type ClusterInfo struct { - Name string `json:"name,omitempty"` - Leader string `json:"leader,omitempty"` - Replicas []*PeerInfo `json:"replicas,omitempty"` + Name string `json:"name,omitempty"` + RaftGroup string `json:"raft_group,omitempty"` + Leader string `json:"leader,omitempty"` + LeaderSince *time.Time `json:"leader_since,omitempty"` + SystemAcc bool `json:"system_account,omitempty"` + TrafficAcc string `json:"traffic_account,omitempty"` + Replicas []*PeerInfo `json:"replicas,omitempty"` } // PeerInfo shows information about all the peers in the cluster that diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index facedb251a..0151223c87 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -48,7 +48,7 @@ import ( // Default Constants const ( - Version = "1.45.0" + Version = "1.46.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 diff --git a/vendor/modules.txt b/vendor/modules.txt index d9d85c86bc..76bb0348e0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1125,7 +1125,7 @@ github.com/nats-io/nats-server/v2/server/stree github.com/nats-io/nats-server/v2/server/sysmem github.com/nats-io/nats-server/v2/server/thw github.com/nats-io/nats-server/v2/server/tpm -# github.com/nats-io/nats.go v1.45.0 +# github.com/nats-io/nats.go v1.46.0 ## explicit; go 1.23.0 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin