Bump github.com/nats-io/nats.go from 1.37.0 to 1.38.0

Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.37.0 to 1.38.0.
- [Release notes](https://github.com/nats-io/nats.go/releases)
- [Commits](https://github.com/nats-io/nats.go/compare/v1.37.0...v1.38.0)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats.go
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-01-24 15:03:40 +00:00
committed by GitHub
parent e428d56443
commit f9a3dc4134
18 changed files with 500 additions and 239 deletions

2
go.mod
View File

@@ -58,7 +58,7 @@ require (
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/nats.go v1.37.0
github.com/nats-io/nats.go v1.38.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5

4
go.sum
View File

@@ -835,8 +835,8 @@ github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4=
github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s=
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -1,36 +0,0 @@
language: go
go:
- "1.22.x"
- "1.21.x"
go_import_path: github.com/nats-io/nats.go
install:
- go get -t ./...
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then
go install github.com/mattn/goveralls@latest;
go install github.com/wadey/gocovmerge@latest;
go install honnef.co/go/tools/cmd/staticcheck@latest;
go install github.com/client9/misspell/cmd/misspell@latest;
fi
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet -modfile=go_test.mod ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then
find . -type f -name "*.go" | xargs misspell -error -locale US;
GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...;
fi
- golangci-lint run ./jetstream/...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.22 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
jobs:
include:
- name: "Go: 1.22.x (nats-server@main)"
go: "1.22.x"
before_script:
- go get -modfile go_test.mod github.com/nats-io/nats-server/v2@main
allow_failures:
- name: "Go: 1.22.x (nats-server@main)"

View File

@@ -7,8 +7,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
[License-Image]: https://img.shields.io/badge/License-Apache2-blue.svg
[ReportCard-Url]: https://goreportcard.com/report/github.com/nats-io/nats.go
[ReportCard-Image]: https://goreportcard.com/badge/github.com/nats-io/nats.go
[Build-Status-Url]: https://travis-ci.com/github/nats-io/nats.go
[Build-Status-Image]: https://travis-ci.com/nats-io/nats.go.svg?branch=main
[Build-Status-Url]: https://github.com/nats-io/nats.go/actions
[Build-Status-Image]: https://github.com/nats-io/nats.go/actions/workflows/ci.yaml/badge.svg?branch=main
[GoDoc-Url]: https://pkg.go.dev/github.com/nats-io/nats.go
[GoDoc-Image]: https://img.shields.io/badge/GoDoc-reference-007d9c
[Coverage-Url]: https://coveralls.io/r/nats-io/nats.go?branch=main
@@ -19,25 +19,14 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
## Installation
```bash
# Go client
go get github.com/nats-io/nats.go/
# To get the latest released Go client:
go get github.com/nats-io/nats.go@latest
# Server
go get github.com/nats-io/nats-server
```
# To get a specific version:
go get github.com/nats-io/nats.go@v1.38.0
When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.37.0
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
# NATS Server v1 is installed otherwise
# go get github.com/nats-io/nats-server
# Note that the latest major version for NATS Server is v2:
go get github.com/nats-io/nats-server/v2@latest
```
## Basic Usage

View File

@@ -88,7 +88,7 @@ func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, dat
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil)
if err != nil {
return nil, err
}

8
vendor/github.com/nats-io/nats.go/dependencies.tpl generated vendored Normal file
View File

@@ -0,0 +1,8 @@
# External Dependencies
This file lists the dependencies used in this repository.
| Dependency | License |
|--------------------------------------------------|-----------------------------------------|
{{ range . }}| {{.Name}} | {{.LicenseName}} |
{{ end }}

View File

@@ -258,7 +258,7 @@ func (c *EncodedConn) subscribe(subject, queue string, cb Handler) (*Subscriptio
cbValue.Call(oV)
}
return c.Conn.subscribe(subject, queue, natsCB, nil, false, nil)
return c.Conn.subscribe(subject, queue, natsCB, nil, nil, false, nil)
}
// FlushTimeout allows a Flush operation to have an associated timeout.

View File

@@ -9,17 +9,17 @@ require (
github.com/klauspost/compress v1.17.9
github.com/nats-io/jwt v1.2.2
github.com/nats-io/nats-server/v2 v2.10.17
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.3.0
golang.org/x/text v0.16.0
golang.org/x/text v0.21.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.7 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/time v0.5.0 // indirect
)

View File

@@ -22,8 +22,8 @@ github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wT
github.com/nats-io/nats-server/v2 v2.10.17 h1:PTVObNBD3TZSNUDgzFb1qQsQX4mOgFmOuG9vhT+KBUY=
github.com/nats-io/nats-server/v2 v2.10.17/go.mod h1:5OUyc4zg42s/p2i92zbbqXvUNsbF0ivdTLKshVMn2YQ=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -34,17 +34,17 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=

View File

@@ -479,6 +479,9 @@ type pubOpts struct {
// stallWait is the max wait of a async pub ack.
stallWait time.Duration
// internal option to re-use existing paf in case of retry.
pafRetry *pubAckFuture
}
// pubAckResponse is the ack response from the JetStream API when publishing a message.
@@ -544,7 +547,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
o.ttl = js.opts.wait
}
if o.stallWait > 0 {
return nil, fmt.Errorf("nats: stall wait cannot be set to sync publish")
return nil, errors.New("nats: stall wait cannot be set to sync publish")
}
if o.id != _EMPTY_ {
@@ -633,13 +636,17 @@ type PubAckFuture interface {
}
type pubAckFuture struct {
js *js
msg *Msg
pa *PubAck
st time.Time
err error
errCh chan error
doneCh chan *PubAck
js *js
msg *Msg
pa *PubAck
st time.Time
err error
errCh chan error
doneCh chan *PubAck
retries int
maxRetries int
retryWait time.Duration
reply string
}
func (paf *pubAckFuture) Ok() <-chan *PubAck {
@@ -848,20 +855,30 @@ func (js *js) handleAsyncReply(m *Msg) {
js.mu.Unlock()
return
}
// Remove
delete(js.pafs, id)
// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
closeStc := func() {
// Check on anyone stalled and waiting.
if js.stc != nil && len(js.pafs) < js.opts.maxpa {
close(js.stc)
js.stc = nil
}
}
// Check on anyone one waiting on done status.
if js.dch != nil && len(js.pafs) == 0 {
dch := js.dch
js.dch = nil
// Defer here so error is processed and can be checked.
defer close(dch)
closeDchFn := func() func() {
var dch chan struct{}
// Check on anyone one waiting on done status.
if js.dch != nil && len(js.pafs) == 0 {
dch = js.dch
js.dch = nil
}
// Return function to close done channel which
// should be deferred so that error is processed and
// can be checked.
return func() {
if dch != nil {
close(dch)
}
}
}
doErr := func(err error) {
@@ -878,10 +895,39 @@ func (js *js) handleAsyncReply(m *Msg) {
// Process no responders etc.
if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
if paf.retries < paf.maxRetries {
paf.retries++
time.AfterFunc(paf.retryWait, func() {
js.mu.Lock()
paf := js.getPAF(id)
js.mu.Unlock()
if paf == nil {
return
}
_, err := js.PublishMsgAsync(paf.msg, pubOptFn(func(po *pubOpts) error {
po.pafRetry = paf
return nil
}))
if err != nil {
js.mu.Lock()
doErr(err)
}
})
js.mu.Unlock()
return
}
delete(js.pafs, id)
closeStc()
defer closeDchFn()()
doErr(ErrNoResponders)
return
}
//remove
delete(js.pafs, id)
closeStc()
defer closeDchFn()()
var pa pubAckResponse
if err := json.Unmarshal(m.Data, &pa); err != nil {
doErr(ErrInvalidJSAck)
@@ -948,6 +994,10 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
}
}
if o.rnum < 0 {
return nil, fmt.Errorf("%w: retry attempts cannot be negative", ErrInvalidArg)
}
// Timeouts and contexts do not make sense for these.
if o.ttl != 0 || o.ctx != nil {
return nil, ErrContextAndTimeout
@@ -975,30 +1025,42 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
}
// Reply
if m.Reply != _EMPTY_ {
paf := o.pafRetry
if paf == nil && m.Reply != _EMPTY_ {
return nil, errors.New("nats: reply subject should be empty")
}
reply := m.Reply
m.Reply = js.newAsyncReply()
defer func() { m.Reply = reply }()
var id string
var reply string
if m.Reply == _EMPTY_ {
return nil, errors.New("nats: error creating async reply handler")
}
// register new paf if not retrying
if paf == nil {
reply = js.newAsyncReply()
id := m.Reply[js.replyPrefixLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
if maxPending > 0 && numPending >= maxPending {
select {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
if reply == _EMPTY_ {
return nil, errors.New("nats: error creating async reply handler")
}
id = reply[js.replyPrefixLen:]
paf = &pubAckFuture{msg: m, st: time.Now(), maxRetries: o.rnum, retryWait: o.rwait, reply: reply}
numPending, maxPending := js.registerPAF(id, paf)
if maxPending > 0 && numPending > maxPending {
select {
case <-js.asyncStall():
case <-time.After(stallWait):
js.clearPAF(id)
return nil, errors.New("nats: stalled with too many outstanding async published messages")
}
}
} else {
reply = paf.reply
id = reply[js.replyPrefixLen:]
}
if err := js.nc.PublishMsg(m); err != nil {
hdr, err := m.headerBytes()
if err != nil {
return nil, err
}
if err := js.nc.publish(m.Subject, reply, hdr, m.Data); err != nil {
js.clearPAF(id)
return nil, err
}
@@ -1081,7 +1143,7 @@ func RetryAttempts(num int) PubOpt {
func StallWait(ttl time.Duration) PubOpt {
return pubOptFn(func(opts *pubOpts) error {
if ttl <= 0 {
return fmt.Errorf("nats: stall wait should be more than 0")
return errors.New("nats: stall wait should be more than 0")
}
opts.stallWait = ttl
return nil
@@ -1439,11 +1501,11 @@ func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode boo
// Prevent an user from attempting to create a queue subscription on
// a JS consumer that was not created with a deliver group.
if queue != _EMPTY_ {
return _EMPTY_, fmt.Errorf("cannot create a queue subscription for a consumer without a deliver group")
return _EMPTY_, errors.New("cannot create a queue subscription for a consumer without a deliver group")
} else if info.PushBound {
// Need to reject a non queue subscription to a non queue consumer
// if the consumer is already bound.
return _EMPTY_, fmt.Errorf("consumer is already bound to a subscription")
return _EMPTY_, errors.New("consumer is already bound to a subscription")
}
} else {
// If the JS consumer has a deliver group, we need to fail a non queue
@@ -1465,7 +1527,7 @@ func processConsInfo(info *ConsumerInfo, userCfg *ConsumerConfig, isPullMode boo
func checkConfig(s, u *ConsumerConfig) error {
makeErr := func(fieldName string, usrVal, srvVal any) error {
return fmt.Errorf("configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
return fmt.Errorf("nats: configuration requests %s to be %v, but consumer's value is %v", fieldName, usrVal, srvVal)
}
if u.Durable != _EMPTY_ && u.Durable != s.Durable {
@@ -1545,7 +1607,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// If no stream name is specified, the subject cannot be empty.
if subj == _EMPTY_ && o.stream == _EMPTY_ {
return nil, fmt.Errorf("nats: subject required")
return nil, errors.New("nats: subject required")
}
// Note that these may change based on the consumer info response we may get.
@@ -1567,7 +1629,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
// would subscribe to and server would send on.
if o.cfg.Heartbeat > 0 || o.cfg.FlowControl {
// Not making this a public ErrXXX in case we allow in the future.
return nil, fmt.Errorf("nats: queue subscription doesn't support idle heartbeat nor flow control")
return nil, errors.New("nats: queue subscription doesn't support idle heartbeat nor flow control")
}
// If this is a queue subscription and no consumer nor durable name was specified,
@@ -1605,31 +1667,31 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if o.ordered {
// Make sure we are not durable.
if isDurable {
return nil, fmt.Errorf("nats: durable can not be set for an ordered consumer")
return nil, errors.New("nats: durable can not be set for an ordered consumer")
}
// Check ack policy.
if o.cfg.AckPolicy != ackPolicyNotSet {
return nil, fmt.Errorf("nats: ack policy can not be set for an ordered consumer")
return nil, errors.New("nats: ack policy can not be set for an ordered consumer")
}
// Check max deliver.
if o.cfg.MaxDeliver != 1 && o.cfg.MaxDeliver != 0 {
return nil, fmt.Errorf("nats: max deliver can not be set for an ordered consumer")
return nil, errors.New("nats: max deliver can not be set for an ordered consumer")
}
// No deliver subject, we pick our own.
if o.cfg.DeliverSubject != _EMPTY_ {
return nil, fmt.Errorf("nats: deliver subject can not be set for an ordered consumer")
return nil, errors.New("nats: deliver subject can not be set for an ordered consumer")
}
// Queue groups not allowed.
if queue != _EMPTY_ {
return nil, fmt.Errorf("nats: queues not be set for an ordered consumer")
return nil, errors.New("nats: queues not be set for an ordered consumer")
}
// Check for bound consumers.
if consumer != _EMPTY_ {
return nil, fmt.Errorf("nats: can not bind existing consumer for an ordered consumer")
return nil, errors.New("nats: can not bind existing consumer for an ordered consumer")
}
// Check for pull mode.
if isPullMode {
return nil, fmt.Errorf("nats: can not use pull mode for an ordered consumer")
return nil, errors.New("nats: can not use pull mode for an ordered consumer")
}
// Setup how we need it to be here.
o.cfg.FlowControl = true
@@ -1777,7 +1839,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
ocb := cb
cb = func(m *Msg) { ocb(m); m.Ack() }
}
sub, err := nc.subscribe(deliver, queue, cb, ch, isSync, jsi)
sub, err := nc.subscribe(deliver, queue, cb, ch, nil, isSync, jsi)
if err != nil {
return nil, err
}
@@ -1848,7 +1910,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
jsi.hbi = info.Config.Heartbeat
// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, nil, isSync, jsi)
if err != nil {
return nil, err
}
@@ -2363,7 +2425,7 @@ func Description(description string) SubOpt {
func Durable(consumer string) SubOpt {
return subOptFn(func(opts *subOpts) error {
if opts.cfg.Durable != _EMPTY_ {
return fmt.Errorf("nats: option Durable set more than once")
return errors.New("nats: option Durable set more than once")
}
if opts.consumer != _EMPTY_ && opts.consumer != consumer {
return fmt.Errorf("nats: duplicate consumer names (%s and %s)", opts.consumer, consumer)
@@ -3052,20 +3114,27 @@ type MessageBatch interface {
}
type messageBatch struct {
sync.Mutex
msgs chan *Msg
err error
done chan struct{}
}
func (mb *messageBatch) Messages() <-chan *Msg {
mb.Lock()
defer mb.Unlock()
return mb.msgs
}
func (mb *messageBatch) Error() error {
mb.Lock()
defer mb.Unlock()
return mb.err
}
func (mb *messageBatch) Done() <-chan struct{} {
mb.Lock()
defer mb.Unlock()
return mb.done
}
@@ -3240,12 +3309,11 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
}
var hbTimer *time.Timer
var hbErr error
hbLock := sync.Mutex{}
if o.hb > 0 {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbLock.Lock()
result.Lock()
hbErr = ErrNoHeartbeat
hbLock.Unlock()
result.Unlock()
cancel()
})
}
@@ -3276,21 +3344,25 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
break
}
if usrMsg {
result.Lock()
result.msgs <- msg
result.Unlock()
requestMsgs++
}
}
if err != nil {
hbLock.Lock()
result.Lock()
if hbErr != nil {
result.err = hbErr
} else {
result.err = o.checkCtxErr(err)
}
hbLock.Unlock()
result.Unlock()
}
close(result.msgs)
result.Lock()
result.done <- struct{}{}
result.Unlock()
}()
return result, nil
}
@@ -3888,7 +3960,7 @@ func (alg StoreCompression) MarshalJSON() ([]byte, error) {
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
return nil, errors.New("unknown compression algorithm")
}
return json.Marshal(str)
}
@@ -3904,7 +3976,7 @@ func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
return errors.New("unknown compression algorithm")
}
return nil
}

View File

@@ -1330,11 +1330,11 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
// Check for headers that give us the required information to
// reconstruct the message.
if len(r.Header) == 0 {
return nil, fmt.Errorf("nats: response should have headers")
return nil, errors.New("nats: response should have headers")
}
stream := r.Header.Get(JSStream)
if stream == _EMPTY_ {
return nil, fmt.Errorf("nats: missing stream header")
return nil, errors.New("nats: missing stream header")
}
// Mirrors can now answer direct gets, so removing check for name equality.
@@ -1342,7 +1342,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
seqStr := r.Header.Get(JSSequence)
if seqStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing sequence header")
return nil, errors.New("nats: missing sequence header")
}
seq, err := strconv.ParseUint(seqStr, 10, 64)
if err != nil {
@@ -1350,7 +1350,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
}
timeStr := r.Header.Get(JSTimeStamp)
if timeStr == _EMPTY_ {
return nil, fmt.Errorf("nats: missing timestamp header")
return nil, errors.New("nats: missing timestamp header")
}
// Temporary code: the server in main branch is sending with format
// "2006-01-02 15:04:05.999999999 +0000 UTC", but will be changed
@@ -1365,7 +1365,7 @@ func convertDirectGetMsgResponseToMsg(name string, r *Msg) (*RawStreamMsg, error
}
subj := r.Header.Get(JSSubject)
if subj == _EMPTY_ {
return nil, fmt.Errorf("nats: missing subject header")
return nil, errors.New("nats: missing subject header")
}
return &RawStreamMsg{
Subject: subj,

View File

@@ -54,6 +54,7 @@ type KeyValue interface {
// Create will add the key/value pair iff it does not exist.
Create(key string, value []byte) (revision uint64, err error)
// Update will update the value iff the latest revision matches.
// Update also resets the TTL associated with the key (if any).
Update(key string, value []byte, last uint64) (revision uint64, err error)
// Delete will place a delete marker and leave all revisions.
Delete(key string, opts ...DeleteOpt) error
@@ -64,6 +65,9 @@ type KeyValue interface {
Watch(keys string, opts ...WatchOpt) (KeyWatcher, error)
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
// WatchFiltered will watch for any updates to keys that match the keys
// argument. It can be configured with the same options as Watch.
WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// Deprecated: Use ListKeys instead to avoid memory issues.
Keys(opts ...WatchOpt) ([]string, error)
@@ -963,11 +967,11 @@ func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
return kv.Watch(AllKeys, opts...)
}
// Watch will fire the callback when a key that matches the keys pattern is updated.
// keys needs to be a valid NATS subject.
func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
if !searchKeyValid(keys) {
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject")
func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error) {
for _, key := range keys {
if !searchKeyValid(key) {
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "key cannot be empty and must be a valid NATS subject")
}
}
var o watchOpts
for _, opt := range opts {
@@ -979,10 +983,20 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
}
// Could be a pattern so don't check for validity as we normally do.
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(keys)
keys = b.String()
for i, key := range keys {
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(key)
keys[i] = b.String()
}
// if no keys are provided, watch all keys
if len(keys) == 0 {
var b strings.Builder
b.WriteString(kv.pre)
b.WriteString(AllKeys)
keys = []string{b.String()}
}
// We will block below on placing items on the chan. That is by design.
w := &watcher{updates: make(chan KeyValueEntry, 256), ctx: o.ctx}
@@ -1055,7 +1069,14 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
// update() callback.
w.mu.Lock()
defer w.mu.Unlock()
sub, err := kv.js.Subscribe(keys, update, subOpts...)
var sub *Subscription
var err error
if len(keys) == 1 {
sub, err = kv.js.Subscribe(keys[0], update, subOpts...)
} else {
subOpts = append(subOpts, ConsumerFilterSubjects(keys...))
sub, err = kv.js.Subscribe("", update, subOpts...)
}
if err != nil {
return nil, err
}
@@ -1082,6 +1103,12 @@ func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
return w, nil
}
// Watch will fire the callback when a key that matches the keys pattern is updated.
// keys needs to be a valid NATS subject.
func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
return kv.WatchFiltered([]string{keys}, opts...)
}
// Bucket returns the current bucket name (JetStream stream).
func (kv *kvs) Bucket() string {
return kv.name

View File

@@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.37.0"
Version = "1.38.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -86,6 +86,9 @@ const (
// MAX_CONNECTIONS_ERR is for when nats server denies the connection due to server max_connections limit
MAX_CONNECTIONS_ERR = "maximum connections exceeded"
// MAX_SUBSCRIPTIONS_ERR is for when nats server denies the connection due to server subscriptions limit
MAX_SUBSCRIPTIONS_ERR = "maximum subscriptions exceeded"
)
// Errors
@@ -106,6 +109,7 @@ var (
ErrAuthorization = errors.New("nats: authorization violation")
ErrAuthExpired = errors.New("nats: authentication expired")
ErrAuthRevoked = errors.New("nats: authentication revoked")
ErrPermissionViolation = errors.New("nats: permissions violation")
ErrAccountAuthExpired = errors.New("nats: account authentication expired")
ErrNoServers = errors.New("nats: no servers available for connection")
ErrJsonParse = errors.New("nats: connect message, json parse error")
@@ -131,6 +135,7 @@ var (
ErrNkeysNotSupported = errors.New("nats: nkeys not supported by the server")
ErrStaleConnection = errors.New("nats: " + STALE_CONNECTION)
ErrTokenAlreadySet = errors.New("nats: token and token handler both set")
ErrUserInfoAlreadySet = errors.New("nats: cannot set user info callback and user/pass")
ErrMsgNotBound = errors.New("nats: message is not bound to subscription/connection")
ErrMsgNoReply = errors.New("nats: message does not have a reply")
ErrClientIPNotSupported = errors.New("nats: client IP not supported by this server")
@@ -140,6 +145,7 @@ var (
ErrNoResponders = errors.New("nats: no responders available for request")
ErrMaxConnectionsExceeded = errors.New("nats: server maximum connections exceeded")
ErrConnectionNotTLS = errors.New("nats: connection is not tls")
ErrMaxSubscriptionsExceeded = errors.New("nats: server maximum subscriptions exceeded")
)
// GetDefaultOptions returns default configuration options for the client.
@@ -230,6 +236,9 @@ type SignatureHandler func([]byte) ([]byte, error)
// AuthTokenHandler is used to generate a new token.
type AuthTokenHandler func() string
// UserInfoCB is used to pass the username and password when establishing connection.
type UserInfoCB func() (string, string)
// ReconnectDelayHandler is used to get from the user the desired
// delay the library should pause before attempting to reconnect
// again. Note that this is invoked after the library tried the
@@ -443,6 +452,9 @@ type Options struct {
// Password sets the password to be used when connecting to a server.
Password string
// UserInfo sets the callback handler that will fetch the username and password.
UserInfo UserInfoCB
// Token sets the token to be used when connecting to a server.
Token string
@@ -499,6 +511,11 @@ type Options struct {
// SkipHostLookup skips the DNS lookup for the server hostname.
SkipHostLookup bool
// PermissionErrOnSubscribe - if set to true, the client will return ErrPermissionViolation
// from SubscribeSync if the server returns a permissions error for a subscription.
// Defaults to false.
PermissionErrOnSubscribe bool
}
const (
@@ -607,17 +624,19 @@ type Subscription struct {
// For holding information about a JetStream consumer.
jsi *jsSub
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus
delivered uint64
max uint64
conn *Conn
mcb MsgHandler
mch chan *Msg
errCh chan (error)
closed bool
sc bool
connClosed bool
draining bool
status SubStatus
statListeners map[chan SubStatus][]SubStatus
permissionsErr error
// Type of Subscription
typ SubscriptionType
@@ -1166,6 +1185,13 @@ func UserInfo(user, password string) Option {
}
}
func UserInfoHandler(cb UserInfoCB) Option {
return func(o *Options) error {
o.UserInfo = cb
return nil
}
}
// Token is an Option to set the token to use
// when a token is not included directly in the URLs
// and when a token handler is not provided.
@@ -1359,7 +1385,7 @@ func ProxyPath(path string) Option {
func CustomInboxPrefix(p string) Option {
return func(o *Options) error {
if p == "" || strings.Contains(p, ">") || strings.Contains(p, "*") || strings.HasSuffix(p, ".") {
return fmt.Errorf("nats: invalid custom prefix")
return errors.New("nats: invalid custom prefix")
}
o.InboxPrefix = p
return nil
@@ -1383,6 +1409,13 @@ func SkipHostLookup() Option {
}
}
func PermissionErrOnSubscribe(enabled bool) Option {
return func(o *Options) error {
o.PermissionErrOnSubscribe = enabled
return nil
}
}
// TLSHandshakeFirst is an Option to perform the TLS handshake first, that is
// before receiving the INFO protocol. This requires the server to also be
// configured with such option, otherwise the connection will fail.
@@ -1814,7 +1847,7 @@ func (nc *Conn) addURLToPool(sURL string, implicit, saveTLSName bool) error {
if len(nc.srvPool) == 0 {
nc.ws = isWS
} else if isWS && !nc.ws || !isWS && nc.ws {
return fmt.Errorf("mixing of websocket and non websocket URLs is not allowed")
return errors.New("mixing of websocket and non websocket URLs is not allowed")
}
var tlsName string
@@ -2563,6 +2596,13 @@ func (nc *Conn) connectProto() (string, error) {
pass = o.Password
token = o.Token
nkey = o.Nkey
if nc.Opts.UserInfo != nil {
if user != _EMPTY_ || pass != _EMPTY_ {
return _EMPTY_, ErrUserInfoAlreadySet
}
user, pass = nc.Opts.UserInfo()
}
}
// Look for user jwt.
@@ -2952,11 +2992,11 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) {
// processOpErr handles errors from reading or parsing the protocol.
// The lock should not be held entering this function.
func (nc *Conn) processOpErr(err error) {
func (nc *Conn) processOpErr(err error) bool {
nc.mu.Lock()
defer nc.mu.Unlock()
if nc.isConnecting() || nc.isClosed() || nc.isReconnecting() {
nc.mu.Unlock()
return
return false
}
if nc.Opts.AllowReconnect && nc.status == CONNECTED {
@@ -2976,14 +3016,12 @@ func (nc *Conn) processOpErr(err error) {
nc.clearPendingFlushCalls()
go nc.doReconnect(err, false)
nc.mu.Unlock()
return
return false
}
nc.changeConnStatus(DISCONNECTED)
nc.err = err
nc.mu.Unlock()
nc.close(CLOSED, true, nil)
return true
}
// dispatch is responsible for calling any async callbacks
@@ -3080,7 +3118,9 @@ func (nc *Conn) readLoop() {
err = nc.parse(buf)
}
if err != nil {
nc.processOpErr(err)
if shouldClose := nc.processOpErr(err); shouldClose {
nc.close(CLOSED, true, nil)
}
break
}
}
@@ -3410,15 +3450,41 @@ slowConsumer:
}
}
// processPermissionsViolation is called when the server signals a subject
// permissions violation on either publish or subscribe.
func (nc *Conn) processPermissionsViolation(err string) {
var permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`)
var permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`)
// processTransientError is called when the server signals a non terminal error
// which does not close the connection or trigger a reconnect.
// This will trigger the async error callback if set.
// These errors include the following:
// - permissions violation on publish or subscribe
// - maximum subscriptions exceeded
func (nc *Conn) processTransientError(err error) {
nc.mu.Lock()
// create error here so we can pass it as a closure to the async cb dispatcher.
e := errors.New("nats: " + err)
nc.err = e
nc.err = err
if errors.Is(err, ErrPermissionViolation) {
matches := permissionsRe.FindStringSubmatch(err.Error())
if len(matches) >= 2 {
queueMatches := permissionsQueueRe.FindStringSubmatch(err.Error())
var q string
if len(queueMatches) >= 2 {
q = queueMatches[1]
}
subject := matches[1]
for _, sub := range nc.subs {
if sub.Subject == subject && sub.Queue == q && sub.permissionsErr == nil {
sub.mu.Lock()
if sub.errCh != nil {
sub.errCh <- err
}
sub.permissionsErr = err
sub.mu.Unlock()
}
}
}
}
if nc.Opts.AsyncErrorCB != nil {
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, e) })
nc.ach.push(func() { nc.Opts.AsyncErrorCB(nc, nil, err) })
}
nc.mu.Unlock()
}
@@ -3650,15 +3716,17 @@ func (nc *Conn) processErr(ie string) {
// convert to lower case.
e := strings.ToLower(ne)
close := false
var close bool
// FIXME(dlc) - process Slow Consumer signals special.
if e == STALE_CONNECTION {
nc.processOpErr(ErrStaleConnection)
close = nc.processOpErr(ErrStaleConnection)
} else if e == MAX_CONNECTIONS_ERR {
nc.processOpErr(ErrMaxConnectionsExceeded)
close = nc.processOpErr(ErrMaxConnectionsExceeded)
} else if strings.HasPrefix(e, PERMISSIONS_ERR) {
nc.processPermissionsViolation(ne)
nc.processTransientError(fmt.Errorf("%w: %s", ErrPermissionViolation, ne))
} else if strings.HasPrefix(e, MAX_SUBSCRIPTIONS_ERR) {
nc.processTransientError(ErrMaxSubscriptionsExceeded)
} else if authErr := checkAuthError(e); authErr != nil {
nc.mu.Lock()
close = nc.processAuthError(authErr)
@@ -3999,10 +4067,6 @@ func (nc *Conn) respHandler(m *Msg) {
// Helper to setup and send new request style requests. Return the chan to receive the response.
func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Msg, string, error) {
nc.mu.Lock()
// Do setup for the new style if needed.
if nc.respMap == nil {
nc.initNewResp()
}
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := nc.newRespInbox()
@@ -4013,7 +4077,7 @@ func (nc *Conn) createNewRequestAndSend(subj string, hdr, data []byte) (chan *Ms
// Create the response subscription we will use for all new style responses.
// This will be on an _INBOX with an additional terminal token. The subscription
// will be on a wildcard.
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, false, nil)
s, err := nc.subscribeLocked(nc.respSub, _EMPTY_, nc.respHandler, nil, nil, false, nil)
if err != nil {
nc.mu.Unlock()
return nil, token, err
@@ -4111,7 +4175,7 @@ func (nc *Conn) oldRequest(subj string, hdr, data []byte, timeout time.Duration)
inbox := nc.NewInbox()
ch := make(chan *Msg, RequestChanLen)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, nil, true, nil)
if err != nil {
return nil, err
}
@@ -4217,14 +4281,14 @@ func (nc *Conn) respToken(respInbox string) string {
// since it can't match more than one token.
// Messages will be delivered to the associated MsgHandler.
func (nc *Conn) Subscribe(subj string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, cb, nil, false, nil)
return nc.subscribe(subj, _EMPTY_, cb, nil, nil, false, nil)
}
// ChanSubscribe will express interest in the given subject and place
// all messages received on the channel.
// You should not close the channel until sub.Unsubscribe() has been called.
func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, _EMPTY_, nil, ch, false, nil)
return nc.subscribe(subj, _EMPTY_, nil, ch, nil, false, nil)
}
// ChanQueueSubscribe will express interest in the given subject.
@@ -4234,7 +4298,7 @@ func (nc *Conn) ChanSubscribe(subj string, ch chan *Msg) (*Subscription, error)
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than QueueSubscribeSyncWithChan.
func (nc *Conn) ChanQueueSubscribe(subj, group string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, group, nil, ch, false, nil)
return nc.subscribe(subj, group, nil, ch, nil, false, nil)
}
// SubscribeSync will express interest on the given subject. Messages will
@@ -4244,7 +4308,11 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
return nil, ErrInvalidConnection
}
mch := make(chan *Msg, nc.Opts.SubChanLen)
return nc.subscribe(subj, _EMPTY_, nil, mch, true, nil)
var errCh chan error
if nc.Opts.PermissionErrOnSubscribe {
errCh = make(chan error, 100)
}
return nc.subscribe(subj, _EMPTY_, nil, mch, errCh, true, nil)
}
// QueueSubscribe creates an asynchronous queue subscriber on the given subject.
@@ -4252,7 +4320,7 @@ func (nc *Conn) SubscribeSync(subj string) (*Subscription, error) {
// only one member of the group will be selected to receive any given
// message asynchronously.
func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription, error) {
return nc.subscribe(subj, queue, cb, nil, false, nil)
return nc.subscribe(subj, queue, cb, nil, nil, false, nil)
}
// QueueSubscribeSync creates a synchronous queue subscriber on the given
@@ -4261,7 +4329,11 @@ func (nc *Conn) QueueSubscribe(subj, queue string, cb MsgHandler) (*Subscription
// given message synchronously using Subscription.NextMsg().
func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
mch := make(chan *Msg, nc.Opts.SubChanLen)
return nc.subscribe(subj, queue, nil, mch, true, nil)
var errCh chan error
if nc.Opts.PermissionErrOnSubscribe {
errCh = make(chan error, 100)
}
return nc.subscribe(subj, queue, nil, mch, errCh, true, nil)
}
// QueueSubscribeSyncWithChan will express interest in the given subject.
@@ -4271,7 +4343,7 @@ func (nc *Conn) QueueSubscribeSync(subj, queue string) (*Subscription, error) {
// You should not close the channel until sub.Unsubscribe() has been called.
// Note: This is the same than ChanQueueSubscribe.
func (nc *Conn) QueueSubscribeSyncWithChan(subj, queue string, ch chan *Msg) (*Subscription, error) {
return nc.subscribe(subj, queue, nil, ch, false, nil)
return nc.subscribe(subj, queue, nil, ch, nil, false, nil)
}
// badSubject will do quick test on whether a subject is acceptable.
@@ -4295,16 +4367,16 @@ func badQueue(qname string) bool {
}
// subscribe is the internal subscribe function that indicates interest in a subject.
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
func (nc *Conn) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
nc.mu.Lock()
defer nc.mu.Unlock()
return nc.subscribeLocked(subj, queue, cb, ch, isSync, js)
return nc.subscribeLocked(subj, queue, cb, ch, errCh, isSync, js)
}
func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, isSync bool, js *jsSub) (*Subscription, error) {
func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg, errCh chan (error), isSync bool, js *jsSub) (*Subscription, error) {
if nc == nil {
return nil, ErrInvalidConnection
}
@@ -4355,6 +4427,7 @@ func (nc *Conn) subscribeLocked(subj, queue string, cb MsgHandler, ch chan *Msg,
} else { // Sync Subscription
sub.typ = SyncSubscription
sub.mch = ch
sub.errCh = errCh
}
nc.subsMu.Lock()
@@ -4799,16 +4872,92 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
t := globalTimerPool.Get(timeout)
defer globalTimerPool.Put(t)
if s.errCh != nil {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case err := <-s.errCh:
return nil, err
case <-t.C:
return nil, ErrTimeout
}
} else {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-t.C:
return nil, ErrTimeout
}
}
return msg, nil
}
// nextMsgNoTimeout works similarly to Subscription.NextMsg() but will not
// time out. It is only used internally for non-timeout subscription iterator.
func (s *Subscription) nextMsgNoTimeout() (*Msg, error) {
if s == nil {
return nil, ErrBadSubscription
}
s.mu.Lock()
err := s.validateNextMsgState(false)
if err != nil {
s.mu.Unlock()
return nil, err
}
// snapshot
mch := s.mch
s.mu.Unlock()
var ok bool
var msg *Msg
// If something is available right away, let's optimize that case.
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
return msg, nil
}
default:
}
if s.errCh != nil {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case err := <-s.errCh:
return nil, err
}
} else {
msg, ok = <-mch
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
case <-t.C:
return nil, ErrTimeout
}
return msg, nil
@@ -4831,6 +4980,12 @@ func (s *Subscription) validateNextMsgState(pullSubInternal bool) error {
if s.mcb != nil {
return ErrSyncSubRequired
}
// if this subscription previously had a permissions error
// and no reconnect has been attempted, return the permissions error
// since the subscription does not exist on the server
if s.conn.Opts.PermissionErrOnSubscribe && s.permissionsErr != nil {
return s.permissionsErr
}
if s.sc {
s.changeSubStatus(SubscriptionActive)
s.sc = false
@@ -5107,7 +5262,9 @@ func (nc *Conn) processPingTimer() {
nc.pout++
if nc.pout > nc.Opts.MaxPingsOut {
nc.mu.Unlock()
nc.processOpErr(ErrStaleConnection)
if shouldClose := nc.processOpErr(ErrStaleConnection); shouldClose {
nc.close(CLOSED, true, nil)
}
return
}
@@ -5204,6 +5361,9 @@ func (nc *Conn) resendSubscriptions() {
for _, s := range subs {
adjustedMax := uint64(0)
s.mu.Lock()
// when resending subscriptions, the permissions error should be cleared
// since the user may have fixed the permissions issue
s.permissionsErr = nil
if s.max > 0 {
if s.delivered < s.max {
adjustedMax = s.max - s.delivered
@@ -5242,9 +5402,6 @@ func (nc *Conn) clearPendingFlushCalls() {
// This will clear any pending Request calls.
// Lock is assumed to be held by the caller.
func (nc *Conn) clearPendingRequestCalls() {
if nc.respMap == nil {
return
}
for key, ch := range nc.respMap {
if ch != nil {
close(ch)
@@ -5792,7 +5949,7 @@ func NkeyOptionFromSeed(seedFile string) (Option, error) {
return nil, err
}
if !nkeys.IsValidPublicUserKey(pub) {
return nil, fmt.Errorf("nats: Not a valid nkey user seed")
return nil, errors.New("nats: Not a valid nkey user seed")
}
sigCB := func(nonce []byte) ([]byte, error) {
return sigHandler(nonce, seedFile)

73
vendor/github.com/nats-io/nats.go/nats_iter.go generated vendored Normal file
View File

@@ -0,0 +1,73 @@
// Copyright 2012-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build go1.23
// +build go1.23
package nats
import (
"errors"
"iter"
"time"
)
// Msgs returns an iter.Seq2[*Msg, error] that can be used to iterate over
// messages. It can only be used with a subscription that has been created with
// SubscribeSync or QueueSubscribeSync, otherwise it will return an error on the
// first iteration.
//
// The iterator will block until a message is available. The
// subscription will not be closed when the iterator is done.
func (sub *Subscription) Msgs() iter.Seq2[*Msg, error] {
return func(yield func(*Msg, error) bool) {
for {
msg, err := sub.nextMsgNoTimeout()
if err != nil {
yield(nil, err)
return
}
if !yield(msg, nil) {
return
}
}
}
}
// MsgsTimeout returns an iter.Seq2[*Msg, error] that can be used to iterate
// over messages. It can only be used with a subscription that has been created
// with SubscribeSync or QueueSubscribeSync, otherwise it will return an error
// on the first iteration.
//
// The iterator will block until a message is available or the timeout is
// reached. If the timeout is reached, the iterator will return nats.ErrTimeout
// but it will not be closed.
func (sub *Subscription) MsgsTimeout(timeout time.Duration) iter.Seq2[*Msg, error] {
return func(yield func(*Msg, error) bool) {
for {
msg, err := sub.NextMsg(timeout)
if err != nil {
if !yield(nil, err) {
return
}
if !errors.Is(err, ErrTimeout) {
return
}
}
if !yield(msg, nil) {
return
}
}
}
}

View File

@@ -113,5 +113,5 @@ func (c *EncodedConn) bindRecvChan(subject, queue string, channel any) (*Subscri
chVal.Send(oPtr)
}
return c.Conn.subscribe(subject, queue, cb, nil, false, nil)
return c.Conn.subscribe(subject, queue, cb, nil, nil, false, nil)
}

View File

@@ -1,29 +0,0 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !go1.20
// +build !go1.20
// A Go client for the NATS messaging system (https://nats.io).
package nats
import (
"math/rand"
"time"
)
func init() {
// This is not needed since Go 1.20 because now rand.Seed always happens
// by default (uses runtime.fastrand64 instead as source).
rand.Seed(time.Now().UnixNano())
}

View File

@@ -237,8 +237,8 @@ func (r *websocketReader) Read(p []byte) (int, error) {
case wsPingMessage, wsPongMessage, wsCloseMessage:
if rem > wsMaxControlPayloadSize {
return 0, fmt.Errorf(
fmt.Sprintf("control frame length bigger than maximum allowed of %v bytes",
wsMaxControlPayloadSize))
"control frame length bigger than maximum allowed of %v bytes",
wsMaxControlPayloadSize)
}
if compressed {
return 0, errors.New("control frame should not be compressed")
@@ -622,7 +622,7 @@ func (nc *Conn) wsInitHandshake(u *url.URL) error {
!strings.EqualFold(resp.Header.Get("Connection"), "upgrade") ||
resp.Header.Get("Sec-Websocket-Accept") != wsAcceptKey(wsKey)) {
err = fmt.Errorf("invalid websocket connection")
err = errors.New("invalid websocket connection")
}
// Check compression extension...
if err == nil && compress {
@@ -634,7 +634,7 @@ func (nc *Conn) wsInitHandshake(u *url.URL) error {
if !srvCompress {
compress = false
} else if !noCtxTakeover {
err = fmt.Errorf("compression negotiation error")
err = errors.New("compression negotiation error")
}
}
if resp != nil {

2
vendor/modules.txt vendored
View File

@@ -1022,7 +1022,7 @@ github.com/nats-io/nats-server/v2/server/certstore
github.com/nats-io/nats-server/v2/server/pse
github.com/nats-io/nats-server/v2/server/stree
github.com/nats-io/nats-server/v2/server/sysmem
# github.com/nats-io/nats.go v1.37.0
# github.com/nats-io/nats.go v1.38.0
## explicit; go 1.20
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin