build(deps): bump github.com/nats-io/nats.go from 1.31.0 to 1.32.0

Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.31.0 to 1.32.0.
- [Release notes](https://github.com/nats-io/nats.go/releases)
- [Commits](https://github.com/nats-io/nats.go/compare/v1.31.0...v1.32.0)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats.go
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-01-16 12:00:10 +00:00
committed by Ralf Haferkamp
parent 81e000b987
commit fce4d19e3f
20 changed files with 206 additions and 98 deletions

2
go.mod
View File

@@ -61,7 +61,7 @@ require (
github.com/mna/pigeon v1.2.1
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.9
github.com/nats-io/nats.go v1.31.0
github.com/nats-io/nats.go v1.32.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5

4
go.sum
View File

@@ -1744,8 +1744,8 @@ github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI=
github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ=
github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E=
github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8=
github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0=
github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

View File

@@ -5,6 +5,9 @@ issues:
- linters:
- errcheck
text: "Unsubscribe"
- linters:
- errcheck
text: "Drain"
- linters:
- errcheck
text: "msg.Ack"

View File

@@ -14,6 +14,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io
[Coverage-Url]: https://coveralls.io/r/nats-io/nats.go?branch=main
[Coverage-image]: https://coveralls.io/repos/github/nats-io/nats.go/badge.svg?branch=main
**Check out [NATS by example](https://natsbyexample.com) - An evolving collection of runnable, cross-client reference examples for NATS.**
## Installation
```bash
@@ -29,7 +31,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.31.0
go get github.com/nats-io/nats.go/@v1.32.0
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2

View File

@@ -1,4 +1,4 @@
// Copyright 2016-2022 The NATS Authors
// Copyright 2016-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2019 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2013-2018 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2012-2018 The NATS Authors
// Copyright 2012-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -4,19 +4,19 @@ go 1.19
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.0
github.com/nats-io/nats-server/v2 v2.10.0
github.com/nats-io/nkeys v0.4.5
github.com/klauspost/compress v1.17.4
github.com/nats-io/nats-server/v2 v2.10.7
github.com/nats-io/nkeys v0.4.6
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.13.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
)

View File

@@ -10,32 +10,31 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=

View File

@@ -227,14 +227,16 @@ type js struct {
opts *jsOpts
// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
connStatusCh chan (Status)
replyPrefix string
replyPrefixLen int
}
type jsOpts struct {
@@ -283,6 +285,12 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) {
maxpa: defaultAsyncPubAckInflight,
},
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.replyPrefix = inboxPrefix
js.replyPrefixLen = len(js.replyPrefix) + aReplyTokensize + 1
for _, opt := range opts {
if err := opt.configureJSContext(js.opts); err != nil {
@@ -537,7 +545,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}
if err != nil {
for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ {
for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ {
// To protect against small blips in leadership changes etc, if we get a no responders here retry.
if o.ctx != nil {
select {
@@ -559,7 +567,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) {
}
}
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrNoStreamResponse
}
return nil, err
@@ -641,7 +649,6 @@ func (paf *pubAckFuture) Msg() *Msg {
}
// For quick token lookup etc.
const aReplyPreLen = 14
const aReplyTokensize = 6
func (js *js) newAsyncReply() string {
@@ -654,11 +661,7 @@ func (js *js) newAsyncReply() string {
for i := 0; i < aReplyTokensize; i++ {
b[i] = rdigits[int(b[i]%base)]
}
inboxPrefix := InboxPrefix
if js.nc.Opts.InboxPrefix != _EMPTY_ {
inboxPrefix = js.nc.Opts.InboxPrefix + "."
}
js.rpre = fmt.Sprintf("%s%s.", inboxPrefix, b[:aReplyTokensize])
js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize])
sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply)
if err != nil {
js.mu.Unlock()
@@ -767,10 +770,10 @@ func (js *js) asyncStall() <-chan struct{} {
// Handle an async reply from PublishAsync.
func (js *js) handleAsyncReply(m *Msg) {
if len(m.Subject) <= aReplyPreLen {
if len(m.Subject) <= js.replyPrefixLen {
return
}
id := m.Subject[aReplyPreLen:]
id := m.Subject[js.replyPrefixLen:]
js.mu.Lock()
paf := js.getPAF(id)
@@ -916,7 +919,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) {
return nil, errors.New("nats: error creating async reply handler")
}
id := m.Reply[aReplyPreLen:]
id := m.Reply[js.replyPrefixLen:]
paf := &pubAckFuture{msg: m, st: time.Now()}
numPending, maxPending := js.registerPAF(id, paf)
@@ -1241,6 +1244,10 @@ func (sub *Subscription) deleteConsumer() error {
sub.mu.Unlock()
return nil
}
if jsi.stream == _EMPTY_ || jsi.consumer == _EMPTY_ {
sub.mu.Unlock()
return nil
}
stream, consumer := jsi.stream, jsi.consumer
js := jsi.js
sub.mu.Unlock()
@@ -1594,7 +1601,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if consumer != _EMPTY_ && !o.skipCInfo {
info, err = js.ConsumerInfo(stream, consumer)
notFoundErr = errors.Is(err, ErrConsumerNotFound)
lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded
lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded)
}
switch {
@@ -1808,7 +1815,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if bl < DefaultSubPendingBytesLimit {
bl = DefaultSubPendingBytesLimit
}
sub.SetPendingLimits(maxap, bl)
if err := sub.SetPendingLimits(maxap, bl); err != nil {
return nil, err
}
}
// Do heartbeats last if needed.
@@ -2047,7 +2056,16 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
js := jsi.js
sub.mu.Unlock()
consName := nuid.Next()
sub.mu.Lock()
// Attempt to delete the existing consumer.
// We don't wait for the response since even if it's unsuccessful,
// inactivity threshold will kick in and delete it.
if jsi.consumer != _EMPTY_ {
go js.DeleteConsumer(jsi.stream, jsi.consumer)
}
jsi.consumer = ""
sub.mu.Unlock()
consName := getHash(nuid.Next())
cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg)
if err != nil {
var apiErr *APIError
@@ -2813,7 +2831,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// are no messages.
msg, err = sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
if errors.Is(err, errNoMessages) {
err = nil
}
break
@@ -2893,13 +2911,13 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
usrMsg, err = checkMsg(msg, true, noWait)
if err == nil && usrMsg {
msgs = append(msgs, msg)
} else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 {
} else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 {
// If we have a 404/408 for our "no_wait" request and have
// not collected any message, then resend request to
// wait this time.
noWait = false
err = sendReq()
} else if err == ErrTimeout && len(msgs) == 0 {
} else if errors.Is(err, ErrTimeout) && len(msgs) == 0 {
// If we get a 408, we will bail if we already collected some
// messages, otherwise ignore and go back calling nextMsg.
err = nil
@@ -3082,7 +3100,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
// are no messages.
msg, err := sub.nextMsgWithContext(ctx, true, false)
if err != nil {
if err == errNoMessages {
if errors.Is(err, errNoMessages) {
err = nil
}
result.err = err
@@ -3159,7 +3177,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
usrMsg, err = checkMsg(msg, true, false)
if err != nil {
if err == ErrTimeout {
if errors.Is(err, ErrTimeout) {
if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) {
// ignore timeout message from server if it comes from a different pull request
continue
@@ -3188,7 +3206,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
// checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout
func (o *pullOpts) checkCtxErr(err error) error {
if o.ctx == nil && err == context.DeadlineExceeded {
if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) {
return ErrTimeout
}
return err
@@ -3204,7 +3222,7 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer)
resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err

View File

@@ -1,4 +1,4 @@
// Copyright 2020-2022 The NATS Authors
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2021-2022 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -252,11 +252,13 @@ type AccountInfo struct {
}
type Tier struct {
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Limits AccountLimits `json:"limits"`
Memory uint64 `json:"memory"`
Store uint64 `json:"storage"`
ReservedMemory uint64 `json:"reserved_memory"`
ReservedStore uint64 `json:"reserved_storage"`
Streams int `json:"streams"`
Consumers int `json:"consumers"`
Limits AccountLimits `json:"limits"`
}
// APIStats reports on API calls to JetStream for this account.
@@ -297,7 +299,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil)
if err != nil {
// todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
@@ -415,7 +417,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return nil, err
@@ -1623,7 +1625,7 @@ func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) {
resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j)
if err != nil {
if err == ErrNoResponders {
if errors.Is(err, ErrNoResponders) {
err = ErrJetStreamNotEnabled
}
return _EMPTY_, err

View File

@@ -1,4 +1,4 @@
// Copyright 2021-2022 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -65,7 +65,10 @@ type KeyValue interface {
// WatchAll will invoke the callback for all updates.
WatchAll(opts ...WatchOpt) (KeyWatcher, error)
// Keys will return all keys.
// DEPRECATED: Use ListKeys instead to avoid memory issues.
Keys(opts ...WatchOpt) ([]string, error)
// ListKeys will return all keys in a channel.
ListKeys(opts ...WatchOpt) (KeyLister, error)
// History will return all historical values for the key.
History(key string, opts ...WatchOpt) ([]KeyValueEntry, error)
// Bucket returns the current bucket name.
@@ -95,6 +98,9 @@ type KeyValueStatus interface {
// Bytes returns the size in bytes of the bucket
Bytes() uint64
// IsCompressed indicates if the data is compressed on disk
IsCompressed() bool
}
// KeyWatcher is what is returned when doing a watch.
@@ -107,6 +113,12 @@ type KeyWatcher interface {
Stop() error
}
// KeyLister is used to retrieve a list of key value store keys
type KeyLister interface {
Keys() <-chan string
Stop() error
}
type WatchOpt interface {
configureWatcher(opts *watchOpts) error
}
@@ -249,6 +261,10 @@ type KeyValueConfig struct {
RePublish *RePublish
Mirror *StreamSource
Sources []*StreamSource
// Enable underlying stream compression.
// NOTE: Compression is supported for nats-server 2.10.0+
Compression bool
}
// Used to watch all keys.
@@ -343,7 +359,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
si, err := js.StreamInfo(stream)
if err != nil {
if err == ErrStreamNotFound {
if errors.Is(err, ErrStreamNotFound) {
err = ErrBucketNotFound
}
return nil, err
@@ -405,6 +421,10 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
if cfg.TTL > 0 && cfg.TTL < duplicateWindow {
duplicateWindow = cfg.TTL
}
var compression StoreCompression
if cfg.Compression {
compression = S2Compression
}
scfg := &StreamConfig{
Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket),
Description: cfg.Description,
@@ -422,6 +442,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
MaxConsumers: -1,
AllowDirect: true,
RePublish: cfg.RePublish,
Compression: compression,
}
if cfg.Mirror != nil {
// Copy in case we need to make changes so we do not change caller's version.
@@ -465,7 +486,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
// the stream.
// The same logic applies for KVs created pre 2.9.x and
// the AllowDirect setting.
if err == ErrStreamNameAlreadyInUse {
if errors.Is(err, ErrStreamNameAlreadyInUse) {
if si, _ = js.StreamInfo(scfg.Name); si != nil {
// To compare, make the server's stream info discard
// policy same than ours.
@@ -537,7 +558,7 @@ func keyValid(key string) bool {
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
e, err := kv.get(key, kvLatestRevision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
@@ -550,7 +571,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) {
func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) {
e, err := kv.get(key, revision)
if err != nil {
if err == ErrKeyDeleted {
if errors.Is(err, ErrKeyDeleted) {
return nil, ErrKeyNotFound
}
return nil, err
@@ -587,7 +608,7 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) {
}
}
if err != nil {
if err == ErrMsgNotFound {
if errors.Is(err, ErrMsgNotFound) {
err = ErrKeyNotFound
}
return nil, err
@@ -654,7 +675,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) {
// TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that
// so we need to double check.
if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted {
if e, err := kv.get(key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) {
return kv.Update(key, value, e.Revision())
}
@@ -830,6 +851,41 @@ func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) {
return keys, nil
}
type keyLister struct {
watcher KeyWatcher
keys chan string
}
// ListKeys will return all keys.
func (kv *kvs) ListKeys(opts ...WatchOpt) (KeyLister, error) {
opts = append(opts, IgnoreDeletes(), MetaOnly())
watcher, err := kv.WatchAll(opts...)
if err != nil {
return nil, err
}
kl := &keyLister{watcher: watcher, keys: make(chan string, 256)}
go func() {
defer close(kl.keys)
defer watcher.Stop()
for entry := range watcher.Updates() {
if entry == nil {
return
}
kl.keys <- entry.Key()
}
}()
return kl, nil
}
func (kl *keyLister) Keys() <-chan string {
return kl.keys
}
func (kl *keyLister) Stop() error {
return kl.watcher.Stop()
}
// History will return all values for the key.
func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) {
opts = append(opts, IncludeHistory())
@@ -1040,6 +1096,9 @@ func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
// Bytes is the size of the stream
func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes }
// IsCompressed indicates if the data is compressed on disk
func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression }
// Status retrieves the status and configuration of a bucket
func (kv *kvs) Status() (KeyValueStatus, error) {
nfo, err := kv.js.StreamInfo(kv.stream)
@@ -1062,7 +1121,7 @@ func (js *js) KeyValueStoreNames() <-chan string {
if !strings.HasPrefix(name, kvBucketNamePre) {
continue
}
ch <- name
ch <- strings.TrimPrefix(name, kvBucketNamePre)
}
}
}()

View File

@@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.31.0"
Version = "1.32.0"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -4298,6 +4298,12 @@ func (nc *Conn) removeSub(s *Subscription) {
}
}
if s.typ != AsyncSubscription {
done := s.pDone
if done != nil {
done(s.Subject)
}
}
// Mark as invalid
s.closed = true
if s.pCond != nil {

View File

@@ -1,4 +1,4 @@
// Copyright 2013-2022 The NATS Authors
// Copyright 2013-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@@ -1,4 +1,4 @@
// Copyright 2021-2022 The NATS Authors
// Copyright 2021-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -377,13 +377,16 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
defer jetStream.(*js).cleanupReplySub()
purgePartial := func() {
purgePartial := func() error {
// wait until all pubs are complete or up to default timeout before attempting purge
select {
case <-jetStream.PublishAsyncComplete():
case <-time.After(obs.js.opts.wait):
}
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj})
if err := obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}); err != nil {
return fmt.Errorf("could not cleanup bucket after erronous put operation: %w", err)
}
return nil
}
m, h := NewMsg(chunkSubj), sha256.New()
@@ -404,7 +407,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
default:
}
if err != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
return nil, err
}
}
@@ -415,7 +420,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Handle all non EOF errors
if readErr != nil && readErr != io.EOF {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(readErr, purgeErr)
}
return nil, readErr
}
@@ -427,11 +434,15 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Send msg itself.
if _, err := jetStream.PublishMsgAsync(m); err != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
return nil, err
}
if err := getErr(); err != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
return nil, err
}
// Update totals.
@@ -455,7 +466,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
mm.Data, err = json.Marshal(info)
if err != nil {
if r != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
}
return nil, err
}
@@ -464,7 +477,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
_, err = jetStream.PublishMsgAsync(mm)
if err != nil {
if r != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
}
return nil, err
}
@@ -474,7 +489,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
case <-jetStream.PublishAsyncComplete():
if err := getErr(); err != nil {
if r != nil {
purgePartial()
if purgeErr := purgePartial(); purgeErr != nil {
return nil, errors.Join(err, purgeErr)
}
}
return nil, err
}
@@ -487,7 +504,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn
// Delete any original chunks.
if einfo != nil && !einfo.Deleted {
echunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID)
obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj})
if err := obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj}); err != nil {
return info, err
}
}
// TODO would it be okay to do this to return the info with the correct time?
@@ -626,7 +645,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
if ctx != nil {
select {
case <-ctx.Done():
if ctx.Err() == context.Canceled {
if errors.Is(ctx.Err(), context.Canceled) {
err = ctx.Err()
} else {
err = ErrTimeout
@@ -926,7 +945,7 @@ func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, err
m, err := obs.js.GetLastMsg(stream, metaSubj)
if err != nil {
if err == ErrMsgNotFound {
if errors.Is(err, ErrMsgNotFound) {
err = ErrObjectNotFound
}
return nil, err

View File

@@ -29,7 +29,7 @@ type timerPool struct {
// Get returns a timer that completes after the given duration.
func (tp *timerPool) Get(d time.Duration) *time.Timer {
if t, _ := tp.p.Get().(*time.Timer); t != nil {
if t, ok := tp.p.Get().(*time.Timer); ok && t != nil {
t.Reset(d)
return t
}

2
vendor/modules.txt vendored
View File

@@ -1401,7 +1401,7 @@ github.com/nats-io/nats-server/v2/server/certidp
github.com/nats-io/nats-server/v2/server/certstore
github.com/nats-io/nats-server/v2/server/pse
github.com/nats-io/nats-server/v2/server/sysmem
# github.com/nats-io/nats.go v1.31.0
# github.com/nats-io/nats.go v1.32.0
## explicit; go 1.20
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin