mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-05-06 19:40:42 -05:00
build(deps): bump github.com/nats-io/nats-server/v2
Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.11.5 to 2.11.6. - [Release notes](https://github.com/nats-io/nats-server/releases) - [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml) - [Commits](https://github.com/nats-io/nats-server/compare/v2.11.5...v2.11.6) --- updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-version: 2.11.6 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
@@ -55,7 +55,7 @@ require (
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/mna/pigeon v1.3.0
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
|
||||
github.com/nats-io/nats-server/v2 v2.11.5
|
||||
github.com/nats-io/nats-server/v2 v2.11.6
|
||||
github.com/nats-io/nats.go v1.43.0
|
||||
github.com/oklog/run v1.2.0
|
||||
github.com/olekukonko/tablewriter v1.0.7
|
||||
|
||||
@@ -821,8 +821,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
|
||||
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
|
||||
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
|
||||
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
|
||||
github.com/nats-io/nats-server/v2 v2.11.5 h1:yxwFASM5VrbHky6bCCame6g6fXZaayLoh7WFPWU9EEg=
|
||||
github.com/nats-io/nats-server/v2 v2.11.5/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
|
||||
github.com/nats-io/nats-server/v2 v2.11.6 h1:4VXRjbTUFKEB+7UoaKL3F5Y83xC7MxPoIONOnGgpkHw=
|
||||
github.com/nats-io/nats-server/v2 v2.11.6/go.mod h1:2xoztlcb4lDL5Blh1/BiukkKELXvKQ5Vy29FPVRBUYs=
|
||||
github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug=
|
||||
github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||
|
||||
+1
-1
@@ -58,7 +58,7 @@ func init() {
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.11.5"
|
||||
VERSION = "2.11.6"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
+14
-13
@@ -30,6 +30,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server/avl"
|
||||
"github.com/nats-io/nats-server/v2/server/gsl"
|
||||
"github.com/nats-io/nuid"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
@@ -402,15 +403,15 @@ type consumer struct {
|
||||
sid int
|
||||
name string
|
||||
stream string
|
||||
sseq uint64 // next stream sequence
|
||||
subjf subjectFilters // subject filters and their sequences
|
||||
filters *Sublist // When we have multiple filters we will use LoadNextMsgMulti and pass this in.
|
||||
dseq uint64 // delivered consumer sequence
|
||||
adflr uint64 // ack delivery floor
|
||||
asflr uint64 // ack store floor
|
||||
chkflr uint64 // our check floor, interest streams only.
|
||||
npc int64 // Num Pending Count
|
||||
npf uint64 // Num Pending Floor Sequence
|
||||
sseq uint64 // next stream sequence
|
||||
subjf subjectFilters // subject filters and their sequences
|
||||
filters *gsl.SimpleSublist // When we have multiple filters we will use LoadNextMsgMulti and pass this in.
|
||||
dseq uint64 // delivered consumer sequence
|
||||
adflr uint64 // ack delivery floor
|
||||
asflr uint64 // ack store floor
|
||||
chkflr uint64 // our check floor, interest streams only.
|
||||
npc int64 // Num Pending Count
|
||||
npf uint64 // Num Pending Floor Sequence
|
||||
dsubj string
|
||||
qgroup string
|
||||
lss *lastSeqSkipList
|
||||
@@ -1098,9 +1099,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
|
||||
// If we have multiple filter subjects, create a sublist which we will use
|
||||
// in calling store.LoadNextMsgMulti.
|
||||
if len(o.cfg.FilterSubjects) > 0 {
|
||||
o.filters = NewSublistNoCache()
|
||||
o.filters = gsl.NewSublist[struct{}]()
|
||||
for _, filter := range o.cfg.FilterSubjects {
|
||||
o.filters.Insert(&subscription{subject: []byte(filter)})
|
||||
o.filters.Insert(filter, struct{}{})
|
||||
}
|
||||
} else {
|
||||
// Make sure this is nil otherwise.
|
||||
@@ -2255,9 +2256,9 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
|
||||
if len(o.subjf) == 1 {
|
||||
o.filters = nil
|
||||
} else {
|
||||
o.filters = NewSublistNoCache()
|
||||
o.filters = gsl.NewSublist[struct{}]()
|
||||
for _, filter := range o.subjf {
|
||||
o.filters.Insert(&subscription{subject: []byte(filter.subject)})
|
||||
o.filters.Insert(filter.subject, struct{}{})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
+47
-48
@@ -45,6 +45,7 @@ import (
|
||||
"github.com/minio/highwayhash"
|
||||
"github.com/nats-io/nats-server/v2/server/ats"
|
||||
"github.com/nats-io/nats-server/v2/server/avl"
|
||||
"github.com/nats-io/nats-server/v2/server/gsl"
|
||||
"github.com/nats-io/nats-server/v2/server/stree"
|
||||
"github.com/nats-io/nats-server/v2/server/thw"
|
||||
"golang.org/x/crypto/chacha20"
|
||||
@@ -967,6 +968,17 @@ func (fs *fileStore) initMsgBlock(index uint32) *msgBlock {
|
||||
return mb
|
||||
}
|
||||
|
||||
// Check for encryption, we do not load keys on startup anymore so might need to load them here.
|
||||
// Lock for fs should be held.
|
||||
func (mb *msgBlock) checkAndLoadEncryption() error {
|
||||
if mb.fs != nil && mb.fs.prf != nil && (mb.aek == nil || mb.bek == nil) {
|
||||
if err := mb.fs.loadEncryptionForMsgBlock(mb); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lock for fs should be held.
|
||||
func (fs *fileStore) loadEncryptionForMsgBlock(mb *msgBlock) error {
|
||||
if fs.prf == nil {
|
||||
@@ -1966,11 +1978,8 @@ func (mb *msgBlock) lastChecksum() []byte {
|
||||
return lchk[:]
|
||||
}
|
||||
// Encrypted?
|
||||
// Check for encryption, we do not load keys on startup anymore so might need to load them here.
|
||||
if mb.fs != nil && mb.fs.prf != nil && (mb.aek == nil || mb.bek == nil) {
|
||||
if err := mb.fs.loadEncryptionForMsgBlock(mb); err != nil {
|
||||
return nil
|
||||
}
|
||||
if err := mb.checkAndLoadEncryption(); err != nil {
|
||||
return nil
|
||||
}
|
||||
if mb.bek != nil {
|
||||
if buf, _ := mb.loadBlock(nil); len(buf) >= checksumSize {
|
||||
@@ -2369,7 +2378,7 @@ func (fs *fileStore) GetSeqFromTime(t time.Time) uint64 {
|
||||
}
|
||||
|
||||
// Find the first matching message against a sublist.
|
||||
func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) {
|
||||
func (mb *msgBlock) firstMatchingMulti(sl *gsl.SimpleSublist, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) {
|
||||
mb.mu.Lock()
|
||||
var didLoad bool
|
||||
var updateLLTS bool
|
||||
@@ -2405,15 +2414,12 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg)
|
||||
if uint64(mb.fss.Size()) < lseq-start {
|
||||
// If there are no subject matches then this is effectively no-op.
|
||||
hseq := uint64(math.MaxUint64)
|
||||
IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
|
||||
gsl.IntersectStree(mb.fss, sl, func(subj []byte, ss *SimpleState) {
|
||||
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
|
||||
// mb is already loaded into the cache so should be fast-ish.
|
||||
mb.recalculateForSubj(bytesToString(subj), ss)
|
||||
}
|
||||
first := ss.First
|
||||
if start > first {
|
||||
first = start
|
||||
}
|
||||
first := max(start, ss.First)
|
||||
if first > ss.Last || first >= hseq {
|
||||
// The start cutoff is after the last sequence for this subject,
|
||||
// or we think we already know of a subject with an earlier msg
|
||||
@@ -2455,7 +2461,7 @@ func (mb *msgBlock) firstMatchingMulti(sl *Sublist, start uint64, sm *StoreMsg)
|
||||
}
|
||||
})
|
||||
if hseq < uint64(math.MaxUint64) && sm != nil {
|
||||
return sm, didLoad, nil
|
||||
return sm, didLoad && start == lseq, nil
|
||||
}
|
||||
} else {
|
||||
for seq := start; seq <= lseq; seq++ {
|
||||
@@ -3533,7 +3539,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
|
||||
// NumPending will return the number of pending messages matching any subject in the sublist starting at sequence.
|
||||
// Optimized for stream num pending calculations for consumers with lots of filtered subjects.
|
||||
// Subjects should not overlap, this property is held when doing multi-filtered consumers.
|
||||
func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) {
|
||||
func (fs *fileStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPerSubject bool) (total, validThrough uint64) {
|
||||
fs.mu.RLock()
|
||||
defer fs.mu.RUnlock()
|
||||
|
||||
@@ -3598,22 +3604,18 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
|
||||
mb := fs.blks[seqStart]
|
||||
bi := mb.index
|
||||
|
||||
subs := make([]*subscription, 0, sl.Count())
|
||||
sl.All(&subs)
|
||||
for _, sub := range subs {
|
||||
fs.psim.Match(sub.subject, func(subj []byte, psi *psi) {
|
||||
// If the select blk start is greater than entry's last blk skip.
|
||||
if bi > psi.lblk {
|
||||
return
|
||||
}
|
||||
total++
|
||||
// We will track the subjects that are an exact match to the last block.
|
||||
// This is needed for last block processing.
|
||||
if psi.lblk == bi {
|
||||
lbm[string(subj)] = true
|
||||
}
|
||||
})
|
||||
}
|
||||
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
// If the select blk start is greater than entry's last blk skip.
|
||||
if bi > psi.lblk {
|
||||
return
|
||||
}
|
||||
total++
|
||||
// We will track the subjects that are an exact match to the last block.
|
||||
// This is needed for last block processing.
|
||||
if psi.lblk == bi {
|
||||
lbm[string(subj)] = true
|
||||
}
|
||||
})
|
||||
|
||||
// Now check if we need to inspect the seqStart block.
|
||||
// Grab write lock in case we need to load in msgs.
|
||||
@@ -3693,7 +3695,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
|
||||
var t uint64
|
||||
var havePartial bool
|
||||
var updateLLTS bool
|
||||
IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
gsl.IntersectStree[SimpleState](mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
subj := bytesToString(bsubj)
|
||||
if havePartial {
|
||||
// If we already found a partial then don't do anything else.
|
||||
@@ -3751,17 +3753,14 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
|
||||
|
||||
// If we are here it's better to calculate totals from psim and adjust downward by scanning less blocks.
|
||||
start := uint32(math.MaxUint32)
|
||||
subs := make([]*subscription, 0, sl.Count())
|
||||
sl.All(&subs)
|
||||
for _, sub := range subs {
|
||||
fs.psim.Match(sub.subject, func(_ []byte, psi *psi) {
|
||||
total += psi.total
|
||||
// Keep track of start index for this subject.
|
||||
if psi.fblk < start {
|
||||
start = psi.fblk
|
||||
}
|
||||
})
|
||||
}
|
||||
gsl.IntersectStree(fs.psim, sl, func(subj []byte, psi *psi) {
|
||||
total += psi.total
|
||||
// Keep track of start index for this subject.
|
||||
if psi.fblk < start {
|
||||
start = psi.fblk
|
||||
}
|
||||
})
|
||||
|
||||
// See if we were asked for all, if so we are done.
|
||||
if sseq <= fs.state.FirstSeq {
|
||||
return total, validThrough
|
||||
@@ -3802,7 +3801,7 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
|
||||
}
|
||||
// Mark fss activity.
|
||||
mb.lsts = ats.AccessTime()
|
||||
IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
gsl.IntersectStree(mb.fss, sl, func(bsubj []byte, ss *SimpleState) {
|
||||
adjust += ss.Msgs
|
||||
})
|
||||
}
|
||||
@@ -6760,6 +6759,9 @@ func (mb *msgBlock) flushPendingMsgsLocked() (*LostStreamData, error) {
|
||||
// under heavy load.
|
||||
|
||||
// Check if we need to encrypt.
|
||||
if err := mb.checkAndLoadEncryption(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if mb.bek != nil && lob > 0 {
|
||||
// Need to leave original alone.
|
||||
var dst []byte
|
||||
@@ -6946,11 +6948,8 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
|
||||
|
||||
// Lock should be held.
|
||||
func (mb *msgBlock) loadMsgsWithLock() error {
|
||||
// Check for encryption, we do not load keys on startup anymore so might need to load them here.
|
||||
if mb.fs != nil && mb.fs.prf != nil && (mb.aek == nil || mb.bek == nil) {
|
||||
if err := mb.fs.loadEncryptionForMsgBlock(mb); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mb.checkAndLoadEncryption(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check to see if we are loading already.
|
||||
@@ -7518,7 +7517,7 @@ func (fs *fileStore) LoadLastMsg(subject string, smv *StoreMsg) (sm *StoreMsg, e
|
||||
}
|
||||
|
||||
// LoadNextMsgMulti will find the next message matching any entry in the sublist.
|
||||
func (fs *fileStore) LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
|
||||
func (fs *fileStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
|
||||
if sl == nil {
|
||||
return fs.LoadNextMsg(_EMPTY_, false, start, smp)
|
||||
}
|
||||
|
||||
+5
@@ -45,6 +45,11 @@ var (
|
||||
ErrAlreadyRegistered = errors.New("gsl: notification already registered")
|
||||
)
|
||||
|
||||
// SimpleSublist is an alias type for GenericSublist that takes
|
||||
// empty values, useful for tracking interest only without any
|
||||
// unnecessary allocations.
|
||||
type SimpleSublist = GenericSublist[struct{}]
|
||||
|
||||
// A GenericSublist stores and efficiently retrieves subscriptions.
|
||||
type GenericSublist[T comparable] struct {
|
||||
sync.RWMutex
|
||||
|
||||
+4
-4
@@ -1656,12 +1656,12 @@ func diffCheckedLimits(a, b map[string]JetStreamAccountLimits) map[string]JetStr
|
||||
return diff
|
||||
}
|
||||
|
||||
// Return reserved bytes for memory and store for this account on this server.
|
||||
// Return reserved bytes for memory and file store streams for this account on this server.
|
||||
// Lock should be held.
|
||||
func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
|
||||
for _, mset := range jsa.streams {
|
||||
cfg := &mset.cfg
|
||||
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
|
||||
if (tier == _EMPTY_ || tier == tierName(cfg.Replicas)) && cfg.MaxBytes > 0 {
|
||||
switch cfg.Storage {
|
||||
case FileStorage:
|
||||
store += uint64(cfg.MaxBytes)
|
||||
@@ -1673,12 +1673,12 @@ func (jsa *jsAccount) reservedStorage(tier string) (mem, store uint64) {
|
||||
return mem, store
|
||||
}
|
||||
|
||||
// Return reserved bytes for memory and store for this account in clustered mode.
|
||||
// Return reserved bytes for memory and file store streams for this account in clustered mode.
|
||||
// js lock should be held.
|
||||
func reservedStorage(sas map[string]*streamAssignment, tier string) (mem, store uint64) {
|
||||
for _, sa := range sas {
|
||||
cfg := sa.Config
|
||||
if tier == _EMPTY_ || tier == tierName(cfg.Replicas) && cfg.MaxBytes > 0 {
|
||||
if (tier == _EMPTY_ || tier == tierName(cfg.Replicas)) && cfg.MaxBytes > 0 {
|
||||
switch cfg.Storage {
|
||||
case FileStorage:
|
||||
store += uint64(cfg.MaxBytes)
|
||||
|
||||
+12
-3
@@ -2691,6 +2691,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
if mset, err = acc.lookupStream(sa.Config.Name); mset != nil {
|
||||
mset.monitorWg.Add(1)
|
||||
defer mset.monitorWg.Done()
|
||||
mset.checkInMonitor()
|
||||
mset.setStreamAssignment(sa)
|
||||
// Make sure to update our updateC which would have been nil.
|
||||
uch = mset.updateC()
|
||||
@@ -2816,6 +2817,14 @@ func (mset *stream) resetClusteredState(err error) bool {
|
||||
stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas
|
||||
mset.mu.RUnlock()
|
||||
|
||||
// The stream might already be deleted and not assigned to us anymore.
|
||||
// In any case, don't revive the stream if it's already closed.
|
||||
if mset.closed.Load() {
|
||||
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
|
||||
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
|
||||
return true
|
||||
}
|
||||
|
||||
// Stepdown regardless if we are the leader here.
|
||||
if node != nil {
|
||||
node.StepDown()
|
||||
@@ -2823,19 +2832,19 @@ func (mset *stream) resetClusteredState(err error) bool {
|
||||
|
||||
// If we detect we are shutting down just return.
|
||||
if js != nil && js.isShuttingDown() {
|
||||
s.Debugf("Will not reset stream, JetStream shutting down")
|
||||
s.Debugf("Will not reset stream '%s > %s', JetStream shutting down", acc, mset.name())
|
||||
return false
|
||||
}
|
||||
|
||||
// Server
|
||||
if js.limitsExceeded(stype) {
|
||||
s.Warnf("Will not reset stream, server resources exceeded")
|
||||
s.Warnf("Will not reset stream '%s > %s', server resources exceeded", acc, mset.name())
|
||||
return false
|
||||
}
|
||||
|
||||
// Account
|
||||
if exceeded, _ := jsa.limitsExceeded(stype, tierName, replicas); exceeded {
|
||||
s.Warnf("stream '%s > %s' errored, account resources exceeded", acc, mset.name())
|
||||
s.Warnf("Stream '%s > %s' errored, account resources exceeded", acc, mset.name())
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
+4
-3
@@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server/avl"
|
||||
"github.com/nats-io/nats-server/v2/server/gsl"
|
||||
"github.com/nats-io/nats-server/v2/server/stree"
|
||||
"github.com/nats-io/nats-server/v2/server/thw"
|
||||
)
|
||||
@@ -782,7 +783,7 @@ func (ms *memStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
|
||||
}
|
||||
|
||||
// NumPending will return the number of pending messages matching any subject in the sublist starting at sequence.
|
||||
func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64) {
|
||||
func (ms *memStore) NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPerSubject bool) (total, validThrough uint64) {
|
||||
if sl == nil {
|
||||
return ms.NumPending(sseq, fwcs, lastPerSubject)
|
||||
}
|
||||
@@ -817,7 +818,7 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
|
||||
var havePartial bool
|
||||
var totalSkipped uint64
|
||||
// We will track start and end sequences as we go.
|
||||
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
|
||||
gsl.IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
|
||||
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
|
||||
ms.recalculateForSubj(bytesToString(subj), fss)
|
||||
}
|
||||
@@ -1546,7 +1547,7 @@ func (ms *memStore) LoadLastMsg(subject string, smp *StoreMsg) (*StoreMsg, error
|
||||
}
|
||||
|
||||
// LoadNextMsgMulti will find the next message matching any entry in the sublist.
|
||||
func (ms *memStore) LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
|
||||
func (ms *memStore) LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
|
||||
// TODO(dlc) - for now simple linear walk to get started.
|
||||
ms.mu.RLock()
|
||||
defer ms.mu.RUnlock()
|
||||
|
||||
+3
-3
@@ -1018,7 +1018,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
|
||||
return true
|
||||
})
|
||||
|
||||
details := make([]SubDetail, len(subs))
|
||||
details := make([]SubDetail, 0, len(subs))
|
||||
i := 0
|
||||
// TODO(dlc) - may be inefficient and could just do normal match when total subs is large and filtering.
|
||||
for _, sub := range subs {
|
||||
@@ -1030,7 +1030,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
|
||||
continue
|
||||
}
|
||||
sub.client.mu.Lock()
|
||||
details[i] = newSubDetail(sub)
|
||||
details = append(details, newSubDetail(sub))
|
||||
sub.client.mu.Unlock()
|
||||
i++
|
||||
}
|
||||
@@ -1047,7 +1047,7 @@ func (s *Server) Subsz(opts *SubszOptions) (*Subsz, error) {
|
||||
maxoff = maxIndex
|
||||
}
|
||||
sz.Subs = details[minoff:maxoff]
|
||||
sz.Total = len(sz.Subs)
|
||||
sz.Total = len(details)
|
||||
} else {
|
||||
s.accounts.Range(func(k, v any) bool {
|
||||
acc := v.(*Account)
|
||||
|
||||
+3
-2
@@ -24,6 +24,7 @@ import (
|
||||
"unsafe"
|
||||
|
||||
"github.com/nats-io/nats-server/v2/server/avl"
|
||||
"github.com/nats-io/nats-server/v2/server/gsl"
|
||||
)
|
||||
|
||||
// StorageType determines how messages are stored for retention.
|
||||
@@ -97,7 +98,7 @@ type StreamStore interface {
|
||||
SkipMsgs(seq uint64, num uint64) error
|
||||
LoadMsg(seq uint64, sm *StoreMsg) (*StoreMsg, error)
|
||||
LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
|
||||
LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
|
||||
LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
|
||||
LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error)
|
||||
LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error)
|
||||
RemoveMsg(seq uint64) (bool, error)
|
||||
@@ -114,7 +115,7 @@ type StreamStore interface {
|
||||
MultiLastSeqs(filters []string, maxSeq uint64, maxAllowed int) ([]uint64, error)
|
||||
SubjectForSeq(seq uint64) (string, error)
|
||||
NumPending(sseq uint64, filter string, lastPerSubject bool) (total, validThrough uint64)
|
||||
NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bool) (total, validThrough uint64)
|
||||
NumPendingMulti(sseq uint64, sl *gsl.SimpleSublist, lastPerSubject bool) (total, validThrough uint64)
|
||||
State() StreamState
|
||||
FastState(*StreamState)
|
||||
EncodedStreamState(failed uint64) (enc []byte, err error)
|
||||
|
||||
-1
@@ -3485,7 +3485,6 @@ func (mset *stream) processAllSourceMsgs() {
|
||||
defer mset.mu.Unlock()
|
||||
for _, si := range stalled {
|
||||
mset.setupSourceConsumer(si.iname, si.sseq+1, time.Time{})
|
||||
si.last.Store(time.Now().UnixNano())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
Vendored
+1
-1
@@ -996,7 +996,7 @@ github.com/munnerz/goautoneg
|
||||
# github.com/nats-io/jwt/v2 v2.7.4
|
||||
## explicit; go 1.23.0
|
||||
github.com/nats-io/jwt/v2
|
||||
# github.com/nats-io/nats-server/v2 v2.11.5
|
||||
# github.com/nats-io/nats-server/v2 v2.11.6
|
||||
## explicit; go 1.23.0
|
||||
github.com/nats-io/nats-server/v2/conf
|
||||
github.com/nats-io/nats-server/v2/internal/fastrand
|
||||
|
||||
Reference in New Issue
Block a user