From fce4d19e3fa13cc53e45cd5768b5753ccd182630 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 16 Jan 2024 12:00:10 +0000 Subject: [PATCH] build(deps): bump github.com/nats-io/nats.go from 1.31.0 to 1.32.0 Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.31.0 to 1.32.0. - [Release notes](https://github.com/nats-io/nats.go/releases) - [Commits](https://github.com/nats-io/nats.go/compare/v1.31.0...v1.32.0) --- updated-dependencies: - dependency-name: github.com/nats-io/nats.go dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- .../github.com/nats-io/nats.go/.golangci.yaml | 3 + vendor/github.com/nats-io/nats.go/README.md | 4 +- vendor/github.com/nats-io/nats.go/context.go | 2 +- vendor/github.com/nats-io/nats.go/enc.go | 2 +- .../nats.go/encoders/builtin/default_enc.go | 2 +- .../nats.go/encoders/builtin/gob_enc.go | 2 +- .../nats.go/encoders/builtin/json_enc.go | 2 +- vendor/github.com/nats-io/nats.go/go_test.mod | 16 ++-- vendor/github.com/nats-io/nats.go/go_test.sum | 33 ++++---- vendor/github.com/nats-io/nats.go/js.go | 76 ++++++++++++------- vendor/github.com/nats-io/nats.go/jserrors.go | 2 +- vendor/github.com/nats-io/nats.go/jsm.go | 20 ++--- vendor/github.com/nats-io/nats.go/kv.go | 75 ++++++++++++++++-- vendor/github.com/nats-io/nats.go/nats.go | 8 +- vendor/github.com/nats-io/nats.go/netchan.go | 2 +- vendor/github.com/nats-io/nats.go/object.go | 45 +++++++---- vendor/github.com/nats-io/nats.go/timer.go | 2 +- vendor/modules.txt | 2 +- 20 files changed, 206 insertions(+), 98 deletions(-) diff --git a/go.mod b/go.mod index 0b3b003bf4..4bf841a186 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/mna/pigeon v1.2.1 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/nats-io/nats-server/v2 v2.10.9 - github.com/nats-io/nats.go v1.31.0 + github.com/nats-io/nats.go v1.32.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/ginkgo v1.16.5 diff --git a/go.sum b/go.sum index 2a8e210ab5..7875677cda 100644 --- a/go.sum +++ b/go.sum @@ -1744,8 +1744,8 @@ github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= github.com/nats-io/nats-server/v2 v2.10.9 h1:VEW43Zz+p+9lARtiPM9ctd6ckun+92ZT2T17HWtwiFI= github.com/nats-io/nats-server/v2 v2.10.9/go.mod h1:oorGiV9j3BOLLO3ejQe+U7pfAGyPo+ppD7rpgNF6KTQ= -github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= -github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nats.go v1.32.0 h1:Bx9BZS+aXYlxW08k8Gd3yR2s73pV5XSoAQUyp1Kwvp0= +github.com/nats-io/nats.go v1.32.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/.golangci.yaml b/vendor/github.com/nats-io/nats.go/.golangci.yaml index be66189ede..fb548e50e4 100644 --- a/vendor/github.com/nats-io/nats.go/.golangci.yaml +++ b/vendor/github.com/nats-io/nats.go/.golangci.yaml @@ -5,6 +5,9 @@ issues: - linters: - errcheck text: "Unsubscribe" + - linters: + - errcheck + text: "Drain" - linters: - errcheck text: "msg.Ack" diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 042733da1d..d7f2e00d64 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -14,6 +14,8 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io [Coverage-Url]: https://coveralls.io/r/nats-io/nats.go?branch=main [Coverage-image]: https://coveralls.io/repos/github/nats-io/nats.go/badge.svg?branch=main +**Check out [NATS by example](https://natsbyexample.com) - An evolving collection of runnable, cross-client reference examples for NATS.** + ## Installation ```bash @@ -29,7 +31,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.31.0 +go get github.com/nats-io/nats.go/@v1.32.0 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 diff --git a/vendor/github.com/nats-io/nats.go/context.go b/vendor/github.com/nats-io/nats.go/context.go index c4ef4be173..20f1782acf 100644 --- a/vendor/github.com/nats-io/nats.go/context.go +++ b/vendor/github.com/nats-io/nats.go/context.go @@ -1,4 +1,4 @@ -// Copyright 2016-2022 The NATS Authors +// Copyright 2016-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats.go/enc.go b/vendor/github.com/nats-io/nats.go/enc.go index a1c54f246b..4550f618d4 100644 --- a/vendor/github.com/nats-io/nats.go/enc.go +++ b/vendor/github.com/nats-io/nats.go/enc.go @@ -1,4 +1,4 @@ -// Copyright 2012-2019 The NATS Authors +// Copyright 2012-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats.go/encoders/builtin/default_enc.go b/vendor/github.com/nats-io/nats.go/encoders/builtin/default_enc.go index 65c2d68bb8..7e729637ce 100644 --- a/vendor/github.com/nats-io/nats.go/encoders/builtin/default_enc.go +++ b/vendor/github.com/nats-io/nats.go/encoders/builtin/default_enc.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats.go/encoders/builtin/gob_enc.go b/vendor/github.com/nats-io/nats.go/encoders/builtin/gob_enc.go index 4e7cecba29..7ecf85e4da 100644 --- a/vendor/github.com/nats-io/nats.go/encoders/builtin/gob_enc.go +++ b/vendor/github.com/nats-io/nats.go/encoders/builtin/gob_enc.go @@ -1,4 +1,4 @@ -// Copyright 2013-2018 The NATS Authors +// Copyright 2013-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats.go/encoders/builtin/json_enc.go b/vendor/github.com/nats-io/nats.go/encoders/builtin/json_enc.go index 9b6ffc017a..0540d9850c 100644 --- a/vendor/github.com/nats-io/nats.go/encoders/builtin/json_enc.go +++ b/vendor/github.com/nats-io/nats.go/encoders/builtin/json_enc.go @@ -1,4 +1,4 @@ -// Copyright 2012-2018 The NATS Authors +// Copyright 2012-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index 8902c1edd7..d28963c27b 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -4,19 +4,19 @@ go 1.19 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.17.0 - github.com/nats-io/nats-server/v2 v2.10.0 - github.com/nats-io/nkeys v0.4.5 + github.com/klauspost/compress v1.17.4 + github.com/nats-io/nats-server/v2 v2.10.7 + github.com/nats-io/nkeys v0.4.6 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.2.1 - golang.org/x/text v0.13.0 + golang.org/x/text v0.14.0 google.golang.org/protobuf v1.23.0 ) require ( github.com/minio/highwayhash v1.0.2 // indirect - github.com/nats-io/jwt/v2 v2.5.2 // indirect - golang.org/x/crypto v0.13.0 // indirect - golang.org/x/sys v0.12.0 // indirect - golang.org/x/time v0.3.0 // indirect + github.com/nats-io/jwt/v2 v2.5.3 // indirect + golang.org/x/crypto v0.16.0 // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/time v0.5.0 // indirect ) diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index ce4ba9205c..38fe6ef6ff 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -10,32 +10,31 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= -github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= +github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU= -github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg= -github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto= -github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= -github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo= +github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4= +github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y= +github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c= +github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY= +github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= -golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck= -golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= -golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g= +golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY= +golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o= -golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= -golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= +golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index 444278e0f7..0c06730587 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -227,14 +227,16 @@ type js struct { opts *jsOpts // For async publish context. - mu sync.RWMutex - rpre string - rsub *Subscription - pafs map[string]*pubAckFuture - stc chan struct{} - dch chan struct{} - rr *rand.Rand - connStatusCh chan (Status) + mu sync.RWMutex + rpre string + rsub *Subscription + pafs map[string]*pubAckFuture + stc chan struct{} + dch chan struct{} + rr *rand.Rand + connStatusCh chan (Status) + replyPrefix string + replyPrefixLen int } type jsOpts struct { @@ -283,6 +285,12 @@ func (nc *Conn) JetStream(opts ...JSOpt) (JetStreamContext, error) { maxpa: defaultAsyncPubAckInflight, }, } + inboxPrefix := InboxPrefix + if js.nc.Opts.InboxPrefix != _EMPTY_ { + inboxPrefix = js.nc.Opts.InboxPrefix + "." + } + js.replyPrefix = inboxPrefix + js.replyPrefixLen = len(js.replyPrefix) + aReplyTokensize + 1 for _, opt := range opts { if err := opt.configureJSContext(js.opts); err != nil { @@ -537,7 +545,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { } if err != nil { - for r, ttl := 0, o.ttl; err == ErrNoResponders && (r < o.rnum || o.rnum < 0); r++ { + for r, ttl := 0, o.ttl; errors.Is(err, ErrNoResponders) && (r < o.rnum || o.rnum < 0); r++ { // To protect against small blips in leadership changes etc, if we get a no responders here retry. if o.ctx != nil { select { @@ -559,7 +567,7 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { } } if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrNoStreamResponse } return nil, err @@ -641,7 +649,6 @@ func (paf *pubAckFuture) Msg() *Msg { } // For quick token lookup etc. -const aReplyPreLen = 14 const aReplyTokensize = 6 func (js *js) newAsyncReply() string { @@ -654,11 +661,7 @@ func (js *js) newAsyncReply() string { for i := 0; i < aReplyTokensize; i++ { b[i] = rdigits[int(b[i]%base)] } - inboxPrefix := InboxPrefix - if js.nc.Opts.InboxPrefix != _EMPTY_ { - inboxPrefix = js.nc.Opts.InboxPrefix + "." - } - js.rpre = fmt.Sprintf("%s%s.", inboxPrefix, b[:aReplyTokensize]) + js.rpre = fmt.Sprintf("%s%s.", js.replyPrefix, b[:aReplyTokensize]) sub, err := js.nc.Subscribe(fmt.Sprintf("%s*", js.rpre), js.handleAsyncReply) if err != nil { js.mu.Unlock() @@ -767,10 +770,10 @@ func (js *js) asyncStall() <-chan struct{} { // Handle an async reply from PublishAsync. func (js *js) handleAsyncReply(m *Msg) { - if len(m.Subject) <= aReplyPreLen { + if len(m.Subject) <= js.replyPrefixLen { return } - id := m.Subject[aReplyPreLen:] + id := m.Subject[js.replyPrefixLen:] js.mu.Lock() paf := js.getPAF(id) @@ -916,7 +919,7 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { return nil, errors.New("nats: error creating async reply handler") } - id := m.Reply[aReplyPreLen:] + id := m.Reply[js.replyPrefixLen:] paf := &pubAckFuture{msg: m, st: time.Now()} numPending, maxPending := js.registerPAF(id, paf) @@ -1241,6 +1244,10 @@ func (sub *Subscription) deleteConsumer() error { sub.mu.Unlock() return nil } + if jsi.stream == _EMPTY_ || jsi.consumer == _EMPTY_ { + sub.mu.Unlock() + return nil + } stream, consumer := jsi.stream, jsi.consumer js := jsi.js sub.mu.Unlock() @@ -1594,7 +1601,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if consumer != _EMPTY_ && !o.skipCInfo { info, err = js.ConsumerInfo(stream, consumer) notFoundErr = errors.Is(err, ErrConsumerNotFound) - lookupErr = err == ErrJetStreamNotEnabled || err == ErrTimeout || err == context.DeadlineExceeded + lookupErr = err == ErrJetStreamNotEnabled || errors.Is(err, ErrTimeout) || errors.Is(err, context.DeadlineExceeded) } switch { @@ -1808,7 +1815,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if bl < DefaultSubPendingBytesLimit { bl = DefaultSubPendingBytesLimit } - sub.SetPendingLimits(maxap, bl) + if err := sub.SetPendingLimits(maxap, bl); err != nil { + return nil, err + } } // Do heartbeats last if needed. @@ -2047,7 +2056,16 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { js := jsi.js sub.mu.Unlock() - consName := nuid.Next() + sub.mu.Lock() + // Attempt to delete the existing consumer. + // We don't wait for the response since even if it's unsuccessful, + // inactivity threshold will kick in and delete it. + if jsi.consumer != _EMPTY_ { + go js.DeleteConsumer(jsi.stream, jsi.consumer) + } + jsi.consumer = "" + sub.mu.Unlock() + consName := getHash(nuid.Next()) cinfo, err := js.upsertConsumer(jsi.stream, consName, cfg) if err != nil { var apiErr *APIError @@ -2813,7 +2831,7 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { // are no messages. msg, err = sub.nextMsgWithContext(ctx, true, false) if err != nil { - if err == errNoMessages { + if errors.Is(err, errNoMessages) { err = nil } break @@ -2893,13 +2911,13 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) { usrMsg, err = checkMsg(msg, true, noWait) if err == nil && usrMsg { msgs = append(msgs, msg) - } else if noWait && (err == errNoMessages || err == errRequestsPending) && len(msgs) == 0 { + } else if noWait && (errors.Is(err, errNoMessages) || errors.Is(err, errRequestsPending)) && len(msgs) == 0 { // If we have a 404/408 for our "no_wait" request and have // not collected any message, then resend request to // wait this time. noWait = false err = sendReq() - } else if err == ErrTimeout && len(msgs) == 0 { + } else if errors.Is(err, ErrTimeout) && len(msgs) == 0 { // If we get a 408, we will bail if we already collected some // messages, otherwise ignore and go back calling nextMsg. err = nil @@ -3082,7 +3100,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e // are no messages. msg, err := sub.nextMsgWithContext(ctx, true, false) if err != nil { - if err == errNoMessages { + if errors.Is(err, errNoMessages) { err = nil } result.err = err @@ -3159,7 +3177,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e usrMsg, err = checkMsg(msg, true, false) if err != nil { - if err == ErrTimeout { + if errors.Is(err, ErrTimeout) { if reqID != "" && !subjectMatchesReqID(msg.Subject, reqID) { // ignore timeout message from server if it comes from a different pull request continue @@ -3188,7 +3206,7 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e // checkCtxErr is used to determine whether ErrTimeout should be returned in case of context timeout func (o *pullOpts) checkCtxErr(err error) error { - if o.ctx == nil && err == context.DeadlineExceeded { + if o.ctx == nil && errors.Is(err, context.DeadlineExceeded) { return ErrTimeout } return err @@ -3204,7 +3222,7 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin ccInfoSubj := fmt.Sprintf(apiConsumerInfoT, stream, consumer) resp, err := js.apiRequestWithContext(ctx, js.apiSubj(ccInfoSubj), nil) if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return nil, err diff --git a/vendor/github.com/nats-io/nats.go/jserrors.go b/vendor/github.com/nats-io/nats.go/jserrors.go index c8b1f5fc62..ef5d4af945 100644 --- a/vendor/github.com/nats-io/nats.go/jserrors.go +++ b/vendor/github.com/nats-io/nats.go/jserrors.go @@ -1,4 +1,4 @@ -// Copyright 2020-2022 The NATS Authors +// Copyright 2020-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 266bf0665f..8f724726b8 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022 The NATS Authors +// Copyright 2021-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -252,11 +252,13 @@ type AccountInfo struct { } type Tier struct { - Memory uint64 `json:"memory"` - Store uint64 `json:"storage"` - Streams int `json:"streams"` - Consumers int `json:"consumers"` - Limits AccountLimits `json:"limits"` + Memory uint64 `json:"memory"` + Store uint64 `json:"storage"` + ReservedMemory uint64 `json:"reserved_memory"` + ReservedStore uint64 `json:"reserved_storage"` + Streams int `json:"streams"` + Consumers int `json:"consumers"` + Limits AccountLimits `json:"limits"` } // APIStats reports on API calls to JetStream for this account. @@ -297,7 +299,7 @@ func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) { resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(apiAccountInfo), nil) if err != nil { // todo maybe nats server should never have no responder on this subject and always respond if they know there is no js to be had - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return nil, err @@ -415,7 +417,7 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req) if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return nil, err @@ -1623,7 +1625,7 @@ func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) { resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j) if err != nil { - if err == ErrNoResponders { + if errors.Is(err, ErrNoResponders) { err = ErrJetStreamNotEnabled } return _EMPTY_, err diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index 7382f4d872..0864f30cce 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022 The NATS Authors +// Copyright 2021-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -65,7 +65,10 @@ type KeyValue interface { // WatchAll will invoke the callback for all updates. WatchAll(opts ...WatchOpt) (KeyWatcher, error) // Keys will return all keys. + // DEPRECATED: Use ListKeys instead to avoid memory issues. Keys(opts ...WatchOpt) ([]string, error) + // ListKeys will return all keys in a channel. + ListKeys(opts ...WatchOpt) (KeyLister, error) // History will return all historical values for the key. History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) // Bucket returns the current bucket name. @@ -95,6 +98,9 @@ type KeyValueStatus interface { // Bytes returns the size in bytes of the bucket Bytes() uint64 + + // IsCompressed indicates if the data is compressed on disk + IsCompressed() bool } // KeyWatcher is what is returned when doing a watch. @@ -107,6 +113,12 @@ type KeyWatcher interface { Stop() error } +// KeyLister is used to retrieve a list of key value store keys +type KeyLister interface { + Keys() <-chan string + Stop() error +} + type WatchOpt interface { configureWatcher(opts *watchOpts) error } @@ -249,6 +261,10 @@ type KeyValueConfig struct { RePublish *RePublish Mirror *StreamSource Sources []*StreamSource + + // Enable underlying stream compression. + // NOTE: Compression is supported for nats-server 2.10.0+ + Compression bool } // Used to watch all keys. @@ -343,7 +359,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) { stream := fmt.Sprintf(kvBucketNameTmpl, bucket) si, err := js.StreamInfo(stream) if err != nil { - if err == ErrStreamNotFound { + if errors.Is(err, ErrStreamNotFound) { err = ErrBucketNotFound } return nil, err @@ -405,6 +421,10 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { if cfg.TTL > 0 && cfg.TTL < duplicateWindow { duplicateWindow = cfg.TTL } + var compression StoreCompression + if cfg.Compression { + compression = S2Compression + } scfg := &StreamConfig{ Name: fmt.Sprintf(kvBucketNameTmpl, cfg.Bucket), Description: cfg.Description, @@ -422,6 +442,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { MaxConsumers: -1, AllowDirect: true, RePublish: cfg.RePublish, + Compression: compression, } if cfg.Mirror != nil { // Copy in case we need to make changes so we do not change caller's version. @@ -465,7 +486,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) { // the stream. // The same logic applies for KVs created pre 2.9.x and // the AllowDirect setting. - if err == ErrStreamNameAlreadyInUse { + if errors.Is(err, ErrStreamNameAlreadyInUse) { if si, _ = js.StreamInfo(scfg.Name); si != nil { // To compare, make the server's stream info discard // policy same than ours. @@ -537,7 +558,7 @@ func keyValid(key string) bool { func (kv *kvs) Get(key string) (KeyValueEntry, error) { e, err := kv.get(key, kvLatestRevision) if err != nil { - if err == ErrKeyDeleted { + if errors.Is(err, ErrKeyDeleted) { return nil, ErrKeyNotFound } return nil, err @@ -550,7 +571,7 @@ func (kv *kvs) Get(key string) (KeyValueEntry, error) { func (kv *kvs) GetRevision(key string, revision uint64) (KeyValueEntry, error) { e, err := kv.get(key, revision) if err != nil { - if err == ErrKeyDeleted { + if errors.Is(err, ErrKeyDeleted) { return nil, ErrKeyNotFound } return nil, err @@ -587,7 +608,7 @@ func (kv *kvs) get(key string, revision uint64) (KeyValueEntry, error) { } } if err != nil { - if err == ErrMsgNotFound { + if errors.Is(err, ErrMsgNotFound) { err = ErrKeyNotFound } return nil, err @@ -654,7 +675,7 @@ func (kv *kvs) Create(key string, value []byte) (revision uint64, err error) { // TODO(dlc) - Since we have tombstones for DEL ops for watchers, this could be from that // so we need to double check. - if e, err := kv.get(key, kvLatestRevision); err == ErrKeyDeleted { + if e, err := kv.get(key, kvLatestRevision); errors.Is(err, ErrKeyDeleted) { return kv.Update(key, value, e.Revision()) } @@ -830,6 +851,41 @@ func (kv *kvs) Keys(opts ...WatchOpt) ([]string, error) { return keys, nil } +type keyLister struct { + watcher KeyWatcher + keys chan string +} + +// ListKeys will return all keys. +func (kv *kvs) ListKeys(opts ...WatchOpt) (KeyLister, error) { + opts = append(opts, IgnoreDeletes(), MetaOnly()) + watcher, err := kv.WatchAll(opts...) + if err != nil { + return nil, err + } + kl := &keyLister{watcher: watcher, keys: make(chan string, 256)} + + go func() { + defer close(kl.keys) + defer watcher.Stop() + for entry := range watcher.Updates() { + if entry == nil { + return + } + kl.keys <- entry.Key() + } + }() + return kl, nil +} + +func (kl *keyLister) Keys() <-chan string { + return kl.keys +} + +func (kl *keyLister) Stop() error { + return kl.watcher.Stop() +} + // History will return all values for the key. func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { opts = append(opts, IncludeHistory()) @@ -1040,6 +1096,9 @@ func (s *KeyValueBucketStatus) StreamInfo() *StreamInfo { return s.nfo } // Bytes is the size of the stream func (s *KeyValueBucketStatus) Bytes() uint64 { return s.nfo.State.Bytes } +// IsCompressed indicates if the data is compressed on disk +func (s *KeyValueBucketStatus) IsCompressed() bool { return s.nfo.Config.Compression != NoCompression } + // Status retrieves the status and configuration of a bucket func (kv *kvs) Status() (KeyValueStatus, error) { nfo, err := kv.js.StreamInfo(kv.stream) @@ -1062,7 +1121,7 @@ func (js *js) KeyValueStoreNames() <-chan string { if !strings.HasPrefix(name, kvBucketNamePre) { continue } - ch <- name + ch <- strings.TrimPrefix(name, kvBucketNamePre) } } }() diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index da13692fd7..a252da2a4b 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -47,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.31.0" + Version = "1.32.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -4298,6 +4298,12 @@ func (nc *Conn) removeSub(s *Subscription) { } } + if s.typ != AsyncSubscription { + done := s.pDone + if done != nil { + done(s.Subject) + } + } // Mark as invalid s.closed = true if s.pCond != nil { diff --git a/vendor/github.com/nats-io/nats.go/netchan.go b/vendor/github.com/nats-io/nats.go/netchan.go index 060721eb42..6b13690b4c 100644 --- a/vendor/github.com/nats-io/nats.go/netchan.go +++ b/vendor/github.com/nats-io/nats.go/netchan.go @@ -1,4 +1,4 @@ -// Copyright 2013-2022 The NATS Authors +// Copyright 2013-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/vendor/github.com/nats-io/nats.go/object.go b/vendor/github.com/nats-io/nats.go/object.go index f6ba8fb164..92267918eb 100644 --- a/vendor/github.com/nats-io/nats.go/object.go +++ b/vendor/github.com/nats-io/nats.go/object.go @@ -1,4 +1,4 @@ -// Copyright 2021-2022 The NATS Authors +// Copyright 2021-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -377,13 +377,16 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn defer jetStream.(*js).cleanupReplySub() - purgePartial := func() { + purgePartial := func() error { // wait until all pubs are complete or up to default timeout before attempting purge select { case <-jetStream.PublishAsyncComplete(): case <-time.After(obs.js.opts.wait): } - obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}) + if err := obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: chunkSubj}); err != nil { + return fmt.Errorf("could not cleanup bucket after erronous put operation: %w", err) + } + return nil } m, h := NewMsg(chunkSubj), sha256.New() @@ -404,7 +407,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn default: } if err != nil { - purgePartial() + if purgeErr := purgePartial(); purgeErr != nil { + return nil, errors.Join(err, purgeErr) + } return nil, err } } @@ -415,7 +420,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn // Handle all non EOF errors if readErr != nil && readErr != io.EOF { - purgePartial() + if purgeErr := purgePartial(); purgeErr != nil { + return nil, errors.Join(readErr, purgeErr) + } return nil, readErr } @@ -427,11 +434,15 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn // Send msg itself. if _, err := jetStream.PublishMsgAsync(m); err != nil { - purgePartial() + if purgeErr := purgePartial(); purgeErr != nil { + return nil, errors.Join(err, purgeErr) + } return nil, err } if err := getErr(); err != nil { - purgePartial() + if purgeErr := purgePartial(); purgeErr != nil { + return nil, errors.Join(err, purgeErr) + } return nil, err } // Update totals. @@ -455,7 +466,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn mm.Data, err = json.Marshal(info) if err != nil { if r != nil { - purgePartial() + if purgeErr := purgePartial(); purgeErr != nil { + return nil, errors.Join(err, purgeErr) + } } return nil, err } @@ -464,7 +477,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn _, err = jetStream.PublishMsgAsync(mm) if err != nil { if r != nil { - purgePartial() + if purgeErr := purgePartial(); purgeErr != nil { + return nil, errors.Join(err, purgeErr) + } } return nil, err } @@ -474,7 +489,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn case <-jetStream.PublishAsyncComplete(): if err := getErr(); err != nil { if r != nil { - purgePartial() + if purgeErr := purgePartial(); purgeErr != nil { + return nil, errors.Join(err, purgeErr) + } } return nil, err } @@ -487,7 +504,9 @@ func (obs *obs) Put(meta *ObjectMeta, r io.Reader, opts ...ObjectOpt) (*ObjectIn // Delete any original chunks. if einfo != nil && !einfo.Deleted { echunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, einfo.NUID) - obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj}) + if err := obs.js.purgeStream(obs.stream, &StreamPurgeRequest{Subject: echunkSubj}); err != nil { + return info, err + } } // TODO would it be okay to do this to return the info with the correct time? @@ -626,7 +645,7 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) { if ctx != nil { select { case <-ctx.Done(): - if ctx.Err() == context.Canceled { + if errors.Is(ctx.Err(), context.Canceled) { err = ctx.Err() } else { err = ErrTimeout @@ -926,7 +945,7 @@ func (obs *obs) GetInfo(name string, opts ...GetObjectInfoOpt) (*ObjectInfo, err m, err := obs.js.GetLastMsg(stream, metaSubj) if err != nil { - if err == ErrMsgNotFound { + if errors.Is(err, ErrMsgNotFound) { err = ErrObjectNotFound } return nil, err diff --git a/vendor/github.com/nats-io/nats.go/timer.go b/vendor/github.com/nats-io/nats.go/timer.go index 4fb02ecb41..6edeb4cf89 100644 --- a/vendor/github.com/nats-io/nats.go/timer.go +++ b/vendor/github.com/nats-io/nats.go/timer.go @@ -29,7 +29,7 @@ type timerPool struct { // Get returns a timer that completes after the given duration. func (tp *timerPool) Get(d time.Duration) *time.Timer { - if t, _ := tp.p.Get().(*time.Timer); t != nil { + if t, ok := tp.p.Get().(*time.Timer); ok && t != nil { t.Reset(d) return t } diff --git a/vendor/modules.txt b/vendor/modules.txt index 82cf397d99..3f0e58bf44 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1401,7 +1401,7 @@ github.com/nats-io/nats-server/v2/server/certidp github.com/nats-io/nats-server/v2/server/certstore github.com/nats-io/nats-server/v2/server/pse github.com/nats-io/nats-server/v2/server/sysmem -# github.com/nats-io/nats.go v1.31.0 +# github.com/nats-io/nats.go v1.32.0 ## explicit; go 1.20 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin