Bump github.com/nats-io/nats-server/v2 from 2.9.21 to 2.9.22

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.9.21 to 2.9.22.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.9.21...v2.9.22)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-09-08 06:31:16 +00:00
committed by Ralf Haferkamp
parent 92ae593b4b
commit c1dfe240d8
22 changed files with 321 additions and 169 deletions
+2 -2
View File
@@ -55,7 +55,7 @@ require (
github.com/libregraph/lico v0.60.1-0.20230811070109-1d4140be554d
github.com/mitchellh/mapstructure v1.5.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.9.21
github.com/nats-io/nats-server/v2 v2.9.22
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5
@@ -265,7 +265,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
github.com/nats-io/jwt/v2 v2.5.0 // indirect
github.com/nats-io/nats.go v1.28.0 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
+4 -4
View File
@@ -1711,10 +1711,10 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk=
github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
github.com/nats-io/jwt/v2 v2.5.0 h1:WQQ40AAlqqfx+f6ku+i0pOVm+ASirD4fUh+oQsiE9Ak=
github.com/nats-io/jwt/v2 v2.5.0/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.22 h1:rzl88pqWFFrU4G00ed+JnY+uGHSLZ+3jrxDnJxzKwGA=
github.com/nats-io/nats-server/v2 v2.9.22/go.mod h1:wEjrEy9vnqIGE4Pqz4/c75v9Pmaq7My2IgFmnykc4C0=
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
-4
View File
@@ -154,10 +154,6 @@ func (m *Mapping) Validate(vr *ValidationResults) {
total := uint8(0)
for _, wm := range wm {
wm.Subject.Validate(vr)
if wm.Subject.HasWildCards() {
vr.AddError("Subject %q in weighted mapping %q is not allowed to contains wildcard",
string(wm.Subject), ubFrom)
}
total += wm.GetWeight()
}
if total > 100 {
+1
View File
@@ -78,6 +78,7 @@ const (
topOptTerm = '}'
blockStart = '('
blockEnd = ')'
mapEndString = string(mapEnd)
)
type stateFn func(lx *lexer) stateFn
+7 -3
View File
@@ -137,18 +137,22 @@ func parse(data, fp string, pedantic bool) (p *parser, err error) {
}
p.pushContext(p.mapping)
var prevItem item
for {
it := p.next()
if it.typ == itemEOF {
// Here we allow the final character to be a bracket '}'
// in order to support JSON like configurations.
if prevItem.typ == itemKey && prevItem.val != mapEndString {
return nil, fmt.Errorf("config is invalid (%s:%d:%d)", fp, it.line, it.pos)
}
break
}
prevItem = it
if err := p.processItem(it, fp); err != nil {
return nil, err
}
}
if len(p.mapping) == 0 {
return nil, fmt.Errorf("config has no values or is empty")
}
return p, nil
}
+5 -1
View File
@@ -18,6 +18,7 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
@@ -5304,7 +5305,10 @@ func (c *client) doTLSHandshake(typ string, solicit bool, url *url.URL, tlsConfi
if solicit {
// Based on type of error, possibly clear the saved tlsName
// See: https://github.com/nats-io/nats-server/issues/1256
if _, ok := err.(x509.HostnameError); ok {
// NOTE: As of Go 1.20, the HostnameError is wrapped so cannot
// type assert to check directly.
var hostnameErr x509.HostnameError
if errors.As(err, &hostnameErr) {
if host == tlsName {
resetTLSName = true
}
+1 -1
View File
@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.9.21"
VERSION = "2.9.22"
// PROTO is the currently supported protocol.
// 0 was the original
+65 -19
View File
@@ -213,14 +213,14 @@ var (
// Calculate accurate replicas for the consumer config with the parent stream config.
func (consCfg ConsumerConfig) replicas(strCfg *StreamConfig) int {
if consCfg.Replicas == 0 {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy {
if consCfg.Replicas == 0 || consCfg.Replicas > strCfg.Replicas {
if !isDurableConsumer(&consCfg) && strCfg.Retention == LimitsPolicy && consCfg.Replicas == 0 {
// Matches old-school ephemerals only, where the replica count is 0.
return 1
}
return strCfg.Replicas
} else {
return consCfg.Replicas
}
return consCfg.Replicas
}
// Consumer is a jetstream consumer.
@@ -1091,7 +1091,7 @@ func (o *consumer) setLeader(isLeader bool) {
if o.dthresh > 0 && (o.isPullMode() || !o.active) {
// Pull consumer. We run the dtmr all the time for this one.
stopAndClearTimer(&o.dtmr)
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
// If we are not in ReplayInstant mode mark us as in replay state until resolved.
@@ -1121,7 +1121,6 @@ func (o *consumer) setLeader(isLeader bool) {
if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)
}
// If we are R>1 spin up our proposal loop.
@@ -1140,7 +1139,10 @@ func (o *consumer) setLeader(isLeader bool) {
close(o.qch)
o.qch = nil
}
// Make sure to clear out any re delivery queues
// Stop any inactivity timers. Should only be running on leaders.
stopAndClearTimer(&o.dtmr)
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.rdq, o.rdqi = nil, nil
o.pending = nil
@@ -1156,9 +1158,6 @@ func (o *consumer) setLeader(isLeader bool) {
// Reset waiting if we are in pull mode.
if o.isPullMode() {
o.waiting = newWaitQueue(o.cfg.MaxWaiting)
if !o.isDurable() {
stopAndClearTimer(&o.dtmr)
}
o.nextMsgReqs.drain()
} else if o.srv.gateway.enabled {
stopAndClearTimer(&o.gwdtmr)
@@ -1349,7 +1348,7 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
// If we do not have interest anymore and have a delete threshold set, then set
// a timer to delete us. We wait for a bit in case of server reconnect.
if !interest && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
return true
}
return false
@@ -1376,7 +1375,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh - elapsed)
} else {
o.dtmr = time.AfterFunc(o.dthresh-elapsed, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh-elapsed, o.deleteNotActive)
}
o.mu.Unlock()
return
@@ -1386,7 +1385,7 @@ func (o *consumer) deleteNotActive() {
if o.dtmr != nil {
o.dtmr.Reset(o.dthresh)
} else {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
o.mu.Unlock()
return
@@ -1640,7 +1639,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
stopAndClearTimer(&o.dtmr)
// Restart timer only if we are the leader.
if o.isLeader() && o.dthresh > 0 {
o.dtmr = time.AfterFunc(o.dthresh, func() { o.deleteNotActive() })
o.dtmr = time.AfterFunc(o.dthresh, o.deleteNotActive)
}
}
@@ -3031,6 +3030,22 @@ func (o *consumer) incDeliveryCount(sseq uint64) uint64 {
return o.rdc[sseq] + 1
}
// Used if we have to adjust on failed delivery or bad lookups.
// Those failed attempts should not increase deliver count.
// Lock should be held.
func (o *consumer) decDeliveryCount(sseq uint64) {
if o.rdc == nil {
return
}
if dc, ok := o.rdc[sseq]; ok {
if dc == 1 {
delete(o.rdc, sseq)
} else {
o.rdc[sseq] -= 1
}
}
}
// send a delivery exceeded advisory.
func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
e := JSConsumerDeliveryExceededAdvisory{
@@ -3093,7 +3108,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
o.notifyDeliveryExceeded(seq, dc-1)
}
// Make sure to remove from pending.
delete(o.pending, seq)
if p, ok := o.pending[seq]; ok && p != nil {
delete(o.pending, seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
}
continue
}
if seq > 0 {
@@ -3102,6 +3120,8 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
return pmsg, dc, err
}
@@ -3203,6 +3223,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
interest = true
}
}
// If interest, update batch pending requests counter and update fexp timer.
if interest {
brp += wr.n
@@ -3289,7 +3310,7 @@ func (o *consumer) checkAckFloor() {
}
}
} else if numPending > 0 {
// here it shorter to walk pending.
// here it is shorter to walk pending.
// toTerm is seq, dseq, rcd for each entry.
toTerm := make([]uint64, 0, numPending*3)
o.mu.RLock()
@@ -3506,10 +3527,16 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
if err == ErrStoreEOF {
o.checkNumPendingOnEOF()
}
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending || err == errPartialCache {
if err == ErrStoreMsgNotFound || err == errDeletedMsg || err == ErrStoreEOF || err == errMaxAckPending {
goto waitForMsgs
} else if err == errPartialCache {
s.Warnf("Unexpected partial cache error looking up message for consumer '%s > %s > %s'",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name)
goto waitForMsgs
} else {
s.Errorf("Received an error looking up message for consumer: %v", err)
s.Errorf("Received an error looking up message for consumer '%s > %s > %s': %v",
o.mset.acc, o.mset.cfg.Name, o.cfg.Name, err)
goto waitForMsgs
}
}
@@ -3904,20 +3931,39 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
}
}
// Credit back a failed delivery.
// lock should be held.
func (o *consumer) creditWaitingRequest(reply string) {
for i, rp := 0, o.waiting.rp; i < o.waiting.n; i++ {
if wr := o.waiting.reqs[rp]; wr != nil {
if wr.reply == reply {
wr.n++
wr.d--
return
}
}
rp = (rp + 1) % cap(o.waiting.reqs)
}
}
// didNotDeliver is called when a delivery for a consumer message failed.
// Depending on our state, we will process the failure.
func (o *consumer) didNotDeliver(seq uint64) {
func (o *consumer) didNotDeliver(seq uint64, subj string) {
o.mu.Lock()
mset := o.mset
if mset == nil {
o.mu.Unlock()
return
}
// Adjust back deliver count.
o.decDeliveryCount(seq)
var checkDeliveryInterest bool
if o.isPushMode() {
o.active = false
checkDeliveryInterest = true
} else if o.pending != nil {
o.creditWaitingRequest(subj)
// pull mode and we have pending.
if _, ok := o.pending[seq]; ok {
// We found this messsage on pending, we need
+21
View File
@@ -56,6 +56,7 @@ const (
connsRespSubj = "$SYS._INBOX_.%s"
accConnsEventSubjNew = "$SYS.ACCOUNT.%s.SERVER.CONNS"
accConnsEventSubjOld = "$SYS.SERVER.ACCOUNT.%s.CONNS" // kept for backward compatibility
lameDuckEventSubj = "$SYS.SERVER.%s.LAMEDUCK"
shutdownEventSubj = "$SYS.SERVER.%s.SHUTDOWN"
authErrorEventSubj = "$SYS.SERVER.%s.CLIENT.AUTH.ERR"
serverStatsSubj = "$SYS.SERVER.%s.STATSZ"
@@ -533,6 +534,19 @@ RESET:
}
}
// Will send a shutdown message for lame-duck. Unlike sendShutdownEvent, this will
// not close off the send queue or reply handler, as we may still have a workload
// that needs migrating off.
// Lock should be held.
func (s *Server) sendLDMShutdownEventLocked() {
if s.sys == nil || s.sys.sendq == nil {
return
}
subj := fmt.Sprintf(lameDuckEventSubj, s.info.ID)
si := &ServerInfo{}
s.sys.sendq.push(newPubMsg(nil, subj, _EMPTY_, si, nil, si, noCompression, false, true))
}
// Will send a shutdown message.
func (s *Server) sendShutdownEvent() {
s.mu.Lock()
@@ -944,6 +958,13 @@ func (s *Server) initEventTracking() {
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for servers entering lame-duck mode.
// NOTE: This currently is handled in the same way as a server shutdown, but has
// a different subject in case we need to handle differently in future.
subject = fmt.Sprintf(lameDuckEventSubj, "*")
if _, err := s.sysSubscribe(subject, s.noInlineCallback(s.remoteServerShutdown)); err != nil {
s.Errorf("Error setting up internal tracking: %v", err)
}
// Listen for account claims updates.
subscribeToUpdate := true
if s.accResolver != nil {
+131 -92
View File
@@ -292,7 +292,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
}
// Default values.
if fcfg.BlockSize == 0 {
fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, fcfg.Cipher)
fcfg.BlockSize = dynBlkSize(cfg.Retention, cfg.MaxBytes, prf != nil)
}
if fcfg.BlockSize > maxBlockSize {
return nil, fmt.Errorf("filestore max block size is %s", friendlyBytes(maxBlockSize))
@@ -451,7 +451,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
fs.ageChk = nil
}
if cfg.MaxMsgsPer > 0 && cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
if fs.cfg.MaxMsgsPer > 0 && fs.cfg.MaxMsgsPer < old_cfg.MaxMsgsPer {
fs.enforceMsgPerSubjectLimit()
}
fs.mu.Unlock()
@@ -462,7 +462,7 @@ func (fs *fileStore) UpdateConfig(cfg *StreamConfig) error {
return nil
}
func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) uint64 {
func dynBlkSize(retention RetentionPolicy, maxBytes int64, encrypted bool) uint64 {
if maxBytes > 0 {
blkSize := (maxBytes / 4) + 1 // (25% overhead)
// Round up to nearest 100
@@ -476,7 +476,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u
} else {
blkSize = defaultMediumBlockSize
}
if cipher != NoCipher && blkSize > maximumEncryptedBlockSize {
if encrypted && blkSize > maximumEncryptedBlockSize {
// Notes on this below.
blkSize = maximumEncryptedBlockSize
}
@@ -484,7 +484,7 @@ func dynBlkSize(retention RetentionPolicy, maxBytes int64, cipher StoreCipher) u
}
switch {
case cipher != NoCipher:
case encrypted:
// In the case of encrypted stores, large blocks can result in worsened perf
// since many writes on disk involve re-encrypting the entire block. For now,
// we will enforce a cap on the block size when encryption is enabled to avoid
@@ -1032,7 +1032,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
fd = mb.mfd
} else {
fd, err = os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
if err == nil {
defer fd.Close()
}
}
@@ -1078,9 +1078,35 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
return gatherLost(lbuf - index), errBadMsg
}
// Check for checksum failures before additional processing.
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}
// Grab our sequence and timestamp.
seq := le.Uint64(hdr[4:])
ts := int64(le.Uint64(hdr[12:]))
// Check if this is a delete tombstone.
if seq&tbit != 0 {
index += rl
continue
}
// This is an old erased message, or a new one that we can track.
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
seq = seq &^ ebit
@@ -1108,29 +1134,7 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
_, deleted = mb.dmap[seq]
}
// Always set last.
mb.last.seq = seq
mb.last.ts = ts
if !deleted {
data := buf[index+msgHdrSize : index+rl]
if hh := mb.hh; hh != nil {
hh.Reset()
hh.Write(hdr[4:20])
hh.Write(data[:slen])
if hasHeaders {
hh.Write(data[slen+4 : dlen-recordHashSize])
} else {
hh.Write(data[slen : dlen-recordHashSize])
}
checksum := hh.Sum(nil)
if !bytes.Equal(checksum, data[len(data)-recordHashSize:]) {
truncate(index)
return gatherLost(lbuf - index), errBadMsg
}
copy(mb.lchk[0:], checksum)
}
if firstNeedsSet {
firstNeedsSet, mb.first.seq, mb.first.ts = false, seq, ts
}
@@ -1156,6 +1160,11 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
mb.fssNeedsWrite = true
}
}
// Always set last
mb.last.seq = seq
mb.last.ts = ts
// Advance to next record.
index += rl
}
@@ -1315,6 +1324,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
fs.psim = make(map[string]*psi)
return false
}
// Make sure we do subject cleanup as well.
mb.ensurePerSubjectInfoLoaded()
for subj := range mb.fss {
fs.removePerSubject(subj)
}
mb.dirtyCloseWithRemove(true)
deleted++
return true
@@ -2387,6 +2401,12 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
}
}
}
} else if mb := fs.selectMsgBlock(fseq); mb != nil {
// If we are here we could not remove fseq from above, so rebuild.
var ld *LostStreamData
if ld, _ = mb.rebuildState(); ld != nil {
fs.rebuildStateLocked(ld)
}
}
}
@@ -2514,7 +2534,14 @@ func (fs *fileStore) rebuildFirst() {
fmb.removeIndexFile()
ld, _ := fmb.rebuildState()
fmb.writeIndexInfo()
fmb.mu.RLock()
isEmpty := fmb.msgs == 0
fmb.mu.RUnlock()
if isEmpty {
fs.removeMsgBlock(fmb)
} else {
fmb.writeIndexInfo()
}
fs.selectNextFirst()
fs.rebuildStateLocked(ld)
}
@@ -3036,7 +3063,8 @@ func (mb *msgBlock) compact() {
if !isDeleted(seq) {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
// Do not set based on tombstone.
if !firstSet && seq&tbit == 0 {
firstSet = true
mb.first.seq = seq
}
@@ -3799,20 +3827,25 @@ func (mb *msgBlock) writeMsgRecord(rl, seq uint64, subj string, mhdr, msg []byte
// Update write through cache.
// Write to msg record.
mb.cache.buf = append(mb.cache.buf, checksum...)
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
mb.cache.lrl = uint32(rl)
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}
// Set cache timestamp for last store.
mb.lwts = ts
// Decide if we write index info if flushing in place.
writeIndex := ts-mb.lwits > wiThresh
// Accounting
mb.updateAccounting(seq, ts, rl)
// Only update index and do accounting if not a delete tombstone.
if seq&tbit == 0 {
// Accounting, do this before stripping ebit, it is ebit aware.
mb.updateAccounting(seq, ts, rl)
// Strip ebit if set.
seq = seq &^ ebit
if mb.cache.fseq == 0 {
mb.cache.fseq = seq
}
// Write index
mb.cache.idx = append(mb.cache.idx, uint32(index)|hbit)
}
fch, werr := mb.fch, mb.werr
@@ -3915,7 +3948,7 @@ func (mb *msgBlock) updateAccounting(seq uint64, ts int64, rl uint64) {
seq = seq &^ ebit
}
if mb.first.seq == 0 || mb.first.ts == 0 {
if (mb.first.seq == 0 || mb.first.ts == 0) && seq >= mb.first.seq {
mb.first.seq = seq
mb.first.ts = ts
}
@@ -4008,28 +4041,30 @@ func (fs *fileStore) selectMsgBlockWithIndex(seq uint64) (int, *msgBlock) {
return -1, nil
}
// Starting index, defaults to beginning.
si := 0
const linearThresh = 32
nb := len(fs.blks) - 1
// TODO(dlc) - Use new AVL and make this real for anything beyond certain size.
// Max threshold before we probe for a starting block to start our linear search.
const maxl = 256
if nb := len(fs.blks); nb > maxl {
d := nb / 8
for _, i := range []int{d, 2 * d, 3 * d, 4 * d, 5 * d, 6 * d, 7 * d} {
mb := fs.blks[i]
if nb < linearThresh {
for i, mb := range fs.blks {
if seq <= atomic.LoadUint64(&mb.last.seq) {
break
return i, mb
}
si = i
}
return -1, nil
}
// blks are sorted in ascending order.
for i := si; i < len(fs.blks); i++ {
mb := fs.blks[i]
if seq <= atomic.LoadUint64(&mb.last.seq) {
return i, mb
// Do traditional binary search here since we know the blocks are sorted by sequence first and last.
for low, high, mid := 0, nb, nb/2; low <= high; mid = (low + high) / 2 {
mb := fs.blks[mid]
// Right now these atomic loads do not factor in, so fine to leave. Was considering
// uplifting these to fs scope to avoid atomic load but not needed.
first, last := atomic.LoadUint64(&mb.first.seq), atomic.LoadUint64(&mb.last.seq)
if seq > last {
low = mid + 1
} else if seq < first {
high = mid - 1
} else {
return mid, mb
}
}
@@ -4098,6 +4133,12 @@ func (mb *msgBlock) indexCacheBuf(buf []byte) error {
return errCorruptState
}
// Check for tombstones which we can skip in terms of indexing.
if seq&tbit != 0 {
index += rl
continue
}
// Clear erase bit.
seq = seq &^ ebit
@@ -4453,15 +4494,17 @@ var (
errNoMainKey = errors.New("encrypted store encountered with no main key")
)
// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
const hbit = 1 << 31
// Used for marking erased messages sequences.
const ebit = 1 << 63
// Used to mark a bad index as deleted.
const dbit = 1 << 30
const (
// Used for marking messages that have had their checksums checked.
// Used to signal a message record with headers.
hbit = 1 << 31
// Used for marking erased messages sequences.
ebit = 1 << 63
// Used for marking tombstone sequences.
tbit = 1 << 62
// Used to mark a bad index as deleted.
dbit = 1 << 30
)
// Will do a lookup from cache.
// Lock should be held.
@@ -4606,7 +4649,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > dlen || int(rl) > len(buf) {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) {
return nil, errBadMsg
}
data := buf[msgHdrSize : msgHdrSize+dlen]
@@ -4799,20 +4842,17 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
start = fs.state.FirstSeq
}
// TODO(dlc) - If num blocks gets large maybe use selectMsgBlock but have it return index b/c
// we need to keep walking if no match found in first mb.
for _, mb := range fs.blks {
// Skip blocks that are less than our starting sequence.
if start > atomic.LoadUint64(&mb.last.seq) {
continue
}
if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil {
if expireOk && mb != fs.lmb {
mb.tryForceExpireCache()
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
for i := bi; i < len(fs.blks); i++ {
mb := fs.blks[i]
if sm, expireOk, err := mb.firstMatching(filter, wc, start, sm); err == nil {
if expireOk && mb != fs.lmb {
mb.tryForceExpireCache()
}
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
}
return sm, sm.seq, nil
} else if err != ErrStoreMsgNotFound {
return nil, 0, err
}
}
@@ -5205,16 +5245,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
}
if sequence > 1 {
return fs.Compact(sequence)
} else if keep > 0 {
fs.mu.RLock()
msgs, lseq := fs.state.Msgs, fs.state.LastSeq
fs.mu.RUnlock()
if keep >= msgs {
return 0, nil
}
return fs.Compact(lseq - keep + 1)
}
return 0, nil
}
eq, wc := compareFn(subject), subjectHasWildcard(subject)
@@ -5950,7 +5981,7 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
slen := int(le.Uint16(hdr[20:]))
if subj == string(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq&ebit != 0 {
if seq < mb.first.seq || seq&ebit != 0 {
continue
}
if len(mb.dmap) > 0 {
@@ -6946,20 +6977,28 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
}
if dc > 1 {
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
// Make sure to remove from pending.
delete(o.state.Pending, sseq)
}
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
// Only update if greater then what we already have.
if o.state.Redelivered[sseq] < dc {
if o.state.Redelivered[sseq] < dc-1 {
o.state.Redelivered[sseq] = dc - 1
}
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
o.state.Delivered.Consumer = dseq
o.state.Delivered.Stream = sseq
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
o.state.AckFloor.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
o.state.AckFloor.Stream = sseq
}
}
// Make sure we flush to disk.
o.kickFlusher()
+13 -1
View File
@@ -148,6 +148,9 @@ type jsAccount struct {
// From server
sendq *ipQueue[*pubMsg]
// For limiting only running one checkAndSync at a time.
sync atomic.Bool
// Usage/limits related fields that will be protected by usageMu
usageMu sync.RWMutex
limits map[string]JetStreamAccountLimits // indexed by tierName
@@ -1811,6 +1814,12 @@ func (jsa *jsAccount) remoteUpdateUsage(sub *subscription, c *client, _ *Account
// When we detect a skew of some sort this will verify the usage reporting is correct.
// No locks should be held.
func (jsa *jsAccount) checkAndSyncUsage(tierName string, storeType StorageType) {
// This will run in a separate go routine, so check that we are only running once.
if !jsa.sync.CompareAndSwap(false, true) {
return
}
defer jsa.sync.Store(false)
// Hold the account read lock and the usage lock while we calculate.
// We scope by tier and storage type, but if R3 File has 200 streams etc. could
// show a pause. I did test with > 100 non-active streams and was 80-200ns or so.
@@ -1916,7 +1925,10 @@ func (jsa *jsAccount) updateUsage(tierName string, storeType StorageType, delta
jsa.usageMu.Unlock()
if needsCheck {
jsa.checkAndSyncUsage(tierName, storeType)
// We could be holding the stream lock from up in the stack, and this
// will want the jsa lock, which would violate locking order.
// So do this in a Go routine. The function will check if it is already running.
go jsa.checkAndSyncUsage(tierName, storeType)
}
}
+8 -16
View File
@@ -2161,21 +2161,11 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
startMigrationMonitoring := func() {
if mmt == nil {
mmt = time.NewTicker(10 * time.Millisecond)
mmt = time.NewTicker(500 * time.Millisecond)
mmtc = mmt.C
}
}
adjustMigrationMonitoring := func() {
const delay = 500 * time.Millisecond
if mmt == nil {
mmt = time.NewTicker(delay)
mmtc = mmt.C
} else {
mmt.Reset(delay)
}
}
stopMigrationMonitoring := func() {
if mmt != nil {
mmt.Stop()
@@ -2407,9 +2397,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
continue
}
// Adjust to our normal time delay.
adjustMigrationMonitoring()
// Make sure we have correct cluster information on the other peers.
ci := js.clusterInfo(rg)
mset.checkClusterInfo(ci)
@@ -4001,7 +3988,7 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
var didCreate, isConfigUpdate, needsLocalResponse bool
if o == nil {
// Add in the consumer if needed.
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, false); err == nil {
if o, err = mset.addConsumerWithAssignment(ca.Config, ca.Name, ca, wasExisting); err == nil {
didCreate = true
}
} else {
@@ -5197,7 +5184,12 @@ func (cc *jetStreamCluster) remapStreamAssignment(sa *streamAssignment, removePe
return true
}
// If we are here let's remove the peer at least.
// If R1 just return to avoid bricking the stream.
if sa.Group.node == nil || len(sa.Group.Peers) == 1 {
return false
}
// If we are here let's remove the peer at least, as long as we are R>1
for i, peer := range sa.Group.Peers {
if peer == removePeer {
sa.Group.Peers[i] = sa.Group.Peers[len(sa.Group.Peers)-1]
+4 -2
View File
@@ -1643,6 +1643,10 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
subs := _subs[:0]
ims := []string{}
// Hold the client lock otherwise there can be a race and miss some subs.
c.mu.Lock()
defer c.mu.Unlock()
acc.mu.RLock()
accName := acc.Name
accNTag := acc.nameTag
@@ -1718,7 +1722,6 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
}
// Now walk the results and add them to our smap
c.mu.Lock()
rc := c.leaf.remoteCluster
c.leaf.smap = make(map[string]int32)
for _, sub := range subs {
@@ -1784,7 +1787,6 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
c.mu.Unlock()
})
}
c.mu.Unlock()
}
// updateInterestForAccountOnGateway called from gateway code when processing RS+ and RS-.
+16 -5
View File
@@ -1307,17 +1307,28 @@ func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) erro
}
if dc > 1 {
if maxdc := uint64(o.cfg.MaxDeliver); maxdc > 0 && dc > maxdc {
// Make sure to remove from pending.
delete(o.state.Pending, sseq)
}
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
o.state.Redelivered[sseq] = dc - 1
// Only update if greater then what we already have.
if o.state.Redelivered[sseq] < dc-1 {
o.state.Redelivered[sseq] = dc - 1
}
}
} else {
// For AckNone just update delivered and ackfloor at the same time.
o.state.Delivered.Consumer = dseq
o.state.Delivered.Stream = sseq
o.state.AckFloor.Consumer = dseq
o.state.AckFloor.Stream = sseq
if dseq > o.state.Delivered.Consumer {
o.state.Delivered.Consumer = dseq
o.state.AckFloor.Consumer = dseq
}
if sseq > o.state.Delivered.Stream {
o.state.Delivered.Stream = sseq
o.state.AckFloor.Stream = sseq
}
}
return nil
+24 -10
View File
@@ -492,7 +492,7 @@ func (s *Server) Connz(opts *ConnzOptions) (*Connz, error) {
case ByLast:
sort.Sort(sort.Reverse(byLast{pconns}))
case ByIdle:
sort.Sort(sort.Reverse(byIdle{pconns}))
sort.Sort(sort.Reverse(byIdle{pconns, c.Now}))
case ByUptime:
sort.Sort(byUptime{pconns, time.Now()})
case ByStop:
@@ -1120,14 +1120,17 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) {
queues := map[string]monitorIPQueue{}
s.ipQueues.Range(func(k, v interface{}) bool {
s.ipQueues.Range(func(k, v any) bool {
var pending, inProgress int
name := k.(string)
queue := v.(interface {
queue, ok := v.(interface {
len() int
inProgress() uint64
inProgress() int64
})
pending := queue.len()
inProgress := int(queue.inProgress())
if ok {
pending = queue.len()
inProgress = int(queue.inProgress())
}
if !all && (pending == 0 && inProgress == 0) {
return true
} else if qfilter != _EMPTY_ && !strings.Contains(name, qfilter) {
@@ -2301,18 +2304,26 @@ func (s *Server) HandleAccountStatz(w http.ResponseWriter, r *http.Request) {
ResponseHandler(w, r, b)
}
// ResponseHandler handles responses for monitoring routes
// ResponseHandler handles responses for monitoring routes.
func ResponseHandler(w http.ResponseWriter, r *http.Request, data []byte) {
handleResponse(http.StatusOK, w, r, data)
}
// handleResponse handles responses for monitoring routes with a specific HTTP status code.
func handleResponse(code int, w http.ResponseWriter, r *http.Request, data []byte) {
// Get callback from request
callback := r.URL.Query().Get("callback")
// If callback is not empty then
if callback != "" {
// Response for JSONP
w.Header().Set("Content-Type", "application/javascript")
w.WriteHeader(code)
fmt.Fprintf(w, "%s(%s)", callback, data)
} else {
// Otherwise JSON
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
w.WriteHeader(code)
w.Write(data)
}
}
@@ -3047,7 +3058,7 @@ type HealthStatus struct {
Error string `json:"error,omitempty"`
}
// https://tools.ietf.org/id/draft-inadarei-api-health-check-05.html
// https://datatracker.ietf.org/doc/html/draft-inadarei-api-health-check
func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
s.mu.Lock()
s.httpReqStats[HealthzPath]++
@@ -3074,16 +3085,19 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) {
JSEnabledOnly: jsEnabledOnly,
JSServerOnly: jsServerOnly,
})
code := http.StatusOK
if hs.Error != _EMPTY_ {
s.Warnf("Healthcheck failed: %q", hs.Error)
w.WriteHeader(http.StatusServiceUnavailable)
code = http.StatusServiceUnavailable
}
b, err := json.Marshal(hs)
if err != nil {
s.Errorf("Error marshaling response to /healthz request: %v", err)
}
ResponseHandler(w, r, b)
handleResponse(code, w, r, b)
}
// Generate health status.
+5 -4
View File
@@ -92,12 +92,13 @@ func (l byLast) Less(i, j int) bool {
}
// Idle time
type byIdle struct{ ConnInfos }
type byIdle struct {
ConnInfos
now time.Time
}
func (l byIdle) Less(i, j int) bool {
ii := l.ConnInfos[i].LastActivity.Sub(l.ConnInfos[i].Start)
ij := l.ConnInfos[j].LastActivity.Sub(l.ConnInfos[j].Start)
return ii < ij
return l.now.Sub(l.ConnInfos[i].LastActivity) < l.now.Sub(l.ConnInfos[j].LastActivity)
}
// Uptime
+1 -1
View File
@@ -139,13 +139,13 @@ func (s *Server) plugTLSOCSPPeer(config *tlsConfigKind) (*tls.Config, bool, erro
if config == nil || config.tlsConfig == nil {
return nil, false, errors.New(certidp.ErrUnableToPlugTLSEmptyConfig)
}
s.Debugf(certidp.DbgPlugTLSForKind, config.kind)
kind := config.kind
isSpoke := config.isLeafSpoke
tcOpts := config.tlsOpts
if tcOpts == nil || tcOpts.OCSPPeerConfig == nil || !tcOpts.OCSPPeerConfig.Verify {
return nil, false, nil
}
s.Debugf(certidp.DbgPlugTLSForKind, config.kind)
// peer is a tls client
if kind == kindStringMap[CLIENT] || (kind == kindStringMap[LEAF] && !isSpoke) {
if !tcOpts.Verify {
+3
View File
@@ -744,6 +744,9 @@ func (o *Options) ProcessConfigFile(configFile string) error {
// Collect all errors and warnings and report them all together.
errors := make([]error, 0)
warnings := make([]error, 0)
if len(m) == 0 {
warnings = append(warnings, fmt.Errorf("%s: config has no values or is empty", configFile))
}
// First check whether a system account has been defined,
// as that is a condition for other features to be enabled.
+5
View File
@@ -1814,6 +1814,7 @@ func (n *raft) runAsFollower() {
if n.catchupStalled() {
n.cancelCatchup()
}
n.resetElectionTimeout()
n.Unlock()
} else {
n.switchToCandidate()
@@ -2508,6 +2509,10 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) {
}
if err != nil || ae == nil {
n.warn("Could not find a starting entry for catchup request: %v", err)
// If we are here we are seeing a request for an item we do not have, meaning we should stepdown.
// This is possible on a reset of our WAL but the other side has a snapshot already.
// If we do not stepdown this can cycle.
n.stepdown.push(noLeader)
n.Unlock()
arPool.Put(ar)
return
+1
View File
@@ -3557,6 +3557,7 @@ func (s *Server) lameDuckMode() {
}
s.Noticef("Entering lame duck mode, stop accepting new clients")
s.ldm = true
s.sendLDMShutdownEventLocked()
expected := 1
s.listener.Close()
s.listener = nil
+2 -2
View File
@@ -2983,7 +2983,7 @@ func streamAndSeq(shdr string) (string, uint64) {
}
// New version which is stream index name <SPC> sequence
fields := strings.Fields(shdr)
if len(fields) != 2 {
if len(fields) < 2 {
return _EMPTY_, 0
}
return fields[0], uint64(parseAckReplyNum(fields[1]))
@@ -4455,7 +4455,7 @@ func (mset *stream) internalLoop() {
// Check to see if this is a delivery for a consumer and
// we failed to deliver the message. If so alert the consumer.
if pm.o != nil && pm.seq > 0 && !didDeliver {
pm.o.didNotDeliver(pm.seq)
pm.o.didNotDeliver(pm.seq, pm.dsubj)
}
pm.returnToPool()
}
+2 -2
View File
@@ -1333,10 +1333,10 @@ github.com/mohae/deepcopy
# github.com/mschoch/smat v0.2.0
## explicit; go 1.13
github.com/mschoch/smat
# github.com/nats-io/jwt/v2 v2.4.1
# github.com/nats-io/jwt/v2 v2.5.0
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.9.21
# github.com/nats-io/nats-server/v2 v2.9.22
## explicit; go 1.19
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/ldap