build(deps): bump github.com/nats-io/nats.go from 1.39.1 to 1.41.0

Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.39.1 to 1.41.0.
- [Release notes](https://github.com/nats-io/nats.go/releases)
- [Commits](https://github.com/nats-io/nats.go/compare/v1.39.1...v1.41.0)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats.go
  dependency-version: 1.41.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-04-04 10:00:11 +00:00
committed by GitHub
parent b5d87db137
commit 9ec6e3eebf
12 changed files with 288 additions and 75 deletions

2
go.mod
View File

@@ -56,7 +56,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.0
github.com/nats-io/nats.go v1.39.1
github.com/nats-io/nats.go v1.41.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5

4
go.sum
View File

@@ -829,8 +829,8 @@ github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE=
github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI=
github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk=
github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
github.com/nats-io/nats.go v1.41.0 h1:PzxEva7fflkd+n87OtQTXqCTyLfIIMFJBpyccHLE2Ko=
github.com/nats-io/nats.go v1.41.0/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.39.1
go get github.com/nats-io/nats.go@v1.41.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest

View File

@@ -1,23 +1,24 @@
module github.com/nats-io/nats.go
go 1.22.0
go 1.23.0
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.11
github.com/klauspost/compress v1.18.0
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nats-server/v2 v2.11.0
github.com/nats-io/nkeys v0.4.10
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.21.0
golang.org/x/text v0.23.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/google/go-tpm v0.9.3 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.7.3 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/time v0.11.0 // indirect
)

View File

@@ -1,3 +1,5 @@
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0=
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
@@ -9,21 +11,24 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4=
github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s=
github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE=
github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -34,20 +39,19 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2024 The NATS Authors
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -273,6 +273,8 @@ type jsOpts struct {
aecb MsgErrHandler
// Max async pub ack in flight
maxpa int
// ackTimeout is the max time to wait for an ack in async publish.
ackTimeout time.Duration
// the domain that produced the pre
domain string
// enables protocol tracing
@@ -466,13 +468,14 @@ func (opt pubOptFn) configurePublish(opts *pubOpts) error {
}
type pubOpts struct {
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject
msgTTL time.Duration // Message TTL
// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
@@ -507,6 +510,7 @@ const (
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
MsgTTLHdr = "Nats-TTL"
)
// Headers for republished messages and direct gets.
@@ -566,6 +570,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
if o.msgTTL > 0 {
m.Header.Set(MsgTTLHdr, o.msgTTL.String())
}
var resp *Msg
var err error
@@ -648,6 +655,7 @@ type pubAckFuture struct {
maxRetries int
retryWait time.Duration
reply string
timeout *time.Timer
}
func (paf *pubAckFuture) Ok() <-chan *PubAck {
@@ -712,13 +720,19 @@ func (js *js) newAsyncReply() string {
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.rr.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
for {
rn := js.rr.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
}
if _, ok := js.pafs[string(b[:])]; ok {
continue
}
sb.Write(b[:])
break
}
sb.Write(b[:])
js.mu.Unlock()
return sb.String()
}
@@ -894,6 +908,10 @@ func (js *js) handleAsyncReply(m *Msg) {
}
}
if paf.timeout != nil {
paf.timeout.Stop()
}
// Process no responders etc.
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
if paf.retries < paf.maxRetries {
@@ -975,6 +993,15 @@ func PublishAsyncMaxPending(max int) JSOpt {
})
}
// PublishAsyncTimeout sets the timeout for async message publish.
// If not provided, timeout is disabled.
func PublishAsyncTimeout(dur time.Duration) JSOpt {
return jsOptFn(func(opts *jsOpts) error {
opts.ackTimeout = dur
return nil
})
}
// PublishAsync publishes a message to JetStream and returns a PubAckFuture
func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
@@ -1024,6 +1051,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
if o.msgTTL > 0 {
m.Header.Set(MsgTTLHdr, o.msgTTL.String())
}
// Reply
paf := o.pafRetry
@@ -1050,11 +1080,52 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
return nil, ErrTooManyStalledMsgs
}
}
if js.opts.ackTimeout > 0 {
paf.timeout = time.AfterFunc(js.opts.ackTimeout, func() {
js.mu.Lock()
defer js.mu.Unlock()
if _, ok := js.pafs[id]; !ok {
// paf has already been resolved
// while waiting for the lock
return
}
// ack timed out, remove from pending acks
delete(js.pafs, id)
// check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
// send error to user
paf.err = ErrAsyncPublishTimeout
if paf.errCh != nil {
paf.errCh <- paf.err
}
// call error callback if set
if js.opts.aecb != nil {
js.opts.aecb(js, paf.msg, ErrAsyncPublishTimeout)
}
// check on anyone one waiting on done status.
if js.dch != nil && len(js.pafs) == 0 {
close(js.dch)
js.dch = nil
}
})
}
} else {
reply = paf.reply
if paf.timeout != nil {
paf.timeout.Reset(js.opts.ackTimeout)
}
id = reply[js.replyPrefixLen:]
}
hdr, err := m.headerBytes()
@@ -1151,6 +1222,15 @@ func StallWait(ttl time.Duration) PubOpt {
})
}
// MsgTTL sets per msg TTL.
// Requires [StreamConfig.AllowMsgTTL] to be enabled.
func MsgTTL(dur time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.msgTTL = dur
return nil
})
}
type ackOpts struct {
ttl time.Duration
ctx context.Context
@@ -1361,6 +1441,9 @@ type jsSub struct {
fciseq uint64
csfct *time.Timer
// context set on js.Subscribe used e.g. to recreate ordered consumer
ctx context.Context
// Cancellation function to cancel context on drain/unsubscribe.
cancel func()
}
@@ -1833,6 +1916,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
psubj: subj,
cancel: cancel,
ackNone: o.cfg.AckPolicy == AckNonePolicy,
ctx: o.ctx,
}
// Auto acknowledge unless manual ack is set or policy is set to AckNonePolicy
@@ -1864,7 +1948,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
} else if consName == "" {
consName = getHash(nuid.Next())
}
info, err := js.upsertConsumer(stream, consName, ccreq.Config)
var info *ConsumerInfo
if o.ctx != nil {
info, err = js.upsertConsumer(stream, consName, ccreq.Config, Context(o.ctx))
} else {
info, err = js.upsertConsumer(stream, consName, ccreq.Config)
}
if err != nil {
var apiErr *APIError
if ok := errors.As(err, &apiErr); !ok {
@@ -2196,7 +2285,13 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
jsi.consumer = ""
sub.mu.Unlock()
consName := getHash(nuid.Next())
cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
var cinfo *ConsumerInfo
var err error
if js.opts.ctx != nil {
cinfo, err = js.upsertConsumer(jsi.stream, consName, cfg, Context(js.opts.ctx))
} else {
cinfo, err = js.upsertConsumer(jsi.stream, consName, cfg)
}
if err != nil {
var apiErr *APIError
if errors.Is(err, ErrJetStreamNotEnabled) || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
@@ -2206,6 +2301,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
// retry for insufficient resources, as it may mean that client is connected to a running
// server in cluster while the server hosting R1 JetStream resources is restarting
return
} else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeJetStreamNotAvailable {
// retry if JetStream meta leader is temporarily unavailable
return
}
pushErr(err)
return
@@ -2975,6 +3073,11 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
}
var hbTimer *time.Timer
defer func() {
if hbTimer != nil {
hbTimer.Stop()
}
}()
var hbErr error
sub.mu.Lock()
subClosed := sub.closed || sub.draining
@@ -2983,6 +3086,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
err = errors.Join(ErrBadSubscription, ErrSubscriptionClosed)
}
hbLock := sync.Mutex{}
var disconnected atomic.Bool
if err == nil && len(msgs) < batch && !subClosed {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
@@ -3037,7 +3141,17 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
return nil
}
connStatusChanged := nc.StatusChanged()
go func() {
select {
case <-ctx.Done():
return
case <-connStatusChanged:
disconnected.Store(true)
cancel()
return
}
}()
err = sendReq()
for err == nil && len(msgs) < batch {
// Ask for next message and wait if there are no messages
@@ -3064,9 +3178,6 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
}
}
if hbTimer != nil {
hbTimer.Stop()
}
}
// If there is at least a message added to msgs, then need to return OK and no error
if err != nil && len(msgs) == 0 {
@@ -3075,6 +3186,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
if hbErr != nil {
return nil, hbErr
}
if disconnected.Load() {
return nil, ErrFetchDisconnected
}
return nil, o.checkCtxErr(err)
}
return msgs, nil
@@ -3285,6 +3399,18 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
}
expires := ttl - expiresDiff
connStatusChanged := nc.StatusChanged()
var disconnected atomic.Bool
go func() {
select {
case <-ctx.Done():
return
case <-connStatusChanged:
disconnected.Store(true)
cancel()
return
}
}()
requestBatch := batch - len(result.msgs)
req := nextRequest{
Expires: expires,
@@ -3309,6 +3435,11 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
return result, nil
}
var hbTimer *time.Timer
defer func() {
if hbTimer != nil {
hbTimer.Stop()
}
}()
var hbErr error
if o.hb > 0 {
hbTimer = time.AfterFunc(2*o.hb, func() {
@@ -3355,6 +3486,8 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
result.Lock()
if hbErr != nil {
result.err = hbErr
} else if disconnected.Load() {
result.err = ErrFetchDisconnected
} else {
result.err = o.checkCtxErr(err)
}

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -157,6 +157,17 @@ var (
// Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
// ErrAsyncPublishTimeout is returned when waiting for ack on async publish
ErrAsyncPublishTimeout JetStreamError = &jsError{message: "timeout waiting for ack"}
// ErrTooManyStalledMsgs is returned when too many outstanding async
// messages are waiting for ack.
ErrTooManyStalledMsgs JetStreamError = &jsError{message: "stalled with too many outstanding async published messages"}
// ErrFetchDisconnected is returned when the connection to the server is lost
// while waiting for messages to be delivered on PullSubscribe.
ErrFetchDisconnected = &jsError{message: "disconnected during fetch"}
)
// Error code represents JetStream error codes returned by the API
@@ -166,6 +177,7 @@ const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeInsufficientResourcesErr ErrorCode = 10023
JSErrCodeJetStreamNotAvailable ErrorCode = 10008
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

View File

@@ -243,6 +243,14 @@ type StreamConfig struct {
// Template identifies the template that manages the Stream. Deprecated:
// This feature is no longer supported.
Template string `json:"template_owner,omitempty"`
// AllowMsgTTL allows header initiated per-message TTLs.
// This feature requires nats-server v2.11.0 or later.
AllowMsgTTL bool `json:"allow_msg_ttl"`
// Enables and sets a duration for adding server markers for delete, purge and max age limits.
// This feature requires nats-server v2.11.0 or later.
SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"`
}
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.

View File

@@ -826,6 +826,8 @@ func (kv *kvs) PurgeDeletes(opts ...PurgeOpt) error {
deleteMarkers = append(deleteMarkers, entry)
}
}
// Stop watcher here so as we purge we do not have the system continually updating numPending.
watcher.Stop()
var (
pr StreamPurgeRequest
@@ -929,13 +931,14 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
// Implementation for Watch
type watcher struct {
mu sync.Mutex
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
ctx context.Context
mu sync.Mutex
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
ctx context.Context
initDoneTimer *time.Timer
}
// Context returns the context for the watcher if set.
@@ -1044,8 +1047,11 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
w.initPending = delta
}
if w.received > w.initPending || delta == 0 {
w.initDoneTimer.Stop()
w.initDone = true
w.updates <- nil
} else if w.initDoneTimer != nil {
w.initDoneTimer.Reset(kv.js.opts.wait)
}
}
}
@@ -1088,6 +1094,16 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
if sub.jsi != nil && sub.jsi.pending == 0 {
w.initDone = true
w.updates <- nil
} else {
// Set a timer to send the marker if we do not get any messages.
w.initDoneTimer = time.AfterFunc(kv.js.opts.wait, func() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.initDone {
w.initDone = true
w.updates <- nil
}
})
}
} else {
// if UpdatesOnly was used, mark initialization as complete
@@ -1095,8 +1111,14 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
}
// Set us up to close when the waitForMessages func returns.
sub.pDone = func(_ string) {
w.mu.Lock()
defer w.mu.Unlock()
if w.initDoneTimer != nil {
w.initDoneTimer.Stop()
}
close(w.updates)
}
sub.mu.Unlock()
w.sub = sub

