build(deps): bump github.com/nats-io/nats.go from 1.45.0 to 1.46.0

Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.45.0 to 1.46.0.
- [Release notes](https://github.com/nats-io/nats.go/releases)
- [Commits](https://github.com/nats-io/nats.go/compare/v1.45.0...v1.46.0)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats.go
  dependency-version: 1.46.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-09-24 14:18:32 +00:00
committed by Ralf Haferkamp
parent 93b02c204f
commit 865d4b6980
22 changed files with 362 additions and 69 deletions
+1 -1
View File
@@ -57,7 +57,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.9
github.com/nats-io/nats.go v1.45.0
github.com/nats-io/nats.go v1.46.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v1.0.9
github.com/onsi/ginkgo v1.16.5
+2 -2
View File
@@ -885,8 +885,8 @@ github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.9 h1:k7nzHZjUf51W1b08xiQih63Rdxh0yr5O4K892Mx5gQA=
github.com/nats-io/nats-server/v2 v2.11.9/go.mod h1:1MQgsAQX1tVjpf3Yzrk3x2pzdsZiNL/TVP3Amhp3CR8=
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nats.go v1.46.0 h1:iUcX+MLT0HHXskGkz+Sg20sXrPtJLsOojMDTDzOHSb8=
github.com/nats-io/nats.go v1.46.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+3
View File
@@ -21,6 +21,9 @@ _testmain.go
*.exe
# Git backup files
*.orig
# Emacs
*~
\#*\#
+1 -1
View File
@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.45.0
go get github.com/nats-io/nats.go@v1.46.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest
+8 -8
View File
@@ -1,22 +1,22 @@
module github.com/nats-io/nats.go
go 1.23.0
go 1.24.0
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.18.0
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.11.2
github.com/nats-io/jwt/v2 v2.8.0
github.com/nats-io/nats-server/v2 v2.12.0
github.com/nats-io/nkeys v0.4.11
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
)
require (
github.com/google/go-tpm v0.9.3 // indirect
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.7.4 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/time v0.11.0 // indirect
golang.org/x/crypto v0.42.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/time v0.13.0 // indirect
)
+12 -21
View File
@@ -12,36 +12,27 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/google/go-tpm v0.9.5 h1:ocUmnDebX54dnW+MQWGQRbdaAcJELsa6PqZhJ48KwVU=
github.com/google/go-tpm v0.9.5/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.11.2 h1:k5KBAuRpJW9qAF11Io2txNhR5m1KUmqVkalLAw2yLfk=
github.com/nats-io/nats-server/v2 v2.11.2/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
github.com/nats-io/nats-server/v2 v2.12.0 h1:OIwe8jZUqJFrh+hhiyKu8snNib66qsx806OslqJuo74=
github.com/nats-io/nats-server/v2 v2.12.0/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/time v0.13.0 h1:eUlYslOIt32DgYD6utsuUeHs4d7AsEYLuIAdg7FlYgI=
golang.org/x/time v0.13.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+4
View File
@@ -314,6 +314,10 @@ func upsertConsumer(ctx context.Context, js *jetStream, stream string, cfg Consu
if resp.Error.ErrorCode == JSErrCodeStreamNotFound {
return nil, ErrStreamNotFound
}
if resp.Error.ErrorCode == JSErrCodeMaximumConsumersLimit {
return nil, ErrMaximumConsumersLimit
}
return nil, resp.Error
}
+15
View File
@@ -325,6 +325,12 @@ type (
// associating metadata on the consumer. This feature requires
// nats-server v2.10.0 or later.
Metadata map[string]string `json:"metadata,omitempty"`
// NamePrefix is an optional custom prefix for the consumer name.
// If provided, ordered consumer names will be generated as:
// {NamePrefix}_{sequence_number} (e.g., "custom_1", "custom_2").
// If not provided, a unique ID (NUID) will be used as the prefix.
NamePrefix string `json:"-"`
}
// DeliverPolicy determines from which point to start delivering messages.
@@ -362,6 +368,11 @@ const (
// restricting when a consumer will receive messages based on the number of
// pending messages or acks.
PriorityPolicyOverflow
// PriorityPolicyPrioritized is the priority policy that allows for the
// server to deliver messages to clients based on their priority (instead
// of round-robin). Requires nats-server v2.12.0 or later.
PriorityPolicyPrioritized
)
func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
@@ -372,6 +383,8 @@ func (p *PriorityPolicy) UnmarshalJSON(data []byte) error {
*p = PriorityPolicyPinned
case jsonString("overflow"):
*p = PriorityPolicyOverflow
case jsonString("prioritized"):
*p = PriorityPolicyPrioritized
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
@@ -386,6 +399,8 @@ func (p PriorityPolicy) MarshalJSON() ([]byte, error) {
return json.Marshal("pinned_client")
case PriorityPolicyOverflow:
return json.Marshal("overflow")
case PriorityPolicyPrioritized:
return json.Marshal("prioritized")
}
return nil, fmt.Errorf("nats: unknown priority policy %v", p)
}
+18 -13
View File
@@ -43,27 +43,28 @@ type (
)
const (
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeMaximumConsumersLimit ErrorCode = 10026
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerCreate ErrorCode = 10012
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeConsumerExists ErrorCode = 10148
JSErrCodeConsumerDoesNotExist ErrorCode = 10149
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)
var (
@@ -142,6 +143,10 @@ var (
// creating consumer (e.g. illegal update).
ErrConsumerCreate JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerCreate, Description: "could not create consumer", Code: 500}}
// ErrMaximumConsumersLimit is returned when user limit of allowed
// consumers for stream is reached
ErrMaximumConsumersLimit JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeMaximumConsumersLimit, Description: "maximum consumers limit reached", Code: 400}}
// ErrDuplicateFilterSubjects is returned when both FilterSubject and
// FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}
+5 -1
View File
@@ -864,11 +864,15 @@ func (js *jetStream) OrderedConsumer(ctx context.Context, stream string, cfg Ord
if err := validateStreamName(stream); err != nil {
return nil, err
}
namePrefix := cfg.NamePrefix
if namePrefix == "" {
namePrefix = nuid.Next()
}
oc := &orderedConsumer{
js: js,
cfg: &cfg,
stream: stream,
namePrefix: nuid.Next(),
namePrefix: namePrefix,
doReset: make(chan struct{}, 1),
}
consCfg := oc.getConsumerConfig()
+82
View File
@@ -14,6 +14,7 @@
package jetstream
import (
"context"
"fmt"
"time"
)
@@ -347,6 +348,26 @@ func (min PullMinAckPending) configureMessages(opts *consumeOpts) error {
return nil
}
// PullPrioritized sets the priority used when sending pull requests for consumer with
// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the
// highest priority). Maximum priority value is 9.
//
// If provided, PullPriorityGroup must be set as well and the consumer has to
// have PriorityPolicy set to PriorityPolicyPrioritized.
//
// PullPrioritized implements both PullConsumeOpt and PullMessagesOpt, allowing
// it to configure Consumer.Consume and Consumer.Messages.
type PullPrioritized uint8
func (p PullPrioritized) configureConsume(opts *consumeOpts) error {
opts.Priority = uint8(p)
return nil
}
func (p PullPrioritized) configureMessages(opts *consumeOpts) error {
opts.Priority = uint8(p)
return nil
}
// PullPriorityGroup sets the priority group for a consumer.
// It has to match one of the priority groups set on the consumer.
//
@@ -468,6 +489,19 @@ func FetchMinAckPending(min int64) FetchOpt {
}
}
// FetchPrioritized sets the priority used when sending fetch requests for consumer with
// PriorityPolicyPrioritized. Lower values indicate higher priority (0 is the
// highest priority). Maximum priority value is 9.
//
// If provided, FetchPriorityGroup must be set as well and the consumer has to
// have PriorityPolicy set to PriorityPolicyPrioritized.
func FetchPrioritized(priority uint8) FetchOpt {
return func(req *pullRequest) error {
req.Priority = priority
return nil
}
}
// FetchPriorityGroup sets the priority group for a consumer.
// It has to match one of the priority groups set on the consumer.
func FetchPriorityGroup(group string) FetchOpt {
@@ -486,6 +520,7 @@ func FetchMaxWait(timeout time.Duration) FetchOpt {
return fmt.Errorf("%w: timeout value must be greater than 0", ErrInvalidOption)
}
req.Expires = timeout
req.maxWaitSet = true
return nil
}
}
@@ -508,6 +543,31 @@ func FetchHeartbeat(hb time.Duration) FetchOpt {
}
}
// FetchContext sets a context for the Fetch operation.
// The Fetch operation will be canceled if the context is canceled.
// If the context has a deadline, it will be used to set expiry on pull request.
func FetchContext(ctx context.Context) FetchOpt {
return func(req *pullRequest) error {
req.ctx = ctx
// If context has a deadline, use it to set expiry
if deadline, ok := ctx.Deadline(); ok {
remaining := time.Until(deadline)
if remaining <= 0 {
return fmt.Errorf("%w: context deadline already exceeded", ErrInvalidOption)
}
// Use 90% of remaining time for server (capped at 1s)
buffer := time.Duration(float64(remaining) * 0.1)
if buffer > time.Second {
buffer = time.Second
}
req.Expires = remaining - buffer
}
return nil
}
}
// WithDeletedDetails can be used to display the information about messages
// deleted from a stream on a stream info request
func WithDeletedDetails(deletedDetails bool) StreamInfoOpt {
@@ -648,3 +708,25 @@ func WithStallWait(ttl time.Duration) PublishOpt {
return nil
}
}
type nextOptFunc func(*nextOpts)
func (fn nextOptFunc) configureNext(opts *nextOpts) {
fn(opts)
}
// NextMaxWait sets a timeout for the Next operation.
// If the timeout is reached before a message is available, a timeout error is returned.
func NextMaxWait(timeout time.Duration) NextOpt {
return nextOptFunc(func(opts *nextOpts) {
opts.timeout = timeout
})
}
// NextContext sets a context for the Next operation.
// The Next operation will be canceled if the context is canceled.
func NextContext(ctx context.Context) NextOpt {
return nextOptFunc(func(opts *nextOpts) {
opts.ctx = ctx
})
}
+14 -1
View File
@@ -256,7 +256,11 @@ type (
// removed by the TTL setting.
// It is required for per-key TTL to work and for watcher to notify
// about TTL expirations (both per key and per bucket)
LimitMarkerTTL time.Duration
LimitMarkerTTL time.Duration `json:"limit_marker_ttl,omitempty"`
// Metadata is a set of application-defined key-value pairs that can be
// used to store arbitrary metadata about the bucket.
Metadata map[string]string `json:"metadata,omitempty"`
}
// KeyLister is used to retrieve a list of key value store keys. It returns
@@ -316,6 +320,9 @@ type (
// LimitMarkerTTL is how long the bucket keeps markers when keys are
// removed by the TTL setting, 0 meaning markers are not supported.
LimitMarkerTTL() time.Duration
// Metadata returns the metadata associated with the bucket.
Metadata() map[string]string
}
// KeyWatcher is what is returned when doing a watch. It can be used to
@@ -667,6 +674,7 @@ func (js *jetStream) prepareKeyValueConfig(ctx context.Context, cfg KeyValueConf
Discard: DiscardNew,
AllowMsgTTL: allowMsgTTL,
SubjectDeleteMarkerTTL: subjectDeleteMarkerTTL,
Metadata: cfg.Metadata,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
@@ -813,6 +821,11 @@ func (s *KeyValueBucketStatus) LimitMarkerTTL() time.Duration {
return s.info.Config.SubjectDeleteMarkerTTL
}
// Metadata returns the metadata associated with the bucket.
func (s *KeyValueBucketStatus) Metadata() map[string]string {
return s.info.Config.Metadata
}
type kvLister struct {
kvs chan KeyValueStatus
kvNames chan string
+13 -2
View File
@@ -282,10 +282,21 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er
return sub, nil
}
func (s *orderedSubscription) Next() (Msg, error) {
func (s *orderedSubscription) Next(opts ...NextOpt) (Msg, error) {
for {
msg, err := s.consumer.currentSub.Next()
msg, err := s.consumer.currentSub.Next(opts...)
if err != nil {
// Check for errors which should be returned directly
// without resetting the consumer
if errors.Is(err, ErrInvalidOption) {
return nil, err
}
if errors.Is(err, nats.ErrTimeout) {
return nil, err
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return nil, err
}
if errors.Is(err, ErrMsgIteratorClosed) {
s.Stop()
return nil, err
+2
View File
@@ -130,6 +130,8 @@ type (
// Domain is the domain the message was published to.
Domain string `json:"domain,omitempty"`
Value string `json:"val,omitempty"`
}
)
+84 -12
View File
@@ -14,6 +14,7 @@
package jetstream
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -34,8 +35,11 @@ type (
MessagesContext interface {
// Next retrieves next message on a stream. It will block until the next
// message is available. If the context is canceled, Next will return
// ErrMsgIteratorClosed error.
Next() (Msg, error)
// ErrMsgIteratorClosed error. An optional timeout or context can be
// provided using NextOpt options. If none are provided, Next will block
// indefinitely until a message is available, iterator is closed or a
// heartbeat error occurs.
Next(opts ...NextOpt) (Msg, error)
// Stop unsubscribes from the stream and cancels subscription. Calling
// Next after calling Stop will return ErrMsgIteratorClosed error.
@@ -92,15 +96,18 @@ type (
}
pullRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
MinPending int64 `json:"min_pending,omitempty"`
MinAckPending int64 `json:"min_ack_pending,omitempty"`
PinID string `json:"id,omitempty"`
Group string `json:"group,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
MinPending int64 `json:"min_pending,omitempty"`
MinAckPending int64 `json:"min_ack_pending,omitempty"`
PinID string `json:"id,omitempty"`
Group string `json:"group,omitempty"`
Priority uint8 `json:"priority,omitempty"`
ctx context.Context `json:"-"`
maxWaitSet bool `json:"-"`
}
consumeOpts struct {
@@ -110,6 +117,7 @@ type (
LimitSize bool
MinPending int64
MinAckPending int64
Priority uint8
Group string
Heartbeat time.Duration
ErrHandler ConsumeErrHandler
@@ -167,6 +175,16 @@ type (
timer *time.Timer
sync.Mutex
}
// NextOpt is an option for configuring the behavior of MessagesContext.Next.
NextOpt interface {
configureNext(*nextOpts)
}
nextOpts struct {
timeout time.Duration
ctx context.Context
}
)
const (
@@ -314,6 +332,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: consumeOpts.Heartbeat,
MinPending: consumeOpts.MinPending,
MinAckPending: consumeOpts.MinAckPending,
Priority: consumeOpts.Priority,
Group: consumeOpts.Group,
PinID: p.getPinID(),
}, subject); err != nil {
@@ -353,6 +372,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: sub.consumeOpts.Heartbeat,
MinPending: sub.consumeOpts.MinPending,
MinAckPending: sub.consumeOpts.MinAckPending,
Priority: sub.consumeOpts.Priority,
Group: sub.consumeOpts.Group,
PinID: p.getPinID(),
}
@@ -383,6 +403,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
Heartbeat: sub.consumeOpts.Heartbeat,
MinPending: sub.consumeOpts.MinPending,
MinAckPending: sub.consumeOpts.MinAckPending,
Priority: sub.consumeOpts.Priority,
Group: sub.consumeOpts.Group,
PinID: p.getPinID(),
}
@@ -468,6 +489,7 @@ func (s *pullSubscription) checkPending() {
Group: s.consumeOpts.Group,
MinPending: s.consumeOpts.MinPending,
MinAckPending: s.consumeOpts.MinAckPending,
Priority: s.consumeOpts.Priority,
}
s.pending.msgCount = s.consumeOpts.MaxMessages
@@ -569,7 +591,30 @@ var (
// Next retrieves next message on a stream. It will block until the next
// message is available. If the context is canceled, Next will return
// ErrMsgIteratorClosed error.
func (s *pullSubscription) Next() (Msg, error) {
func (s *pullSubscription) Next(opts ...NextOpt) (Msg, error) {
var nextOpts nextOpts
for _, opt := range opts {
opt.configureNext(&nextOpts)
}
if nextOpts.timeout > 0 && nextOpts.ctx != nil {
return nil, fmt.Errorf("%w: cannot specify both NextMaxWait and NextContext", ErrInvalidOption)
}
// Create timeout channel if needed
var timeoutCh <-chan time.Time
if nextOpts.timeout > 0 {
timer := time.NewTimer(nextOpts.timeout)
defer timer.Stop()
timeoutCh = timer.C
}
// Use context if provided
var ctxDone <-chan struct{}
if nextOpts.ctx != nil {
ctxDone = nextOpts.ctx.Done()
}
s.Lock()
defer s.Unlock()
drainMode := s.draining.Load() == 1
@@ -660,6 +705,10 @@ func (s *pullSubscription) Next() (Msg, error) {
}
isConnected = false
}
case <-timeoutCh:
return nil, nats.ErrTimeout
case <-ctxDone:
return nil, nextOpts.ctx.Err()
}
}
}
@@ -779,6 +828,11 @@ func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error)
return nil, err
}
}
if req.ctx != nil && req.maxWaitSet {
return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
}
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
@@ -808,6 +862,11 @@ func (p *pullConsumer) FetchBytes(maxBytes int, opts ...FetchOpt) (MessageBatch,
return nil, err
}
}
if req.ctx != nil && req.maxWaitSet {
return nil, fmt.Errorf("%w: cannot specify both FetchContext and FetchMaxWait", ErrInvalidOption)
}
// if heartbeat was not explicitly set, set it to 5 seconds for longer pulls
// and disable it for shorter pulls
if req.Heartbeat == unset {
@@ -862,6 +921,13 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
var receivedMsgs, receivedBytes int
hbTimer := sub.scheduleHeartbeatCheck(req.Heartbeat)
// Use context if provided
var ctxDone <-chan struct{}
if req.ctx != nil {
ctxDone = req.ctx.Done()
}
go func(res *fetchResult) {
defer sub.subscription.Unsubscribe()
defer close(res.msgs)
@@ -922,6 +988,12 @@ func (p *pullConsumer) fetch(req *pullRequest) (MessageBatch, error) {
res.done = true
res.Unlock()
return
case <-ctxDone:
res.Lock()
res.err = req.ctx.Err()
res.done = true
res.Unlock()
return
}
}
}(res)
+7 -1
View File
@@ -349,11 +349,15 @@ func (s *stream) UpdatePushConsumer(ctx context.Context, cfg ConsumerConfig) (Pu
// messages from a stream. Ordered consumers are ephemeral in-memory
// pull consumers and are resilient to deletes and restarts.
func (s *stream) OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error) {
namePrefix := cfg.NamePrefix
if namePrefix == "" {
namePrefix = nuid.Next()
}
oc := &orderedConsumer{
js: s.js,
cfg: &cfg,
stream: s.name,
namePrefix: nuid.Next(),
namePrefix: namePrefix,
doReset: make(chan struct{}, 1),
}
consCfg := oc.getConsumerConfig()
@@ -528,6 +532,7 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream
if err != nil {
return nil, err
}
var gmSubj string
// handle direct gets
@@ -598,6 +603,7 @@ func convertDirectGetMsgResponseToMsg(r *nats.Msg) (*RawStreamMsg, error) {
}
}
}
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {
+75 -1
View File
@@ -201,6 +201,18 @@ type (
// Enables and sets a duration for adding server markers for delete, purge and max age limits.
// This feature requires nats-server v2.11.0 or later.
SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"`
// AllowMsgCounter enables the feature
AllowMsgCounter bool `json:"allow_msg_counter"`
// AllowAtomicPublish allows atomic batch publishing into the stream.
AllowAtomicPublish bool `json:"allow_atomic,omitempty"`
// AllowMsgSchedules enables the scheduling of messages
AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty"`
// PersistMode allows to opt-in to different persistence mode settings.
PersistMode PersistModeType `json:"persist_mode,omitempty"`
}
// StreamSourceInfo shows information about an upstream stream
@@ -276,10 +288,25 @@ type (
// Name is the name of the cluster.
Name string `json:"name,omitempty"`
// RaftGroup is the name of the Raft group managing the asset (in
// clustered environments).
RaftGroup string `json:"raft_group,omitempty"`
// Leader is the server name of the RAFT leader.
Leader string `json:"leader,omitempty"`
// Replicas is the list of members of the RAFT cluster
// LeaderSince is the time that it was elected as leader in RFC3339
// format, absent when not the leader.
LeaderSince *time.Time `json:"leader_since,omitempty"`
// SystemAcc indicates if the traffic_account is the system account.
// When true, replication traffic goes over the system account.
SystemAcc bool `json:"system_account,omitempty"`
// TrafficAcc is the account where the replication traffic goes over.
TrafficAcc string `json:"traffic_account,omitempty"`
// Replicas is the list of members of the RAFT cluster.
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
@@ -407,6 +434,9 @@ type (
// StoreCompression determines how messages are compressed.
StoreCompression uint8
// PersistModeType determines what persistence mode the stream uses.
PersistModeType int
)
const (
@@ -438,6 +468,16 @@ const (
workQueuePolicyString = "workqueue"
)
const (
// DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed.
// The publish acknowledgement will be sent after the persisting completes.
DefaultPersistMode = PersistModeType(iota)
// AsyncPersistMode specifies writes to the stream will be flushed asynchronously.
// The publish acknowledgement may be sent before the persisting completes.
// This means writes could be lost if they weren't flushed prior to a hard kill of the server.
AsyncPersistMode
)
func (rp RetentionPolicy) String() string {
switch rp {
case LimitsPolicy:
@@ -512,6 +552,40 @@ func (dp *DiscardPolicy) UnmarshalJSON(data []byte) error {
return nil
}
func (pm PersistModeType) String() string {
switch pm {
case DefaultPersistMode:
return "Default"
case AsyncPersistMode:
return "Async"
default:
return "Unknown Persist Mode"
}
}
func (pm PersistModeType) MarshalJSON() ([]byte, error) {
switch pm {
case DefaultPersistMode:
return json.Marshal("default")
case AsyncPersistMode:
return json.Marshal("async")
default:
return nil, fmt.Errorf("nats: can not marshal %v", pm)
}
}
func (pm *PersistModeType) UnmarshalJSON(data []byte) error {
switch strings.ToLower(string(data)) {
case jsonString("default"):
*pm = DefaultPersistMode
case jsonString("async"):
*pm = AsyncPersistMode
default:
return fmt.Errorf("nats: can not unmarshal %q", data)
}
return nil
}
const (
// FileStorage specifies on disk storage. It's the default.
FileStorage StorageType = iota
+4
View File
@@ -2836,6 +2836,10 @@ func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
if sub.jsi == nil || sub.jsi.consumer == _EMPTY_ {
if sub.jsi.ordered {
sub.mu.Unlock()
return nil, ErrConsumerInfoOnOrderedReset
}
sub.mu.Unlock()
return nil, ErrTypeSubscription
}
+3
View File
@@ -150,6 +150,9 @@ var (
// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"}
// ErrConsumerInfoOnOrderedReset is returned when attempting to fetch consumer info for an ordered consumer that is currently being recreated.
ErrConsumerInfoOnOrderedReset JetStreamError = &jsError{message: "cannot fetch consumer info; ordered consumer is being reset"}
// ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}
+7 -3
View File
@@ -1093,9 +1093,13 @@ type StreamState struct {
// ClusterInfo shows information about the underlying set of servers
// that make up the stream or consumer.
type ClusterInfo struct {
Name string `json:"name,omitempty"`
Leader string `json:"leader,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
Name string `json:"name,omitempty"`
RaftGroup string `json:"raft_group,omitempty"`
Leader string `json:"leader,omitempty"`
LeaderSince *time.Time `json:"leader_since,omitempty"`
SystemAcc bool `json:"system_account,omitempty"`
TrafficAcc string `json:"traffic_account,omitempty"`
Replicas []*PeerInfo `json:"replicas,omitempty"`
}
// PeerInfo shows information about all the peers in the cluster that
+1 -1
View File
@@ -48,7 +48,7 @@ import (
// Default Constants
const (
Version = "1.45.0"
Version = "1.46.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
+1 -1
View File
@@ -1125,7 +1125,7 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.45.0
# github.com/nats-io/nats.go v1.46.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin