build(deps): bump github.com/nats-io/nats.go from 1.39.1 to 1.40.0

Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.39.1 to 1.40.0.
- [Release notes](https://github.com/nats-io/nats.go/releases)
- [Commits](https://github.com/nats-io/nats.go/compare/v1.39.1...v1.40.0)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats.go
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-03-25 05:58:50 +00:00
committed by GitHub
parent 8bc17593cb
commit fae29f107c
11 changed files with 200 additions and 65 deletions

2
go.mod
View File

@@ -56,7 +56,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.11.0
github.com/nats-io/nats.go v1.39.1
github.com/nats-io/nats.go v1.40.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5

4
go.sum
View File

@@ -829,8 +829,8 @@ github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE=
github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI=
github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk=
github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM=
github.com/nats-io/nats.go v1.40.0 h1:qC3rnVZy15vJ15GSbB+pQtOmqo9q+65wnGVpvmcVv0Q=
github.com/nats-io/nats.go v1.40.0/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
go get github.com/nats-io/nats.go@latest
# To get a specific version:
go get github.com/nats-io/nats.go@v1.39.1
go get github.com/nats-io/nats.go@v1.40.0
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest

View File

@@ -1,23 +1,24 @@
module github.com/nats-io/nats.go
go 1.22.0
go 1.23.0
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.11
github.com/klauspost/compress v1.18.0
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nats-server/v2 v2.11.0
github.com/nats-io/nkeys v0.4.10
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.21.0
golang.org/x/text v0.23.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/google/go-tpm v0.9.3 // indirect
github.com/minio/highwayhash v1.0.3 // indirect
github.com/nats-io/jwt/v2 v2.7.3 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/time v0.11.0 // indirect
)

View File

@@ -1,3 +1,5 @@
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0=
github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
@@ -9,21 +11,24 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q=
github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ=
github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU=
github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4=
github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s=
github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE=
github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -34,20 +39,19 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34=
golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2024 The NATS Authors
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -273,6 +273,8 @@ type jsOpts struct {
aecb MsgErrHandler
// Max async pub ack in flight
maxpa int
// ackTimeout is the max time to wait for an ack in async publish.
ackTimeout time.Duration
// the domain that produced the pre
domain string
// enables protocol tracing
@@ -466,13 +468,14 @@ func (opt pubOptFn) configurePublish(opts *pubOpts) error {
}
type pubOpts struct {
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject
ctx context.Context
ttl time.Duration
id string
lid string // Expected last msgId
str string // Expected stream name
seq *uint64 // Expected last sequence
lss *uint64 // Expected last sequence per subject
msgTTL time.Duration // Message TTL
// Publish retries for NoResponders err.
rwait time.Duration // Retry wait between attempts
@@ -507,6 +510,7 @@ const (
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
MsgTTLHdr = "Nats-TTL"
)
// Headers for republished messages and direct gets.
@@ -566,6 +570,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
if o.msgTTL > 0 {
m.Header.Set(MsgTTLHdr, o.msgTTL.String())
}
var resp *Msg
var err error
@@ -648,6 +655,7 @@ type pubAckFuture struct {
maxRetries int
retryWait time.Duration
reply string
timeout *time.Timer
}
func (paf *pubAckFuture) Ok() <-chan *PubAck {
@@ -712,13 +720,19 @@ func (js *js) newAsyncReply() string {
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.rr.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
for {
rn := js.rr.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
l /= base
}
if _, ok := js.pafs[string(b[:])]; ok {
continue
}
sb.Write(b[:])
break
}
sb.Write(b[:])
js.mu.Unlock()
return sb.String()
}
@@ -894,6 +908,10 @@ func (js *js) handleAsyncReply(m *Msg) {
}
}
if paf.timeout != nil {
paf.timeout.Stop()
}
// Process no responders etc.
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
if paf.retries < paf.maxRetries {
@@ -975,6 +993,15 @@ func PublishAsyncMaxPending(max int) JSOpt {
})
}
// PublishAsyncTimeout sets the timeout for async message publish.
// If not provided, timeout is disabled.
func PublishAsyncTimeout(dur time.Duration) JSOpt {
return jsOptFn(func(opts *jsOpts) error {
opts.ackTimeout = dur
return nil
})
}
// PublishAsync publishes a message to JetStream and returns a PubAckFuture
func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) {
return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...)
@@ -1024,6 +1051,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
if o.lss != nil {
m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10))
}
if o.msgTTL > 0 {
m.Header.Set(MsgTTLHdr, o.msgTTL.String())
}
// Reply
paf := o.pafRetry
@@ -1050,11 +1080,52 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
return nil, ErrTooManyStalledMsgs
}
}
if js.opts.ackTimeout > 0 {
paf.timeout = time.AfterFunc(js.opts.ackTimeout, func() {
js.mu.Lock()
defer js.mu.Unlock()
if _, ok := js.pafs[id]; !ok {
// paf has already been resolved
// while waiting for the lock
return
}
// ack timed out, remove from pending acks
delete(js.pafs, id)
// check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
// send error to user
paf.err = ErrAsyncPublishTimeout
if paf.errCh != nil {
paf.errCh <- paf.err
}
// call error callback if set
if js.opts.aecb != nil {
js.opts.aecb(js, paf.msg, ErrAsyncPublishTimeout)
}
// check on anyone one waiting on done status.
if js.dch != nil && len(js.pafs) == 0 {
close(js.dch)
js.dch = nil
}
})
}
} else {
reply = paf.reply
if paf.timeout != nil {
paf.timeout.Reset(js.opts.ackTimeout)
}
id = reply[js.replyPrefixLen:]
}
hdr, err := m.headerBytes()
@@ -1151,6 +1222,15 @@ func StallWait(ttl time.Duration) PubOpt {
})
}
// MsgTTL sets per msg TTL.
// Requires [StreamConfig.AllowMsgTTL] to be enabled.
func MsgTTL(dur time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
opts.msgTTL = dur
return nil
})
}
type ackOpts struct {
ttl time.Duration
ctx context.Context
@@ -2206,6 +2286,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
// retry for insufficient resources, as it may mean that client is connected to a running
// server in cluster while the server hosting R1 JetStream resources is restarting
return
} else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeJetStreamNotAvailable {
// retry if JetStream meta leader is temporarily unavailable
return
}
pushErr(err)
return

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2023 The NATS Authors
// Copyright 2020-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -157,6 +157,13 @@ var (
// Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
// ErrAsyncPublishTimeout is returned when waiting for ack on async publish
ErrAsyncPublishTimeout JetStreamError = &jsError{message: "timeout waiting for ack"}
// ErrTooManyStalledMsgs is returned when too many outstanding async
// messages are waiting for ack.
ErrTooManyStalledMsgs JetStreamError = &jsError{message: "stalled with too many outstanding async published messages"}
)
// Error code represents JetStream error codes returned by the API
@@ -166,6 +173,7 @@ const (
JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039
JSErrCodeJetStreamNotEnabled ErrorCode = 10076
JSErrCodeInsufficientResourcesErr ErrorCode = 10023
JSErrCodeJetStreamNotAvailable ErrorCode = 10008
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058

View File

@@ -243,6 +243,14 @@ type StreamConfig struct {
// Template identifies the template that manages the Stream. Deprecated:
// This feature is no longer supported.
Template string `json:"template_owner,omitempty"`
// AllowMsgTTL allows header initiated per-message TTLs.
// This feature requires nats-server v2.11.0 or later.
AllowMsgTTL bool `json:"allow_msg_ttl"`
// Enables and sets a duration for adding server markers for delete, purge and max age limits.
// This feature requires nats-server v2.11.0 or later.
SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"`
}
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.

View File

@@ -929,13 +929,14 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
// Implementation for Watch
type watcher struct {
mu sync.Mutex
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
ctx context.Context
mu sync.Mutex
updates chan KeyValueEntry
sub *Subscription
initDone bool
initPending uint64
received uint64
ctx context.Context
initDoneTimer *time.Timer
}
// Context returns the context for the watcher if set.
@@ -1044,8 +1045,11 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
w.initPending = delta
}
if w.received > w.initPending || delta == 0 {
w.initDoneTimer.Stop()
w.initDone = true
w.updates <- nil
} else if w.initDoneTimer != nil {
w.initDoneTimer.Reset(kv.js.opts.wait)
}
}
}
@@ -1088,6 +1092,16 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
if sub.jsi != nil && sub.jsi.pending == 0 {
w.initDone = true
w.updates <- nil
} else {
// Set a timer to send the marker if we do not get any messages.
w.initDoneTimer = time.AfterFunc(kv.js.opts.wait, func() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.initDone {
w.initDone = true
w.updates <- nil
}
})
}
} else {
// if UpdatesOnly was used, mark initialization as complete
@@ -1097,6 +1111,7 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error
sub.pDone = func(_ string) {
close(w.updates)
}
sub.mu.Unlock()
w.sub = sub

View File

@@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.39.1"
Version = "1.40.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -274,7 +274,6 @@ type InProcessConnProvider interface {
// Options can be used to create a customized connection.
type Options struct {
// Url represents a single NATS server url to which the client
// will be connecting. If the Servers option is also set, it
// then becomes the first server in the Servers array.
@@ -422,6 +421,10 @@ type Options struct {
// AsyncErrorCB sets the async error handler (e.g. slow consumer errors)
AsyncErrorCB ErrHandler
// ReconnectErrCB sets the callback that is invoked whenever a
// reconnect attempt failed
ReconnectErrCB ConnErrHandler
// ReconnectBufSize is the size of the backing bufio during reconnect.
// Once this has been exhausted publish operations will return an error.
// Defaults to 8388608 bytes (8MB).
@@ -1151,6 +1154,14 @@ func ReconnectHandler(cb ConnHandler) Option {
}
}
// ReconnectErrHandler is an Option to set the reconnect error handler.
func ReconnectErrHandler(cb ConnErrHandler) Option {
return func(o *Options) error {
o.ReconnectErrCB = cb
return nil
}
}
// ClosedHandler is an Option to set the closed handler.
func ClosedHandler(cb ConnHandler) Option {
return func(o *Options) error {
@@ -2386,7 +2397,6 @@ func (nc *Conn) setup() {
// Process a connected connection and initialize properly.
func (nc *Conn) processConnectInit() error {
// Set our deadline for the whole connect process
nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout))
defer nc.conn.SetDeadline(time.Time{})
@@ -2535,7 +2545,6 @@ func (nc *Conn) checkForSecure() error {
// processExpectedInfo will look for the expected first INFO message
// sent when a connection is established. The lock should be held entering.
func (nc *Conn) processExpectedInfo() error {
c := &control{}
// Read the protocol
@@ -2640,8 +2649,10 @@ func (nc *Conn) connectProto() (string, error) {
// If our server does not support headers then we can't do them or no responders.
hdrs := nc.info.Headers
cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs}
cinfo := connectInfo{
o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token,
o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs,
}
b, err := json.Marshal(cinfo)
if err != nil {
@@ -2911,10 +2922,13 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
// Try to create a new connection
err = nc.createConn()
// Not yet connected, retry...
// Continue to hold the lock
if err != nil {
// Perform appropriate callback for a failed connection attempt.
if nc.Opts.ReconnectErrCB != nil {
nc.ach.push(func() { nc.Opts.ReconnectErrCB(nc, err) })
}
nc.err = nil
continue
}
@@ -3259,7 +3273,7 @@ func (nc *Conn) processMsg(data []byte) {
// It's possible that we end-up not using the message, but that's ok.
// FIXME(dlc): Need to copy, should/can do COW?
var msgPayload = data
msgPayload := data
if !nc.ps.msgCopied {
msgPayload = make([]byte, len(data))
copy(msgPayload, data)
@@ -3450,8 +3464,10 @@ slowConsumer:
}
}
var permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`)
var permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`)
var (
permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`)
permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`)
)
// processTransientError is called when the server signals a non terminal error
// which does not close the connection or trigger a reconnect.
@@ -3976,7 +3992,7 @@ func (nc *Conn) publish(subj, reply string, hdr, data []byte) error {
// go 1.14 some values strconv faster, may be able to switch over.
var b [12]byte
var i = len(b)
i := len(b)
if hdr != nil {
if len(hdr) > 0 {
@@ -5677,7 +5693,7 @@ func (nc *Conn) IsDraining() bool {
// caller must lock
func (nc *Conn) getServers(implicitOnly bool) []string {
poolSize := len(nc.srvPool)
var servers = make([]string, 0)
servers := make([]string, 0)
for i := 0; i < poolSize; i++ {
if implicitOnly && !nc.srvPool[i].isImplicit {
continue

4
vendor/modules.txt vendored
View File

@@ -1009,8 +1009,8 @@ github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
github.com/nats-io/nats-server/v2/server/thw
github.com/nats-io/nats-server/v2/server/tpm
# github.com/nats-io/nats.go v1.39.1
## explicit; go 1.22.0
# github.com/nats-io/nats.go v1.40.0
## explicit; go 1.23.0
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
github.com/nats-io/nats.go/internal/parser