Bump github.com/nats-io/nats-server/v2 from 2.10.24 to 2.10.25

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.24 to 2.10.25.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.10.24...v2.10.25)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-02-05 14:51:18 +00:00
committed by GitHub
parent 46df1a041e
commit ee26fb3c6f
18 changed files with 570 additions and 454 deletions
+2 -2
View File
@@ -57,7 +57,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.3.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.24
github.com/nats-io/nats-server/v2 v2.10.25
github.com/nats-io/nats.go v1.38.0
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
@@ -323,7 +323,7 @@ require (
go.uber.org/zap v1.23.0 // indirect
golang.org/x/mod v0.22.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/time v0.9.0 // indirect
golang.org/x/tools v0.28.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect
+4 -4
View File
@@ -823,8 +823,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE=
github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4=
github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4=
github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s=
github.com/nats-io/nats-server/v2 v2.10.25 h1:J0GWLDDXo5HId7ti/lTmBfs+lzhmu8RPkoKl0eSCqwc=
github.com/nats-io/nats-server/v2 v2.10.25/go.mod h1:/YYYQO7cuoOBt+A7/8cVjuhWTaTUEAlZbJT+3sMAfFU=
github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA=
github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw=
github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0=
@@ -1464,8 +1464,8 @@ golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY=
golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+1 -1
View File
@@ -55,7 +55,7 @@ func init() {
const (
// VERSION is the current version for the server.
VERSION = "2.10.24"
VERSION = "2.10.25"
// PROTO is the currently supported protocol.
// 0 was the original
+94 -79
View File
@@ -1373,6 +1373,8 @@ func (o *consumer) setLeader(isLeader bool) {
// If we were the leader make sure to drain queued up acks.
if wasLeader {
o.ackMsgs.drain()
// Reset amount of acks that need to be processed.
atomic.StoreInt64(&o.awl, 0)
// Also remove any pending replies since we should not be the one to respond at this point.
o.replies = nil
}
@@ -1415,8 +1417,23 @@ func (o *consumer) unsubscribe(sub *subscription) {
// We need to make sure we protect access to the outq.
// Do all advisory sends here.
func (o *consumer) sendAdvisory(subj string, msg []byte) {
o.outq.sendMsg(subj, msg)
func (o *consumer) sendAdvisory(subject string, e any) {
if o.acc == nil {
return
}
// If there is no one listening for this advisory then save ourselves the effort
// and don't bother encoding the JSON or sending it.
if sl := o.acc.sl; (sl != nil && !sl.HasInterest(subject)) && !o.srv.hasGatewayInterest(o.acc.Name, subject) {
return
}
j, err := json.Marshal(e)
if err != nil {
return
}
o.outq.sendMsg(subject, j)
}
func (o *consumer) sendDeleteAdvisoryLocked() {
@@ -1432,13 +1449,8 @@ func (o *consumer) sendDeleteAdvisoryLocked() {
Domain: o.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(e)
if err != nil {
return
}
subj := JSAdvisoryConsumerDeletedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}
func (o *consumer) sendCreateAdvisory() {
@@ -1457,13 +1469,8 @@ func (o *consumer) sendCreateAdvisory() {
Domain: o.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(e)
if err != nil {
return
}
subj := JSAdvisoryConsumerCreatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
}
// Created returns created time.
@@ -1573,6 +1580,8 @@ var (
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)
// deleteNotActive must only be called from time.AfterFunc or in its own
// goroutine, as it can block on clean-up.
func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
@@ -1613,8 +1622,25 @@ func (o *consumer) deleteNotActive() {
s, js := o.mset.srv, o.srv.js.Load()
acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct
var qch, cqch chan struct{}
if o.srv != nil {
qch = o.srv.quitCh
}
if o.js != nil {
cqch = o.js.clusterQuitC()
}
o.mu.Unlock()
// Useful for pprof.
setGoRoutineLabels(pprofLabels{
"account": acc,
"stream": stream,
"consumer": name,
})
// We will delete locally regardless.
defer o.delete()
// If we are clustered, check if we still have this consumer assigned.
// If we do forward a proposal to delete ourselves to the metacontroller leader.
if !isDirect && s.JetStreamIsClustered() {
@@ -1637,38 +1663,40 @@ func (o *consumer) deleteNotActive() {
if ca != nil && cc != nil {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
return
}
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-qch:
return
case <-cqch:
return
}
}()
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
return
}
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
continue
}
// We saw that consumer has been removed, all done.
return
}
}
}
// We will delete here regardless.
o.delete()
}
func (o *consumer) watchGWinterest() {
@@ -2382,12 +2410,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
Domain: o.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(e)
if err != nil {
return
}
o.sendAdvisory(o.nakEventT, j)
o.sendAdvisory(o.nakEventT, e)
// Check to see if we have delays attached.
if len(nak) > len(AckNak) {
@@ -2462,15 +2485,8 @@ func (o *consumer) processTerm(sseq, dseq, dc uint64, reason, reply string) bool
Domain: o.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(e)
if err != nil {
// We had an error during the marshal, so we can't send the advisory,
// but we still need to tell the caller that the ack was processed.
return ackedInPlace
}
subj := JSAdvisoryConsumerMsgTerminatedPre + "." + o.stream + "." + o.name
o.sendAdvisory(subj, j)
o.sendAdvisory(subj, e)
return ackedInPlace
}
@@ -2765,12 +2781,7 @@ func (o *consumer) sampleAck(sseq, dseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(e)
if err != nil {
return
}
o.sendAdvisory(o.ackEventT, j)
o.sendAdvisory(o.ackEventT, e)
}
// Process an ACK.
@@ -2851,7 +2862,8 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
// no-op
if dseq <= o.adflr || sseq <= o.asflr {
o.mu.Unlock()
return ackInPlace
// Return true to let caller respond back to the client.
return true
}
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
@@ -3515,12 +3527,7 @@ func (o *consumer) notifyDeliveryExceeded(sseq, dc uint64) {
Domain: o.srv.getOpts().JetStreamDomain,
}
j, err := json.Marshal(e)
if err != nil {
return
}
o.sendAdvisory(o.deliveryExcEventT, j)
o.sendAdvisory(o.deliveryExcEventT, e)
}
// Check if the candidate subject matches a filter if its present.
@@ -3596,17 +3603,23 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
}
continue
}
if seq > 0 {
pmsg := getJSPubMsgFromPool()
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
return pmsg, dc, err
pmsg := getJSPubMsgFromPool()
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
// Message was scheduled for redelivery but was removed in the meantime.
if err == ErrStoreMsgNotFound || err == errDeletedMsg {
// This is a race condition where the message is still in o.pending and
// scheduled for redelivery, but it has been removed from the stream.
// o.processTerm is called in a goroutine so could run after we get here.
// That will correct the pending state and delivery/ack floors, so just skip here.
continue
}
return pmsg, dc, err
}
}
@@ -5379,6 +5392,7 @@ func (o *consumer) requestNextMsgSubject() string {
func (o *consumer) decStreamPending(sseq uint64, subj string) {
o.mu.Lock()
// Update our cached num pending only if we think deliverMsg has not done so.
if sseq >= o.sseq && o.isFilteredMatch(subj) {
o.npc--
@@ -5390,6 +5404,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) {
if o.rdc != nil {
rdc = o.rdc[sseq]
}
o.mu.Unlock()
// If it was pending process it like an ack.
+22
View File
@@ -324,6 +324,28 @@ func (ci *ClientInfo) forAssignmentSnap() *ClientInfo {
}
}
// forProposal returns the minimum amount of ClientInfo we need for assignment proposals.
func (ci *ClientInfo) forProposal() *ClientInfo {
if ci == nil {
return nil
}
cci := *ci
cci.Jwt = _EMPTY_
cci.IssuerKey = _EMPTY_
return &cci
}
// forAdvisory returns the minimum amount of ClientInfo we need for JS advisory events.
func (ci *ClientInfo) forAdvisory() *ClientInfo {
if ci == nil {
return nil
}
cci := *ci
cci.Jwt = _EMPTY_
cci.Alternates = nil
return &cci
}
// ServerStats hold various statistics that we will periodically send out.
type ServerStats struct {
Start time.Time `json:"start"`
+160 -81
View File
@@ -495,7 +495,10 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
// Do age checks too, make sure to call in place.
if fs.cfg.MaxAge != 0 {
fs.expireMsgsOnRecover()
err := fs.expireMsgsOnRecover()
if isPermissionError(err) {
return nil, err
}
fs.startAgeChk()
}
@@ -1376,14 +1379,14 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
}
hdr := buf[index : index+msgHdrSize]
rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:])
rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:]))
hasHeaders := rl&hbit != 0
// Clear any headers bit that could be set.
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
truncate(index)
return gatherLost(lbuf - index), tombstones, errBadMsg
}
@@ -1978,9 +1981,9 @@ func (fs *fileStore) recoverMsgs() error {
// We will treat this differently in case we have a recovery
// that will expire alot of messages on startup.
// Should only be called on startup.
func (fs *fileStore) expireMsgsOnRecover() {
func (fs *fileStore) expireMsgsOnRecover() error {
if fs.state.Msgs == 0 {
return
return nil
}
var minAge = time.Now().UnixNano() - int64(fs.cfg.MaxAge)
@@ -1992,7 +1995,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
// usually taken care of by fs.removeMsgBlock() but we do not call that here.
var last msgId
deleteEmptyBlock := func(mb *msgBlock) {
deleteEmptyBlock := func(mb *msgBlock) error {
// If we are the last keep state to remember first/last sequence.
// Do this part by hand since not deleting one by one.
if mb == fs.lmb {
@@ -2008,8 +2011,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
}
return true
})
mb.dirtyCloseWithRemove(true)
err := mb.dirtyCloseWithRemove(true)
if isPermissionError(err) {
return err
}
deleted++
return nil
}
for _, mb := range fs.blks {
@@ -2023,8 +2030,11 @@ func (fs *fileStore) expireMsgsOnRecover() {
if mb.last.ts <= minAge {
purged += mb.msgs
bytes += mb.bytes
deleteEmptyBlock(mb)
err := deleteEmptyBlock(mb)
mb.mu.Unlock()
if isPermissionError(err) {
return err
}
continue
}
@@ -2148,6 +2158,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
if purged > 0 {
fs.dirty++
}
return nil
}
func copyMsgBlocks(src []*msgBlock) []*msgBlock {
@@ -2315,8 +2326,8 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq = lseq + 1
for _, subj := range subs {
ss, _ := mb.fss.Find(stringToBytes(subj))
if ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
if ss == nil || start > ss.Last || ss.First >= fseq {
continue
@@ -2445,8 +2456,8 @@ func (mb *msgBlock) filteredPendingLocked(filter string, wc bool, sseq uint64) (
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(bytesToString(bsubj), ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(bytesToString(bsubj), ss)
}
if sseq <= ss.First {
update(ss)
@@ -2616,10 +2627,6 @@ func (fs *fileStore) numFilteredPendingWithLast(filter string, last bool, ss *Si
// Always reset.
ss.First, ss.Last, ss.Msgs = 0, 0, 0
if filter == _EMPTY_ {
filter = fwcs
}
// We do need to figure out the first and last sequences.
wc := subjectHasWildcard(filter)
start, stop := uint32(math.MaxUint32), uint32(0)
@@ -2749,8 +2756,8 @@ func (fs *fileStore) SubjectsState(subject string) map[string]SimpleState {
mb.lsts = time.Now().UnixNano()
mb.fss.Match(stringToBytes(subject), func(bsubj []byte, ss *SimpleState) {
subj := string(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
oss := fss[subj]
if oss.First == 0 { // New
@@ -2940,8 +2947,8 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
return
}
subj := bytesToString(bsubj)
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
@@ -3228,8 +3235,8 @@ func (fs *fileStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject bo
// If we already found a partial then don't do anything else.
return
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
if sseq <= ss.First {
t += ss.Msgs
@@ -3467,6 +3474,9 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
dios <- struct{}{}
if err != nil {
if isPermissionError(err) {
return nil, err
}
mb.dirtyCloseWithRemove(true)
return nil, fmt.Errorf("Error creating msg block file: %v", err)
}
@@ -3902,8 +3912,8 @@ func (fs *fileStore) firstSeqForSubj(subj string) (uint64, error) {
info.fblk = i
}
}
if ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
// Re-acquire fs lock
@@ -4034,8 +4044,8 @@ func (fs *fileStore) enforceMsgPerSubjectLimit(fireCallback bool) {
mb.mu.Lock()
mb.ensurePerSubjectInfoLoaded()
ss, ok := mb.fss.Find(stringToBytes(subj))
if ok && ss != nil && ss.firstNeedsUpdate {
mb.recalculateFirstForSubj(subj, ss.First, ss)
if ok && ss != nil && (ss.firstNeedsUpdate || ss.lastNeedsUpdate) {
mb.recalculateForSubj(subj, ss)
}
mb.mu.Unlock()
if ss == nil {
@@ -4364,12 +4374,12 @@ func (mb *msgBlock) compactWithFloor(floor uint64) {
return
}
hdr := buf[index : index+msgHdrSize]
rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:])
rl, slen := le.Uint32(hdr[0:]), int(le.Uint16(hdr[20:]))
// Clear any headers bit that could be set.
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > rlBadThresh || index+rl > lbuf {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || index+rl > lbuf || rl > rlBadThresh {
return
}
// Only need to process non-deleted messages.
@@ -5473,16 +5483,23 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
<-dios
tmpFD, err := os.OpenFile(tmpFN, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, defaultFilePerms)
dios <- struct{}{}
if err != nil {
return fmt.Errorf("failed to create temporary file: %w", err)
}
errorCleanup := func(err error) error {
tmpFD.Close()
os.Remove(tmpFN)
return err
}
// The original buffer at this point is uncompressed, so we will now compress
// it if needed. Note that if the selected algorithm is NoCompression, the
// Compress function will just return the input buffer unmodified.
cmpBuf, err := alg.Compress(origBuf)
if err != nil {
return fmt.Errorf("failed to compress block: %w", err)
return errorCleanup(fmt.Errorf("failed to compress block: %w", err))
}
// We only need to write out the metadata header if compression is enabled.
@@ -5500,7 +5517,7 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
if mb.bek != nil && len(cmpBuf) > 0 {
bek, err := genBlockEncryptionKey(mb.fs.fcfg.Cipher, mb.seed, mb.nonce)
if err != nil {
return err
return errorCleanup(err)
}
mb.bek = bek
mb.bek.XORKeyStream(cmpBuf, cmpBuf)
@@ -5508,11 +5525,6 @@ func (mb *msgBlock) recompressOnDiskIfNeeded() error {
// Write the new block data (which might be compressed or encrypted) to the
// temporary file.
errorCleanup := func(err error) error {
tmpFD.Close()
os.Remove(tmpFN)
return err
}
if n, err := tmpFD.Write(cmpBuf); err != nil {
return errorCleanup(fmt.Errorf("failed to write to temporary file: %w", err))
} else if n != len(cmpBuf) {
@@ -6488,7 +6500,7 @@ func (mb *msgBlock) msgFromBuf(buf []byte, sm *StoreMsg, hh hash.Hash64) (*Store
dlen := int(rl) - msgHdrSize
slen := int(le.Uint16(hdr[20:]))
// Simple sanity check.
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) {
if dlen < 0 || slen > (dlen-recordHashSize) || dlen > int(rl) || int(rl) > len(buf) || rl > rlBadThresh {
return nil, errBadMsg
}
data := buf[msgHdrSize : msgHdrSize+dlen]
@@ -7783,9 +7795,9 @@ func (mb *msgBlock) dirtyClose() {
}
// Should be called with lock held.
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
func (mb *msgBlock) dirtyCloseWithRemove(remove bool) error {
if mb == nil {
return
return nil
}
// Stop cache expiration timer.
if mb.ctmr != nil {
@@ -7807,13 +7819,20 @@ func (mb *msgBlock) dirtyCloseWithRemove(remove bool) {
// Clear any tracking by subject if we are removing.
mb.fss = nil
if mb.mfn != _EMPTY_ {
os.Remove(mb.mfn)
err := os.Remove(mb.mfn)
if isPermissionError(err) {
return err
}
mb.mfn = _EMPTY_
}
if mb.kfn != _EMPTY_ {
os.Remove(mb.kfn)
err := os.Remove(mb.kfn)
if isPermissionError(err) {
return err
}
}
}
return nil
}
// Remove a seq from the fss and select new first.
@@ -7836,24 +7855,14 @@ func (mb *msgBlock) removeSeqPerSubject(subj string, seq uint64) {
ss.Msgs--
// Only one left.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
ss.firstNeedsUpdate = false
return
}
// We can lazily calculate the first sequence when needed.
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}
// Will recalulate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject in this block.
// Will avoid slower path message lookups and scan the cache directly instead.
func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
func (mb *msgBlock) recalculateForSubj(subj string, ss *SimpleState) {
// Need to make sure messages are loaded.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
@@ -7861,42 +7870,100 @@ func (mb *msgBlock) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
}
}
// Mark first as updated.
ss.firstNeedsUpdate = false
startSlot := int(startSeq - mb.cache.fseq)
startSlot := int(ss.First - mb.cache.fseq)
if startSlot < 0 {
startSlot = 0
}
if startSlot >= len(mb.cache.idx) {
ss.First = ss.Last
return
} else if startSlot < 0 {
startSlot = 0
}
endSlot := int(ss.Last - mb.cache.fseq)
if endSlot < 0 {
endSlot = 0
}
if endSlot >= len(mb.cache.idx) || startSlot > endSlot {
return
}
var le = binary.LittleEndian
for slot, fseq := startSlot, atomic.LoadUint64(&mb.first.seq); slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
if ss.firstNeedsUpdate {
// Mark first as updated.
ss.firstNeedsUpdate = false
fseq := ss.First + 1
if mbFseq := atomic.LoadUint64(&mb.first.seq); fseq < mbFseq {
fseq = mbFseq
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
for slot := startSlot; slot < len(mb.cache.idx); slot++ {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
ss.First = ss.Last
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq < fseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
ss.First = seq
if ss.Msgs == 1 {
ss.Last = seq
ss.lastNeedsUpdate = false
return
}
// Skip the start slot ahead, if we need to recalculate last we can stop early.
startSlot = slot
break
}
}
}
if ss.lastNeedsUpdate {
// Mark last as updated.
ss.lastNeedsUpdate = false
lseq := ss.Last - 1
if mbLseq := atomic.LoadUint64(&mb.last.seq); lseq > mbLseq {
lseq = mbLseq
}
for slot := endSlot; slot >= startSlot; slot-- {
bi := mb.cache.idx[slot] &^ hbit
if bi == dbit {
// delete marker so skip.
continue
}
li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
// Can't overwrite ss.Last, just skip.
return
}
buf := mb.cache.buf[li:]
hdr := buf[:msgHdrSize]
slen := int(le.Uint16(hdr[20:]))
if subj == bytesToString(buf[msgHdrSize:msgHdrSize+slen]) {
seq := le.Uint64(hdr[4:])
if seq > lseq || seq&ebit != 0 || mb.dmap.Exists(seq) {
continue
}
// Sequence should never be lower, but guard against it nonetheless.
if seq < ss.First {
seq = ss.First
}
ss.Last = seq
if ss.Msgs == 1 {
ss.First = seq
ss.firstNeedsUpdate = false
}
return
}
return
}
}
}
@@ -8178,7 +8245,15 @@ func (fs *fileStore) flushStreamStateLoop(qch, done chan struct{}) {
for {
select {
case <-t.C:
fs.writeFullState()
err := fs.writeFullState()
if isPermissionError(err) && fs.srv != nil {
fs.warn("File system permission denied when flushing stream state, disabling JetStream: %v", err)
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
fs.srv.DisableJetStream()
return
}
case <-qch:
return
}
@@ -8386,7 +8461,11 @@ func (fs *fileStore) _writeFullState(force bool) error {
// Protect with dios.
<-dios
err := os.WriteFile(fn, buf, defaultFilePerms)
// if file system is not writable isPermissionError is set to true
dios <- struct{}{}
if isPermissionError(err) {
return err
}
// Update dirty if successful.
if err == nil {
+5 -2
View File
@@ -190,14 +190,16 @@ func (q *ipQueue[T]) len() int {
}
// Empty the queue and consumes the notification signal if present.
// Returns the number of items that were drained from the queue.
// Note that this could cause a reader go routine that has been
// notified that there is something in the queue (reading from queue's `ch`)
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
func (q *ipQueue[T]) drain() {
func (q *ipQueue[T]) drain() int {
if q == nil {
return
return 0
}
q.Lock()
olen := len(q.elts)
if q.elts != nil {
q.resetAndReturnToPool(&q.elts)
q.elts, q.pos = nil, 0
@@ -209,6 +211,7 @@ func (q *ipQueue[T]) drain() {
default:
}
q.Unlock()
return olen
}
// Since the length of the queue goes to 0 after a pop(), it is good to
+11
View File
@@ -2974,3 +2974,14 @@ func fixCfgMirrorWithDedupWindow(cfg *StreamConfig) {
cfg.Duplicates = 0
}
}
func (s *Server) handleWritePermissionError() {
//TODO Check if we should add s.jetStreamOOSPending in condition
if s.JetStreamEnabled() {
s.Errorf("File system permission denied while writing, disabling JetStream")
go s.DisableJetStream()
//TODO Send respective advisory if needed, same as in handleOutOfSpace
}
}
+22 -12
View File
@@ -836,7 +836,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
limit := atomic.LoadInt64(&js.queueLimit)
if pending >= int(limit) {
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
s.jsAPIRoutedReqs.drain()
drained := int64(s.jsAPIRoutedReqs.drain())
atomic.AddInt64(&js.apiInflight, -drained)
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
TypedEvent: TypedEvent{
@@ -846,7 +847,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
},
Server: s.Name(),
Domain: js.config.Domain,
Dropped: int64(pending),
Dropped: drained,
})
}
}
@@ -864,8 +865,10 @@ func (s *Server) processJSAPIRoutedRequests() {
for {
select {
case <-queue.ch:
reqs := queue.pop()
for _, r := range reqs {
// Only pop one item at a time here, otherwise if the system is recovering
// from queue buildup, then one worker will pull off all the tasks and the
// others will be starved of work.
for r, ok := queue.popOne(); ok && r != nil; r, ok = queue.popOne() {
client.pa = r.pa
start := time.Now()
r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg)
@@ -874,7 +877,6 @@ func (s *Server) processJSAPIRoutedRequests() {
}
atomic.AddInt64(&js.apiInflight, -1)
}
queue.recycle(&reqs)
case <-s.quitCh:
return
}
@@ -3416,7 +3418,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
Time: start,
},
Stream: streamName,
Client: ci,
Client: ci.forAdvisory(),
Domain: domain,
})
@@ -3548,7 +3550,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
Start: start,
End: end,
Bytes: int64(total),
Client: ci,
Client: ci.forAdvisory(),
Domain: domain,
})
@@ -3681,7 +3683,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Accoun
},
Stream: mset.name(),
State: sr.State,
Client: ci,
Client: ci.forAdvisory(),
Domain: s.getOpts().JetStreamDomain,
})
@@ -3699,7 +3701,7 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Accoun
Stream: mset.name(),
Start: start,
End: end,
Client: ci,
Client: ci.forAdvisory(),
Domain: s.getOpts().JetStreamDomain,
})
@@ -4263,9 +4265,17 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
return
}
js.mu.RLock()
meta := cc.meta
js.mu.RUnlock()
// Since these could wait on the Raft group lock, don't do so under the JS lock.
ourID := meta.ID()
groupLeader := meta.GroupLeader()
groupCreated := meta.Created()
js.mu.RLock()
isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
ourID := cc.meta.ID()
var rg *raftGroup
var offline, isMember bool
if ca != nil {
@@ -4279,7 +4289,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
// Also capture if we think there is no meta leader.
var isLeaderLess bool
if !isLeader {
isLeaderLess = cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault
isLeaderLess = groupLeader == _EMPTY_ && time.Since(groupCreated) > lostQuorumIntervalDefault
}
js.mu.RUnlock()
@@ -4489,7 +4499,7 @@ func (s *Server) sendJetStreamAPIAuditAdvisory(ci *ClientInfo, acc *Account, sub
Time: time.Now().UTC(),
},
Server: s.Name(),
Client: ci,
Client: ci.forAdvisory(),
Subject: subject,
Request: request,
Response: response,
+88 -181
View File
@@ -267,7 +267,12 @@ func (s *Server) JetStreamSnapshotMeta() error {
return errNotLeader
}
return meta.InstallSnapshot(js.metaSnapshot())
snap, err := js.metaSnapshot()
if err != nil {
return err
}
return meta.InstallSnapshot(snap)
}
func (s *Server) JetStreamStepdownStream(account, stream string) error {
@@ -437,73 +442,6 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
return false
}
// Restart the stream in question.
// Should only be called when the stream is known to be in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
s, cc := js.srv, js.cluster
if cc == nil {
js.mu.Unlock()
return
}
// Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy.
asa := cc.streams[acc.Name]
if asa == nil {
js.mu.Unlock()
return
}
sa := asa[csa.Config.Name]
if sa == nil {
js.mu.Unlock()
return
}
// Make sure to clear out the raft node if still present in the meta layer.
if rg := sa.Group; rg != nil && rg.node != nil {
if rg.node.State() != Closed {
rg.node.Stop()
}
rg.node = nil
}
sinceCreation := time.Since(sa.Created)
js.mu.Unlock()
// Process stream assignment to recreate.
// Check that we have given system enough time to start us up.
// This will be longer than obvious, and matches consumer logic in case system very busy.
if sinceCreation < 10*time.Second {
s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v",
acc, csa.Config.Name, sinceCreation)
return
}
js.processStreamAssignment(sa)
// If we had consumers assigned to this server they will be present in the copy, csa.
// They also need to be processed. The csa consumers is a copy of only our consumers,
// those assigned to us, but the consumer assignment's there are direct from the meta
// layer to make this part much easier and avoid excessive lookups.
for _, cca := range csa.consumers {
if cca.deleted {
continue
}
// Need to look up original as well here to make sure node is nil.
js.mu.Lock()
ca := sa.consumers[cca.Name]
if ca != nil && ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
// Make sure node is wiped.
ca.Group.node = nil
}
js.mu.Unlock()
if ca != nil {
js.processConsumerAssignment(ca)
}
}
}
// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
@@ -529,7 +467,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
// First lookup stream and make sure its there.
mset, err := acc.lookupStream(streamName)
if err != nil {
js.restartStream(acc, sa)
return false
}
@@ -554,8 +491,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node.State() == Closed {
js.restartStream(acc, sa)
}
}
return false
@@ -585,37 +520,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
node := ca.Group.node
js.mu.RUnlock()
// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
js.mu.Lock()
deleted := ca.deleted
// Check that we have not just been created.
if !deleted && time.Since(ca.Created) < 10*time.Second {
s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v",
accName, streamName, consumer, time.Since(ca.Created))
js.mu.Unlock()
return
}
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
}
}
// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}
@@ -630,11 +537,12 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
restartConsumer()
} else if node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()
// When we try to restart we nil out the node and reprocess the consumer assignment.
js.mu.Lock()
ca.Group.node = nil
js.mu.Unlock()
js.processConsumerAssignment(ca)
}
}
return false
@@ -901,15 +809,17 @@ func (js *jetStream) server() *Server {
// Will respond if we do not think we have a metacontroller leader.
func (js *jetStream) isLeaderless() bool {
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
if cc == nil || cc.meta == nil {
js.mu.RUnlock()
return false
}
meta := cc.meta
js.mu.RUnlock()
// If we don't have a leader.
// Make sure we have been running for enough time.
if cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault {
if meta.GroupLeader() == _EMPTY_ && time.Since(meta.Created()) > lostQuorumIntervalDefault {
return true
}
return false
@@ -921,34 +831,38 @@ func (js *jetStream) isGroupLeaderless(rg *raftGroup) bool {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()
cc := js.cluster
started := js.started
// If we are not a member we can not say..
if cc.meta == nil {
js.mu.RUnlock()
return false
}
if !rg.isMember(cc.meta.ID()) {
js.mu.RUnlock()
return false
}
// Single peer groups always have a leader if we are here.
if rg.node == nil {
js.mu.RUnlock()
return false
}
node := rg.node
js.mu.RUnlock()
// If we don't have a leader.
if rg.node.GroupLeader() == _EMPTY_ {
if node.GroupLeader() == _EMPTY_ {
// Threshold for jetstream startup.
const startupThreshold = 10 * time.Second
if rg.node.HadPreviousLeader() {
if node.HadPreviousLeader() {
// Make sure we have been running long enough to intelligently determine this.
if time.Since(js.started) > startupThreshold {
if time.Since(started) > startupThreshold {
return true
}
}
// Make sure we have been running for enough time.
if time.Since(rg.node.Created()) > lostQuorumIntervalDefault {
if time.Since(node.Created()) > lostQuorumIntervalDefault {
return true
}
}
@@ -1334,7 +1248,10 @@ func (js *jetStream) monitorCluster() {
}
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
snap, err := js.metaSnapshot()
if err != nil {
s.Warnf("Error generating JetStream cluster snapshot: %v", err)
} else if err = n.InstallSnapshot(snap); err == nil {
lastSnapTime = time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
@@ -1528,7 +1445,7 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf
return StreamConfig{}, false
}
func (js *jetStream) metaSnapshot() []byte {
func (js *jetStream) metaSnapshot() ([]byte, error) {
start := time.Now()
js.mu.RLock()
s := js.srv
@@ -1568,16 +1485,22 @@ func (js *jetStream) metaSnapshot() []byte {
if len(streams) == 0 {
js.mu.RUnlock()
return nil
return nil, nil
}
// Track how long it took to marshal the JSON
mstart := time.Now()
b, _ := json.Marshal(streams)
b, err := json.Marshal(streams)
mend := time.Since(mstart)
js.mu.RUnlock()
// Must not be possible for a JSON marshaling error to result
// in an empty snapshot.
if err != nil {
return nil, err
}
// Track how long it took to compress the JSON
cstart := time.Now()
snap := s2.Encode(nil, b)
@@ -1587,7 +1510,7 @@ func (js *jetStream) metaSnapshot() []byte {
s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)",
took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap))
}
return snap
return snap, nil
}
func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error {
@@ -2411,7 +2334,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
compactInterval = 2 * time.Minute
compactSizeMin = 8 * 1024 * 1024
compactNumMin = 65536
minSnapDelta = 10 * time.Second
)
// Spread these out for large numbers on server restart.
@@ -2435,16 +2357,15 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// a complete and detailed state which could be costly in terms of memory, cpu and GC.
// This only entails how many messages, and the first and last sequence of the stream.
// This is all that is needed to detect a change, and we can get this from FilteredState()
// with and empty filter.
// with an empty filter.
var lastState SimpleState
var lastSnapTime time.Time
// Don't allow the upper layer to install snapshots until we have
// fully recovered from disk.
isRecovering := true
doSnapshot := func() {
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
if mset == nil || isRecovering || isRestore {
return
}
@@ -2462,7 +2383,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
}
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
lastState = curState
} else if err != errNoSnapAvailable && err != errNodeClosed && err != errCatchupsRunning {
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
@@ -2541,10 +2462,14 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
for {
select {
case <-s.quitCh:
// Server shutting down, but we might receive this before qch, so try to snapshot.
doSnapshot()
return
case <-mqch:
return
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot.
doSnapshot()
return
case <-aq.ch:
var ne, nb uint64
@@ -2603,12 +2528,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Check about snapshotting
// If we have at least min entries to compact, go ahead and try to snapshot/compact.
if ne >= compactNumMin || nb > compactSizeMin || mset.getCLFS() > pclfs {
// We want to make sure we do not short circuit if transistioning from no clfs.
if pclfs == 0 {
// This is always false by default.
lastState.firstNeedsUpdate = true
lastSnapTime = time.Time{}
}
doSnapshot()
}
@@ -2711,8 +2630,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// keep stream assignment current
sa = mset.streamAssignment()
// keep peer list up to date with config
js.checkPeers(mset.raftGroup())
// We get this when we have a new stream assignment caused by an update.
// We want to know if we are migrating.
if migrating := mset.isMigrating(); migrating {
@@ -2800,7 +2717,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// Check if we have a quorom.
if current >= neededCurrent {
s.Noticef("Transfer of stream leader for '%s > %s' to '%s'", accName, sa.Config.Name, newLeader)
n.UpdateKnownPeers(newPeers)
n.ProposeKnownPeers(newPeers)
n.StepDown(newLeaderPeer)
}
}
@@ -3090,8 +3007,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
if subject == _EMPTY_ && ts == 0 && len(msg) == 0 && len(hdr) == 0 {
// Skip and update our lseq.
last := mset.store.SkipMsg()
mset.mu.Lock()
mset.setLastSeq(last)
mset.clearAllPreAcks(last)
mset.mu.Unlock()
continue
}
@@ -3318,22 +3237,6 @@ func (s *Server) replicas(node RaftNode) []*PeerInfo {
return replicas
}
// Will check our node peers and see if we should remove a peer.
func (js *jetStream) checkPeers(rg *raftGroup) {
js.mu.Lock()
defer js.mu.Unlock()
// FIXME(dlc) - Single replicas?
if rg == nil || rg.node == nil {
return
}
for _, peer := range rg.node.Peers() {
if !rg.isMember(peer.ID) {
rg.node.ProposeRemovePeer(peer.ID)
}
}
}
// Process a leader change for the clustered stream.
func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
if mset == nil {
@@ -3362,8 +3265,6 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
if isLeader {
s.Noticef("JetStream cluster new stream leader for '%s > %s'", account, streamName)
s.sendStreamLeaderElectAdvisory(mset)
// Check for peer removal and process here if needed.
js.checkPeers(sa.Group)
mset.checkAllowMsgCompress(peers)
} else {
// We are stepping down.
@@ -3579,7 +3480,7 @@ func (js *jetStream) processStreamAssignment(sa *streamAssignment) bool {
js.processClusterCreateStream(acc, sa)
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.removeStream(ourID, mset, sa)
s.removeStream(mset, sa)
}
// If this stream assignment does not have a sync subject (bug) set that the meta-leader should check when elected.
@@ -3667,13 +3568,13 @@ func (js *jetStream) processUpdateStreamAssignment(sa *streamAssignment) {
js.processClusterUpdateStream(acc, osa, sa)
} else if mset, _ := acc.lookupStream(sa.Config.Name); mset != nil {
// We have one here even though we are not a member. This can happen on re-assignment.
s.removeStream(ourID, mset, sa)
s.removeStream(mset, sa)
}
}
// Common function to remove ourself from this server.
// Common function to remove ourselves from this server.
// This can happen on re-assignment, move, etc
func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment) {
func (s *Server) removeStream(mset *stream, nsa *streamAssignment) {
if mset == nil {
return
}
@@ -3683,7 +3584,6 @@ func (s *Server) removeStream(ourID string, mset *stream, nsa *streamAssignment)
if node.Leader() {
node.StepDown(nsa.Group.Preferred)
}
node.ProposeRemovePeer(ourID)
// shutdown monitor by shutting down raft.
node.Delete()
}
@@ -4472,10 +4372,11 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
} else {
// If we are clustered update the known peers.
js.mu.RLock()
if node := rg.node; node != nil {
node := rg.node
js.mu.RUnlock()
if node != nil {
node.UpdateKnownPeers(ca.Group.Peers)
}
js.mu.RUnlock()
}
// Check if we already have this consumer running.
@@ -4943,8 +4844,12 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
for {
select {
case <-s.quitCh:
// Server shutting down, but we might receive this before qch, so try to snapshot.
doSnapshot(false)
return
case <-qch:
// Clean signal from shutdown routine so do best effort attempt to snapshot.
doSnapshot(false)
return
case <-aq.ch:
ces := aq.pop()
@@ -5009,8 +4914,6 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
// We get this when we have a new consumer assignment caused by an update.
// We want to know if we are migrating.
rg := o.raftGroup()
// keep peer list up to date with config
js.checkPeers(rg)
// If we are migrating, monitor for the new peers to be caught up.
replicas, err := o.replica()
if err != nil {
@@ -5327,8 +5230,6 @@ func (js *jetStream) processConsumerLeaderChange(o *consumer, isLeader bool) err
if isLeader {
s.Noticef("JetStream cluster new consumer leader for '%s > %s > %s'", ca.Client.serviceAccount(), streamName, consumerName)
s.sendConsumerLeaderElectAdvisory(o)
// Check for peer removal and process here if needed.
js.checkPeers(ca.Group)
} else {
// We are stepping down.
// Make sure if we are doing so because we have lost quorum that we send the appropriate advisories.
@@ -7267,23 +7168,29 @@ func (s *Server) jsClusteredMsgDeleteRequest(ci *ClientInfo, acc *Account, mset
}
func encodeAddStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(assignStreamOp))
json.NewEncoder(&bb).Encode(sa)
json.NewEncoder(&bb).Encode(csa)
return bb.Bytes()
}
func encodeUpdateStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(updateStreamOp))
json.NewEncoder(&bb).Encode(sa)
json.NewEncoder(&bb).Encode(csa)
return bb.Bytes()
}
func encodeDeleteStreamAssignment(sa *streamAssignment) []byte {
csa := *sa
csa.Client = csa.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(removeStreamOp))
json.NewEncoder(&bb).Encode(sa)
json.NewEncoder(&bb).Encode(csa)
return bb.Bytes()
}
@@ -7671,16 +7578,20 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
func encodeAddConsumerAssignment(ca *consumerAssignment) []byte {
cca := *ca
cca.Client = cca.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(assignConsumerOp))
json.NewEncoder(&bb).Encode(ca)
json.NewEncoder(&bb).Encode(cca)
return bb.Bytes()
}
func encodeDeleteConsumerAssignment(ca *consumerAssignment) []byte {
cca := *ca
cca.Client = cca.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(removeConsumerOp))
json.NewEncoder(&bb).Encode(ca)
json.NewEncoder(&bb).Encode(cca)
return bb.Bytes()
}
@@ -7691,25 +7602,21 @@ func decodeConsumerAssignment(buf []byte) (*consumerAssignment, error) {
}
func encodeAddConsumerAssignmentCompressed(ca *consumerAssignment) []byte {
b, err := json.Marshal(ca)
if err != nil {
return nil
}
// TODO(dlc) - Streaming better approach here probably.
cca := *ca
cca.Client = cca.Client.forProposal()
var bb bytes.Buffer
bb.WriteByte(byte(assignCompressedConsumerOp))
bb.Write(s2.Encode(nil, b))
s2e := s2.NewWriter(&bb)
json.NewEncoder(s2e).Encode(cca)
s2e.Close()
return bb.Bytes()
}
func decodeConsumerAssignmentCompressed(buf []byte) (*consumerAssignment, error) {
var ca consumerAssignment
js, err := s2.Decode(nil, buf)
if err != nil {
return nil, err
}
err = json.Unmarshal(js, &ca)
return &ca, err
bb := bytes.NewBuffer(buf)
s2d := s2.NewReader(bb)
return &ca, json.NewDecoder(s2d).Decode(&ca)
}
var errBadStreamMsg = errors.New("jetstream cluster bad replicated stream msg")
@@ -8654,6 +8561,8 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
return 0, err
}
mset.mu.Lock()
defer mset.mu.Unlock()
// Update our lseq.
mset.setLastSeq(seq)
@@ -8661,11 +8570,9 @@ func (mset *stream) processCatchupMsg(msg []byte) (uint64, error) {
if len(hdr) > 0 {
if msgId := getMsgId(hdr); msgId != _EMPTY_ {
if !ddloaded {
mset.mu.Lock()
mset.rebuildDedupe()
mset.mu.Unlock()
}
mset.storeMsgId(&ddentry{msgId, seq, ts})
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
}
}
+12 -2
View File
@@ -18,13 +18,22 @@ import (
"time"
)
func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
// publishAdvisory sends the given advisory into the account. Returns true if
// it was sent, false if not (i.e. due to lack of interest or a marshal error).
func (s *Server) publishAdvisory(acc *Account, subject string, adv any) bool {
if acc == nil {
acc = s.SystemAccount()
if acc == nil {
return
return false
}
}
// If there is no one listening for this advisory then save ourselves the effort
// and don't bother encoding the JSON or sending it.
if sl := acc.sl; (sl != nil && !sl.HasInterest(subject)) && !s.hasGatewayInterest(acc.Name, subject) {
return false
}
ej, err := json.Marshal(adv)
if err == nil {
err = s.sendInternalAccountMsg(acc, subject, ej)
@@ -34,6 +43,7 @@ func (s *Server) publishAdvisory(acc *Account, subject string, adv any) {
} else {
s.Warnf("Advisory could not be serialized for account %q: %v", acc.Name, err)
}
return err == nil
}
// JSAPIAudit is an advisory about administrative actions taken on JetStream
+19 -13
View File
@@ -2247,8 +2247,16 @@ func (c *client) sendLeafNodeSubUpdate(key string, n int32) {
checkPerms = false
}
}
if checkPerms && !c.canSubscribe(key) {
return
if checkPerms {
var subject string
if sep := strings.IndexByte(key, ' '); sep != -1 {
subject = key[:sep]
} else {
subject = key
}
if !c.canSubscribe(subject) {
return
}
}
}
// If we are here we can send over to the other side.
@@ -2435,7 +2443,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
}
key := bytesToString(sub.sid)
osub := c.subs[key]
updateGWs := false
if osub == nil {
c.subs[key] = sub
// Now place into the account sl.
@@ -2446,7 +2453,6 @@ func (c *client) processLeafSub(argo []byte) (err error) {
c.sendErr("Invalid Subscription")
return nil
}
updateGWs = srv.gateway.enabled
} else if sub.queue != nil {
// For a queue we need to update the weight.
delta = sub.qw - atomic.LoadInt32(&osub.qw)
@@ -2469,7 +2475,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
if !spoke {
// If we are routing add to the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, delta)
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
}
}
@@ -2511,27 +2517,27 @@ func (c *client) processLeafUnsub(arg []byte) error {
return nil
}
updateGWs := false
spoke := c.isSpokeLeafNode()
// We store local subs by account and subject and optionally queue name.
// LS- will have the arg exactly as the key.
sub, ok := c.subs[string(arg)]
if !ok {
// If not found, don't try to update routes/gws/leaf nodes.
c.mu.Unlock()
return nil
}
delta := int32(1)
if ok && len(sub.queue) > 0 {
if len(sub.queue) > 0 {
delta = sub.qw
}
c.mu.Unlock()
if ok {
c.unsubscribe(acc, sub, true, true)
updateGWs = srv.gateway.enabled
}
c.unsubscribe(acc, sub, true, true)
if !spoke {
// If we are routing subtract from the route map for the associated account.
srv.updateRouteSubscriptionMap(acc, sub, -delta)
// Gateways
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, -delta)
}
}
+55 -39
View File
@@ -143,8 +143,8 @@ func (ms *memStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts int
return ErrMaxBytes
}
// If we are here we are at a subject maximum, need to determine if dropping last message gives us enough room.
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
sm, ok := ms.msgs[ss.First]
if !ok || memStoreMsgSize(sm.subj, sm.hdr, sm.msg) < memStoreMsgSize(subj, hdr, msg) {
@@ -430,8 +430,8 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
var totalSkipped uint64
// We will track start and end sequences as we go.
ms.fss.Match(stringToBytes(filter), func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
@@ -585,8 +585,8 @@ func (ms *memStore) SubjectsState(subject string) map[string]SimpleState {
fss := make(map[string]SimpleState)
ms.fss.Match(stringToBytes(subject), func(subj []byte, ss *SimpleState) {
subjs := string(subj)
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subjs, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subjs, ss)
}
oss := fss[subjs]
if oss.First == 0 { // New
@@ -675,8 +675,8 @@ func (ms *memStore) NumPendingMulti(sseq uint64, sl *Sublist, lastPerSubject boo
var totalSkipped uint64
// We will track start and end sequences as we go.
IntersectStree[SimpleState](ms.fss, sl, func(subj []byte, fss *SimpleState) {
if fss.firstNeedsUpdate {
ms.recalculateFirstForSubj(bytesToString(subj), fss.First, fss)
if fss.firstNeedsUpdate || fss.lastNeedsUpdate {
ms.recalculateForSubj(bytesToString(subj), fss)
}
if sseq <= fss.First {
update(fss)
@@ -793,8 +793,8 @@ func (ms *memStore) enforcePerSubjectLimit(subj string, ss *SimpleState) {
return
}
for nmsgs := ss.Msgs; nmsgs > uint64(ms.maxp); nmsgs = ss.Msgs {
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if !ms.removeMsg(ss.First, false) {
break
@@ -1009,8 +1009,9 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
delete(ms.msgs, seq)
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
}
}
if purged > ms.state.Msgs {
@@ -1098,8 +1099,9 @@ func (ms *memStore) Truncate(seq uint64) error {
if sm := ms.msgs[i]; sm != nil {
purged++
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, i)
ms.removeSeqPerSubject(sm.subj, i)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, i)
}
}
// Reset last.
@@ -1265,8 +1267,8 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
if !ok {
continue
}
if ss.firstNeedsUpdate {
ms.recalculateFirstForSubj(subj, ss.First, ss)
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
ms.recalculateForSubj(subj, ss)
}
if ss.First < fseq {
fseq = ss.First
@@ -1360,34 +1362,47 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
ss.Msgs--
// If we know we only have 1 msg left don't need to search for next first.
if ss.Msgs == 1 {
if seq == ss.Last {
ss.Last = ss.First
} else {
ss.First = ss.Last
}
ss.firstNeedsUpdate = false
} else {
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
}
// We can lazily calculate the first/last sequence when needed.
ss.firstNeedsUpdate = seq == ss.First || ss.firstNeedsUpdate
ss.lastNeedsUpdate = seq == ss.Last || ss.lastNeedsUpdate
}
// Will recalculate the first sequence for this subject in this block.
// Will recalculate the first and/or last sequence for this subject.
// Lock should be held.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
tseq := startSeq + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
if ss.Msgs == 1 {
ss.Last = tseq
func (ms *memStore) recalculateForSubj(subj string, ss *SimpleState) {
if ss.firstNeedsUpdate {
tseq := ss.First + 1
if tseq < ms.state.FirstSeq {
tseq = ms.state.FirstSeq
}
for ; tseq <= ss.Last; tseq++ {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.First = tseq
ss.firstNeedsUpdate = false
if ss.Msgs == 1 {
ss.Last = tseq
ss.lastNeedsUpdate = false
return
}
break
}
}
}
if ss.lastNeedsUpdate {
tseq := ss.Last - 1
if tseq > ms.state.LastSeq {
tseq = ms.state.LastSeq
}
for ; tseq >= ss.First; tseq-- {
if sm := ms.msgs[tseq]; sm != nil && sm.subj == subj {
ss.Last = tseq
ss.lastNeedsUpdate = false
if ss.Msgs == 1 {
ss.First = tseq
ss.firstNeedsUpdate = false
}
return
}
ss.firstNeedsUpdate = false
return
}
}
}
@@ -1403,7 +1418,6 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
ss = memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
delete(ms.msgs, seq)
if ms.state.Msgs > 0 {
ms.state.Msgs--
if ss > ms.state.Bytes {
@@ -1428,6 +1442,8 @@ func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
// Remove any per subject tracking.
ms.removeSeqPerSubject(sm.subj, seq)
// Must delete message after updating per-subject info, to be consistent with file store.
delete(ms.msgs, seq)
if ms.scb != nil {
// We do not want to hold any locks here.
+25 -13
View File
@@ -61,6 +61,7 @@ type RaftNode interface {
ID() string
Group() string
Peers() []*Peer
ProposeKnownPeers(knownPeers []string)
UpdateKnownPeers(knownPeers []string)
ProposeAddPeer(peer string) error
ProposeRemovePeer(peer string) error
@@ -1326,6 +1327,12 @@ func (n *raft) isCurrent(includeForwardProgress bool) bool {
return false
}
if n.paused && n.hcommit > n.commit {
// We're currently paused, waiting to be resumed to apply pending commits.
n.debug("Not current, waiting to resume applies commit=%d, hcommit=%d", n.commit, n.hcommit)
return false
}
if n.commit == n.applied {
// At this point if we are current, we can return saying so.
clearBehindState()
@@ -1556,14 +1563,12 @@ func (n *raft) ID() string {
if n == nil {
return _EMPTY_
}
n.RLock()
defer n.RUnlock()
// Lock not needed as n.id is never changed after creation.
return n.id
}
func (n *raft) Group() string {
n.RLock()
defer n.RUnlock()
// Lock not needed as n.group is never changed after creation.
return n.group
}
@@ -1588,19 +1593,23 @@ func (n *raft) Peers() []*Peer {
return peers
}
// Update and propose our known set of peers.
func (n *raft) ProposeKnownPeers(knownPeers []string) {
// If we are the leader update and send this update out.
if n.State() != Leader {
return
}
n.UpdateKnownPeers(knownPeers)
n.sendPeerState()
}
// Update our known set of peers.
func (n *raft) UpdateKnownPeers(knownPeers []string) {
n.Lock()
// Process like peer state update.
ps := &peerState{knownPeers, len(knownPeers), n.extSt}
n.processPeerState(ps)
isLeader := n.State() == Leader
n.Unlock()
// If we are the leader send this update out as well.
if isLeader {
n.sendPeerState()
}
}
// ApplyQ returns the apply queue that new commits will be sent to for the
@@ -1615,8 +1624,7 @@ func (n *raft) LeadChangeC() <-chan bool { return n.leadc }
func (n *raft) QuitC() <-chan struct{} { return n.quit }
func (n *raft) Created() time.Time {
n.RLock()
defer n.RUnlock()
// Lock not needed as n.created is never changed after creation.
return n.created
}
@@ -1840,7 +1848,7 @@ runner:
// just will remove them from the central monitoring map
queues := []interface {
unregister()
drain()
drain() int
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
for _, q := range queues {
q.drain()
@@ -3853,6 +3861,10 @@ func (n *raft) setWriteErrLocked(err error) {
n.error("Critical write error: %v", err)
n.werr = err
if isPermissionError(err) {
go n.s.handleWritePermissionError()
}
if isOutOfSpaceErr(err) {
// For now since this can be happening all under the covers, we will call up and disable JetStream.
go n.s.handleOutOfSpace(nil)
+9 -11
View File
@@ -1348,8 +1348,6 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) {
return nil
}
updateGWs := false
_keya := [128]byte{}
_key := _keya[:0]
@@ -1373,19 +1371,21 @@ func (c *client) processRemoteUnsub(arg []byte, leafUnsub bool) (err error) {
if ok {
delete(c.subs, key)
acc.sl.Remove(sub)
updateGWs = srv.gateway.enabled
if len(sub.queue) > 0 {
delta = sub.qw
}
}
c.mu.Unlock()
if updateGWs {
srv.gatewayUpdateSubInterest(accountName, sub, -delta)
}
// Update gateways and leaf nodes only if the subscription was found.
if ok {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(accountName, sub, -delta)
}
// Now check on leafnode updates.
acc.updateLeafNodes(sub, -delta)
// Now check on leafnode updates.
acc.updateLeafNodes(sub, -delta)
}
if c.opts.Verbose {
c.sendOK()
@@ -1600,7 +1600,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
// We use the sub.sid for the key of the c.subs map.
key := bytesToString(sub.sid)
osub := c.subs[key]
updateGWs := false
if osub == nil {
c.subs[key] = sub
// Now place into the account sl.
@@ -1611,7 +1610,6 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
c.sendErr("Invalid Subscription")
return nil
}
updateGWs = srv.gateway.enabled
} else if sub.queue != nil {
// For a queue we need to update the weight.
delta = sub.qw - atomic.LoadInt32(&osub.qw)
@@ -1620,7 +1618,7 @@ func (c *client) processRemoteSub(argo []byte, hasOrigin bool) (err error) {
}
c.mu.Unlock()
if updateGWs {
if srv.gateway.enabled {
srv.gatewayUpdateSubInterest(acc.Name, sub, delta)
}
+7
View File
@@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io"
"os"
"strings"
"time"
"unsafe"
@@ -166,6 +167,8 @@ type SimpleState struct {
// Internal usage for when the first needs to be updated before use.
firstNeedsUpdate bool
// Internal usage for when the last needs to be updated before use.
lastNeedsUpdate bool
}
// LostStreamData indicates msgs that have been lost.
@@ -778,3 +781,7 @@ func copyString(s string) string {
copy(b, s)
return bytesToString(b)
}
func isPermissionError(err error) bool {
return err != nil && os.IsPermission(err)
}
+32 -12
View File
@@ -1039,10 +1039,10 @@ func (mset *stream) lastSeq() uint64 {
return mset.lseq
}
// Set last seq.
// Write lock should be held.
func (mset *stream) setLastSeq(lseq uint64) {
mset.mu.Lock()
mset.lseq = lseq
mset.mu.Unlock()
}
func (mset *stream) sendCreateAdvisory() {
@@ -2051,11 +2051,16 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
store.FastState(&state)
fseq, lseq := state.FirstSeq, state.LastSeq
mset.mu.Lock()
// Check if our last has moved past what our original last sequence was, if so reset.
if lseq > mlseq {
mset.setLastSeq(lseq)
}
// Clear any pending acks below first seq.
mset.clearAllPreAcksBelowFloor(fseq)
mset.mu.Unlock()
// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
@@ -2102,7 +2107,14 @@ func (mset *stream) deleteMsg(seq uint64) (bool, error) {
if mset.closed.Load() {
return false, errStreamClosed
}
return mset.store.RemoveMsg(seq)
removed, err := mset.store.RemoveMsg(seq)
if err != nil {
return removed, err
}
mset.mu.Lock()
mset.clearAllPreAcks(seq)
mset.mu.Unlock()
return removed, err
}
// EraseMsg will securely remove a message and rewrite the data with random data.
@@ -2110,7 +2122,14 @@ func (mset *stream) eraseMsg(seq uint64) (bool, error) {
if mset.closed.Load() {
return false, errStreamClosed
}
return mset.store.EraseMsg(seq)
removed, err := mset.store.EraseMsg(seq)
if err != nil {
return removed, err
}
mset.mu.Lock()
mset.clearAllPreAcks(seq)
mset.mu.Unlock()
return removed, err
}
// Are we a mirror?
@@ -4000,15 +4019,8 @@ func (mset *stream) purgeMsgIds() {
}
}
// storeMsgId will store the message id for duplicate detection.
func (mset *stream) storeMsgId(dde *ddentry) {
mset.mu.Lock()
defer mset.mu.Unlock()
mset.storeMsgIdLocked(dde)
}
// storeMsgIdLocked will store the message id for duplicate detection.
// Lock should he held.
// Lock should be held.
func (mset *stream) storeMsgIdLocked(dde *ddentry) {
if mset.ddmap == nil {
mset.ddmap = make(map[string]*ddentry)
@@ -4628,6 +4640,14 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
if err != nil {
if isPermissionError(err) {
mset.mu.Unlock()
// messages in block cache could be lost in the worst case.
// In the clustered mode it is very highly unlikely as a result of replication.
mset.srv.DisableJetStream()
mset.srv.Warnf("Filesystem permission denied while writing msg, disabling JetStream: %v", err)
return err
}
// If we did not succeed put those values back and increment clfs in case we are clustered.
var state StreamState
mset.store.FastState(&state)
+2 -2
View File
@@ -984,7 +984,7 @@ github.com/munnerz/goautoneg
# github.com/nats-io/jwt/v2 v2.7.3
## explicit; go 1.22
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.24
# github.com/nats-io/nats-server/v2 v2.10.25
## explicit; go 1.22
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
@@ -2208,7 +2208,7 @@ golang.org/x/text/transform
golang.org/x/text/unicode/bidi
golang.org/x/text/unicode/norm
golang.org/x/text/width
# golang.org/x/time v0.8.0
# golang.org/x/time v0.9.0
## explicit; go 1.18
golang.org/x/time/rate
# golang.org/x/tools v0.28.0