View File

@@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.39.1"
Version = "1.41.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -274,7 +274,6 @@ type InProcessConnProvider interface {
// Options can be used to create a customized connection.
type Options struct {
// Url represents a single NATS server url to which the client
// will be connecting. If the Servers option is also set, it
// then becomes the first server in the Servers array.
@@ -422,6 +421,10 @@ type Options struct {
// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
AsyncErrorCB ErrHandler
// ReconnectErrCB sets the callback that is invoked whenever a
// reconnect attempt failed
ReconnectErrCB ConnErrHandler
// ReconnectBufSize is the size of the backing bufio during reconnect.
// Once this has been exhausted publish operations will return an error.
// Defaults to 8388608 bytes (8MB).
@@ -1151,6 +1154,14 @@ func ReconnectHandler(cb ConnHandler) Option {
}
}
// ReconnectErrHandler is an Option to set the reconnect error handler.
func ReconnectErrHandler(cb ConnErrHandler) Option {
return func(o *Options) error {
o.ReconnectErrCB = cb
return nil
}
}
// ClosedHandler is an Option to set the closed handler.
func ClosedHandler(cb ConnHandler) Option {
return func(o *Options) error {
@@ -2213,6 +2224,7 @@ func (nc *Conn) ForceReconnect() error {
// even if we're in the middle of a backoff
if nc.rqch != nil {
close(nc.rqch)
nc.rqch = nil
}
return nil
}
@@ -2386,7 +2398,6 @@ func (nc *Conn) setup() {
// Process a connected connection and initialize properly.
func (nc *Conn) processConnectInit() error {
// Set our deadline for the whole connect process
nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
defer nc.conn.SetDeadline(time.Time{})
@@ -2535,7 +2546,6 @@ func (nc *Conn) checkForSecure() error {
// processExpectedInfo will look for the expected first INFO message
// sent when a connection is established. The lock should be held entering.
func (nc *Conn) processExpectedInfo() error {
c := &control{}
// Read the protocol
@@ -2640,8 +2650,10 @@ func (nc *Conn) connectProto() (string, error) {
// If our server does not support headers then we can't do them or no responders.
hdrs := nc.info.Headers
cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs}
cinfo := connectInfo{
o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs,
}
b, err := json.Marshal(cinfo)
if err != nil {
@@ -2832,6 +2844,16 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
var rt *time.Timer
// Channel used to kick routine out of sleep when conn is closed.
rqch := nc.rqch
// if rqch is nil, we need to set it up to signal
// the reconnect loop to reconnect immediately
// this means that `ForceReconnect` was called
// before entering doReconnect
if rqch == nil {
rqch = make(chan struct{})
close(rqch)
}
// Counter that is increased when the whole list of servers has been tried.
var wlf int
@@ -2911,10 +2933,13 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
// Try to create a new connection
err = nc.createConn()
// Not yet connected, retry...
// Continue to hold the lock
if err != nil {
// Perform appropriate callback for a failed connection attempt.
if nc.Opts.ReconnectErrCB != nil {
nc.ach.push(func() { nc.Opts.ReconnectErrCB(nc, err) })
}
nc.err = nil
continue
}
@@ -3259,7 +3284,7 @@ func (nc *Conn) processMsg(data []byte) {
// It's possible that we end-up not using the message, but that's ok.
// FIXME(dlc): Need to copy, should/can do COW?
var msgPayload = data
msgPayload := data
if !nc.ps.msgCopied {
msgPayload = make([]byte, len(data))
copy(msgPayload, data)
@@ -3450,8 +3475,10 @@ slowConsumer:
}
}
var permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`)
var permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`)
var (
permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`)
permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`)
)
// processTransientError is called when the server signals a non terminal error
// which does not close the connection or trigger a reconnect.
@@ -3976,7 +4003,7 @@ func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
// go 1.14 some values strconv faster, may be able to switch over.
var b [12]byte
var i = len(b)
i := len(b)
if hdr != nil {
if len(hdr) > 0 {
@@ -4584,6 +4611,8 @@ func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus {
statuses = []SubStatus{SubscriptionActive, SubscriptionDraining, SubscriptionClosed, SubscriptionSlowConsumer}
}
ch := make(chan SubStatus, 10)
s.mu.Lock()
defer s.mu.Unlock()
for _, status := range statuses {
s.registerStatusChangeListener(status, ch)
// initial status
@@ -4597,9 +4626,8 @@ func (s *Subscription) StatusChanged(statuses ...SubStatus) <-chan SubStatus {
// registerStatusChangeListener registers a channel waiting for a specific status change event.
// Status change events are non-blocking - if no receiver is waiting for the status change,
// it will not be sent on the channel. Closed channels are ignored.
// Lock should be held entering.
func (s *Subscription) registerStatusChangeListener(status SubStatus, ch chan SubStatus) {
s.mu.Lock()
defer s.mu.Unlock()
if s.statListeners == nil {
s.statListeners = make(map[chan SubStatus][]SubStatus)
}
@@ -5677,7 +5705,7 @@ func (nc *Conn) IsDraining() bool {
// caller must lock
func (nc *Conn) getServers(implicitOnly bool) []string {
poolSize := len(nc.srvPool)
var servers = make([]string, 0)
servers := make([]string, 0)
for i := 0; i < poolSize; i++ {
if implicitOnly && !nc.srvPool[i].isImplicit {
continue
@@ -5877,6 +5905,8 @@ func (nc *Conn) StatusChanged(statuses ...Status) chan Status {
statuses = []Status{CONNECTED, RECONNECTING, DISCONNECTED, CLOSED}
}
ch := make(chan Status, 10)
nc.mu.Lock()
defer nc.mu.Unlock()
for _, s := range statuses {
nc.registerStatusChangeListener(s, ch)
}
@@ -5886,9 +5916,8 @@ func (nc *Conn) StatusChanged(statuses ...Status) chan Status {
// registerStatusChangeListener registers a channel waiting for a specific status change event.
// Status change events are non-blocking - if no receiver is waiting for the status change,
// it will not be sent on the channel. Closed channels are ignored.
// The lock should be held entering.
func (nc *Conn) registerStatusChangeListener(status Status, ch chan Status) {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.statListeners == nil {
nc.statListeners = make(map[Status][]chan Status)
}

View File

@@ -1127,6 +1127,10 @@ func (obs *obs) Watch(opts ...WatchOpt) (ObjectWatcher, error) {
if err != nil {
return nil, err
}
// Set us up to close when the waitForMessages func returns.
sub.pDone = func(_ string) {
close(w.updates)
}
w.sub = sub
return w, nil
}

4
vendor/modules.txt vendored
View File

@@ -1009,8 +1009,8 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.39.1
## explicit; go 1.22.0
# github.com/nats-io/nats.go v1.41.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
github.com/nats-io/nats.go/internal/parser