From fae29f107c6fa6c1654022b70ab1525ea0514bc5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 25 Mar 2025 05:58:50 +0000 Subject: [PATCH] build(deps): bump github.com/nats-io/nats.go from 1.39.1 to 1.40.0 Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.39.1 to 1.40.0. - [Release notes](https://github.com/nats-io/nats.go/releases) - [Commits](https://github.com/nats-io/nats.go/compare/v1.39.1...v1.40.0) --- updated-dependencies: - dependency-name: github.com/nats-io/nats.go dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 +- vendor/github.com/nats-io/nats.go/README.md | 2 +- vendor/github.com/nats-io/nats.go/go_test.mod | 17 +-- vendor/github.com/nats-io/nats.go/go_test.sum | 36 +++--- vendor/github.com/nats-io/nats.go/js.go | 113 +++++++++++++++--- vendor/github.com/nats-io/nats.go/jserrors.go | 10 +- vendor/github.com/nats-io/nats.go/jsm.go | 8 ++ vendor/github.com/nats-io/nats.go/kv.go | 29 +++-- vendor/github.com/nats-io/nats.go/nats.go | 40 +++++-- vendor/modules.txt | 4 +- 11 files changed, 200 insertions(+), 65 deletions(-) diff --git a/go.mod b/go.mod index b7c3ecd3b..493e09001 100644 --- a/go.mod +++ b/go.mod @@ -56,7 +56,7 @@ require ( github.com/mna/pigeon v1.3.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 github.com/nats-io/nats-server/v2 v2.11.0 - github.com/nats-io/nats.go v1.39.1 + github.com/nats-io/nats.go v1.40.0 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/ginkgo v1.16.5 diff --git a/go.sum b/go.sum index bb418b2e7..dd71e196c 100644 --- a/go.sum +++ b/go.sum @@ -829,8 +829,8 @@ github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE= github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI= -github.com/nats-io/nats.go v1.39.1 h1:oTkfKBmz7W047vRxV762M67ZdXeOtUgvbBaNoQ+3PPk= -github.com/nats-io/nats.go v1.39.1/go.mod h1:MgRb8oOdigA6cYpEPhXJuRVH6UE/V4jblJ2jQ27IXYM= +github.com/nats-io/nats.go v1.40.0 h1:qC3rnVZy15vJ15GSbB+pQtOmqo9q+65wnGVpvmcVv0Q= +github.com/nats-io/nats.go v1.40.0/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo= github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 6961b5812..b00eafa1c 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -23,7 +23,7 @@ A [Go](http://golang.org) client for the [NATS messaging system](https://nats.io go get github.com/nats-io/nats.go@latest # To get a specific version: -go get github.com/nats-io/nats.go@v1.39.1 +go get github.com/nats-io/nats.go@v1.40.0 # Note that the latest major version for NATS Server is v2: go get github.com/nats-io/nats-server/v2@latest diff --git a/vendor/github.com/nats-io/nats.go/go_test.mod b/vendor/github.com/nats-io/nats.go/go_test.mod index 32c1003a5..56732596a 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.mod +++ b/vendor/github.com/nats-io/nats.go/go_test.mod @@ -1,23 +1,24 @@ module github.com/nats-io/nats.go -go 1.22.0 +go 1.23.0 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.17.11 + github.com/klauspost/compress v1.18.0 github.com/nats-io/jwt v1.2.2 - github.com/nats-io/nats-server/v2 v2.10.24 - github.com/nats-io/nkeys v0.4.9 + github.com/nats-io/nats-server/v2 v2.11.0 + github.com/nats-io/nkeys v0.4.10 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.3.0 - golang.org/x/text v0.21.0 + golang.org/x/text v0.23.0 google.golang.org/protobuf v1.23.0 ) require ( + github.com/google/go-tpm v0.9.3 // indirect github.com/minio/highwayhash v1.0.3 // indirect github.com/nats-io/jwt/v2 v2.7.3 // indirect - golang.org/x/crypto v0.31.0 // indirect - golang.org/x/sys v0.28.0 // indirect - golang.org/x/time v0.8.0 // indirect + golang.org/x/crypto v0.36.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/time v0.11.0 // indirect ) diff --git a/vendor/github.com/nats-io/nats.go/go_test.sum b/vendor/github.com/nats-io/nats.go/go_test.sum index f6223f25b..7f6017f23 100644 --- a/vendor/github.com/nats-io/nats.go/go_test.sum +++ b/vendor/github.com/nats-io/nats.go/go_test.sum @@ -1,3 +1,5 @@ +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op h1:+OSa/t11TFhqfrX0EOSqQBDJ0YlpmK0rDSiB19dg9M0= +github.com/antithesishq/antithesis-sdk-go v0.4.3-default-no-op/go.mod h1:IUpT2DPAKh6i/YhSbt6Gl3v2yvUZjmKncl7U91fup7E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= @@ -9,21 +11,24 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= -github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc= +github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/nats-io/jwt v1.2.2 h1:w3GMTO969dFg+UOKTmmyuu7IGdusK+7Ytlt//OYH/uU= github.com/nats-io/jwt v1.2.2/go.mod h1:/xX356yQA6LuXI9xWW7mZNpxgF2mBmGecH+Fj34sP5Q= github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= -github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= -github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= +github.com/nats-io/nats-server/v2 v2.11.0 h1:fdwAT1d6DZW/4LUz5rkvQUe5leGEwjjOQYntzVRKvjE= +github.com/nats-io/nats-server/v2 v2.11.0/go.mod h1:leXySghbdtXSUmWem8K9McnJ6xbJOb0t9+NQ5HTRZjI= github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= -github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= -github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc= +github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -34,20 +39,19 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= -golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= +golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= -golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= -golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0= +golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= diff --git a/vendor/github.com/nats-io/nats.go/js.go b/vendor/github.com/nats-io/nats.go/js.go index fe246aaea..705932ee1 100644 --- a/vendor/github.com/nats-io/nats.go/js.go +++ b/vendor/github.com/nats-io/nats.go/js.go @@ -1,4 +1,4 @@ -// Copyright 2020-2024 The NATS Authors +// Copyright 2020-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -273,6 +273,8 @@ type jsOpts struct { aecb MsgErrHandler // Max async pub ack in flight maxpa int + // ackTimeout is the max time to wait for an ack in async publish. + ackTimeout time.Duration // the domain that produced the pre domain string // enables protocol tracing @@ -466,13 +468,14 @@ func (opt pubOptFn) configurePublish(opts *pubOpts) error { } type pubOpts struct { - ctx context.Context - ttl time.Duration - id string - lid string // Expected last msgId - str string // Expected stream name - seq *uint64 // Expected last sequence - lss *uint64 // Expected last sequence per subject + ctx context.Context + ttl time.Duration + id string + lid string // Expected last msgId + str string // Expected stream name + seq *uint64 // Expected last sequence + lss *uint64 // Expected last sequence per subject + msgTTL time.Duration // Message TTL // Publish retries for NoResponders err. rwait time.Duration // Retry wait between attempts @@ -507,6 +510,7 @@ const ( ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence" ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id" MsgRollup = "Nats-Rollup" + MsgTTLHdr = "Nats-TTL" ) // Headers for republished messages and direct gets. @@ -566,6 +570,9 @@ func (js *js) PublishMsg(m *Msg, opts ...PubOpt) (*PubAck, error) { if o.lss != nil { m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10)) } + if o.msgTTL > 0 { + m.Header.Set(MsgTTLHdr, o.msgTTL.String()) + } var resp *Msg var err error @@ -648,6 +655,7 @@ type pubAckFuture struct { maxRetries int retryWait time.Duration reply string + timeout *time.Timer } func (paf *pubAckFuture) Ok() <-chan *PubAck { @@ -712,13 +720,19 @@ func (js *js) newAsyncReply() string { } var sb strings.Builder sb.WriteString(js.rpre) - rn := js.rr.Int63() - var b [aReplyTokensize]byte - for i, l := 0, rn; i < len(b); i++ { - b[i] = rdigits[l%base] - l /= base + for { + rn := js.rr.Int63() + var b [aReplyTokensize]byte + for i, l := 0, rn; i < len(b); i++ { + b[i] = rdigits[l%base] + l /= base + } + if _, ok := js.pafs[string(b[:])]; ok { + continue + } + sb.Write(b[:]) + break } - sb.Write(b[:]) js.mu.Unlock() return sb.String() } @@ -894,6 +908,10 @@ func (js *js) handleAsyncReply(m *Msg) { } } + if paf.timeout != nil { + paf.timeout.Stop() + } + // Process no responders etc. if len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders { if paf.retries < paf.maxRetries { @@ -975,6 +993,15 @@ func PublishAsyncMaxPending(max int) JSOpt { }) } +// PublishAsyncTimeout sets the timeout for async message publish. +// If not provided, timeout is disabled. +func PublishAsyncTimeout(dur time.Duration) JSOpt { + return jsOptFn(func(opts *jsOpts) error { + opts.ackTimeout = dur + return nil + }) +} + // PublishAsync publishes a message to JetStream and returns a PubAckFuture func (js *js) PublishAsync(subj string, data []byte, opts ...PubOpt) (PubAckFuture, error) { return js.PublishMsgAsync(&Msg{Subject: subj, Data: data}, opts...) @@ -1024,6 +1051,9 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { if o.lss != nil { m.Header.Set(ExpectedLastSubjSeqHdr, strconv.FormatUint(*o.lss, 10)) } + if o.msgTTL > 0 { + m.Header.Set(MsgTTLHdr, o.msgTTL.String()) + } // Reply paf := o.pafRetry @@ -1050,11 +1080,52 @@ func (js *js) PublishMsgAsync(m *Msg, opts ...PubOpt) (PubAckFuture, error) { case <-js.asyncStall(): case <-time.After(stallWait): js.clearPAF(id) - return nil, errors.New("nats: stalled with too many outstanding async published messages") + return nil, ErrTooManyStalledMsgs } } + if js.opts.ackTimeout > 0 { + paf.timeout = time.AfterFunc(js.opts.ackTimeout, func() { + js.mu.Lock() + defer js.mu.Unlock() + + if _, ok := js.pafs[id]; !ok { + // paf has already been resolved + // while waiting for the lock + return + } + + // ack timed out, remove from pending acks + delete(js.pafs, id) + + // check on anyone stalled and waiting. + if js.stc != nil && len(js.pafs) < js.opts.maxpa { + close(js.stc) + js.stc = nil + } + + // send error to user + paf.err = ErrAsyncPublishTimeout + if paf.errCh != nil { + paf.errCh <- paf.err + } + + // call error callback if set + if js.opts.aecb != nil { + js.opts.aecb(js, paf.msg, ErrAsyncPublishTimeout) + } + + // check on anyone one waiting on done status. + if js.dch != nil && len(js.pafs) == 0 { + close(js.dch) + js.dch = nil + } + }) + } } else { reply = paf.reply + if paf.timeout != nil { + paf.timeout.Reset(js.opts.ackTimeout) + } id = reply[js.replyPrefixLen:] } hdr, err := m.headerBytes() @@ -1151,6 +1222,15 @@ func StallWait(ttl time.Duration) PubOpt { }) } +// MsgTTL sets per msg TTL. +// Requires [StreamConfig.AllowMsgTTL] to be enabled. +func MsgTTL(dur time.Duration) PubOpt { + return pubOptFn(func(opts *pubOpts) error { + opts.msgTTL = dur + return nil + }) +} + type ackOpts struct { ttl time.Duration ctx context.Context @@ -2206,6 +2286,9 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { // retry for insufficient resources, as it may mean that client is connected to a running // server in cluster while the server hosting R1 JetStream resources is restarting return + } else if errors.As(err, &apiErr) && apiErr.ErrorCode == JSErrCodeJetStreamNotAvailable { + // retry if JetStream meta leader is temporarily unavailable + return } pushErr(err) return diff --git a/vendor/github.com/nats-io/nats.go/jserrors.go b/vendor/github.com/nats-io/nats.go/jserrors.go index 1c22d812b..65e78cf04 100644 --- a/vendor/github.com/nats-io/nats.go/jserrors.go +++ b/vendor/github.com/nats-io/nats.go/jserrors.go @@ -1,4 +1,4 @@ -// Copyright 2020-2023 The NATS Authors +// Copyright 2020-2025 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -157,6 +157,13 @@ var ( // Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases. // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") + + // ErrAsyncPublishTimeout is returned when waiting for ack on async publish + ErrAsyncPublishTimeout JetStreamError = &jsError{message: "timeout waiting for ack"} + + // ErrTooManyStalledMsgs is returned when too many outstanding async + // messages are waiting for ack. + ErrTooManyStalledMsgs JetStreamError = &jsError{message: "stalled with too many outstanding async published messages"} ) // Error code represents JetStream error codes returned by the API @@ -166,6 +173,7 @@ const ( JSErrCodeJetStreamNotEnabledForAccount ErrorCode = 10039 JSErrCodeJetStreamNotEnabled ErrorCode = 10076 JSErrCodeInsufficientResourcesErr ErrorCode = 10023 + JSErrCodeJetStreamNotAvailable ErrorCode = 10008 JSErrCodeStreamNotFound ErrorCode = 10059 JSErrCodeStreamNameInUse ErrorCode = 10058 diff --git a/vendor/github.com/nats-io/nats.go/jsm.go b/vendor/github.com/nats-io/nats.go/jsm.go index 2ae19c7a3..18ee255f2 100644 --- a/vendor/github.com/nats-io/nats.go/jsm.go +++ b/vendor/github.com/nats-io/nats.go/jsm.go @@ -243,6 +243,14 @@ type StreamConfig struct { // Template identifies the template that manages the Stream. Deprecated: // This feature is no longer supported. Template string `json:"template_owner,omitempty"` + + // AllowMsgTTL allows header initiated per-message TTLs. + // This feature requires nats-server v2.11.0 or later. + AllowMsgTTL bool `json:"allow_msg_ttl"` + + // Enables and sets a duration for adding server markers for delete, purge and max age limits. + // This feature requires nats-server v2.11.0 or later. + SubjectDeleteMarkerTTL time.Duration `json:"subject_delete_marker_ttl,omitempty"` } // SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received. diff --git a/vendor/github.com/nats-io/nats.go/kv.go b/vendor/github.com/nats-io/nats.go/kv.go index bcb283ff8..0d75bdf87 100644 --- a/vendor/github.com/nats-io/nats.go/kv.go +++ b/vendor/github.com/nats-io/nats.go/kv.go @@ -929,13 +929,14 @@ func (kv *kvs) History(key string, opts ...WatchOpt) ([]KeyValueEntry, error) { // Implementation for Watch type watcher struct { - mu sync.Mutex - updates chan KeyValueEntry - sub *Subscription - initDone bool - initPending uint64 - received uint64 - ctx context.Context + mu sync.Mutex + updates chan KeyValueEntry + sub *Subscription + initDone bool + initPending uint64 + received uint64 + ctx context.Context + initDoneTimer *time.Timer } // Context returns the context for the watcher if set. @@ -1044,8 +1045,11 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error w.initPending = delta } if w.received > w.initPending || delta == 0 { + w.initDoneTimer.Stop() w.initDone = true w.updates <- nil + } else if w.initDoneTimer != nil { + w.initDoneTimer.Reset(kv.js.opts.wait) } } } @@ -1088,6 +1092,16 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error if sub.jsi != nil && sub.jsi.pending == 0 { w.initDone = true w.updates <- nil + } else { + // Set a timer to send the marker if we do not get any messages. + w.initDoneTimer = time.AfterFunc(kv.js.opts.wait, func() { + w.mu.Lock() + defer w.mu.Unlock() + if !w.initDone { + w.initDone = true + w.updates <- nil + } + }) } } else { // if UpdatesOnly was used, mark initialization as complete @@ -1097,6 +1111,7 @@ func (kv *kvs) WatchFiltered(keys []string, opts ...WatchOpt) (KeyWatcher, error sub.pDone = func(_ string) { close(w.updates) } + sub.mu.Unlock() w.sub = sub diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 0d1358122..683505662 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -47,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.39.1" + Version = "1.40.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -274,7 +274,6 @@ type InProcessConnProvider interface { // Options can be used to create a customized connection. type Options struct { - // Url represents a single NATS server url to which the client // will be connecting. If the Servers option is also set, it // then becomes the first server in the Servers array. @@ -422,6 +421,10 @@ type Options struct { // AsyncErrorCB sets the async error handler (e.g. slow consumer errors) AsyncErrorCB ErrHandler + // ReconnectErrCB sets the callback that is invoked whenever a + // reconnect attempt failed + ReconnectErrCB ConnErrHandler + // ReconnectBufSize is the size of the backing bufio during reconnect. // Once this has been exhausted publish operations will return an error. // Defaults to 8388608 bytes (8MB). @@ -1151,6 +1154,14 @@ func ReconnectHandler(cb ConnHandler) Option { } } +// ReconnectErrHandler is an Option to set the reconnect error handler. +func ReconnectErrHandler(cb ConnErrHandler) Option { + return func(o *Options) error { + o.ReconnectErrCB = cb + return nil + } +} + // ClosedHandler is an Option to set the closed handler. func ClosedHandler(cb ConnHandler) Option { return func(o *Options) error { @@ -2386,7 +2397,6 @@ func (nc *Conn) setup() { // Process a connected connection and initialize properly. func (nc *Conn) processConnectInit() error { - // Set our deadline for the whole connect process nc.conn.SetDeadline(time.Now().Add(nc.Opts.Timeout)) defer nc.conn.SetDeadline(time.Time{}) @@ -2535,7 +2545,6 @@ func (nc *Conn) checkForSecure() error { // processExpectedInfo will look for the expected first INFO message // sent when a connection is established. The lock should be held entering. func (nc *Conn) processExpectedInfo() error { - c := &control{} // Read the protocol @@ -2640,8 +2649,10 @@ func (nc *Conn) connectProto() (string, error) { // If our server does not support headers then we can't do them or no responders. hdrs := nc.info.Headers - cinfo := connectInfo{o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token, - o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs} + cinfo := connectInfo{ + o.Verbose, o.Pedantic, ujwt, nkey, sig, user, pass, token, + o.Secure, o.Name, LangString, Version, clientProtoInfo, !o.NoEcho, hdrs, hdrs, + } b, err := json.Marshal(cinfo) if err != nil { @@ -2911,10 +2922,13 @@ func (nc *Conn) doReconnect(err error, forceReconnect bool) { // Try to create a new connection err = nc.createConn() - // Not yet connected, retry... // Continue to hold the lock if err != nil { + // Perform appropriate callback for a failed connection attempt. + if nc.Opts.ReconnectErrCB != nil { + nc.ach.push(func() { nc.Opts.ReconnectErrCB(nc, err) }) + } nc.err = nil continue } @@ -3259,7 +3273,7 @@ func (nc *Conn) processMsg(data []byte) { // It's possible that we end-up not using the message, but that's ok. // FIXME(dlc): Need to copy, should/can do COW? - var msgPayload = data + msgPayload := data if !nc.ps.msgCopied { msgPayload = make([]byte, len(data)) copy(msgPayload, data) @@ -3450,8 +3464,10 @@ slowConsumer: } } -var permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`) -var permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`) +var ( + permissionsRe = regexp.MustCompile(`Subscription to "(\S+)"`) + permissionsQueueRe = regexp.MustCompile(`using queue "(\S+)"`) +) // processTransientError is called when the server signals a non terminal error // which does not close the connection or trigger a reconnect. @@ -3976,7 +3992,7 @@ func (nc *Conn) publish(subj, reply string, hdr, data []byte) error { // go 1.14 some values strconv faster, may be able to switch over. var b [12]byte - var i = len(b) + i := len(b) if hdr != nil { if len(hdr) > 0 { @@ -5677,7 +5693,7 @@ func (nc *Conn) IsDraining() bool { // caller must lock func (nc *Conn) getServers(implicitOnly bool) []string { poolSize := len(nc.srvPool) - var servers = make([]string, 0) + servers := make([]string, 0) for i := 0; i < poolSize; i++ { if implicitOnly && !nc.srvPool[i].isImplicit { continue diff --git a/vendor/modules.txt b/vendor/modules.txt index 2ede1e7f0..d2b6ae0bf 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1009,8 +1009,8 @@ github.com/nats-io/nats-server/v2/server/stree github.com/nats-io/nats-server/v2/server/sysmem github.com/nats-io/nats-server/v2/server/thw github.com/nats-io/nats-server/v2/server/tpm -# github.com/nats-io/nats.go v1.39.1 -## explicit; go 1.22.0 +# github.com/nats-io/nats.go v1.40.0 +## explicit; go 1.23.0 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin github.com/nats-io/nats.go/internal/parser