diff --git a/go.mod b/go.mod index 091f874c02..8ad08c3687 100644 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( github.com/libregraph/lico v0.60.1-0.20230811070109-1d4140be554d github.com/mitchellh/mapstructure v1.5.0 github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 - github.com/nats-io/nats-server/v2 v2.9.21 + github.com/nats-io/nats-server/v2 v2.9.22 github.com/oklog/run v1.1.0 github.com/olekukonko/tablewriter v0.0.5 github.com/onsi/ginkgo v1.16.5 @@ -265,7 +265,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/nats-io/jwt/v2 v2.4.1 // indirect + github.com/nats-io/jwt/v2 v2.5.0 // indirect github.com/nats-io/nats.go v1.28.0 // indirect github.com/nats-io/nkeys v0.4.4 // indirect github.com/nats-io/nuid v1.0.1 // indirect diff --git a/go.sum b/go.sum index 8e874fc1c7..2072b298e4 100644 --- a/go.sum +++ b/go.sum @@ -1711,10 +1711,10 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8= -github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= -github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk= -github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= +github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak= +github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= +github.com/nats-io/nats-server/v2 v2.9.22 h1:rzl88pqWFFrU4G00ed+JnY+uGHSLZ+3jrxDnJxzKwGA= +github.com/nats-io/nats-server/v2 v2.9.22/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0= github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c= github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= diff --git a/vendor/github.com/nats-io/jwt/v2/account_claims.go b/vendor/github.com/nats-io/jwt/v2/account_claims.go index 95240096ec..a62d488534 100644 --- a/vendor/github.com/nats-io/jwt/v2/account_claims.go +++ b/vendor/github.com/nats-io/jwt/v2/account_claims.go @@ -154,10 +154,6 @@ func (m *Mapping) Validate(vr *ValidationResults) { total := uint8(0) for _, wm := range wm { wm.Subject.Validate(vr) - if wm.Subject.HasWildCards() { - vr.AddError("Subject %q in weighted mapping %q is not allowed to contains wildcard", - string(wm.Subject), ubFrom) - } total += wm.GetWeight() } if total > 100 { diff --git a/vendor/github.com/nats-io/nats-server/v2/conf/lex.go b/vendor/github.com/nats-io/nats-server/v2/conf/lex.go index caee90782a..b18447df86 100644 --- a/vendor/github.com/nats-io/nats-server/v2/conf/lex.go +++ b/vendor/github.com/nats-io/nats-server/v2/conf/lex.go @@ -78,6 +78,7 @@ const ( topOptTerm = '}' blockStart = '(' blockEnd = ')' + mapEndString = string(mapEnd) ) type stateFn func(lx *lexer) stateFn diff --git a/vendor/github.com/nats-io/nats-server/v2/conf/parse.go b/vendor/github.com/nats-io/nats-server/v2/conf/parse.go index 4d2a20adee..c104c307f1 100644 --- a/vendor/github.com/nats-io/nats-server/v2/conf/parse.go +++ b/vendor/github.com/nats-io/nats-server/v2/conf/parse.go @@ -137,18 +137,22 @@ func parse(data, fp string, pedantic bool) (p *parser, err error) { } p.pushContext(p.mapping) + var prevItem item for { it := p.next() if it.typ == itemEOF { + // Here we allow the final character to be a bracket '}' + // in order to support JSON like configurations. + if prevItem.typ == itemKey && prevItem.val != mapEndString { + return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos) + } break } + prevItem = it if err := p.processItem(it, fp); err != nil { return nil, err } } - if len(p.mapping) == 0 { - return nil, fmt.Errorf("config has no values or is empty") - } return p, nil } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/client.go b/vendor/github.com/nats-io/nats-server/v2/server/client.go index b6f73442d9..e98c605470 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/client.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/client.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "errors" "fmt" "io" "math/rand" @@ -5304,7 +5305,10 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi if solicit { // Based on type of error, possibly clear the saved tlsName // See: https://github.com/nats-io/nats-server/issues/1256 - if _, ok := err.(x509.HostnameError); ok { + // NOTE: As of Go 1.20, the HostnameError is wrapped so cannot + // type assert to check directly. + var hostnameErr x509.HostnameError + if errors.As(err, &hostnameErr) { if host == tlsName { resetTLSName = true } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/const.go b/vendor/github.com/nats-io/nats-server/v2/server/const.go index d5789cbaca..a76bcada54 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/const.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/const.go @@ -41,7 +41,7 @@ var ( const ( // VERSION is the current version for the server. - VERSION = "2.9.21" + VERSION = "2.9.22" // PROTO is the currently supported protocol. // 0 was the original diff --git a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go index b229b9fdbc..a4dab3759d 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/consumer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/consumer.go @@ -213,14 +213,14 @@ var ( // Calculate accurate replicas for the consumer config with the parent stream config. func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int { - if consCfg.Replicas == 0 { - if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy { + if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas { + if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 { + // Matches old-school ephemerals only, where the replica count is 0. return 1 } return strCfg.Replicas - } else { - return consCfg.Replicas } + return consCfg.Replicas } // Consumer is a jetstream consumer. @@ -1091,7 +1091,7 @@ func (o *consumer) setLeader(isLeader bool) { if o.dthresh > 0 && (o.isPullMode() || !o.active) { // Pull consumer. We run the dtmr all the time for this one. stopAndClearTimer(&o.dtmr) - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } // If we are not in ReplayInstant mode mark us as in replay state until resolved. @@ -1121,7 +1121,6 @@ func (o *consumer) setLeader(isLeader bool) { if pullMode { // Now start up Go routine to process inbound next message requests. go o.processInboundNextMsgReqs(qch) - } // If we are R>1 spin up our proposal loop. @@ -1140,7 +1139,10 @@ func (o *consumer) setLeader(isLeader bool) { close(o.qch) o.qch = nil } - // Make sure to clear out any re delivery queues + // Stop any inactivity timers. Should only be running on leaders. + stopAndClearTimer(&o.dtmr) + + // Make sure to clear out any re-deliver queues stopAndClearTimer(&o.ptmr) o.rdq, o.rdqi = nil, nil o.pending = nil @@ -1156,9 +1158,6 @@ func (o *consumer) setLeader(isLeader bool) { // Reset waiting if we are in pull mode. if o.isPullMode() { o.waiting = newWaitQueue(o.cfg.MaxWaiting) - if !o.isDurable() { - stopAndClearTimer(&o.dtmr) - } o.nextMsgReqs.drain() } else if o.srv.gateway.enabled { stopAndClearTimer(&o.gwdtmr) @@ -1349,7 +1348,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool { // If we do not have interest anymore and have a delete threshold set, then set // a timer to delete us. We wait for a bit in case of server reconnect. if !interest && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) return true } return false @@ -1376,7 +1375,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh - elapsed) } else { - o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive) } o.mu.Unlock() return @@ -1386,7 +1385,7 @@ func (o *consumer) deleteNotActive() { if o.dtmr != nil { o.dtmr.Reset(o.dthresh) } else { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } o.mu.Unlock() return @@ -1640,7 +1639,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error { stopAndClearTimer(&o.dtmr) // Restart timer only if we are the leader. if o.isLeader() && o.dthresh > 0 { - o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() }) + o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive) } } @@ -3031,6 +3030,22 @@ func (o *consumer) incDeliveryCount(sseq uint64) uint64 { return o.rdc[sseq] + 1 } +// Used if we have to adjust on failed delivery or bad lookups. +// Those failed attempts should not increase deliver count. +// Lock should be held. +func (o *consumer) decDeliveryCount(sseq uint64) { + if o.rdc == nil { + return + } + if dc, ok := o.rdc[sseq]; ok { + if dc == 1 { + delete(o.rdc, sseq) + } else { + o.rdc[sseq] -= 1 + } + } +} + // send a delivery exceeded advisory. func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) { e := JSConsumerDeliveryExceededAdvisory{ @@ -3093,7 +3108,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { o.notifyDeliveryExceeded(seq, dc-1) } // Make sure to remove from pending. - delete(o.pending, seq) + if p, ok := o.pending[seq]; ok && p != nil { + delete(o.pending, seq) + o.updateDelivered(p.Sequence, seq, dc, p.Timestamp) + } continue } if seq > 0 { @@ -3102,6 +3120,8 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { if sm == nil || err != nil { pmsg.returnToPool() pmsg, dc = nil, 0 + // Adjust back deliver count. + o.decDeliveryCount(seq) } return pmsg, dc, err } @@ -3203,6 +3223,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) { interest = true } } + // If interest, update batch pending requests counter and update fexp timer. if interest { brp += wr.n @@ -3289,7 +3310,7 @@ func (o *consumer) checkAckFloor() { } } } else if numPending > 0 { - // here it shorter to walk pending. + // here it is shorter to walk pending. // toTerm is seq, dseq, rcd for each entry. toTerm := make([]uint64, 0, numPending*3) o.mu.RLock() @@ -3506,10 +3527,16 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { if err == ErrStoreEOF { o.checkNumPendingOnEOF() } - if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache { + if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending { goto waitForMsgs + } else if err == errPartialCache { + s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'", + o.mset.acc, o.mset.cfg.Name, o.cfg.Name) + goto waitForMsgs + } else { - s.Errorf("Received an error looking up message for consumer: %v", err) + s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v", + o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err) goto waitForMsgs } } @@ -3904,20 +3931,39 @@ func (o *consumer) trackPending(sseq, dseq uint64) { } } +// Credit back a failed delivery. +// lock should be held. +func (o *consumer) creditWaitingRequest(reply string) { + for i, rp := 0, o.waiting.rp; i < o.waiting.n; i++ { + if wr := o.waiting.reqs[rp]; wr != nil { + if wr.reply == reply { + wr.n++ + wr.d-- + return + } + } + rp = (rp + 1) % cap(o.waiting.reqs) + } +} + // didNotDeliver is called when a delivery for a consumer message failed. // Depending on our state, we will process the failure. -func (o *consumer) didNotDeliver(seq uint64) { +func (o *consumer) didNotDeliver(seq uint64, subj string) { o.mu.Lock() mset := o.mset if mset == nil { o.mu.Unlock() return } + // Adjust back deliver count. + o.decDeliveryCount(seq) + var checkDeliveryInterest bool if o.isPushMode() { o.active = false checkDeliveryInterest = true } else if o.pending != nil { + o.creditWaitingRequest(subj) // pull mode and we have pending. if _, ok := o.pending[seq]; ok { // We found this messsage on pending, we need diff --git a/vendor/github.com/nats-io/nats-server/v2/server/events.go b/vendor/github.com/nats-io/nats-server/v2/server/events.go index 1b58d912d7..2d8283e89e 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/events.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/events.go @@ -56,6 +56,7 @@ const ( connsRespSubj = "$SYS._INBOX_.%s" accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS" accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility + lameDuckEventSubj = "$SYS.SERVER.%s.LAMEDUCK" shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN" authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR" serverStatsSubj = "$SYS.SERVER.%s.STATSZ" @@ -533,6 +534,19 @@ RESET: } } +// Will send a shutdown message for lame-duck. Unlike sendShutdownEvent, this will +// not close off the send queue or reply handler, as we may still have a workload +// that needs migrating off. +// Lock should be held. +func (s *Server) sendLDMShutdownEventLocked() { + if s.sys == nil || s.sys.sendq == nil { + return + } + subj := fmt.Sprintf(lameDuckEventSubj, s.info.ID) + si := &ServerInfo{} + s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true)) +} + // Will send a shutdown message. func (s *Server) sendShutdownEvent() { s.mu.Lock() @@ -944,6 +958,13 @@ func (s *Server) initEventTracking() { if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { s.Errorf("Error setting up internal tracking: %v", err) } + // Listen for servers entering lame-duck mode. + // NOTE: This currently is handled in the same way as a server shutdown, but has + // a different subject in case we need to handle differently in future. + subject = fmt.Sprintf(lameDuckEventSubj, "*") + if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil { + s.Errorf("Error setting up internal tracking: %v", err) + } // Listen for account claims updates. subscribeToUpdate := true if s.accResolver != nil { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go index 7b167aefb9..34ef6ad82f 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/filestore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/filestore.go @@ -292,7 +292,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim } // Default values. if fcfg.BlockSize == 0 { - fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, fcfg.Cipher) + fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, prf != nil) } if fcfg.BlockSize > maxBlockSize { return nil, fmt.Errorf("filestore max block size is %s", friendlyBytes(maxBlockSize)) @@ -451,7 +451,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { fs.ageChk = nil } - if cfg.MaxMsgsPer > 0 && cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { + if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer { fs.enforceMsgPerSubjectLimit() } fs.mu.Unlock() @@ -462,7 +462,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error { return nil } -func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) uint64 { +func dynBlkSize(retention RetentionPolicy, maxBytes int64, encrypted bool) uint64 { if maxBytes > 0 { blkSize := (maxBytes / 4) + 1 // (25% overhead) // Round up to nearest 100 @@ -476,7 +476,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u } else { blkSize = defaultMediumBlockSize } - if cipher != NoCipher && blkSize > maximumEncryptedBlockSize { + if encrypted && blkSize > maximumEncryptedBlockSize { // Notes on this below. blkSize = maximumEncryptedBlockSize } @@ -484,7 +484,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u } switch { - case cipher != NoCipher: + case encrypted: // In the case of encrypted stores, large blocks can result in worsened perf // since many writes on disk involve re-encrypting the entire block. For now, // we will enforce a cap on the block size when encryption is enabled to avoid @@ -1032,7 +1032,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { fd = mb.mfd } else { fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms) - if err != nil { + if err == nil { defer fd.Close() } } @@ -1078,9 +1078,35 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { return gatherLost(lbuf - index), errBadMsg } + // Check for checksum failures before additional processing. + data := buf[index+msgHdrSize : index+rl] + if hh := mb.hh; hh != nil { + hh.Reset() + hh.Write(hdr[4:20]) + hh.Write(data[:slen]) + if hasHeaders { + hh.Write(data[slen+4 : dlen-recordHashSize]) + } else { + hh.Write(data[slen : dlen-recordHashSize]) + } + checksum := hh.Sum(nil) + if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { + truncate(index) + return gatherLost(lbuf - index), errBadMsg + } + copy(mb.lchk[0:], checksum) + } + + // Grab our sequence and timestamp. seq := le.Uint64(hdr[4:]) ts := int64(le.Uint64(hdr[12:])) + // Check if this is a delete tombstone. + if seq&tbit != 0 { + index += rl + continue + } + // This is an old erased message, or a new one that we can track. if seq == 0 || seq&ebit != 0 || seq < mb.first.seq { seq = seq &^ ebit @@ -1108,29 +1134,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { _, deleted = mb.dmap[seq] } - // Always set last. - mb.last.seq = seq - mb.last.ts = ts - if !deleted { - data := buf[index+msgHdrSize : index+rl] - if hh := mb.hh; hh != nil { - hh.Reset() - hh.Write(hdr[4:20]) - hh.Write(data[:slen]) - if hasHeaders { - hh.Write(data[slen+4 : dlen-recordHashSize]) - } else { - hh.Write(data[slen : dlen-recordHashSize]) - } - checksum := hh.Sum(nil) - if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) { - truncate(index) - return gatherLost(lbuf - index), errBadMsg - } - copy(mb.lchk[0:], checksum) - } - if firstNeedsSet { firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts } @@ -1156,6 +1160,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) { mb.fssNeedsWrite = true } } + + // Always set last + mb.last.seq = seq + mb.last.ts = ts + // Advance to next record. index += rl } @@ -1315,6 +1324,11 @@ func (fs *fileStore) expireMsgsOnRecover() { fs.psim = make(map[string]*psi) return false } + // Make sure we do subject cleanup as well. + mb.ensurePerSubjectInfoLoaded() + for subj := range mb.fss { + fs.removePerSubject(subj) + } mb.dirtyCloseWithRemove(true) deleted++ return true @@ -2387,6 +2401,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in } } } + } else if mb := fs.selectMsgBlock(fseq); mb != nil { + // If we are here we could not remove fseq from above, so rebuild. + var ld *LostStreamData + if ld, _ = mb.rebuildState(); ld != nil { + fs.rebuildStateLocked(ld) + } } } @@ -2514,7 +2534,14 @@ func (fs *fileStore) rebuildFirst() { fmb.removeIndexFile() ld, _ := fmb.rebuildState() - fmb.writeIndexInfo() + fmb.mu.RLock() + isEmpty := fmb.msgs == 0 + fmb.mu.RUnlock() + if isEmpty { + fs.removeMsgBlock(fmb) + } else { + fmb.writeIndexInfo() + } fs.selectNextFirst() fs.rebuildStateLocked(ld) } @@ -3036,7 +3063,8 @@ func (mb *msgBlock) compact() { if !isDeleted(seq) { // Normal message here. nbuf = append(nbuf, buf[index:index+rl]...) - if !firstSet { + // Do not set based on tombstone. + if !firstSet && seq&tbit == 0 { firstSet = true mb.first.seq = seq } @@ -3799,20 +3827,25 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte // Update write through cache. // Write to msg record. mb.cache.buf = append(mb.cache.buf, checksum...) - // Write index - mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit) mb.cache.lrl = uint32(rl) - if mb.cache.fseq == 0 { - mb.cache.fseq = seq - } // Set cache timestamp for last store. mb.lwts = ts // Decide if we write index info if flushing in place. writeIndex := ts-mb.lwits > wiThresh - // Accounting - mb.updateAccounting(seq, ts, rl) + // Only update index and do accounting if not a delete tombstone. + if seq&tbit == 0 { + // Accounting, do this before stripping ebit, it is ebit aware. + mb.updateAccounting(seq, ts, rl) + // Strip ebit if set. + seq = seq &^ ebit + if mb.cache.fseq == 0 { + mb.cache.fseq = seq + } + // Write index + mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit) + } fch, werr := mb.fch, mb.werr @@ -3915,7 +3948,7 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) { seq = seq &^ ebit } - if mb.first.seq == 0 || mb.first.ts == 0 { + if (mb.first.seq == 0 || mb.first.ts == 0) && seq >= mb.first.seq { mb.first.seq = seq mb.first.ts = ts } @@ -4008,28 +4041,30 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) { return -1, nil } - // Starting index, defaults to beginning. - si := 0 + const linearThresh = 32 + nb := len(fs.blks) - 1 - // TODO(dlc) - Use new AVL and make this real for anything beyond certain size. - // Max threshold before we probe for a starting block to start our linear search. - const maxl = 256 - if nb := len(fs.blks); nb > maxl { - d := nb / 8 - for _, i := range []int{d, 2 * d, 3 * d, 4 * d, 5 * d, 6 * d, 7 * d} { - mb := fs.blks[i] + if nb < linearThresh { + for i, mb := range fs.blks { if seq <= atomic.LoadUint64(&mb.last.seq) { - break + return i, mb } - si = i } + return -1, nil } - // blks are sorted in ascending order. - for i := si; i < len(fs.blks); i++ { - mb := fs.blks[i] - if seq <= atomic.LoadUint64(&mb.last.seq) { - return i, mb + // Do traditional binary search here since we know the blocks are sorted by sequence first and last. + for low, high, mid := 0, nb, nb/2; low <= high; mid = (low + high) / 2 { + mb := fs.blks[mid] + // Right now these atomic loads do not factor in, so fine to leave. Was considering + // uplifting these to fs scope to avoid atomic load but not needed. + first, last := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq) + if seq > last { + low = mid + 1 + } else if seq < first { + high = mid - 1 + } else { + return mid, mb } } @@ -4098,6 +4133,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error { return errCorruptState } + // Check for tombstones which we can skip in terms of indexing. + if seq&tbit != 0 { + index += rl + continue + } + // Clear erase bit. seq = seq &^ ebit @@ -4453,15 +4494,17 @@ var ( errNoMainKey = errors.New("encrypted store encountered with no main key") ) -// Used for marking messages that have had their checksums checked. -// Used to signal a message record with headers. -const hbit = 1 << 31 - -// Used for marking erased messages sequences. -const ebit = 1 << 63 - -// Used to mark a bad index as deleted. -const dbit = 1 << 30 +const ( + // Used for marking messages that have had their checksums checked. + // Used to signal a message record with headers. + hbit = 1 << 31 + // Used for marking erased messages sequences. + ebit = 1 << 63 + // Used for marking tombstone sequences. + tbit = 1 << 62 + // Used to mark a bad index as deleted. + dbit = 1 << 30 +) // Will do a lookup from cache. // Lock should be held. @@ -4606,7 +4649,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store dlen := int(rl) - msgHdrSize slen := int(le.Uint16(hdr[20:])) // Simple sanity check. - if dlen < 0 || slen > dlen || int(rl) > len(buf) { + if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) { return nil, errBadMsg } data := buf[msgHdrSize : msgHdrSize+dlen] @@ -4799,20 +4842,17 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store start = fs.state.FirstSeq } - // TODO(dlc) - If num blocks gets large maybe use selectMsgBlock but have it return index b/c - // we need to keep walking if no match found in first mb. - for _, mb := range fs.blks { - // Skip blocks that are less than our starting sequence. - if start > atomic.LoadUint64(&mb.last.seq) { - continue - } - if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil { - if expireOk && mb != fs.lmb { - mb.tryForceExpireCache() + if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 { + for i := bi; i < len(fs.blks); i++ { + mb := fs.blks[i] + if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil { + if expireOk && mb != fs.lmb { + mb.tryForceExpireCache() + } + return sm, sm.seq, nil + } else if err != ErrStoreMsgNotFound { + return nil, 0, err } - return sm, sm.seq, nil - } else if err != ErrStoreMsgNotFound { - return nil, 0, err } } @@ -5205,16 +5245,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint } if sequence > 1 { return fs.Compact(sequence) - } else if keep > 0 { - fs.mu.RLock() - msgs, lseq := fs.state.Msgs, fs.state.LastSeq - fs.mu.RUnlock() - if keep >= msgs { - return 0, nil - } - return fs.Compact(lseq - keep + 1) } - return 0, nil } eq, wc := compareFn(subject), subjectHasWildcard(subject) @@ -5950,7 +5981,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si slen := int(le.Uint16(hdr[20:])) if subj == string(buf[msgHdrSize:msgHdrSize+slen]) { seq := le.Uint64(hdr[4:]) - if seq&ebit != 0 { + if seq < mb.first.seq || seq&ebit != 0 { continue } if len(mb.dmap) > 0 { @@ -6946,20 +6977,28 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err } if dc > 1 { + if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc { + // Make sure to remove from pending. + delete(o.state.Pending, sseq) + } if o.state.Redelivered == nil { o.state.Redelivered = make(map[uint64]uint64) } // Only update if greater then what we already have. - if o.state.Redelivered[sseq] < dc { + if o.state.Redelivered[sseq] < dc-1 { o.state.Redelivered[sseq] = dc - 1 } } } else { // For AckNone just update delivered and ackfloor at the same time. - o.state.Delivered.Consumer = dseq - o.state.Delivered.Stream = sseq - o.state.AckFloor.Consumer = dseq - o.state.AckFloor.Stream = sseq + if dseq > o.state.Delivered.Consumer { + o.state.Delivered.Consumer = dseq + o.state.AckFloor.Consumer = dseq + } + if sseq > o.state.Delivered.Stream { + o.state.Delivered.Stream = sseq + o.state.AckFloor.Stream = sseq + } } // Make sure we flush to disk. o.kickFlusher() diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go index d4f642ed3d..de79b9d4ea 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream.go @@ -148,6 +148,9 @@ type jsAccount struct { // From server sendq *ipQueue[*pubMsg] + // For limiting only running one checkAndSync at a time. + sync atomic.Bool + // Usage/limits related fields that will be protected by usageMu usageMu sync.RWMutex limits map[string]JetStreamAccountLimits // indexed by tierName @@ -1811,6 +1814,12 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account // When we detect a skew of some sort this will verify the usage reporting is correct. // No locks should be held. func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) { + // This will run in a separate go routine, so check that we are only running once. + if !jsa.sync.CompareAndSwap(false, true) { + return + } + defer jsa.sync.Store(false) + // Hold the account read lock and the usage lock while we calculate. // We scope by tier and storage type, but if R3 File has 200 streams etc. could // show a pause. I did test with > 100 non-active streams and was 80-200ns or so. @@ -1916,7 +1925,10 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta jsa.usageMu.Unlock() if needsCheck { - jsa.checkAndSyncUsage(tierName, storeType) + // We could be holding the stream lock from up in the stack, and this + // will want the jsa lock, which would violate locking order. + // So do this in a Go routine. The function will check if it is already running. + go jsa.checkAndSyncUsage(tierName, storeType) } } diff --git a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go index 6af4fa42ba..52ccf5bddf 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go @@ -2161,21 +2161,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps startMigrationMonitoring := func() { if mmt == nil { - mmt = time.NewTicker(10 * time.Millisecond) + mmt = time.NewTicker(500 * time.Millisecond) mmtc = mmt.C } } - adjustMigrationMonitoring := func() { - const delay = 500 * time.Millisecond - if mmt == nil { - mmt = time.NewTicker(delay) - mmtc = mmt.C - } else { - mmt.Reset(delay) - } - } - stopMigrationMonitoring := func() { if mmt != nil { mmt.Stop() @@ -2407,9 +2397,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps continue } - // Adjust to our normal time delay. - adjustMigrationMonitoring() - // Make sure we have correct cluster information on the other peers. ci := js.clusterInfo(rg) mset.checkClusterInfo(ci) @@ -4001,7 +3988,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state var didCreate, isConfigUpdate, needsLocalResponse bool if o == nil { // Add in the consumer if needed. - if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil { + if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil { didCreate = true } } else { @@ -5197,7 +5184,12 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe return true } - // If we are here let's remove the peer at least. + // If R1 just return to avoid bricking the stream. + if sa.Group.node == nil || len(sa.Group.Peers) == 1 { + return false + } + + // If we are here let's remove the peer at least, as long as we are R>1 for i, peer := range sa.Group.Peers { if peer == removePeer { sa.Group.Peers[i] = sa.Group.Peers[len(sa.Group.Peers)-1] diff --git a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go index 39170e3c17..9b78676a79 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/leafnode.go @@ -1643,6 +1643,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { subs := _subs[:0] ims := []string{} + // Hold the client lock otherwise there can be a race and miss some subs. + c.mu.Lock() + defer c.mu.Unlock() + acc.mu.RLock() accName := acc.Name accNTag := acc.nameTag @@ -1718,7 +1722,6 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { } // Now walk the results and add them to our smap - c.mu.Lock() rc := c.leaf.remoteCluster c.leaf.smap = make(map[string]int32) for _, sub := range subs { @@ -1784,7 +1787,6 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) { c.mu.Unlock() }) } - c.mu.Unlock() } // updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go index b35e3d294d..b2ed14aac8 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/memstore.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/memstore.go @@ -1307,17 +1307,28 @@ func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) erro } if dc > 1 { + if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc { + // Make sure to remove from pending. + delete(o.state.Pending, sseq) + } if o.state.Redelivered == nil { o.state.Redelivered = make(map[uint64]uint64) } - o.state.Redelivered[sseq] = dc - 1 + // Only update if greater then what we already have. + if o.state.Redelivered[sseq] < dc-1 { + o.state.Redelivered[sseq] = dc - 1 + } } } else { // For AckNone just update delivered and ackfloor at the same time. - o.state.Delivered.Consumer = dseq - o.state.Delivered.Stream = sseq - o.state.AckFloor.Consumer = dseq - o.state.AckFloor.Stream = sseq + if dseq > o.state.Delivered.Consumer { + o.state.Delivered.Consumer = dseq + o.state.AckFloor.Consumer = dseq + } + if sseq > o.state.Delivered.Stream { + o.state.Delivered.Stream = sseq + o.state.AckFloor.Stream = sseq + } } return nil diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go index b7b26b48c5..d279abb77b 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor.go @@ -492,7 +492,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) { case ByLast: sort.Sort(sort.Reverse(byLast{pconns})) case ByIdle: - sort.Sort(sort.Reverse(byIdle{pconns})) + sort.Sort(sort.Reverse(byIdle{pconns, c.Now})) case ByUptime: sort.Sort(byUptime{pconns, time.Now()}) case ByStop: @@ -1120,14 +1120,17 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { queues := map[string]monitorIPQueue{} - s.ipQueues.Range(func(k, v interface{}) bool { + s.ipQueues.Range(func(k, v any) bool { + var pending, inProgress int name := k.(string) - queue := v.(interface { + queue, ok := v.(interface { len() int - inProgress() uint64 + inProgress() int64 }) - pending := queue.len() - inProgress := int(queue.inProgress()) + if ok { + pending = queue.len() + inProgress = int(queue.inProgress()) + } if !all && (pending == 0 && inProgress == 0) { return true } else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) { @@ -2301,18 +2304,26 @@ func (s *Server) HandleAccountStatz(w http.ResponseWriter, r *http.Request) { ResponseHandler(w, r, b) } -// ResponseHandler handles responses for monitoring routes +// ResponseHandler handles responses for monitoring routes. func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) { + handleResponse(http.StatusOK, w, r, data) +} + +// handleResponse handles responses for monitoring routes with a specific HTTP status code. +func handleResponse(code int, w http.ResponseWriter, r *http.Request, data []byte) { // Get callback from request callback := r.URL.Query().Get("callback") // If callback is not empty then if callback != "" { // Response for JSONP w.Header().Set("Content-Type", "application/javascript") + w.WriteHeader(code) fmt.Fprintf(w, "%s(%s)", callback, data) } else { // Otherwise JSON w.Header().Set("Content-Type", "application/json") + w.Header().Set("Access-Control-Allow-Origin", "*") + w.WriteHeader(code) w.Write(data) } } @@ -3047,7 +3058,7 @@ type HealthStatus struct { Error string `json:"error,omitempty"` } -// https://tools.ietf.org/id/draft-inadarei-api-health-check-05.html +// https://datatracker.ietf.org/doc/html/draft-inadarei-api-health-check func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) { s.mu.Lock() s.httpReqStats[HealthzPath]++ @@ -3074,16 +3085,19 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) { JSEnabledOnly: jsEnabledOnly, JSServerOnly: jsServerOnly, }) + + code := http.StatusOK + if hs.Error != _EMPTY_ { s.Warnf("Healthcheck failed: %q", hs.Error) - w.WriteHeader(http.StatusServiceUnavailable) + code = http.StatusServiceUnavailable } b, err := json.Marshal(hs) if err != nil { s.Errorf("Error marshaling response to /healthz request: %v", err) } - ResponseHandler(w, r, b) + handleResponse(code, w, r, b) } // Generate health status. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/monitor_sort_opts.go b/vendor/github.com/nats-io/nats-server/v2/server/monitor_sort_opts.go index 58cc3900bb..2fcaf2e9f3 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/monitor_sort_opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/monitor_sort_opts.go @@ -92,12 +92,13 @@ func (l byLast) Less(i, j int) bool { } // Idle time -type byIdle struct{ ConnInfos } +type byIdle struct { + ConnInfos + now time.Time +} func (l byIdle) Less(i, j int) bool { - ii := l.ConnInfos[i].LastActivity.Sub(l.ConnInfos[i].Start) - ij := l.ConnInfos[j].LastActivity.Sub(l.ConnInfos[j].Start) - return ii < ij + return l.now.Sub(l.ConnInfos[i].LastActivity) < l.now.Sub(l.ConnInfos[j].LastActivity) } // Uptime diff --git a/vendor/github.com/nats-io/nats-server/v2/server/ocsp_peer.go b/vendor/github.com/nats-io/nats-server/v2/server/ocsp_peer.go index 0ddcc0c80f..0e424a4a9c 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/ocsp_peer.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/ocsp_peer.go @@ -139,13 +139,13 @@ func (s *Server) plugTLSOCSPPeer(config *tlsConfigKind) (*tls.Config, bool, erro if config == nil || config.tlsConfig == nil { return nil, false, errors.New(certidp.ErrUnableToPlugTLSEmptyConfig) } - s.Debugf(certidp.DbgPlugTLSForKind, config.kind) kind := config.kind isSpoke := config.isLeafSpoke tcOpts := config.tlsOpts if tcOpts == nil || tcOpts.OCSPPeerConfig == nil || !tcOpts.OCSPPeerConfig.Verify { return nil, false, nil } + s.Debugf(certidp.DbgPlugTLSForKind, config.kind) // peer is a tls client if kind == kindStringMap[CLIENT] || (kind == kindStringMap[LEAF] && !isSpoke) { if !tcOpts.Verify { diff --git a/vendor/github.com/nats-io/nats-server/v2/server/opts.go b/vendor/github.com/nats-io/nats-server/v2/server/opts.go index a2b231aa1f..1cb531dae0 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/opts.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/opts.go @@ -744,6 +744,9 @@ func (o *Options) ProcessConfigFile(configFile string) error { // Collect all errors and warnings and report them all together. errors := make([]error, 0) warnings := make([]error, 0) + if len(m) == 0 { + warnings = append(warnings, fmt.Errorf("%s: config has no values or is empty", configFile)) + } // First check whether a system account has been defined, // as that is a condition for other features to be enabled. diff --git a/vendor/github.com/nats-io/nats-server/v2/server/raft.go b/vendor/github.com/nats-io/nats-server/v2/server/raft.go index e625c47b2b..bee20a1fcd 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/raft.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/raft.go @@ -1814,6 +1814,7 @@ func (n *raft) runAsFollower() { if n.catchupStalled() { n.cancelCatchup() } + n.resetElectionTimeout() n.Unlock() } else { n.switchToCandidate() @@ -2508,6 +2509,10 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { } if err != nil || ae == nil { n.warn("Could not find a starting entry for catchup request: %v", err) + // If we are here we are seeing a request for an item we do not have, meaning we should stepdown. + // This is possible on a reset of our WAL but the other side has a snapshot already. + // If we do not stepdown this can cycle. + n.stepdown.push(noLeader) n.Unlock() arPool.Put(ar) return diff --git a/vendor/github.com/nats-io/nats-server/v2/server/server.go b/vendor/github.com/nats-io/nats-server/v2/server/server.go index 11e715b9b2..3ecb4e8750 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/server.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/server.go @@ -3557,6 +3557,7 @@ func (s *Server) lameDuckMode() { } s.Noticef("Entering lame duck mode, stop accepting new clients") s.ldm = true + s.sendLDMShutdownEventLocked() expected := 1 s.listener.Close() s.listener = nil diff --git a/vendor/github.com/nats-io/nats-server/v2/server/stream.go b/vendor/github.com/nats-io/nats-server/v2/server/stream.go index d03306d742..fb792845e8 100644 --- a/vendor/github.com/nats-io/nats-server/v2/server/stream.go +++ b/vendor/github.com/nats-io/nats-server/v2/server/stream.go @@ -2983,7 +2983,7 @@ func streamAndSeq(shdr string) (string, uint64) { } // New version which is stream index name sequence fields := strings.Fields(shdr) - if len(fields) != 2 { + if len(fields) < 2 { return _EMPTY_, 0 } return fields[0], uint64(parseAckReplyNum(fields[1])) @@ -4455,7 +4455,7 @@ func (mset *stream) internalLoop() { // Check to see if this is a delivery for a consumer and // we failed to deliver the message. If so alert the consumer. if pm.o != nil && pm.seq > 0 && !didDeliver { - pm.o.didNotDeliver(pm.seq) + pm.o.didNotDeliver(pm.seq, pm.dsubj) } pm.returnToPool() } diff --git a/vendor/modules.txt b/vendor/modules.txt index fb61d89d91..b67a2f1f84 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1333,10 +1333,10 @@ github.com/mohae/deepcopy # github.com/mschoch/smat v0.2.0 ## explicit; go 1.13 github.com/mschoch/smat -# github.com/nats-io/jwt/v2 v2.4.1 +# github.com/nats-io/jwt/v2 v2.5.0 ## explicit; go 1.18 github.com/nats-io/jwt/v2 -# github.com/nats-io/nats-server/v2 v2.9.21 +# github.com/nats-io/nats-server/v2 v2.9.22 ## explicit; go 1.19 github.com/nats-io/nats-server/v2/conf github.com/nats-io/nats-server/v2/internal/ldap