mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-02-18 11:28:48 -06:00
---
updated-dependencies: - dependency-name: github.com/nats-io/nats-server/v2 dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
Ralf Haferkamp
parent
3ea15e7f97
commit
ffe811074e
6
go.mod
6
go.mod
@@ -63,7 +63,7 @@ require (
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
github.com/mna/pigeon v1.2.1
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
|
||||
github.com/nats-io/nats-server/v2 v2.10.15
|
||||
github.com/nats-io/nats-server/v2 v2.10.16
|
||||
github.com/nats-io/nats.go v1.35.0
|
||||
github.com/oklog/run v1.1.0
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
@@ -255,7 +255,7 @@ require (
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/juliangruber/go-intersect v1.1.0 // indirect
|
||||
github.com/kevinburke/ssh_config v1.2.0 // indirect
|
||||
github.com/klauspost/compress v1.17.7 // indirect
|
||||
github.com/klauspost/compress v1.17.8 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/libregraph/oidc-go v1.1.0 // indirect
|
||||
@@ -282,7 +282,7 @@ require (
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/mschoch/smat v0.2.0 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.5.6 // indirect
|
||||
github.com/nats-io/jwt/v2 v2.5.7 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
|
||||
12
go.sum
12
go.sum
@@ -1605,8 +1605,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
|
||||
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
|
||||
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
|
||||
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
|
||||
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
|
||||
@@ -1755,10 +1755,10 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
|
||||
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
|
||||
github.com/nats-io/jwt/v2 v2.5.6 h1:Cp618+z4q042sWqHiSoIHFT08OZtAskui0hTmRfmGGQ=
|
||||
github.com/nats-io/jwt/v2 v2.5.6/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
|
||||
github.com/nats-io/nats-server/v2 v2.10.15 h1:O/l+ZT91ltMiiRJKjWLQJcGg7ypzjlb/bC5bFIRVw3M=
|
||||
github.com/nats-io/nats-server/v2 v2.10.15/go.mod h1:ul+pGt5I7e4U+nI09ZFDG4vqM+6Ce2Tou7UbVSnLiIw=
|
||||
github.com/nats-io/jwt/v2 v2.5.7 h1:j5lH1fUXCnJnY8SsQeB/a/z9Azgu2bYIDvtPVNdxe2c=
|
||||
github.com/nats-io/jwt/v2 v2.5.7/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
|
||||
github.com/nats-io/nats-server/v2 v2.10.16 h1:2jXaiydp5oB/nAx/Ytf9fdCi9QN6ItIc9eehX8kwVV0=
|
||||
github.com/nats-io/nats-server/v2 v2.10.16/go.mod h1:Pksi38H2+6xLe1vQx0/EA4bzetM0NqyIHcIbmgXSkIU=
|
||||
github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk=
|
||||
github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||
|
||||
2
vendor/github.com/klauspost/compress/s2/writer.go
generated
vendored
2
vendor/github.com/klauspost/compress/s2/writer.go
generated
vendored
@@ -937,7 +937,7 @@ func WriterUncompressed() WriterOption {
|
||||
|
||||
// WriterBlockSize allows to override the default block size.
|
||||
// Blocks will be this size or smaller.
|
||||
// Minimum size is 4KB and and maximum size is 4MB.
|
||||
// Minimum size is 4KB and maximum size is 4MB.
|
||||
//
|
||||
// Bigger blocks may give bigger throughput on systems with many cores,
|
||||
// and will increase compression slightly, but it will limit the possible
|
||||
|
||||
1
vendor/github.com/nats-io/jwt/v2/user_claims.go
generated
vendored
1
vendor/github.com/nats-io/jwt/v2/user_claims.go
generated
vendored
@@ -29,6 +29,7 @@ const (
|
||||
ConnectionTypeLeafnodeWS = "LEAFNODE_WS"
|
||||
ConnectionTypeMqtt = "MQTT"
|
||||
ConnectionTypeMqttWS = "MQTT_WS"
|
||||
ConnectionTypeInProcess = "IN_PROCESS"
|
||||
)
|
||||
|
||||
type UserPermissionLimits struct {
|
||||
|
||||
4
vendor/github.com/nats-io/nats-server/v2/conf/lex.go
generated
vendored
4
vendor/github.com/nats-io/nats-server/v2/conf/lex.go
generated
vendored
@@ -332,12 +332,12 @@ func lexBlockStart(lx *lexer) stateFn {
|
||||
lx.ignore()
|
||||
return lx.pop()
|
||||
case commentHashStart:
|
||||
lx.push(lexBlockEnd)
|
||||
lx.push(lexBlockStart)
|
||||
return lexCommentStart
|
||||
case commentSlashStart:
|
||||
rn := lx.next()
|
||||
if rn == commentSlashStart {
|
||||
lx.push(lexBlockEnd)
|
||||
lx.push(lexBlockStart)
|
||||
return lexCommentStart
|
||||
}
|
||||
lx.backup()
|
||||
|
||||
2
vendor/github.com/nats-io/nats-server/v2/server/const.go
generated
vendored
2
vendor/github.com/nats-io/nats-server/v2/server/const.go
generated
vendored
@@ -41,7 +41,7 @@ var (
|
||||
|
||||
const (
|
||||
// VERSION is the current version for the server.
|
||||
VERSION = "2.10.15"
|
||||
VERSION = "2.10.16"
|
||||
|
||||
// PROTO is the currently supported protocol.
|
||||
// 0 was the original
|
||||
|
||||
32
vendor/github.com/nats-io/nats-server/v2/server/consumer.go
generated
vendored
32
vendor/github.com/nats-io/nats-server/v2/server/consumer.go
generated
vendored
@@ -2748,7 +2748,7 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
|
||||
return
|
||||
}
|
||||
|
||||
var sagap uint64
|
||||
var sgap, floor uint64
|
||||
var needSignal bool
|
||||
|
||||
switch o.cfg.AckPolicy {
|
||||
@@ -2792,12 +2792,29 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
|
||||
if o.maxp > 0 && len(o.pending) >= o.maxp {
|
||||
needSignal = true
|
||||
}
|
||||
sagap = sseq - o.asflr
|
||||
sgap = sseq - o.asflr
|
||||
floor = sgap // start at same and set lower as we go.
|
||||
o.adflr, o.asflr = dseq, sseq
|
||||
for seq := sseq; seq > sseq-sagap; seq-- {
|
||||
|
||||
remove := func(seq uint64) {
|
||||
delete(o.pending, seq)
|
||||
delete(o.rdc, seq)
|
||||
o.removeFromRedeliverQueue(seq)
|
||||
if seq < floor {
|
||||
floor = seq
|
||||
}
|
||||
}
|
||||
// Determine if smarter to walk all of pending vs the sequence range.
|
||||
if sgap > uint64(len(o.pending)) {
|
||||
for seq := range o.pending {
|
||||
if seq <= sseq {
|
||||
remove(seq)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for seq := sseq; seq > sseq-sgap && len(o.pending) > 0; seq-- {
|
||||
remove(seq)
|
||||
}
|
||||
}
|
||||
case AckNone:
|
||||
// FIXME(dlc) - This is error but do we care?
|
||||
@@ -2808,20 +2825,19 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
|
||||
// Update underlying store.
|
||||
o.updateAcks(dseq, sseq, reply)
|
||||
|
||||
clustered := o.node != nil
|
||||
|
||||
// In case retention changes for a stream, this ought to have been updated
|
||||
// using the consumer lock to avoid a race.
|
||||
retention := o.retention
|
||||
clustered := o.node != nil
|
||||
o.mu.Unlock()
|
||||
|
||||
// Let the owning stream know if we are interest or workqueue retention based.
|
||||
// If this consumer is clustered this will be handled by processReplicatedAck
|
||||
// after the ack has propagated.
|
||||
if !clustered && mset != nil && retention != LimitsPolicy {
|
||||
if sagap > 1 {
|
||||
// FIXME(dlc) - This is very inefficient, will need to fix.
|
||||
for seq := sseq; seq > sseq-sagap; seq-- {
|
||||
if sgap > 1 {
|
||||
// FIXME(dlc) - This can very inefficient, will need to fix.
|
||||
for seq := sseq; seq >= floor; seq-- {
|
||||
mset.ackMsg(o, seq)
|
||||
}
|
||||
} else {
|
||||
|
||||
5
vendor/github.com/nats-io/nats-server/v2/server/events.go
generated
vendored
5
vendor/github.com/nats-io/nats-server/v2/server/events.go
generated
vendored
@@ -632,6 +632,11 @@ func (s *Server) sendInternalAccountMsgWithReply(a *Account, subject, reply stri
|
||||
s.mu.RLock()
|
||||
if s.sys == nil || s.sys.sendq == nil {
|
||||
s.mu.RUnlock()
|
||||
if s.isShuttingDown() {
|
||||
// Skip in case this was called at the end phase during shut down
|
||||
// to avoid too many entries in the logs.
|
||||
return nil
|
||||
}
|
||||
return ErrNoSysAccount
|
||||
}
|
||||
c := s.sys.client
|
||||
|
||||
26
vendor/github.com/nats-io/nats-server/v2/server/filestore.go
generated
vendored
26
vendor/github.com/nats-io/nats-server/v2/server/filestore.go
generated
vendored
@@ -3430,7 +3430,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
|
||||
lseq := seq + num - 1
|
||||
|
||||
mb.mu.Lock()
|
||||
var needsRecord bool
|
||||
// If we are empty update meta directly.
|
||||
if mb.msgs == 0 {
|
||||
atomic.StoreUint64(&mb.last.seq, lseq)
|
||||
@@ -3438,7 +3437,6 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
|
||||
atomic.StoreUint64(&mb.first.seq, lseq+1)
|
||||
mb.first.ts = nowts
|
||||
} else {
|
||||
needsRecord = true
|
||||
for ; seq <= lseq; seq++ {
|
||||
mb.dmap.Insert(seq)
|
||||
}
|
||||
@@ -3446,9 +3444,7 @@ func (fs *fileStore) SkipMsgs(seq uint64, num uint64) error {
|
||||
mb.mu.Unlock()
|
||||
|
||||
// Write out our placeholder.
|
||||
if needsRecord {
|
||||
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)
|
||||
}
|
||||
mb.writeMsgRecord(emptyRecordLen, lseq|ebit, _EMPTY_, nil, nil, nowts, true)
|
||||
|
||||
// Now update FS accounting.
|
||||
// Update fs state.
|
||||
@@ -8169,6 +8165,7 @@ func (fs *fileStore) deleteBlocks() DeleteBlocks {
|
||||
}
|
||||
|
||||
// SyncDeleted will make sure this stream has same deleted state as dbs.
|
||||
// This will only process deleted state within our current state.
|
||||
func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
|
||||
if len(dbs) == 0 {
|
||||
return
|
||||
@@ -8177,18 +8174,22 @@ func (fs *fileStore) SyncDeleted(dbs DeleteBlocks) {
|
||||
fs.mu.Lock()
|
||||
defer fs.mu.Unlock()
|
||||
|
||||
lseq := fs.state.LastSeq
|
||||
var needsCheck DeleteBlocks
|
||||
|
||||
fs.readLockAllMsgBlocks()
|
||||
mdbs := fs.deleteBlocks()
|
||||
for i, db := range dbs {
|
||||
first, last, num := db.State()
|
||||
// If the block is same as what we have we can skip.
|
||||
if i < len(mdbs) {
|
||||
first, last, num := db.State()
|
||||
eFirst, eLast, eNum := mdbs[i].State()
|
||||
if first == eFirst && last == eLast && num == eNum {
|
||||
continue
|
||||
}
|
||||
} else if first > lseq {
|
||||
// Skip blocks not applicable to our current state.
|
||||
continue
|
||||
}
|
||||
// Need to insert these.
|
||||
needsCheck = append(needsCheck, db)
|
||||
@@ -8616,9 +8617,16 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
|
||||
sgap := sseq - o.state.AckFloor.Stream
|
||||
o.state.AckFloor.Consumer = dseq
|
||||
o.state.AckFloor.Stream = sseq
|
||||
for seq := sseq; seq > sseq-sgap; seq-- {
|
||||
delete(o.state.Pending, seq)
|
||||
if len(o.state.Redelivered) > 0 {
|
||||
if sgap > uint64(len(o.state.Pending)) {
|
||||
for seq := range o.state.Pending {
|
||||
if seq <= sseq {
|
||||
delete(o.state.Pending, seq)
|
||||
delete(o.state.Redelivered, seq)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for seq := sseq; seq > sseq-sgap && len(o.state.Pending) > 0; seq-- {
|
||||
delete(o.state.Pending, seq)
|
||||
delete(o.state.Redelivered, seq)
|
||||
}
|
||||
}
|
||||
|
||||
75
vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go
generated
vendored
75
vendor/github.com/nats-io/nats-server/v2/server/jetstream_cluster.go
generated
vendored
@@ -2405,9 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
|
||||
// If we are interest based make sure to check consumers if interest retention policy.
|
||||
// This is to make sure we process any outstanding acks from all consumers.
|
||||
mset.checkInterestState()
|
||||
// Make sure we create a new snapshot in case things have changed such that any existing
|
||||
// snapshot may no longer be valid.
|
||||
doSnapshot()
|
||||
// If we became leader during this time and we need to send a snapshot to our
|
||||
// followers, i.e. as a result of a scale-up from R1, do it now.
|
||||
if sendSnapshot && isLeader && mset != nil && n != nil {
|
||||
@@ -2941,6 +2938,7 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
|
||||
|
||||
if err != nil {
|
||||
if err == errLastSeqMismatch {
|
||||
|
||||
var state StreamState
|
||||
mset.store.FastState(&state)
|
||||
|
||||
@@ -2952,6 +2950,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
|
||||
// Retry
|
||||
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts)
|
||||
}
|
||||
// FIXME(dlc) - We could just run a catchup with a request defining the span between what we expected
|
||||
// and what we got.
|
||||
}
|
||||
|
||||
// Only return in place if we are going to reset our stream or we are out of space, or we are closed.
|
||||
@@ -3568,9 +3568,14 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
|
||||
js.mu.Unlock()
|
||||
}
|
||||
|
||||
var needsSetLeader bool
|
||||
if !alreadyRunning && numReplicas > 1 {
|
||||
if needsNode {
|
||||
// Since we are scaling up we want to make sure our sync subject
|
||||
// is registered before we start our raft node.
|
||||
mset.mu.Lock()
|
||||
mset.startClusterSubs()
|
||||
mset.mu.Unlock()
|
||||
|
||||
js.createRaftGroup(acc.GetName(), rg, storage, pprofLabels{
|
||||
"type": "stream",
|
||||
"account": mset.accName(),
|
||||
@@ -3602,16 +3607,13 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
|
||||
rg.node = nil
|
||||
js.mu.Unlock()
|
||||
}
|
||||
// Set the new stream assignment.
|
||||
mset.setStreamAssignment(sa)
|
||||
|
||||
// Call update.
|
||||
if err = mset.updateWithAdvisory(cfg, !recovering); err != nil {
|
||||
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", cfg.Name, acc.Name, err)
|
||||
}
|
||||
// Set the new stream assignment.
|
||||
mset.setStreamAssignment(sa)
|
||||
// Make sure we are the leader now that we are R1.
|
||||
if needsSetLeader {
|
||||
mset.setLeader(true)
|
||||
}
|
||||
}
|
||||
|
||||
// If not found we must be expanding into this node since if we are here we know we are a member.
|
||||
@@ -7582,7 +7584,8 @@ func (mset *stream) supportsBinarySnapshotLocked() bool {
|
||||
// We know we support ourselves.
|
||||
continue
|
||||
}
|
||||
if sir, ok := s.nodeToInfo.Load(p.ID); !ok || sir == nil || !sir.(nodeInfo).binarySnapshots {
|
||||
// Since release 2.10.16 only deny if we know the other node does not support.
|
||||
if sir, ok := s.nodeToInfo.Load(p.ID); ok && sir != nil && !sir.(nodeInfo).binarySnapshots {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -8681,7 +8684,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
return 0
|
||||
}
|
||||
|
||||
nextBatchC := make(chan struct{}, 1)
|
||||
nextBatchC := make(chan struct{}, 4)
|
||||
nextBatchC <- struct{}{}
|
||||
remoteQuitCh := make(chan struct{})
|
||||
|
||||
@@ -8706,19 +8709,18 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
// Kick ourselves and anyone else who might have stalled on global state.
|
||||
select {
|
||||
case nextBatchC <- struct{}{}:
|
||||
// Reset our activity
|
||||
notActive.Reset(activityInterval)
|
||||
default:
|
||||
}
|
||||
// Reset our activity
|
||||
notActive.Reset(activityInterval)
|
||||
})
|
||||
defer s.sysUnsubscribe(ackSub)
|
||||
ackReplyT := strings.ReplaceAll(ackReply, ".*", ".%d")
|
||||
|
||||
// Grab our state.
|
||||
var state StreamState
|
||||
mset.mu.RLock()
|
||||
// mset.store never changes after being set, don't need lock.
|
||||
mset.store.FastState(&state)
|
||||
mset.mu.RUnlock()
|
||||
|
||||
// Reset notion of first if this request wants sequences before our starting sequence
|
||||
// and we would have nothing to send. If we have partial messages still need to send skips for those.
|
||||
@@ -8756,7 +8758,7 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
// Wait til we can send at least 4k
|
||||
const minBatchWait = int32(4 * 1024)
|
||||
mw := time.NewTimer(minWait)
|
||||
for done := false; !done; {
|
||||
for done := maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait; !done; {
|
||||
select {
|
||||
case <-nextBatchC:
|
||||
done = maxOutMsgs-atomic.LoadInt32(&outm) > minBatchWait
|
||||
@@ -8811,9 +8813,33 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
dr.First, dr.Num = 0, 0
|
||||
}
|
||||
|
||||
// See if we should use LoadNextMsg instead of walking sequence by sequence if we have an order magnitude more interior deletes.
|
||||
// Only makes sense with delete range capabilities.
|
||||
useLoadNext := drOk && (uint64(state.NumDeleted) > 10*state.Msgs)
|
||||
|
||||
var smv StoreMsg
|
||||
for ; seq <= last && atomic.LoadInt64(&outb) <= maxOutBytes && atomic.LoadInt32(&outm) <= maxOutMsgs && s.gcbBelowMax(); seq++ {
|
||||
sm, err := mset.store.LoadMsg(seq, &smv)
|
||||
var sm *StoreMsg
|
||||
var err error
|
||||
// Is we should use load next do so here.
|
||||
if useLoadNext {
|
||||
var nseq uint64
|
||||
sm, nseq, err = mset.store.LoadNextMsg(fwcs, true, seq, &smv)
|
||||
if err == nil && nseq > seq {
|
||||
dr.First, dr.Num = seq, nseq-seq
|
||||
// Jump ahead
|
||||
seq = nseq
|
||||
} else if err == ErrStoreEOF {
|
||||
dr.First, dr.Num = seq, state.LastSeq-seq
|
||||
// Clear EOF here for normal processing.
|
||||
err = nil
|
||||
// Jump ahead
|
||||
seq = state.LastSeq
|
||||
}
|
||||
} else {
|
||||
sm, err = mset.store.LoadMsg(seq, &smv)
|
||||
}
|
||||
|
||||
// if this is not a deleted msg, bail out.
|
||||
if err != nil && err != ErrStoreMsgNotFound && err != errDeletedMsg {
|
||||
if err == ErrStoreEOF {
|
||||
@@ -8829,6 +8855,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
if n := mset.raftNode(); n != nil {
|
||||
n.InstallSnapshot(mset.stateSnapshot())
|
||||
}
|
||||
// If we allow gap markers check if we have one pending.
|
||||
if drOk && dr.First > 0 {
|
||||
sendDR()
|
||||
}
|
||||
// Signal EOF
|
||||
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
|
||||
return false
|
||||
@@ -8875,6 +8905,9 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
}
|
||||
// Recheck our exit condition.
|
||||
if seq == last {
|
||||
if drOk && dr.First > 0 {
|
||||
sendDR()
|
||||
}
|
||||
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
|
||||
// EOF
|
||||
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
|
||||
@@ -8890,7 +8923,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
if drOk && dr.First > 0 {
|
||||
sendDR()
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -8930,6 +8962,11 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
|
||||
mset.clearCatchupPeer(sreq.Peer)
|
||||
return
|
||||
}
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
if !sendNextBatchAndContinue(qch) {
|
||||
mset.clearCatchupPeer(sreq.Peer)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
14
vendor/github.com/nats-io/nats-server/v2/server/memstore.go
generated
vendored
14
vendor/github.com/nats-io/nats-server/v2/server/memstore.go
generated
vendored
@@ -1402,19 +1402,25 @@ func (ms *memStore) EncodedStreamState(failed uint64) ([]byte, error) {
|
||||
|
||||
// SyncDeleted will make sure this stream has same deleted state as dbs.
|
||||
func (ms *memStore) SyncDeleted(dbs DeleteBlocks) {
|
||||
ms.mu.Lock()
|
||||
defer ms.mu.Unlock()
|
||||
|
||||
// For now we share one dmap, so if we have one entry here check if states are the same.
|
||||
// Note this will work for any DeleteBlock type, but we expect this to be a dmap too.
|
||||
if len(dbs) == 1 {
|
||||
ms.mu.RLock()
|
||||
min, max, num := ms.dmap.State()
|
||||
ms.mu.RUnlock()
|
||||
if pmin, pmax, pnum := dbs[0].State(); pmin == min && pmax == max && pnum == num {
|
||||
return
|
||||
}
|
||||
}
|
||||
lseq := ms.state.LastSeq
|
||||
for _, db := range dbs {
|
||||
db.Range(func(dseq uint64) bool {
|
||||
ms.RemoveMsg(dseq)
|
||||
// Skip if beyond our current state.
|
||||
if first, _, _ := db.State(); first > lseq {
|
||||
continue
|
||||
}
|
||||
db.Range(func(seq uint64) bool {
|
||||
ms.removeMsg(seq, false)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
16
vendor/github.com/nats-io/nats-server/v2/server/raft.go
generated
vendored
16
vendor/github.com/nats-io/nats-server/v2/server/raft.go
generated
vendored
@@ -887,8 +887,10 @@ func (n *raft) ResumeApply() {
|
||||
}
|
||||
|
||||
n.debug("Resuming our apply channel")
|
||||
n.observer, n.pobserver = n.pobserver, false
|
||||
n.paused = false
|
||||
|
||||
// Reset before we start.
|
||||
n.resetElectionTimeout()
|
||||
|
||||
// Run catchup..
|
||||
if n.hcommit > n.commit {
|
||||
n.debug("Resuming %d replays", n.hcommit+1-n.commit)
|
||||
@@ -904,12 +906,16 @@ func (n *raft) ResumeApply() {
|
||||
runtime.Gosched()
|
||||
// Simply re-acquire
|
||||
n.Lock()
|
||||
// Need to check if we got closed or if we were paused again.
|
||||
if n.State() == Closed || n.paused {
|
||||
// Need to check if we got closed.
|
||||
if n.State() == Closed {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clear our observer and paused state after we apply.
|
||||
n.observer, n.pobserver = n.pobserver, false
|
||||
n.paused = false
|
||||
n.hcommit = 0
|
||||
|
||||
// If we had been selected to be the next leader campaign here now that we have resumed.
|
||||
@@ -3352,7 +3358,7 @@ func (n *raft) processAppendEntry(ae *appendEntry, sub *subscription) {
|
||||
if l > paeWarnThreshold && l%paeWarnModulo == 0 {
|
||||
n.warn("%d append entries pending", len(n.pae))
|
||||
}
|
||||
} else {
|
||||
} else if l%paeWarnModulo == 0 {
|
||||
n.debug("Not saving to append entries pending")
|
||||
}
|
||||
} else {
|
||||
|
||||
8
vendor/github.com/nats-io/nats-server/v2/server/server.go
generated
vendored
8
vendor/github.com/nats-io/nats-server/v2/server/server.go
generated
vendored
@@ -932,7 +932,13 @@ func (s *Server) setClusterName(name string) {
|
||||
|
||||
// Return whether the cluster name is dynamic.
|
||||
func (s *Server) isClusterNameDynamic() bool {
|
||||
return s.getOpts().Cluster.Name == _EMPTY_
|
||||
// We need to lock the whole "Cluster.Name" check and not use s.getOpts()
|
||||
// because otherwise this could cause a data race with setting the name in
|
||||
// route.go's processRouteConnect().
|
||||
s.optsMu.RLock()
|
||||
dynamic := s.opts.Cluster.Name == _EMPTY_
|
||||
s.optsMu.RUnlock()
|
||||
return dynamic
|
||||
}
|
||||
|
||||
// Returns our configured serverName.
|
||||
|
||||
10
vendor/github.com/nats-io/nats-server/v2/server/stream.go
generated
vendored
10
vendor/github.com/nats-io/nats-server/v2/server/stream.go
generated
vendored
@@ -840,7 +840,9 @@ func (mset *stream) setLeader(isLeader bool) error {
|
||||
if isLeader {
|
||||
// Make sure we are listening for sync requests.
|
||||
// TODO(dlc) - Original design was that all in sync members of the group would do DQ.
|
||||
mset.startClusterSubs()
|
||||
if mset.isClustered() {
|
||||
mset.startClusterSubs()
|
||||
}
|
||||
|
||||
// Setup subscriptions if we were not already the leader.
|
||||
if err := mset.subscribeToStream(); err != nil {
|
||||
@@ -875,7 +877,7 @@ func (mset *stream) setLeader(isLeader bool) error {
|
||||
|
||||
// Lock should be held.
|
||||
func (mset *stream) startClusterSubs() {
|
||||
if mset.isClustered() && mset.syncSub == nil {
|
||||
if mset.syncSub == nil {
|
||||
mset.syncSub, _ = mset.srv.systemSubscribe(mset.sa.Sync, _EMPTY_, false, mset.sysc, mset.handleClusterSyncRequest)
|
||||
}
|
||||
}
|
||||
@@ -4868,6 +4870,10 @@ func (mset *stream) name() string {
|
||||
|
||||
func (mset *stream) internalLoop() {
|
||||
mset.mu.RLock()
|
||||
setGoRoutineLabels(pprofLabels{
|
||||
"account": mset.acc.Name,
|
||||
"stream": mset.cfg.Name,
|
||||
})
|
||||
s := mset.srv
|
||||
c := s.createInternalJetStreamClient()
|
||||
c.registerWithAccount(mset.acc)
|
||||
|
||||
6
vendor/modules.txt
vendored
6
vendor/modules.txt
vendored
@@ -1250,7 +1250,7 @@ github.com/justinas/alice
|
||||
# github.com/kevinburke/ssh_config v1.2.0
|
||||
## explicit
|
||||
github.com/kevinburke/ssh_config
|
||||
# github.com/klauspost/compress v1.17.7
|
||||
# github.com/klauspost/compress v1.17.8
|
||||
## explicit; go 1.20
|
||||
github.com/klauspost/compress/flate
|
||||
github.com/klauspost/compress/internal/race
|
||||
@@ -1406,10 +1406,10 @@ github.com/mohae/deepcopy
|
||||
# github.com/mschoch/smat v0.2.0
|
||||
## explicit; go 1.13
|
||||
github.com/mschoch/smat
|
||||
# github.com/nats-io/jwt/v2 v2.5.6
|
||||
# github.com/nats-io/jwt/v2 v2.5.7
|
||||
## explicit; go 1.18
|
||||
github.com/nats-io/jwt/v2
|
||||
# github.com/nats-io/nats-server/v2 v2.10.15
|
||||
# github.com/nats-io/nats-server/v2 v2.10.16
|
||||
## explicit; go 1.20
|
||||
github.com/nats-io/nats-server/v2/conf
|
||||
github.com/nats-io/nats-server/v2/internal/fastrand
|
||||
|
||||
Reference in New Issue
Block a user