Bump github.com/nats-io/nats-server/v2 from 2.10.1 to 2.10.2

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.1 to 2.10.2.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.10.1...v2.10.2)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-10-09 06:23:26 +00:00
committed by Ralf Haferkamp
parent 140f756584
commit caad4709f8
35 changed files with 1064 additions and 552 deletions
+2 -2
View File
@@ -57,7 +57,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.1.0
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.1
github.com/nats-io/nats-server/v2 v2.10.2
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.5
@@ -268,7 +268,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/nats-io/nats.go v1.29.0 // indirect
github.com/nats-io/nats.go v1.30.2 // indirect
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
+4 -4
View File
@@ -1717,10 +1717,10 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.1 h1:MIJ614dhOIdo71iSzY8ln78miXwrYvlvXHUyS+XdKZQ=
github.com/nats-io/nats-server/v2 v2.10.1/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nats.go v1.29.0 h1:dSXZ+SZeGyTdHVYeXimeq12FsIpb9dM8CJ2IZFiHcyE=
github.com/nats-io/nats.go v1.29.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
github.com/nats-io/nats-server/v2 v2.10.2 h1:2o/OOyc/dxeMCQtrF1V/9er0SU0A3LKhDlv/+rqreBM=
github.com/nats-io/nats-server/v2 v2.10.2/go.mod h1:lzrskZ/4gyMAh+/66cCd+q74c6v7muBypzfWhP/MAaM=
github.com/nats-io/nats.go v1.30.2 h1:aloM0TGpPorZKQhbAkdCzYDj+ZmsJDyeo3Gkbr72NuY=
github.com/nats-io/nats.go v1.30.2/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
+28 -24
View File
@@ -30,6 +30,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/nats-io/jwt/v2"
@@ -72,6 +73,7 @@ type Account struct {
lqws map[string]int32
usersRevoked map[string]int64
mappings []*mapping
hasMapped atomic.Bool
lmu sync.RWMutex
lleafs []*client
leafClusters map[string]uint64
@@ -291,6 +293,8 @@ func (a *Account) shallowCopy(na *Account) {
if len(na.mappings) > 0 && na.prand == nil {
na.prand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
na.hasMapped.Store(len(na.mappings) > 0)
// JetStream
na.jsLimits = a.jsLimits
// Server config account limits.
@@ -703,6 +707,7 @@ func (a *Account) AddWeightedMappings(src string, dests ...*MapDest) error {
}
// If we did not replace add to the end.
a.mappings = append(a.mappings, m)
a.hasMapped.Store(len(a.mappings) > 0)
// If we have connected leafnodes make sure to update.
if a.nleafs > 0 {
@@ -729,6 +734,7 @@ func (a *Account) RemoveMapping(src string) bool {
a.mappings[i] = a.mappings[len(a.mappings)-1]
a.mappings[len(a.mappings)-1] = nil // gc
a.mappings = a.mappings[:len(a.mappings)-1]
a.hasMapped.Store(len(a.mappings) > 0)
return true
}
}
@@ -740,28 +746,17 @@ func (a *Account) hasMappings() bool {
if a == nil {
return false
}
a.mu.RLock()
hm := a.hasMappingsLocked()
a.mu.RUnlock()
return hm
}
// Indicates we have mapping entries.
// The account has been verified to be non-nil.
// Read or Write lock held on entry.
func (a *Account) hasMappingsLocked() bool {
return len(a.mappings) > 0
return a.hasMapped.Load()
}
// This performs the logic to map to a new dest subject based on mappings.
// Should only be called from processInboundClientMsg or service import processing.
func (a *Account) selectMappedSubject(dest string) (string, bool) {
a.mu.RLock()
if len(a.mappings) == 0 {
a.mu.RUnlock()
if !a.hasMappings() {
return dest, false
}
a.mu.RLock()
// In case we have to tokenize for subset matching.
tsa := [32]string{}
tts := tsa[:0]
@@ -1707,29 +1702,38 @@ func (a *Account) addReverseRespMapEntry(acc *Account, reply, from string) {
// This will be called from checkForReverseEntry when the reply arg is a wildcard subject.
// This will usually be called in a go routine since we need to walk all the entries.
func (a *Account) checkForReverseEntries(reply string, checkInterest, recursed bool) {
if subjectIsLiteral(reply) {
a._checkForReverseEntry(reply, nil, checkInterest, recursed)
return
}
a.mu.RLock()
if len(a.imports.rrMap) == 0 {
a.mu.RUnlock()
return
}
if subjectIsLiteral(reply) {
a.mu.RUnlock()
a._checkForReverseEntry(reply, nil, checkInterest, recursed)
return
}
var _rs [64]string
rs := _rs[:0]
if n := len(a.imports.rrMap); n > cap(rs) {
rs = make([]string, 0, n)
}
for k := range a.imports.rrMap {
if subjectIsSubsetMatch(k, reply) {
rs = append(rs, k)
}
rs = append(rs, k)
}
a.mu.RUnlock()
tsa := [32]string{}
tts := tokenizeSubjectIntoSlice(tsa[:0], reply)
rsa := [32]string{}
for _, r := range rs {
a._checkForReverseEntry(r, nil, checkInterest, recursed)
rts := tokenizeSubjectIntoSlice(rsa[:0], r)
// isSubsetMatchTokenized is heavy so make sure we do this without the lock.
if isSubsetMatchTokenized(rts, tts) {
a._checkForReverseEntry(r, nil, checkInterest, recursed)
}
}
}
+1 -1
View File
@@ -983,7 +983,7 @@ func (s *Server) processClientOrLeafAuthentication(c *client, opts *Options) (au
acc.mu.RLock()
c.Debugf("Authenticated JWT: %s %q (claim-name: %q, claim-tags: %q) "+
"signed with %q by Account %q (claim-name: %q, claim-tags: %q) signed with %q has mappings %t accused %p",
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappingsLocked(), acc)
c.kindString(), juc.Subject, juc.Name, juc.Tags, juc.Issuer, issuer, acc.nameTag, acc.tags, acc.Issuer, acc.hasMappings(), acc)
acc.mu.RUnlock()
return true
}
+31 -9
View File
@@ -3014,6 +3014,10 @@ func queueMatches(queue string, qsubs [][]*subscription) bool {
// Low level unsubscribe for a given client.
func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool) {
if s := c.srv; s != nil && s.isShuttingDown() {
return
}
c.mu.Lock()
if !force && sub.max > 0 && sub.nm < sub.max {
c.Debugf(
@@ -3067,7 +3071,8 @@ func (c *client) unsubscribe(acc *Account, sub *subscription, force, remove bool
}
// Now check to see if this was part of a respMap entry for service imports.
if acc != nil {
// We can skip subscriptions on reserved replies.
if acc != nil && !isReservedReply(sub.subject) {
acc.checkForReverseEntry(string(sub.subject), nil, true)
}
}
@@ -3735,7 +3740,7 @@ func (c *client) processInboundMsg(msg []byte) {
}
}
// selectMappedSubject will chose the mapped subject based on the client's inbound subject.
// selectMappedSubject will choose the mapped subject based on the client's inbound subject.
func (c *client) selectMappedSubject() bool {
nsubj, changed := c.acc.selectMappedSubject(string(c.pa.subject))
if changed {
@@ -4558,6 +4563,12 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
continue
}
// If we are a spoke leaf node make sure to not forward across routes.
// This mimics same behavior for normal subs above.
if c.kind == LEAF && c.isSpokeLeafNode() && sub.client.kind == ROUTER {
continue
}
// We have taken care of preferring local subs for a message from a route above.
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
@@ -5071,6 +5082,23 @@ func (c *client) closeConnection(reason ClosedState) {
c.out.stc = nil
}
// If we have remote latency tracking running shut that down.
if c.rrTracking != nil {
c.rrTracking.ptmr.Stop()
c.rrTracking = nil
}
// If we are shutting down, no need to do all the accounting on subs, etc.
if reason == ServerShutdown {
s := c.srv
c.mu.Unlock()
if s != nil {
// Unregister
s.removeClient(c)
}
return
}
var (
kind = c.kind
srv = c.srv
@@ -5095,12 +5123,6 @@ func (c *client) closeConnection(reason ClosedState) {
spoke = c.isSpokeLeafNode()
}
// If we have remote latency tracking running shut that down.
if c.rrTracking != nil {
c.rrTracking.ptmr.Stop()
c.rrTracking = nil
}
c.mu.Unlock()
// Remove client's or leaf node or jetstream subscriptions.
@@ -5219,7 +5241,7 @@ func (c *client) reconnect() {
// It is possible that the server is being shutdown.
// If so, don't try to reconnect
if !srv.running {
if !srv.isRunning() {
return
}
+1 -1
View File
@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.10.1"
VERSION = "2.10.2"
// PROTO is the currently supported protocol.
// 0 was the original
+31 -7
View File
@@ -1255,15 +1255,32 @@ func (o *consumer) setLeader(isLeader bool) {
// Snapshot initial info.
o.infoWithSnap(true)
// These are the labels we will use to annotate our goroutines.
labels := pprofLabels{
"type": "consumer",
"account": mset.accName(),
"stream": mset.name(),
"consumer": o.name,
}
// Now start up Go routine to deliver msgs.
go o.loopAndGatherMsgs(qch)
go func() {
setGoRoutineLabels(labels)
o.loopAndGatherMsgs(qch)
}()
// Now start up Go routine to process acks.
go o.processInboundAcks(qch)
go func() {
setGoRoutineLabels(labels)
o.processInboundAcks(qch)
}()
if pullMode {
// Now start up Go routine to process inbound next message requests.
go o.processInboundNextMsgReqs(qch)
go func() {
setGoRoutineLabels(labels)
o.processInboundNextMsgReqs(qch)
}()
}
// If we are R>1 spin up our proposal loop.
@@ -1272,7 +1289,10 @@ func (o *consumer) setLeader(isLeader bool) {
// They must be on server versions >= 2.7.1
o.checkAndSetPendingRequestsOk()
o.checkPendingRequests()
go o.loopAndForwardProposals(qch)
go func() {
setGoRoutineLabels(labels)
o.loopAndForwardProposals(qch)
}()
}
} else {
@@ -1536,7 +1556,7 @@ func (o *consumer) deleteNotActive() {
}
}
s, js := o.mset.srv, o.mset.srv.js
s, js := o.mset.srv, o.srv.js.Load()
acc, stream, name, isDirect := o.acc.Name, o.stream, o.name, o.cfg.Direct
o.mu.Unlock()
@@ -1564,7 +1584,7 @@ func (o *consumer) deleteNotActive() {
// Don't think this needs to be a monitored go routine.
go func() {
const (
startInterval = 5 * time.Second
startInterval = 30 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
@@ -1573,6 +1593,10 @@ func (o *consumer) deleteNotActive() {
defer ticker.Stop()
for range ticker.C {
js.mu.RLock()
if js.shuttingDown {
js.mu.RUnlock()
return
}
nca := js.consumerAssignment(acc, stream, name)
js.mu.RUnlock()
// Make sure this is not a new consumer with the same name.
@@ -2473,7 +2497,7 @@ func (o *consumer) infoWithSnap(snap bool) *ConsumerInfo {
func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
o.mu.Lock()
mset := o.mset
if mset == nil || mset.srv == nil {
if o.closed || mset == nil || mset.srv == nil {
o.mu.Unlock()
return nil
}
+5 -5
View File
@@ -715,7 +715,7 @@ func (s *Server) eventsRunning() bool {
return false
}
s.mu.RLock()
er := s.running && s.eventsEnabled()
er := s.isRunning() && s.eventsEnabled()
s.mu.RUnlock()
return er
}
@@ -739,7 +739,7 @@ func (s *Server) eventsEnabled() bool {
func (s *Server) TrackedRemoteServers() int {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
return -1
}
return len(s.sys.servers)
@@ -875,7 +875,7 @@ func (s *Server) sendStatsz(subj string) {
m.Stats.ActiveServers = len(s.sys.servers) + 1
// JetStream
if js := s.js; js != nil {
if js := s.js.Load(); js != nil {
jStat := &JetStreamVarz{}
s.mu.RUnlock()
js.mu.RLock()
@@ -1484,7 +1484,7 @@ func (s *Server) remoteServerUpdate(sub *subscription, c *client, _ *Account, su
// Should do normal updates before bailing if wrong domain.
s.mu.Lock()
if s.running && s.eventsEnabled() && ssm.Server.ID != s.info.ID {
if s.isRunning() && s.eventsEnabled() && ssm.Server.ID != s.info.ID {
s.updateRemoteServer(&si)
}
s.mu.Unlock()
@@ -1943,7 +1943,7 @@ func (s *Server) remoteConnsUpdate(sub *subscription, c *client, _ *Account, sub
s.mu.Lock()
// check again here if we have been shutdown.
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
s.mu.Unlock()
return
}
+101 -72
View File
@@ -60,6 +60,9 @@ type FileStoreConfig struct {
Cipher StoreCipher
// Compression is the algorithm to use when compressing.
Compression StoreCompression
// Internal reference to our server.
srv *Server
}
// FileStreamInfo allows us to remember created time.
@@ -387,6 +390,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
qch: make(chan struct{}),
fch: make(chan struct{}, 1),
fsld: make(chan struct{}),
srv: fcfg.srv,
}
// Set flush in place to AsyncFlush which by default is false.
@@ -527,12 +531,6 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
return fs, nil
}
func (fs *fileStore) registerServer(s *Server) {
fs.mu.Lock()
defer fs.mu.Unlock()
fs.srv = s
}
// Lock all existing message blocks.
// Lock held on entry.
func (fs *fileStore) lockAllMsgBlocks() {
@@ -1436,6 +1434,16 @@ func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, []uint64, error) {
return nil, tombstones, nil
}
// For doing warn logging.
// Lock should be held.
func (fs *fileStore) warn(format string, args ...any) {
// No-op if no server configured.
if fs.srv == nil {
return
}
fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}
// recoverFullState will attempt to receover our last full state and re-process any state changes
// that happened afterwards.
func (fs *fileStore) recoverFullState() (rerr error) {
@@ -1455,12 +1463,16 @@ func (fs *fileStore) recoverFullState() (rerr error) {
dios <- struct{}{}
if err != nil {
if !os.IsNotExist(err) {
fs.warn("Could not read stream state file: %v", err)
}
return err
}
const minLen = 32
if len(buf) < minLen {
os.Remove(fn)
fs.warn("Stream state too short (%d bytes)", len(buf))
return errCorruptState
}
@@ -1471,6 +1483,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
fs.hh.Write(buf)
if !bytes.Equal(h, fs.hh.Sum(nil)) {
os.Remove(fn)
fs.warn("Stream state checksum did not match")
return errCorruptState
}
@@ -1482,6 +1495,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
ns := fs.aek.NonceSize()
buf, err = fs.aek.Open(nil, buf[:ns], buf[ns:], nil)
if err != nil {
fs.warn("Stream state error reading encryption key: %v", err)
return err
}
}
@@ -1489,6 +1503,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if buf[0] != fullStateMagic || buf[1] != fullStateVersion {
os.Remove(fn)
fs.warn("Stream state magic and version mismatch")
return errCorruptState
}
@@ -1543,6 +1558,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
if lsubj := int(readU64()); lsubj > 0 {
if bi+lsubj > len(buf) {
os.Remove(fn)
fs.warn("Stream state bad subject len (%d)", lsubj)
return errCorruptState
}
subj := fs.subjString(buf[bi : bi+lsubj])
@@ -1573,10 +1589,15 @@ func (fs *fileStore) recoverFullState() (rerr error) {
dmap, n, err := avl.Decode(buf[bi:])
if err != nil {
os.Remove(fn)
fs.warn("Stream state error decoding avl dmap: %v", err)
return errCorruptState
}
mb.dmap = *dmap
mb.msgs -= numDeleted
if mb.msgs > numDeleted {
mb.msgs -= numDeleted
} else {
mb.msgs = 0
}
bi += n
}
// Only add in if not empty or the lmb.
@@ -1601,6 +1622,7 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// Check if we had any errors.
if bi < 0 {
os.Remove(fn)
fs.warn("Stream state has no checksum present")
return errCorruptState
}
@@ -1610,20 +1632,22 @@ func (fs *fileStore) recoverFullState() (rerr error) {
// First let's check the happy path, open the blk file that was the lmb when we created the full state.
// See if we have the last block available.
var matched bool
var mb *msgBlock
if mb = fs.bim[blkIndex]; mb != nil {
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
mb := fs.lmb
if mb == nil || mb.index != blkIndex {
fs.warn("Stream state block does not exist or index mismatch")
return errCorruptState
}
if _, err := os.Stat(mb.mfn); err != nil && os.IsNotExist(err) {
// If our saved state is past what we see on disk, fallback and rebuild.
if ld, _, _ := mb.rebuildState(); ld != nil {
fs.addLostData(ld)
}
fs.warn("Stream state detected prior state, could not locate msg block %d", blkIndex)
return errPriorState
}
if matched = bytes.Equal(mb.lastChecksum(), lchk[:]); !matched {
// Remove the last message block since we will re-process below.
fs.removeMsgBlockFromList(mb)
}
// We may need to check other blocks. Even if we matched last checksum we will see if there is another block.
@@ -1640,12 +1664,14 @@ func (fs *fileStore) recoverFullState() (rerr error) {
return nil
}
os.Remove(fn)
fs.warn("Stream state could not recover msg block %d", bi)
return err
}
if nmb != nil {
// Check if we have to account for a partial message block.
if !matched && mb != nil && mb.index == nmb.index {
if err := fs.adjustAccounting(mb, nmb); err != nil {
fs.warn("Stream state could not adjust accounting")
return err
}
}
@@ -1677,8 +1703,13 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
}
nmb.ensurePerSubjectInfoLoaded()
lookupAndAdjust := func(seq uint64) error {
var smv StoreMsg
// Walk all the original mb's sequences that were included in the stream state.
var smv StoreMsg
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
sm, err := nmb.cacheLookup(seq, &smv)
if err != nil {
@@ -1690,29 +1721,10 @@ func (fs *fileStore) adjustAccounting(mb, nmb *msgBlock) error {
if len(sm.subj) > 0 && fs.psim != nil {
fs.removePerSubject(sm.subj)
}
return nil
}
// Walk all the original mb's sequences that were included in the stream state.
for seq := mb.first.seq; seq <= mb.last.seq; seq++ {
// If we had already declared it deleted we can move on since you can not undelete.
if mb.dmap.Exists(seq) {
continue
}
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}
// Now check to see if we had a higher first for the recovered state mb vs nmb.
if nmb.first.seq < mb.first.seq {
for seq := nmb.first.seq; seq < mb.first.seq; seq++ {
// Lookup the message.
if err := lookupAndAdjust(seq); err != nil {
return err
}
}
// Now set first for nmb.
nmb.first = mb.first
}
@@ -1837,7 +1849,10 @@ func (fs *fileStore) recoverMsgs() error {
}
}
for _, mb := range emptyBlks {
// Need the mb lock here.
mb.mu.Lock()
fs.removeMsgBlock(mb)
mb.mu.Unlock()
}
}
@@ -2031,6 +2046,12 @@ func (fs *fileStore) expireMsgsOnRecover() {
// Clear any global subject state.
fs.psim = make(map[string]*psi)
}
// If we purged anything, make sure we kick flush state loop.
if purged > 0 {
fs.dirty++
fs.kickFlushStateLoop()
}
}
func copyMsgBlocks(src []*msgBlock) []*msgBlock {
@@ -3086,7 +3107,9 @@ func (fs *fileStore) rebuildFirst() {
isEmpty := fmb.msgs == 0
fmb.mu.RUnlock()
if isEmpty {
fmb.mu.Lock()
fs.removeMsgBlock(fmb)
fmb.mu.Unlock()
}
fs.selectNextFirst()
fs.rebuildStateLocked(ld)
@@ -3190,6 +3213,7 @@ func (fs *fileStore) enforceMsgPerSubjectLimit() {
// We had an issue with a use case where psim (and hence fss) were correct but idx was not and was not properly being caught.
// So do a quick sanity check here. If we detect a skew do a rebuild then re-check.
if numMsgs != fs.state.Msgs {
fs.warn("Detected skew in subject-based total (%d) vs raw total (%d), rebuilding", numMsgs, fs.state.Msgs)
// Clear any global subject state.
fs.psim = make(map[string]*psi)
for _, mb := range fs.blks {
@@ -3293,7 +3317,9 @@ func (fs *fileStore) removePerSubject(subj string) {
// We do not update sense of fblk here but will do so when we resolve during lookup.
if info, ok := fs.psim[subj]; ok {
info.total--
if info.total == 0 {
if info.total == 1 {
info.fblk = info.lblk
} else if info.total == 0 {
delete(fs.psim, subj)
}
}
@@ -3463,10 +3489,6 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
}
}
} else if !isEmpty {
if mb.dmap.IsEmpty() {
// Mark initial base for delete set.
mb.dmap.SetInitialMin(mb.first.seq)
}
// Out of order delete.
mb.dmap.Insert(seq)
// Check if <25% utilization and minimum size met.
@@ -3475,6 +3497,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, viaLimits, needFSLock bool) (
rbytes := mb.rbytes - uint64(mb.dmap.Size()*emptyRecordLen)
if rbytes>>2 > mb.bytes {
mb.compact()
fs.kickFlushStateLoop()
}
}
}
@@ -3555,10 +3578,7 @@ func (mb *msgBlock) compact() {
var firstSet bool
isDeleted := func(seq uint64) bool {
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
return true
}
return mb.dmap.Exists(seq)
return seq == 0 || seq&ebit != 0 || seq < mb.first.seq || mb.dmap.Exists(seq)
}
for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
@@ -3601,6 +3621,12 @@ func (mb *msgBlock) compact() {
index += rl
}
// Handle compression
var err error
if nbuf, err = mb.cmp.Compress(nbuf); err != nil {
return
}
// Check for encryption.
if mb.bek != nil && len(nbuf) > 0 {
// Recreate to reset counter.
@@ -3615,7 +3641,7 @@ func (mb *msgBlock) compact() {
mb.closeFDsLocked()
// We will write to a new file and mv/rename it in case of failure.
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
mfn := filepath.Join(mb.fs.fcfg.StoreDir, msgDir, fmt.Sprintf(newScan, mb.index))
if err := os.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
os.Remove(mfn)
return
@@ -3625,8 +3651,8 @@ func (mb *msgBlock) compact() {
return
}
// Remove index file and wipe delete map, then rebuild.
mb.deleteDmap()
// Wipe dmap and rebuild here.
mb.dmap.Empty()
mb.rebuildStateLocked()
// If we entered with the msgs loaded make sure to reload them.
@@ -3635,11 +3661,6 @@ func (mb *msgBlock) compact() {
}
}
// Empty out our dmap.
func (mb *msgBlock) deleteDmap() {
mb.dmap.Empty()
}
// Grab info from a slot.
// Lock should be held.
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
@@ -4697,29 +4718,30 @@ func (fs *fileStore) syncBlocks() {
mb.mu.Unlock()
continue
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
// Check if we need to sync. We will not hold lock during actual sync.
var fn string
if mb.needSync {
// Flush anything that may be pending.
if mb.pendingWriteSizeLocked() > 0 {
mb.flushPendingMsgsLocked()
}
if mb.mfd != nil {
mb.mfd.Sync()
} else {
fd, err := os.OpenFile(mb.mfn, os.O_RDWR, defaultFilePerms)
if err != nil {
mb.mu.Unlock()
continue
}
fn = mb.mfn
mb.needSync = false
}
mb.mu.Unlock()
// Check if we need to sync.
// This is done not holding any locks.
if fn != _EMPTY_ {
if fd, _ := os.OpenFile(fn, os.O_RDWR, defaultFilePerms); fd != nil {
fd.Sync()
fd.Close()
}
mb.needSync = false
}
// See if we can close FDs due to being idle.
if mb.mfd != nil && mb.sinceLastWriteActivity() > closeFDsIdle {
mb.dirtyCloseWithRemove(false)
}
mb.mu.Unlock()
}
fs.mu.Lock()
@@ -6350,6 +6372,12 @@ func (fs *fileStore) reset() error {
fs.psim = make(map[string]*psi)
fs.bim = make(map[uint32]*msgBlock)
// If we purged anything, make sure we kick flush state loop.
if purged > 0 {
fs.dirty++
fs.kickFlushStateLoop()
}
fs.mu.Unlock()
if cb != nil {
@@ -6473,6 +6501,7 @@ func (fs *fileStore) removeMsgBlockFromList(mb *msgBlock) {
// Remove from list.
for i, omb := range fs.blks {
if mb == omb {
fs.dirty++
blks := append(fs.blks[:i], fs.blks[i+1:]...)
fs.blks = copyMsgBlocks(blks)
if fs.bim != nil {
+6 -6
View File
@@ -473,6 +473,10 @@ func (s *Server) startGateways() {
// This starts the gateway accept loop in a go routine, unless it
// is detected that the server has already been shutdown.
func (s *Server) startGatewayAcceptLoop() {
if s.isShuttingDown() {
return
}
// Snapshot server options.
opts := s.getOpts()
@@ -482,10 +486,6 @@ func (s *Server) startGatewayAcceptLoop() {
}
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.Gateway.Host, strconv.Itoa(port))
l, e := natsListen("tcp", hp)
s.gatewayListenerErr = e
@@ -1128,8 +1128,8 @@ func (c *client) processGatewayInfo(info *Info) {
// connect events to switch those accounts into interest only mode.
s.mu.Lock()
s.ensureGWsInterestOnlyForLeafNodes()
js := s.js
s.mu.Unlock()
js := s.js.Load()
// If running in some tests, maintain the original behavior.
if gwDoNotForceInterestOnlyMode && js != nil {
@@ -1575,7 +1575,7 @@ func (s *Server) addGatewayURL(urlStr string) bool {
// Returns true if the URL has been removed, false otherwise.
// Server lock held on entry
func (s *Server) removeGatewayURL(urlStr string) bool {
if s.shutdown {
if s.isShuttingDown() {
return false
}
s.gateway.Lock()
+25 -51
View File
@@ -117,9 +117,11 @@ type jetStream struct {
// Some bools regarding general state.
metaRecovering bool
standAlone bool
disabled bool
oos bool
shuttingDown bool
// Atomic versions
disabled atomic.Bool
}
type remoteUsage struct {
@@ -372,9 +374,7 @@ func (s *Server) enableJetStream(cfg JetStreamConfig) error {
}
s.gcbMu.Unlock()
s.mu.Lock()
s.js = js
s.mu.Unlock()
s.js.Store(js)
// FIXME(dlc) - Allow memory only operation?
if stat, err := os.Stat(cfg.StoreDir); os.IsNotExist(err) {
@@ -530,10 +530,7 @@ func (s *Server) setupJetStreamExports() {
}
func (s *Server) jetStreamOOSPending() (wasPending bool) {
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js != nil {
if js := s.getJetStream(); js != nil {
js.mu.Lock()
wasPending = js.oos
js.oos = true
@@ -543,13 +540,8 @@ func (s *Server) jetStreamOOSPending() (wasPending bool) {
}
func (s *Server) setJetStreamDisabled() {
s.mu.Lock()
js := s.js
s.mu.Unlock()
if js != nil {
js.mu.Lock()
js.disabled = true
js.mu.Unlock()
if js := s.getJetStream(); js != nil {
js.disabled.Store(true)
}
}
@@ -738,16 +730,15 @@ func (s *Server) configAllJetStreamAccounts() error {
// a non-default system account.
s.checkJetStreamExports()
// Snapshot into our own list. Might not be needed.
s.mu.Lock()
// Bail if server not enabled. If it was enabled and a reload turns it off
// that will be handled elsewhere.
js := s.js
js := s.getJetStream()
if js == nil {
s.mu.Unlock()
return nil
}
// Snapshot into our own list. Might not be needed.
s.mu.RLock()
if s.sys != nil {
// clustered stream removal will perform this cleanup as well
// this is mainly for initial cleanup
@@ -764,12 +755,12 @@ func (s *Server) configAllJetStreamAccounts() error {
}
var jsAccounts []*Account
s.accounts.Range(func(k, v interface{}) bool {
s.accounts.Range(func(k, v any) bool {
jsAccounts = append(jsAccounts, v.(*Account))
return true
})
accounts := &s.accounts
s.mu.Unlock()
s.mu.RUnlock()
// Process any jetstream enabled accounts here. These will be accounts we are
// already aware of at startup etc.
@@ -809,9 +800,7 @@ func (js *jetStream) isEnabled() bool {
if js == nil {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()
return !js.disabled
return !js.disabled.Load()
}
// Mark that we will be in standlone mode.
@@ -821,9 +810,9 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {
}
js.mu.Lock()
defer js.mu.Unlock()
js.standAlone = isStandAlone
if isStandAlone {
if js.standAlone = isStandAlone; js.standAlone {
// Update our server atomic.
js.srv.isMetaLeader.Store(true)
js.accountPurge, _ = js.srv.systemSubscribe(JSApiAccountPurge, _EMPTY_, false, nil, js.srv.jsLeaderAccountPurgeRequest)
} else if js.accountPurge != nil {
js.srv.sysUnsubscribe(js.accountPurge)
@@ -832,11 +821,7 @@ func (js *jetStream) setJetStreamStandAlone(isStandAlone bool) {
// JetStreamEnabled reports if jetstream is enabled for this server.
func (s *Server) JetStreamEnabled() bool {
var js *jetStream
s.mu.RLock()
js = s.js
s.mu.RUnlock()
return js.isEnabled()
return s.getJetStream().isEnabled()
}
// JetStreamEnabledForDomain will report if any servers have JetStream enabled within this domain.
@@ -909,10 +894,7 @@ func (js *jetStream) isShuttingDown() bool {
// Shutdown jetstream for this server.
func (s *Server) shutdownJetStream() {
s.mu.RLock()
js := s.js
s.mu.RUnlock()
js := s.getJetStream()
if js == nil {
return
}
@@ -951,9 +933,7 @@ func (s *Server) shutdownJetStream() {
a.removeJetStream()
}
s.mu.Lock()
s.js = nil
s.mu.Unlock()
s.js.Store(nil)
js.mu.Lock()
js.accounts = nil
@@ -994,23 +974,20 @@ func (s *Server) shutdownJetStream() {
// created a dynamic configuration. A copy is returned.
func (s *Server) JetStreamConfig() *JetStreamConfig {
var c *JetStreamConfig
s.mu.Lock()
if s.js != nil {
copy := s.js.config
if js := s.getJetStream(); js != nil {
copy := js.config
c = &(copy)
}
s.mu.Unlock()
return c
}
// StoreDir returns the current JetStream directory.
func (s *Server) StoreDir() string {
s.mu.Lock()
defer s.mu.Unlock()
if s.js == nil {
js := s.getJetStream()
if js == nil {
return _EMPTY_
}
return s.js.config.StoreDir
return js.config.StoreDir
}
// JetStreamNumAccounts returns the number of enabled accounts this server is tracking.
@@ -1036,10 +1013,7 @@ func (s *Server) JetStreamReservedResources() (int64, int64, error) {
}
func (s *Server) getJetStream() *jetStream {
s.mu.RLock()
js := s.js
s.mu.RUnlock()
return js
return s.js.Load()
}
func (a *Account) assignJetStreamLimits(limits map[string]JetStreamAccountLimits) {
+55 -32
View File
@@ -1731,14 +1731,13 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
var clusterWideConsCount int
js, cc := s.getJetStreamCluster()
if js == nil {
return
}
// If we are in clustered mode we need to be the stream leader to proceed.
if s.JetStreamIsClustered() {
if cc != nil {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
js.mu.RLock()
isLeader, sa := cc.isLeader(), js.streamAssignment(acc.Name, streamName)
var offline bool
@@ -1833,15 +1832,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}
mset, err := acc.lookupStream(streamName)
// Error is not to be expected at this point, but could happen if same stream trying to be created.
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
if cc != nil {
// This could be inflight, pause for a short bit and try again.
// This will not be inline, so ok.
time.Sleep(10 * time.Millisecond)
mset, err = acc.lookupStream(streamName)
}
// Check again.
if err != nil {
resp.Error = NewJSStreamNotFoundError(Unless(err))
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
}
config := mset.config()
js, _ := s.getJetStreamCluster()
resp.StreamInfo = &StreamInfo{
Created: mset.createdTime(),
State: mset.stateWithDetail(details),
@@ -2307,14 +2314,15 @@ func (s *Server) peerSetToNames(ps []string) []string {
// looks up the peer id for a given server name. Cluster and domain name are optional filter criteria
func (s *Server) nameToPeer(js *jetStream, serverName, clusterName, domainName string) string {
js.mu.RLock()
cc := js.cluster
defer js.mu.RUnlock()
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == serverName {
if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
return p.ID
if cc := js.cluster; cc != nil {
for _, p := range cc.meta.Peers() {
si, ok := s.nodeToInfo.Load(p.ID)
if ok && si.(nodeInfo).name == serverName {
if clusterName == _EMPTY_ || clusterName == si.(nodeInfo).cluster {
if domainName == _EMPTY_ || domainName == si.(nodeInfo).domain {
return p.ID
}
}
}
}
@@ -4156,9 +4164,20 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
js.mu.RLock()
isLeader, sa, ca := cc.isLeader(), js.streamAssignment(acc.Name, streamName), js.consumerAssignment(acc.Name, streamName, consumerName)
ourID := cc.meta.ID()
var offline bool
var rg *raftGroup
var offline, isMember bool
if ca != nil {
offline = s.allPeersOffline(ca.Group)
if rg = ca.Group; rg != nil {
offline = s.allPeersOffline(rg)
isMember = rg.isMember(ourID)
}
}
// Capture consumer leader here.
isConsumerLeader := cc.isConsumerLeader(acc.Name, streamName, consumerName)
// Also capture if we think there is no meta leader.
var isLeaderLess bool
if !isLeader {
isLeaderLess = cc.meta.GroupLeader() == _EMPTY_ && time.Since(cc.meta.Created()) > lostQuorumIntervalDefault
}
js.mu.RUnlock()
@@ -4181,7 +4200,7 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
} else if ca == nil {
if js.isLeaderless() {
if isLeaderLess {
resp.Error = NewJSClusterNotAvailError()
// Delaying an error response gives the leader a chance to respond before us
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), nil)
@@ -4194,38 +4213,36 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
}
// Check to see if we are a member of the group and if the group has no leader.
if js.isGroupLeaderless(ca.Group) {
if isMember && js.isGroupLeaderless(ca.Group) {
resp.Error = NewJSClusterNotAvailError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
// We have the consumer assigned and a leader, so only the consumer leader should answer.
if !acc.JetStreamIsConsumerLeader(streamName, consumerName) {
if js.isLeaderless() {
if !isConsumerLeader {
if isLeaderLess {
resp.Error = NewJSClusterNotAvailError()
// Delaying an error response gives the leader a chance to respond before us
s.sendDelayedAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp), ca.Group)
return
}
// We have a consumer assignment.
js.mu.RLock()
var node RaftNode
var leaderNotPartOfGroup bool
var isMember bool
rg := ca.Group
if rg != nil && rg.isMember(ourID) {
isMember = true
// We have a consumer assignment.
if isMember {
js.mu.RLock()
if rg.node != nil {
node = rg.node
if gl := node.GroupLeader(); gl != _EMPTY_ && !rg.isMember(gl) {
leaderNotPartOfGroup = true
}
}
js.mu.RUnlock()
}
js.mu.RUnlock()
// Check if we should ignore all together.
if node == nil {
// We have been assigned but have not created a node yet. If we are a member return
@@ -4279,7 +4296,13 @@ func (s *Server) jsConsumerInfoRequest(sub *subscription, c *client, _ *Account,
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
resp.ConsumerInfo = obs.info()
if resp.ConsumerInfo = obs.info(); resp.ConsumerInfo == nil {
// This consumer returned nil which means it's closed. Respond with not found.
resp.Error = NewJSConsumerNotFoundError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
s.sendAPIResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(resp))
}
+71 -63
View File
@@ -183,7 +183,7 @@ const (
func (s *Server) trackedJetStreamServers() (js, total int) {
s.mu.RLock()
defer s.mu.RUnlock()
if !s.running || !s.eventsEnabled() {
if !s.isRunning() || !s.eventsEnabled() {
return -1, -1
}
s.nodeToInfo.Range(func(k, v interface{}) bool {
@@ -198,11 +198,12 @@ func (s *Server) trackedJetStreamServers() (js, total int) {
}
func (s *Server) getJetStreamCluster() (*jetStream, *jetStreamCluster) {
s.mu.RLock()
shutdown, js := s.shutdown, s.js
s.mu.RUnlock()
if s.isShuttingDown() {
return nil, nil
}
if shutdown || js == nil {
js := s.getJetStream()
if js == nil {
return nil, nil
}
@@ -219,13 +220,7 @@ func (s *Server) JetStreamIsClustered() bool {
}
func (s *Server) JetStreamIsLeader() bool {
js := s.getJetStream()
if js == nil {
return false
}
js.mu.RLock()
defer js.mu.RUnlock()
return js.cluster.isLeader()
return s.isMetaLeader.Load()
}
func (s *Server) JetStreamIsCurrent() bool {
@@ -233,9 +228,20 @@ func (s *Server) JetStreamIsCurrent() bool {
if js == nil {
return false
}
// Grab what we need and release js lock.
js.mu.RLock()
defer js.mu.RUnlock()
return js.cluster.isCurrent()
var meta RaftNode
cc := js.cluster
if cc != nil {
meta = cc.meta
}
js.mu.RUnlock()
if cc == nil {
// Non-clustered mode
return true
}
return meta.Current()
}
func (s *Server) JetStreamSnapshotMeta() error {
@@ -381,19 +387,6 @@ func (cc *jetStreamCluster) isLeader() bool {
return cc.meta != nil && cc.meta.Leader()
}
// isCurrent will determine if this node is a leader or an up to date follower.
// Read lock should be held.
func (cc *jetStreamCluster) isCurrent() bool {
if cc == nil {
// Non-clustered mode
return true
}
if cc.meta == nil {
return false
}
return cc.meta.Current()
}
// isStreamCurrent will determine if the stream is up to date.
// For R1 it will make sure the stream is present on this server.
// Read lock should be held.
@@ -643,9 +636,8 @@ func (a *Account) getJetStreamFromAccount() (*Server, *jetStream, *jsAccount) {
if js == nil {
return nil, nil, nil
}
js.mu.RLock()
// Lock not needed, set on creation.
s := js.srv
js.mu.RUnlock()
return s, js, jsa
}
@@ -751,7 +743,7 @@ func (js *jetStream) setupMetaGroup() error {
storeDir := filepath.Join(js.config.StoreDir, sysAcc.Name, defaultStoreDirName, defaultMetaGroupName)
fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false},
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMetaFSBlkSize, AsyncFlush: false, srv: s},
StreamConfig{Name: defaultMetaGroupName, Storage: FileStorage},
time.Now().UTC(),
s.jsKeyGen(s.getOpts().JetStreamKey, defaultMetaGroupName),
@@ -762,9 +754,6 @@ func (js *jetStream) setupMetaGroup() error {
return err
}
// Register our server.
fs.registerServer(s)
cfg := &RaftConfig{Name: defaultMetaGroupName, Store: storeDir, Log: fs}
// If we are soliciting leafnode connections and we are sharing a system account and do not disable it with a hint,
@@ -859,10 +848,8 @@ func (js *jetStream) getMetaGroup() RaftNode {
}
func (js *jetStream) server() *Server {
js.mu.RLock()
s := js.srv
js.mu.RUnlock()
return s
// Lock not needed, only set once on creation.
return js.srv
}
// Will respond if we do not think we have a metacontroller leader.
@@ -1240,6 +1227,7 @@ func (js *jetStream) monitorCluster() {
// Make sure to stop the raft group on exit to prevent accidental memory bloat.
defer n.Stop()
defer s.isMetaLeader.Store(false)
const compactInterval = time.Minute
t := time.NewTicker(compactInterval)
@@ -1727,6 +1715,11 @@ func (js *jetStream) processAddPeer(peer string) {
}
func (js *jetStream) processRemovePeer(peer string) {
// We may be already disabled.
if js == nil || js.disabled.Load() {
return
}
js.mu.Lock()
s, cc := js.srv, js.cluster
if cc == nil || cc.meta == nil {
@@ -1736,14 +1729,8 @@ func (js *jetStream) processRemovePeer(peer string) {
isLeader := cc.isLeader()
// All nodes will check if this is them.
isUs := cc.meta.ID() == peer
disabled := js.disabled
js.mu.Unlock()
// We may be already disabled.
if disabled {
return
}
if isUs {
s.Errorf("JetStream being DISABLED, our server was removed from the cluster")
adv := &JSServerRemovedAdvisory{
@@ -2028,7 +2015,7 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
var store StreamStore
if storage == FileStorage {
fs, err := newFileStoreWithCreated(
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute, srv: s},
StreamConfig{Name: rg.Name, Storage: FileStorage},
time.Now().UTC(),
s.jsKeyGen(s.getOpts().JetStreamKey, rg.Name),
@@ -2038,8 +2025,6 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
s.Errorf("Error creating filestore WAL: %v", err)
return err
}
// Register our server.
fs.registerServer(s)
store = fs
} else {
ms, err := newMemStore(&StreamConfig{Name: rg.Name, Storage: MemoryStorage})
@@ -2229,7 +2214,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
if err := n.InstallSnapshot(mset.stateSnapshot()); err == nil {
lastState, lastSnapTime = curState, time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
s.RateLimitWarnf("Failed to install snapshot for '%s > %s' [%s]: %v", mset.acc.Name, mset.name(), n.Group(), err)
}
}
@@ -2423,6 +2408,10 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// We are not current, but current means exactly caught up. Under heavy publish
// loads we may never reach this, so check if we are within 90% caught up.
_, c, a := mset.node.Progress()
if c == 0 {
mset.mu.Unlock()
continue
}
if p := float64(a) / float64(c) * 100.0; p < syncThreshold {
mset.mu.Unlock()
continue
@@ -2716,6 +2705,11 @@ func (mset *stream) resetClusteredState(err error) bool {
if sa != nil {
js.mu.Lock()
if js.shuttingDown {
js.mu.Unlock()
return
}
s.Warnf("Resetting stream cluster state for '%s > %s'", sa.Client.serviceAccount(), sa.Config.Name)
// Now wipe groups from assignments.
sa.Group.node = nil
@@ -2929,6 +2923,10 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
// If we are the leader or recovering, meaning we own the snapshot,
// we should stepdown and clear our raft state since our snapshot is bad.
if isRecovering || mset.IsLeader() {
mset.mu.RLock()
s, accName, streamName := mset.srv, mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()
s.Warnf("Detected bad stream state, resetting '%s > %s'", accName, streamName)
mset.resetClusteredState(err)
}
}
@@ -3900,6 +3898,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
s, cc := js.srv, js.cluster
accName, stream, consumerName := ca.Client.serviceAccount(), ca.Stream, ca.Name
noMeta := cc == nil || cc.meta == nil
shuttingDown := js.shuttingDown
var ourID string
if !noMeta {
ourID = cc.meta.ID()
@@ -3910,7 +3909,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
}
js.mu.RUnlock()
if s == nil || noMeta {
if s == nil || noMeta || shuttingDown {
return
}
@@ -5275,21 +5274,31 @@ func (js *jetStream) stopUpdatesSub() {
}
func (js *jetStream) processLeaderChange(isLeader bool) {
if js == nil {
return
}
s := js.srv
if s == nil {
return
}
// Update our server atomic.
s.isMetaLeader.Store(isLeader)
if isLeader {
js.srv.Noticef("Self is new JetStream cluster metadata leader")
s.Noticef("Self is new JetStream cluster metadata leader")
} else {
var node string
if meta := js.getMetaGroup(); meta != nil {
node = meta.GroupLeader()
}
if node == _EMPTY_ {
js.srv.Noticef("JetStream cluster no metadata leader")
s.Noticef("JetStream cluster no metadata leader")
} else if srv := js.srv.serverNameForNode(node); srv == _EMPTY_ {
js.srv.Noticef("JetStream cluster new remote metadata leader")
s.Noticef("JetStream cluster new remote metadata leader")
} else if clst := js.srv.clusterNameForNode(node); clst == _EMPTY_ {
js.srv.Noticef("JetStream cluster new metadata leader: %s", srv)
s.Noticef("JetStream cluster new metadata leader: %s", srv)
} else {
js.srv.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst)
s.Noticef("JetStream cluster new metadata leader: %s/%s", srv, clst)
}
}
@@ -5310,7 +5319,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
for acc, asa := range cc.streams {
for _, sa := range asa {
if sa.Sync == _EMPTY_ {
js.srv.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name)
s.Warnf("Stream assigment corrupt for stream '%s > %s'", acc, sa.Config.Name)
nsa := &streamAssignment{Group: sa.Group, Config: sa.Config, Subject: sa.Subject, Reply: sa.Reply, Client: sa.Client}
nsa.Sync = syncSubjForStream()
cc.meta.Propose(encodeUpdateStreamAssignment(nsa))
@@ -7719,8 +7728,8 @@ func (mset *stream) isCurrent() bool {
return mset.node.Current() && !mset.catchup
}
// Maximum requests for the whole server that can be in flight.
const maxConcurrentSyncRequests = 8
// Maximum requests for the whole server that can be in flight at the same time.
const maxConcurrentSyncRequests = 16
var (
errCatchupCorruptSnapshot = errors.New("corrupt stream snapshot detected")
@@ -7897,11 +7906,11 @@ RETRY:
// Grab sync request again on failures.
if sreq == nil {
mset.mu.Lock()
mset.mu.RLock()
var state StreamState
mset.store.FastState(&state)
sreq = mset.calculateSyncRequest(&state, snap)
mset.mu.Unlock()
mset.mu.RUnlock()
if sreq == nil {
return nil
}
@@ -8134,19 +8143,18 @@ func (js *jetStream) clusterInfo(rg *raftGroup) *ClusterInfo {
s := js.srv
if rg == nil || rg.node == nil {
return &ClusterInfo{
Name: s.ClusterName(),
Name: s.cachedClusterName(),
Leader: s.Name(),
}
}
n := rg.node
n := rg.node
ci := &ClusterInfo{
Name: s.ClusterName(),
Name: s.cachedClusterName(),
Leader: s.serverNameForNode(n.GroupLeader()),
}
now := time.Now()
id, peers := n.ID(), n.Peers()
// If we are leaderless, do not suppress putting us in the peer list.
@@ -8267,7 +8275,7 @@ func (mset *stream) handleClusterStreamInfoRequest(_ *subscription, c *client, _
func (mset *stream) processClusterStreamInfoRequest(reply string) {
mset.mu.RLock()
sysc, js, sa, config := mset.sysc, mset.srv.js, mset.sa, mset.cfg
sysc, js, sa, config := mset.sysc, mset.srv.js.Load(), mset.sa, mset.cfg
isLeader := mset.isLeader()
mset.mu.RUnlock()
+4 -4
View File
@@ -681,11 +681,11 @@ func (s *Server) startLeafNodeAcceptLoop() {
port = 0
}
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
if s.isShuttingDown() {
return
}
s.mu.Lock()
hp := net.JoinHostPort(opts.LeafNode.Host, strconv.Itoa(port))
l, e := natsListen("tcp", hp)
s.leafNodeListenerErr = e
@@ -878,7 +878,7 @@ func (s *Server) addLeafNodeURL(urlStr string) bool {
func (s *Server) removeLeafNodeURL(urlStr string) bool {
// Don't need to do this if we are removing the route connection because
// we are shuting down...
if s.shutdown {
if s.isShuttingDown() {
return false
}
if s.leafURLsMap.removeUrl(urlStr) {
+16 -15
View File
@@ -1465,14 +1465,14 @@ func (s *Server) Varz(varzOpts *VarzOptions) (*Varz, error) {
// We want to do that outside of the lock.
pse.ProcUsage(&pcpu, &rss, &vss)
s.mu.Lock()
js := s.js
s.mu.RLock()
// We need to create a new instance of Varz (with no reference
// whatsoever to anything stored in the server) since the user
// has access to the returned value.
v := s.createVarz(pcpu, rss)
s.mu.Unlock()
if js != nil {
s.mu.RUnlock()
if js := s.getJetStream(); js != nil {
s.updateJszVarz(js, &v.JetStream, true)
}
@@ -1798,7 +1798,6 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
// Use server lock to create/update the server's varz object.
s.mu.Lock()
var created bool
js := s.js
s.httpReqStats[VarzPath]++
if s.varz == nil {
s.varz = s.createVarz(pcpu, rss)
@@ -1809,19 +1808,20 @@ func (s *Server) HandleVarz(w http.ResponseWriter, r *http.Request) {
s.mu.Unlock()
// Since locking is jetStream -> Server, need to update jetstream
// varz outside of server lock.
if js != nil {
if js := s.getJetStream(); js != nil {
var v JetStreamVarz
// Work on stack variable
s.updateJszVarz(js, &v, created)
// Now update server's varz
s.mu.Lock()
s.mu.RLock()
sv := &s.varz.JetStream
if created {
sv.Config = v.Config
}
sv.Stats = v.Stats
sv.Meta = v.Meta
s.mu.Unlock()
s.mu.RUnlock()
}
// Do the marshaling outside of server lock, but under varzMu lock.
@@ -2835,10 +2835,10 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg,
}
jsa.mu.RUnlock()
if optStreams {
if js := s.getJetStream(); js != nil && optStreams {
for _, stream := range streams {
rgroup := stream.raftGroup()
ci := s.js.clusterInfo(rgroup)
ci := js.clusterInfo(rgroup)
var cfg *StreamConfig
if optCfg {
c := stream.config()
@@ -2884,7 +2884,8 @@ func (s *Server) accountDetail(jsa *jsAccount, optStreams, optConsumers, optCfg,
}
func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) {
if s.js == nil {
js := s.getJetStream()
if js == nil {
return nil, fmt.Errorf("jetstream not enabled")
}
acc := opts.Account
@@ -2892,9 +2893,9 @@ func (s *Server) JszAccount(opts *JSzOptions) (*AccountDetail, error) {
if !ok {
return nil, fmt.Errorf("account %q not found", acc)
}
s.js.mu.RLock()
jsa, ok := s.js.accounts[account.(*Account).Name]
s.js.mu.RUnlock()
js.mu.RLock()
jsa, ok := js.accounts[account.(*Account).Name]
js.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("account %q not jetstream enabled", acc)
}
@@ -2916,7 +2917,7 @@ func (s *Server) raftNodeToClusterInfo(node RaftNode) *ClusterInfo {
Peers: peerList,
node: node,
}
return s.js.clusterInfo(group)
return s.getJetStream().clusterInfo(group)
}
// Jsz returns a Jsz structure containing information about JetStream.
+14 -6
View File
@@ -425,6 +425,10 @@ type mqttParsedPublishNATSHeader struct {
}
func (s *Server) startMQTT() {
if s.isShuttingDown() {
return
}
sopts := s.getOpts()
o := &sopts.MQTT
@@ -437,10 +441,6 @@ func (s *Server) startMQTT() {
}
hp := net.JoinHostPort(o.Host, strconv.Itoa(port))
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
s.mqtt.sessmgr.sessions = make(map[string]*mqttAccountSessionManager)
hl, err = net.Listen("tcp", hp)
s.mqtt.listenerErr = err
@@ -499,8 +499,8 @@ func (s *Server) createMQTTClient(conn net.Conn, ws *websocket) *client {
c.mu.Unlock()
s.mu.Lock()
if !s.running || s.ldm {
if s.shutdown {
if !s.isRunning() || s.ldm {
if s.isShuttingDown() {
conn.Close()
}
s.mu.Unlock()
@@ -3915,6 +3915,14 @@ func (c *client) mqttEnqueuePubResponse(packetType byte, pi uint16, trace bool)
proto := [4]byte{packetType, 0x2, 0, 0}
proto[2] = byte(pi >> 8)
proto[3] = byte(pi)
// Bits 3,2,1 and 0 of the fixed header in the PUBREL Control Packet are
// reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat
// any other value as malformed and close the Network Connection [MQTT-3.6.1-1].
if packetType == mqttPacketPubRel {
proto[0] |= 0x2
}
c.mu.Lock()
c.enqueueProto(proto[:4])
c.mu.Unlock()
+7 -1
View File
@@ -307,6 +307,7 @@ type Options struct {
Websocket WebsocketOpts `json:"-"`
MQTT MQTTOpts `json:"-"`
ProfPort int `json:"-"`
ProfBlockRate int `json:"-"`
PidFile string `json:"-"`
PortsFileDir string `json:"-"`
LogFile string `json:"-"`
@@ -394,6 +395,9 @@ type Options struct {
// OCSP Cache config enables next-gen cache for OCSP features
OCSPCacheConfig *OCSPResponseCacheConfig
// Used to mark that we had a top level authorization block.
authBlockDefined bool
}
// WebsocketOpts are options for websocket
@@ -884,7 +888,7 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
*errors = append(*errors, err)
return
}
o.authBlockDefined = true
o.Username = auth.user
o.Password = auth.pass
o.Authorization = auth.token
@@ -1013,6 +1017,8 @@ func (o *Options) processConfigFileLine(k string, v interface{}, errors *[]error
o.PortsFileDir = v.(string)
case "prof_port":
o.ProfPort = int(v.(int64))
case "prof_block_rate":
o.ProfBlockRate = int(v.(int64))
case "max_control_line":
if v.(int64) > 1<<31-1 {
err := &configErr{tk, fmt.Sprintf("%s value is too big", k)}
+14 -12
View File
@@ -134,6 +134,7 @@ type raft struct {
track bool
werr error
state RaftState
isLeader atomic.Bool
hh hash.Hash64
snapfile string
csz int
@@ -588,14 +589,15 @@ func (s *Server) stepdownRaftNodes() {
s.Debugf("Stepping down all leader raft nodes")
}
for _, n := range s.raftNodes {
if n.Leader() {
nodes = append(nodes, n)
}
nodes = append(nodes, n)
}
s.rnMu.RUnlock()
for _, node := range nodes {
node.StepDown()
if node.Leader() {
node.StepDown()
}
node.SetObserver(true)
}
}
@@ -652,9 +654,9 @@ func (s *Server) transferRaftLeaders() bool {
// This should only be called on the leader.
func (n *raft) Propose(data []byte) error {
n.RLock()
if n.state != Leader {
if state := n.state; state != Leader {
n.RUnlock()
n.debug("Proposal ignored, not leader (state: %v)", n.state)
n.debug("Proposal ignored, not leader (state: %v)", state)
return errNotLeader
}
// Error if we had a previous write error.
@@ -1157,14 +1159,12 @@ func (n *raft) loadLastSnapshot() (*snapshot, error) {
}
// Leader returns if we are the leader for our group.
// We use an atomic here now vs acquiring the read lock.
func (n *raft) Leader() bool {
if n == nil {
return false
}
n.RLock()
isLeader := n.state == Leader
n.RUnlock()
return isLeader
return n.isLeader.Load()
}
func (n *raft) isCatchingUp() bool {
@@ -1687,8 +1687,7 @@ func (n *raft) run() {
// We want to wait for some routing to be enabled, so we will wait for
// at least a route, leaf or gateway connection to be established before
// starting the run loop.
gw := s.gateway
for {
for gw := s.gateway; ; {
s.mu.Lock()
ready := s.numRemotes()+len(s.leafs) > 0
if !ready && gw.enabled {
@@ -3830,6 +3829,9 @@ func (n *raft) quorumNeeded() int {
// Lock should be held.
func (n *raft) updateLeadChange(isLeader bool) {
// Update our atomic about being the leader.
n.isLeader.Store(isLeader)
// We don't care about values that have not been consumed (transitory states),
// so we dequeue any state that is pending and push the new one.
for {
+17 -1
View File
@@ -805,6 +805,16 @@ func (o *mqttInactiveThresholdReload) Apply(s *Server) {
s.Noticef("Reloaded: MQTT consumer_inactive_threshold = %v", o.newValue)
}
type profBlockRateReload struct {
noopOption
newValue int
}
func (o *profBlockRateReload) Apply(s *Server) {
s.setBlockProfileRate(o.newValue)
s.Noticef("Reloaded: block_prof_rate = %v", o.newValue)
}
type leafNodeOption struct {
noopOption
tlsFirstChanged bool
@@ -1589,6 +1599,12 @@ func (s *Server) diffOptions(newOpts *Options) ([]option, error) {
diffOpts = append(diffOpts, &ocspOption{newValue: newValue.(*OCSPConfig)})
case "ocspcacheconfig":
diffOpts = append(diffOpts, &ocspResponseCacheOption{newValue: newValue.(*OCSPResponseCacheConfig)})
case "profblockrate":
new := newValue.(int)
old := oldValue.(int)
if new != old {
diffOpts = append(diffOpts, &profBlockRateReload{newValue: new})
}
default:
// TODO(ik): Implement String() on those options to have a nice print.
// %v is difficult to figure what's what, %+v print private fields and
@@ -1857,7 +1873,7 @@ func (s *Server) reloadAuthorization() {
awcsti, _ = s.configureAccounts(true)
s.configureAuthorization()
// Double check any JetStream configs.
checkJetStream = s.js != nil
checkJetStream = s.getJetStream() != nil
} else if opts.AccountResolver != nil {
s.configureResolver()
if _, ok := s.accResolver.(*MemAccResolver); ok {
+59 -20
View File
@@ -653,11 +653,14 @@ func (c *client) processRouteInfo(info *Info) {
// We receive an INFO from a server that informs us about another server,
// so the info.ID in the INFO protocol does not match the ID of this route.
if remoteID != _EMPTY_ && remoteID != info.ID {
// We want to know if the existing route supports pooling/pinned-account
// or not when processing the implicit route.
noPool := c.route.noPool
c.mu.Unlock()
// Process this implicit route. We will check that it is not an explicit
// route and/or that it has not been connected already.
s.processImplicitRoute(info)
s.processImplicitRoute(info, noPool)
return
}
@@ -812,10 +815,14 @@ func (c *client) processRouteInfo(info *Info) {
}
}
// For accounts that are configured to have their own route:
// If this is a solicit route, we already have c.route.accName set in createRoute.
// If this is a solicited route, we already have c.route.accName set in createRoute.
// For non solicited route (the accept side), we will set the account name that
// is present in the INFO protocol.
if !didSolicit {
if didSolicit && len(c.route.accName) > 0 {
// Set it in the info.RouteAccount so that addRoute can use that
// and we properly gossip that this is a route for an account.
info.RouteAccount = string(c.route.accName)
} else if !didSolicit && info.RouteAccount != _EMPTY_ {
c.route.accName = []byte(info.RouteAccount)
}
accName := string(c.route.accName)
@@ -977,7 +984,7 @@ func (s *Server) updateRemoteRoutePerms(c *client, info *Info) {
func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) {
// If there are no clients supporting async INFO protocols, we are done.
// Also don't send if we are shutting down...
if s.cproto == 0 || s.shutdown {
if s.cproto == 0 || s.isShuttingDown() {
return
}
info := s.copyInfo()
@@ -1002,7 +1009,7 @@ func (s *Server) sendAsyncInfoToClients(regCli, wsCli bool) {
// This will process implicit route information received from another server.
// We will check to see if we have configured or are already connected,
// and if so we will ignore. Otherwise we will attempt to connect.
func (s *Server) processImplicitRoute(info *Info) {
func (s *Server) processImplicitRoute(info *Info, routeNoPool bool) {
remoteID := info.ID
s.mu.Lock()
@@ -1012,8 +1019,16 @@ func (s *Server) processImplicitRoute(info *Info) {
if remoteID == s.info.ID {
return
}
// Snapshot server options.
opts := s.getOpts()
// Check if this route already exists
if accName := info.RouteAccount; accName != _EMPTY_ {
// If we don't support pooling/pinned account, bail.
if opts.Cluster.PoolSize <= 0 {
return
}
if remotes, ok := s.accRoutes[accName]; ok {
if r := remotes[remoteID]; r != nil {
return
@@ -1034,13 +1049,22 @@ func (s *Server) processImplicitRoute(info *Info) {
return
}
// Snapshot server options.
opts := s.getOpts()
if info.AuthRequired {
r.User = url.UserPassword(opts.Cluster.Username, opts.Cluster.Password)
}
s.startGoRoutine(func() { s.connectToRoute(r, false, true, info.RouteAccount) })
// If we are processing an implicit route from a route that does not
// support pooling/pinned-accounts, we won't receive an INFO for each of
// the pinned-accounts that we would normally receive. In that case, just
// initiate routes for all our configured pinned accounts.
if routeNoPool && info.RouteAccount == _EMPTY_ && len(opts.Cluster.PinnedAccounts) > 0 {
// Copy since we are going to pass as closure to a go routine.
rURL := r
for _, an := range opts.Cluster.PinnedAccounts {
accName := an
s.startGoRoutine(func() { s.connectToRoute(rURL, false, true, accName) })
}
}
}
// hasThisRouteConfigured returns true if info.Host:info.Port is present
@@ -1071,7 +1095,10 @@ func (s *Server) forwardNewRouteInfoToKnownServers(info *Info) {
s.forEachRemote(func(r *client) {
r.mu.Lock()
if r.route.remoteID != info.ID {
// If this is a new route for a given account, do not send to a server
// that does not support pooling/pinned-accounts.
if r.route.remoteID != info.ID &&
(info.RouteAccount == _EMPTY_ || (info.RouteAccount != _EMPTY_ && !r.route.noPool)) {
r.enqueueProto(infoJSON)
}
r.mu.Unlock()
@@ -1834,7 +1861,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
id := info.ID
s.mu.Lock()
if !s.running || s.routesReject {
if !s.isRunning() || s.routesReject {
s.mu.Unlock()
return false
}
@@ -1855,7 +1882,7 @@ func (s *Server) addRoute(c *client, didSolicit bool, info *Info, accName string
// server and need to handle things differently.
if info.RoutePoolSize <= 0 || opts.Cluster.PoolSize < 0 {
if accName != _EMPTY_ {
invProtoErr = fmt.Sprintf("Not possible to have a dedicate route for account %q between those servers", accName)
invProtoErr = fmt.Sprintf("Not possible to have a dedicated route for account %q between those servers", accName)
// In this case, make sure this route does not attempt to reconnect
c.setNoReconnect()
} else {
@@ -2302,6 +2329,10 @@ func (s *Server) updateRouteSubscriptionMap(acc *Account, sub *subscription, del
// is detected that the server has already been shutdown.
// It will also start soliciting explicit routes.
func (s *Server) startRouteAcceptLoop() {
if s.isShuttingDown() {
return
}
// Snapshot server options.
opts := s.getOpts()
@@ -2316,10 +2347,6 @@ func (s *Server) startRouteAcceptLoop() {
clusterName := s.ClusterName()
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
s.Noticef("Cluster name is %s", clusterName)
if s.isClusterNameDynamic() {
s.Warnf("Cluster name was dynamically generated, consider setting one")
@@ -2654,7 +2681,9 @@ func (c *client) processRouteConnect(srv *Server, arg []byte, lang string) error
// We will take on their name since theirs is configured or higher then ours.
srv.setClusterName(proto.Cluster)
if !proto.Dynamic {
srv.getOpts().Cluster.Name = proto.Cluster
srv.optsMu.Lock()
srv.opts.Cluster.Name = proto.Cluster
srv.optsMu.Unlock()
}
c.mu.Lock()
remoteID := c.opts.Name
@@ -2729,6 +2758,7 @@ func (s *Server) removeRoute(c *client) {
opts = s.getOpts()
rURL *url.URL
noPool bool
didSolicit bool
)
c.mu.Lock()
cid := c.cid
@@ -2747,6 +2777,7 @@ func (s *Server) removeRoute(c *client) {
connectURLs = r.connectURLs
wsConnectURLs = r.wsConnURLs
rURL = r.url
didSolicit = r.didSolicit
}
c.mu.Unlock()
if accName != _EMPTY_ {
@@ -2805,10 +2836,18 @@ func (s *Server) removeRoute(c *client) {
if lnURL != _EMPTY_ && s.removeLeafNodeURL(lnURL) {
s.sendAsyncLeafNodeInfo()
}
// If this server has pooling and the route for this remote
// was a "no pool" route, attempt to reconnect.
if s.routesPoolSize > 1 && noPool {
s.startGoRoutine(func() { s.connectToRoute(rURL, true, true, _EMPTY_) })
// If this server has pooling/pinned accounts and the route for
// this remote was a "no pool" route, attempt to reconnect.
if noPool {
if s.routesPoolSize > 1 {
s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, _EMPTY_) })
}
if len(opts.Cluster.PinnedAccounts) > 0 {
for _, an := range opts.Cluster.PinnedAccounts {
accName := an
s.startGoRoutine(func() { s.connectToRoute(rURL, didSolicit, true, accName) })
}
}
}
}
// This is for gateway code. Remove this route from a map that uses
+90 -58
View File
@@ -131,13 +131,14 @@ type Server struct {
configFile string
optsMu sync.RWMutex
opts *Options
running bool
shutdown bool
running atomic.Bool
shutdown atomic.Bool
listener net.Listener
listenerErr error
gacc *Account
sys *internal
js *jetStream
js atomic.Pointer[jetStream]
isMetaLeader atomic.Bool
accounts sync.Map
tmpAccounts sync.Map // Temporarily stores accounts that are being built
activeAccounts int32
@@ -572,13 +573,18 @@ func selectS2AutoModeBasedOnRTT(rtt time.Duration, rttThresholds []time.Duration
// with a nil []s2.WriterOption, but not with a nil s2.WriterOption, so
// this is more versatile.
func s2WriterOptions(cm string) []s2.WriterOption {
_opts := [2]s2.WriterOption{}
opts := append(
_opts[:0],
s2.WriterConcurrency(1), // Stop asynchronous flushing in separate goroutines
)
switch cm {
case CompressionS2Uncompressed:
return []s2.WriterOption{s2.WriterUncompressed()}
return append(opts, s2.WriterUncompressed())
case CompressionS2Best:
return []s2.WriterOption{s2.WriterBestCompression()}
return append(opts, s2.WriterBestCompression())
case CompressionS2Better:
return []s2.WriterOption{s2.WriterBetterCompression()}
return append(opts, s2.WriterBetterCompression())
default:
return nil
}
@@ -1234,8 +1240,8 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
// If we have defined a system account here check to see if its just us and the $G account.
// We would do this to add user/pass to the system account. If this is the case add in
// no-auth-user for $G.
// Only do this if non-operator mode.
if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ {
// Only do this if non-operator mode and we did not have an authorization block defined.
if len(opts.TrustedOperators) == 0 && numAccounts == 2 && opts.NoAuthUser == _EMPTY_ && !opts.authBlockDefined {
// If we come here from config reload, let's not recreate the fake user name otherwise
// it will cause currently clients to be disconnected.
uname := s.sysAccOnlyNoAuthUser
@@ -1267,6 +1273,7 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
// Setup the account resolver. For memory resolver, make sure the JWTs are
// properly formed but do not enforce expiration etc.
// Lock is held on entry, but may be released/reacquired during this call.
func (s *Server) configureResolver() error {
opts := s.getOpts()
s.accResolver = opts.AccountResolver
@@ -1281,7 +1288,12 @@ func (s *Server) configureResolver() error {
}
}
if len(opts.resolverPreloads) > 0 {
if s.accResolver.IsReadOnly() {
// Lock ordering is account resolver -> server, so we need to release
// the lock and reacquire it when done with account resolver's calls.
ar := s.accResolver
s.mu.Unlock()
defer s.mu.Lock()
if ar.IsReadOnly() {
return fmt.Errorf("resolver preloads only available for writeable resolver types MEM/DIR/CACHE_DIR")
}
for k, v := range opts.resolverPreloads {
@@ -1289,7 +1301,7 @@ func (s *Server) configureResolver() error {
if err != nil {
return fmt.Errorf("preload account error for %q: %v", k, err)
}
s.accResolver.Store(k, v)
ar.Store(k, v)
}
}
}
@@ -1477,10 +1489,7 @@ func (s *Server) Running() bool {
// Protected check on running state
func (s *Server) isRunning() bool {
s.mu.RLock()
running := s.running
s.mu.RUnlock()
return running
return s.running.Load()
}
func (s *Server) logPid() error {
@@ -2078,8 +2087,8 @@ func (s *Server) Start() {
s.checkAuthforWarnings()
// Avoid RACE between Start() and Shutdown()
s.running.Store(true)
s.mu.Lock()
s.running = true
// Update leafNodeEnabled in case options have changed post NewServer()
// and before Start() (we should not be able to allow that, but server has
// direct reference to user-provided options - at least before a Reload() is
@@ -2096,6 +2105,10 @@ func (s *Server) Start() {
// Pprof http endpoint for the profiler.
if opts.ProfPort != 0 {
s.StartProfiler()
} else {
// It's still possible to access this profile via a SYS endpoint, so set
// this anyway. (Otherwise StartProfiler would have called it.)
s.setBlockProfileRate(opts.ProfBlockRate)
}
if opts.ConfigFile != _EMPTY_ {
@@ -2338,6 +2351,10 @@ func (s *Server) Start() {
s.startOCSPResponseCache()
}
func (s *Server) isShuttingDown() bool {
return s.shutdown.Load()
}
// Shutdown will shutdown the server instance by kicking out the AcceptLoop
// and closing all associated clients.
func (s *Server) Shutdown() {
@@ -2357,20 +2374,20 @@ func (s *Server) Shutdown() {
// eventing items associated with accounts.
s.shutdownEventing()
s.mu.Lock()
// Prevent issues with multiple calls.
if s.shutdown {
s.mu.Unlock()
if s.isShuttingDown() {
return
}
s.mu.Lock()
s.Noticef("Initiating Shutdown...")
accRes := s.accResolver
opts := s.getOpts()
s.shutdown = true
s.running = false
s.shutdown.Store(true)
s.running.Store(false)
s.grMu.Lock()
s.grRunning = false
s.grMu.Unlock()
@@ -2380,7 +2397,7 @@ func (s *Server) Shutdown() {
accRes.Close()
}
// Now check jetstream.
// Now check and shutdown jetstream.
s.shutdownJetStream()
// Now shutdown the nodes
@@ -2533,16 +2550,15 @@ func (s *Server) AcceptLoop(clr chan struct{}) {
}
}()
if s.isShuttingDown() {
return
}
// Snapshot server options.
opts := s.getOpts()
// Setup state that can enable shutdown
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port))
l, e := natsListen("tcp", hp)
s.listenerErr = e
@@ -2665,6 +2681,10 @@ func (s *Server) setInfoHostPort() error {
// StartProfiler is called to enable dynamic profiling.
func (s *Server) StartProfiler() {
if s.isShuttingDown() {
return
}
// Snapshot server options.
opts := s.getOpts()
@@ -2676,12 +2696,7 @@ func (s *Server) StartProfiler() {
}
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
hp := net.JoinHostPort(opts.Host, strconv.Itoa(port))
l, err := net.Listen("tcp", hp)
if err != nil {
@@ -2699,14 +2714,13 @@ func (s *Server) StartProfiler() {
s.profiler = l
s.profilingServer = srv
s.setBlockProfileRate(opts.ProfBlockRate)
go func() {
// if this errors out, it's probably because the server is being shutdown
err := srv.Serve(l)
if err != nil {
s.mu.Lock()
shutdown := s.shutdown
s.mu.Unlock()
if !shutdown {
if !s.isShuttingDown() {
s.Fatalf("error starting profiler: %s", err)
}
}
@@ -2716,6 +2730,15 @@ func (s *Server) StartProfiler() {
s.mu.Unlock()
}
func (s *Server) setBlockProfileRate(rate int) {
// Passing i ProfBlockRate <= 0 here will disable or > 0 will enable.
runtime.SetBlockProfileRate(rate)
if rate > 0 {
s.Warnf("Block profiling is enabled (rate %d), this may have a performance impact", rate)
}
}
// StartHTTPMonitoring will enable the HTTP monitoring port.
// DEPRECATED: Should use StartMonitoring.
func (s *Server) StartHTTPMonitoring() {
@@ -2804,6 +2827,10 @@ func (s *Server) getMonitoringTLSConfig(_ *tls.ClientHelloInfo) (*tls.Config, er
// Start the monitoring server
func (s *Server) startMonitoring(secure bool) error {
if s.isShuttingDown() {
return nil
}
// Snapshot server options.
opts := s.getOpts()
@@ -2885,11 +2912,6 @@ func (s *Server) startMonitoring(secure bool) error {
ErrorLog: log.New(&captureHTTPServerLog{s, "monitoring: "}, _EMPTY_, 0),
}
s.mu.Lock()
if s.shutdown {
httpListener.Close()
s.mu.Unlock()
return nil
}
s.http = httpListener
s.httpHandler = mux
s.monitoringServer = srv
@@ -2897,10 +2919,7 @@ func (s *Server) startMonitoring(secure bool) error {
go func() {
if err := srv.Serve(httpListener); err != nil {
s.mu.Lock()
shutdown := s.shutdown
s.mu.Unlock()
if !shutdown {
if !s.isShuttingDown() {
s.Fatalf("Error starting monitor on %q: %v", hp, err)
}
}
@@ -3036,13 +3055,13 @@ func (s *Server) createClientEx(conn net.Conn, inProcess bool) *client {
// list of connections to close. It won't contain this one, so we need
// to bail out now otherwise the readLoop started down there would not
// be interrupted. Skip also if in lame duck mode.
if !s.running || s.ldm {
if !s.isRunning() || s.ldm {
// There are some tests that create a server but don't start it,
// and use "async" clients and perform the parsing manually. Such
// clients would branch here (since server is not running). However,
// when a server was really running and has been shutdown, we must
// close this connection.
if s.shutdown {
if s.isShuttingDown() {
conn.Close()
}
s.mu.Unlock()
@@ -3590,22 +3609,28 @@ func (s *Server) String() string {
type pprofLabels map[string]string
func setGoRoutineLabels(tags ...pprofLabels) {
var labels []string
for _, m := range tags {
for k, v := range m {
labels = append(labels, k, v)
}
}
if len(labels) > 0 {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
}
}
func (s *Server) startGoRoutine(f func(), tags ...pprofLabels) bool {
var started bool
s.grMu.Lock()
defer s.grMu.Unlock()
if s.grRunning {
var labels []string
for _, m := range tags {
for k, v := range m {
labels = append(labels, k, v)
}
}
s.grWG.Add(1)
go func() {
pprof.SetGoroutineLabels(
pprof.WithLabels(context.Background(), pprof.Labels(labels...)),
)
setGoRoutineLabels(tags...)
f()
}()
started = true
@@ -3944,11 +3969,12 @@ func (s *Server) isLameDuckMode() bool {
}
// This function will close the client listener then close the clients
// at some interval to avoid a reconnecting storm.
// at some interval to avoid a reconnect storm.
// We will also transfer any raft leaders and shutdown JetStream.
func (s *Server) lameDuckMode() {
s.mu.Lock()
// Check if there is actually anything to do
if s.shutdown || s.ldm || s.listener == nil {
if s.isShuttingDown() || s.ldm || s.listener == nil {
s.mu.Unlock()
return
}
@@ -3985,6 +4011,12 @@ func (s *Server) lameDuckMode() {
}
}
// Now check and shutdown jetstream.
s.shutdownJetStream()
// Now shutdown the nodes
s.shutdownRaftNodes()
// Wait for accept loops to be done to make sure that no new
// client can connect
for i := 0; i < expected; i++ {
@@ -3993,7 +4025,7 @@ func (s *Server) lameDuckMode() {
s.mu.Lock()
// Need to recheck few things
if s.shutdown || len(s.clients) == 0 {
if s.isShuttingDown() || len(s.clients) == 0 {
s.mu.Unlock()
// If there is no client, we need to call Shutdown() to complete
// the LDMode. If server has been shutdown while lock was released,
+81 -30
View File
@@ -246,8 +246,9 @@ type stream struct {
mirror *sourceInfo
// Sources
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer
sources map[string]*sourceInfo
sourceRetries map[string]*time.Timer
sourcesConsumerSetup *time.Timer
// Indicates we have direct consumers.
directs int
@@ -404,12 +405,21 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
}
// Make sure we are ok when these are done in parallel.
v, loaded := jsa.inflight.LoadOrStore(cfg.Name, &sync.WaitGroup{})
// We used to call Add(1) in the "else" clause of the "if loaded"
// statement. This caused a data race because it was possible
// that one go routine stores (with count==0) and another routine
// gets "loaded==true" and calls wg.Wait() while the other routine
// then calls wg.Add(1). It also could mean that two routines execute
// the rest of the code concurrently.
swg := &sync.WaitGroup{}
swg.Add(1)
v, loaded := jsa.inflight.LoadOrStore(cfg.Name, swg)
wg := v.(*sync.WaitGroup)
if loaded {
wg.Wait()
// This waitgroup is "thrown away" (since there was an existing one).
swg.Done()
} else {
wg.Add(1)
defer func() {
jsa.inflight.Delete(cfg.Name)
wg.Done()
@@ -812,6 +822,11 @@ func (mset *stream) setLeader(isLeader bool) error {
return err
}
} else {
// cancel timer to create the source consumers if not fired yet
if mset.sourcesConsumerSetup != nil {
mset.sourcesConsumerSetup.Stop()
mset.sourcesConsumerSetup = nil
}
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
@@ -2380,7 +2395,7 @@ func (mset *stream) scheduleSetupMirrorConsumerRetryAsap() {
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
time.AfterFunc(next, func() {
mset.mu.Lock()
mset.setupMirrorConsumer()
@@ -2620,9 +2635,10 @@ func (mset *stream) setupMirrorConsumer() error {
if !mset.srv.startGoRoutine(
func() { mset.processMirrorMsgs(mirror, &ready) },
pprofLabels{
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"type": "mirror",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"consumer": mirror.cname,
},
) {
ready.Done()
@@ -2727,7 +2743,7 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
mset.scheduleSetSourceConsumerRetry(si.iname, seq, next, startTime)
}
@@ -2950,9 +2966,10 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if !mset.srv.startGoRoutine(
func() { mset.processSourceMsgs(si, &ready) },
pprofLabels{
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"type": "source",
"account": mset.acc.Name,
"stream": mset.cfg.Name,
"consumer": si.cname,
},
) {
ready.Done()
@@ -3286,16 +3303,9 @@ func (mset *stream) setStartingSequenceForSource(iName string, external *Externa
}
}
// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}
// Always reset here.
// lock should be held.
// Resets the SourceInfo for all the sources
func (mset *stream) resetSourceInfo() {
mset.sources = make(map[string]*sourceInfo)
for _, ssi := range mset.cfg.Sources {
@@ -3322,6 +3332,20 @@ func (mset *stream) startingSequenceForSources() {
}
mset.sources[ssi.iname] = si
}
}
// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}
// Always reset here.
mset.resetSourceInfo()
var state StreamState
mset.store.FastState(&state)
@@ -3405,6 +3429,11 @@ func (mset *stream) setupSourceConsumers() error {
}
}
// If we are no longer the leader, give up
if !mset.isLeader() {
return nil
}
mset.startingSequenceForSources()
// Setup our consumers at the proper starting position.
@@ -3430,13 +3459,35 @@ func (mset *stream) subscribeToStream() error {
}
// Check if we need to setup mirroring.
if mset.cfg.Mirror != nil {
if err := mset.setupMirrorConsumer(); err != nil {
return err
// setup the initial mirror sourceInfo
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name}
sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
trs := make([]*subjectTransform, len(mset.cfg.Mirror.SubjectTransforms))
for i, tr := range mset.cfg.Mirror.SubjectTransforms {
// will not fail as already checked before that the transform will work
subjectTransform, err := NewSubjectTransform(tr.Source, tr.Destination)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
sfs[i] = tr.Source
trs[i] = subjectTransform
}
mset.mirror.sfs = sfs
mset.mirror.trs = trs
// delay the actual mirror consumer creation for after a delay
mset.scheduleSetupMirrorConsumerRetryAsap()
} else if len(mset.cfg.Sources) > 0 {
if err := mset.setupSourceConsumers(); err != nil {
return err
}
// Setup the initial source infos for the sources
mset.resetSourceInfo()
// Delay the actual source consumer(s) creation(s) for after a delay
mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(10*time.Millisecond)))+10*time.Millisecond, func() {
mset.mu.Lock()
mset.setupSourceConsumers()
mset.mu.Unlock()
})
}
// Check for direct get access.
// We spin up followers for clustered streams in monitorStream().
@@ -3656,14 +3707,14 @@ func (mset *stream) setupStore(fsCfg *FileStoreConfig) error {
fsCfg.Cipher = s.getOpts().JetStreamCipher
}
oldprf := s.jsKeyGen(s.getOpts().JetStreamOldKey, mset.acc.Name)
fs, err := newFileStoreWithCreated(*fsCfg, mset.cfg, mset.created, prf, oldprf)
cfg := *fsCfg
cfg.srv = s
fs, err := newFileStoreWithCreated(cfg, mset.cfg, mset.created, prf, oldprf)
if err != nil {
mset.mu.Unlock()
return err
}
mset.store = fs
// Register our server.
fs.registerServer(s)
}
// This will fire the callback but we do not require the lock since md will be 0 here.
mset.store.RegisterStorageUpdates(mset.storeUpdates)
+13 -4
View File
@@ -540,6 +540,7 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
if doLock {
s.RLock()
}
cacheEnabled := s.cache != nil
r, ok := s.cache[subject]
if doLock {
s.RUnlock()
@@ -574,7 +575,11 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
var n int
if doLock {
s.Lock()
if cacheEnabled {
s.Lock()
} else {
s.RLock()
}
}
matchLevel(s.root, tokens, result)
@@ -582,16 +587,20 @@ func (s *Sublist) match(subject string, doLock bool) *SublistResult {
if len(result.psubs) == 0 && len(result.qsubs) == 0 {
result = emptyResult
}
if s.cache != nil {
if cacheEnabled {
s.cache[subject] = result
n = len(s.cache)
}
if doLock {
s.Unlock()
if cacheEnabled {
s.Unlock()
} else {
s.RUnlock()
}
}
// Reduce the cache count if we have exceeded our set maximum.
if n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) {
if cacheEnabled && n > slCacheMax && atomic.CompareAndSwapInt32(&s.ccSweep, 0, 1) {
go s.reduceCacheCount()
}
+20 -23
View File
@@ -1050,6 +1050,10 @@ func (s *Server) wsConfigAuth(opts *WebsocketOpts) {
}
func (s *Server) startWebsocketServer() {
if s.isShuttingDown() {
return
}
sopts := s.getOpts()
o := &sopts.Websocket
@@ -1071,10 +1075,6 @@ func (s *Server) startWebsocketServer() {
// avoid the possibility of it being "intercepted".
s.mu.Lock()
if s.shutdown {
s.mu.Unlock()
return
}
// Do not check o.NoTLS here. If a TLS configuration is available, use it,
// regardless of NoTLS. If we don't have a TLS config, it means that the
// user has configured NoTLS because otherwise the server would have failed
@@ -1220,8 +1220,8 @@ func (s *Server) createWSClient(conn net.Conn, ws *websocket) *client {
c.mu.Unlock()
s.mu.Lock()
if !s.running || s.ldm {
if s.shutdown {
if !s.isRunning() || s.ldm {
if s.isShuttingDown() {
conn.Close()
}
s.mu.Unlock()
@@ -1295,8 +1295,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mfs > 0 && c.ws.nocompfrag {
mfs = 0
}
buf := &bytes.Buffer{}
buf := bytes.NewBuffer(nbPoolGet(usz))
cp := c.ws.compressor
if cp == nil {
c.ws.compressor, _ = flate.NewWriter(buf, flate.BestSpeed)
@@ -1331,9 +1330,7 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mask {
wsMaskBuf(key, p[:lp])
}
new := nbPoolGet(wsFrameSizeForBrowsers)
lp = copy(new[:wsFrameSizeForBrowsers], p[:lp])
bufs = append(bufs, fh[:n], new[:lp])
bufs = append(bufs, fh[:n], p[:lp])
csz += n + lp
p = p[lp:]
}
@@ -1343,15 +1340,16 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mask {
wsMaskBuf(key, p)
}
bufs = append(bufs, h)
for len(p) > 0 {
new := nbPoolGet(len(p))
n := copy(new[:cap(new)], p)
bufs = append(bufs, new[:n])
p = p[n:]
if ol > 0 {
bufs = append(bufs, h, p)
}
csz = len(h) + ol
}
// Make sure that the compressor no longer holds a reference to
// the bytes.Buffer, so that the underlying memory gets cleaned
// up after flushOutbound/flushAndClose. For this to be safe, we
// always cp.Reset(...) before reusing the compressor again.
cp.Reset(nil)
// Add to pb the compressed data size (including headers), but
// remove the original uncompressed data size that was added
// during the queueing.
@@ -1362,14 +1360,15 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if mfs > 0 {
// We are limiting the frame size.
startFrame := func() int {
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize)[:wsMaxFrameHeaderSize])
bufs = append(bufs, nbPoolGet(wsMaxFrameHeaderSize))
return len(bufs) - 1
}
endFrame := func(idx, size int) {
bufs[idx] = bufs[idx][:wsMaxFrameHeaderSize]
n, key := wsFillFrameHeader(bufs[idx], mask, wsFirstFrame, wsFinalFrame, wsUncompressedFrame, wsBinaryMessage, size)
bufs[idx] = bufs[idx][:n]
c.out.pb += int64(n)
c.ws.fs += int64(n + size)
bufs[idx] = bufs[idx][:n]
if mask {
wsMaskBufs(key, bufs[idx+1:])
}
@@ -1395,10 +1394,8 @@ func (c *client) wsCollapsePtoNB() (net.Buffers, int64) {
if endStart {
fhIdx = startFrame()
}
new := nbPoolGet(total)
n := copy(new[:cap(new)], b[:total])
bufs = append(bufs, new[:n])
b = b[n:]
bufs = append(bufs, b[:total])
b = b[total:]
}
}
if total > 0 {
+8 -13
View File
@@ -1,12 +1,12 @@
language: go
go:
- "1.21.x"
- "1.20.x"
- "1.19.x"
go_import_path: github.com/nats-io/nats.go
install:
- go get -t ./...
- curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin
- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
go install github.com/mattn/goveralls@latest;
go install github.com/wadey/gocovmerge@latest;
go install honnef.co/go/tools/cmd/staticcheck@latest;
@@ -15,27 +15,22 @@ install:
before_script:
- $(exit $(go fmt ./... | wc -l))
- go vet -modfile=go_test.mod ./...
- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then
find . -type f -name "*.go" | xargs misspell -error -locale US;
GOFLAGS="-mod=mod -modfile=go_test.mod" staticcheck ./...;
fi
- golangci-lint run ./jetstream/...
script:
- go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off
- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi
after_success:
- if [[ "$TRAVIS_GO_VERSION" =~ 1.20 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi
jobs:
include:
- name: "Go: 1.20.x (nats-server@dev)"
go: "1.20.x"
before_script:
- go get -modfile go_test.mod github.com/nats-io/nats-server/v2@dev
- name: "Go: 1.20.x (nats-server@main)"
go: "1.20.x"
- name: "Go: 1.21.x (nats-server@main)"
go: "1.21.x"
before_script:
- go get -modfile go_test.mod github.com/nats-io/nats-server/v2@main
allow_failures:
- name: "Go: 1.20.x (nats-server@dev)"
- name: "Go: 1.20.x (nats-server@main)"
- name: "Go: 1.21.x (nats-server@main)"
+1 -1
View File
@@ -29,7 +29,7 @@ When using or transitioning to Go modules support:
```bash
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.29.0
go get github.com/nats-io/nats.go/@v1.30.2
# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2
+7 -7
View File
@@ -4,19 +4,19 @@ go 1.19
require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.16.5
github.com/nats-io/nats-server/v2 v2.9.19
github.com/nats-io/nkeys v0.4.4
github.com/klauspost/compress v1.17.0
github.com/nats-io/nats-server/v2 v2.10.0
github.com/nats-io/nkeys v0.4.5
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.9.0
golang.org/x/text v0.13.0
google.golang.org/protobuf v1.23.0
)
require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
golang.org/x/crypto v0.13.0 // indirect
golang.org/x/sys v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
)
+15 -14
View File
@@ -10,29 +10,30 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.19 h1:OF9jSKZGo425C/FcVVIvNgpd36CUe7aVTTXEZRJk6kA=
github.com/nats-io/nats-server/v2 v2.9.19/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk=
github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/crypto v0.13.0 h1:mvySKfSWJ+UKUii46M40LOvyWfN0s2U+46/jDd0e6Ck=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+158 -15
View File
@@ -1118,6 +1118,7 @@ type ConsumerConfig struct {
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
@@ -1143,6 +1144,11 @@ type ConsumerConfig struct {
Replicas int `json:"num_replicas"`
// Force memory storage.
MemoryStorage bool `json:"mem_storage,omitempty"`
// Metadata is additional metadata for the Consumer.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
}
// ConsumerInfo is the info from a JetStream consumer.
@@ -1176,10 +1182,11 @@ type SequencePair struct {
// nextRequest is for getting next messages for pull based consumers from JetStream.
type nextRequest struct {
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
Expires time.Duration `json:"expires,omitempty"`
Batch int `json:"batch,omitempty"`
NoWait bool `json:"no_wait,omitempty"`
MaxBytes int `json:"max_bytes,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}
// jsSub includes JetStream subscription info.
@@ -2469,6 +2476,7 @@ func EnableFlowControl() SubOpt {
}
// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
// For pull consumers, idle heartbeat has to be set on each [Fetch] call.
func IdleHeartbeat(duration time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Heartbeat = duration
@@ -2568,6 +2576,16 @@ func ConsumerName(name string) SubOpt {
})
}
// ConsumerFilterSubjects can be used to set multiple subject filters on the consumer.
// It has to be used in conjunction with [nats.BindStream] and
// with empty 'subject' parameter.
func ConsumerFilterSubjects(subjects ...string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FilterSubjects = subjects
return nil
})
}
func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
@@ -2588,6 +2606,7 @@ type pullOpts struct {
maxBytes int
ttl time.Duration
ctx context.Context
hb time.Duration
}
// PullOpt are the options that can be passed when pulling a batch of messages.
@@ -2603,6 +2622,16 @@ func PullMaxWaiting(n int) SubOpt {
})
}
type PullHeartbeat time.Duration
func (h PullHeartbeat) configurePull(opts *pullOpts) error {
if h <= 0 {
return fmt.Errorf("%w: idle heartbeat has to be greater than 0", ErrInvalidArg)
}
opts.hb = time.Duration(h)
return nil
}
// PullMaxBytes defines the max bytes allowed for a fetch request.
type PullMaxBytes int
@@ -2646,6 +2675,11 @@ func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
if !checkSts {
return
}
// if it's a heartbeat message, report as not user msg
if isHb, _ := isJSControlMessage(msg); isHb {
return
}
switch val {
case noResponders:
err = ErrNoResponders
@@ -2732,7 +2766,6 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
)
if ctx == nil {
ctx, cancel = context.WithTimeout(context.Background(), ttl)
defer cancel()
} else if _, hasDeadline := ctx.Deadline(); !hasDeadline {
// Prevent from passing the background context which will just block
// and cannot be canceled either.
@@ -2743,7 +2776,17 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
defer cancel()
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer cancel()
// if heartbeat is set, validate it against the context timeout
if o.hb > 0 {
deadline, _ := ctx.Deadline()
if 2*o.hb >= time.Until(deadline) {
return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
}
}
// Check if context not done already before making the request.
@@ -2783,6 +2826,8 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
msgs = append(msgs, msg)
}
}
var hbTimer *time.Timer
var hbErr error
if err == nil && len(msgs) < batch {
// For batch real size of 1, it does not make sense to set no_wait in
// the request.
@@ -2813,8 +2858,26 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
nr.Expires = expires
nr.NoWait = noWait
nr.MaxBytes = o.maxBytes
if 2*o.hb < expires {
nr.Heartbeat = o.hb
} else {
nr.Heartbeat = 0
}
req, _ := json.Marshal(nr)
return nc.PublishRequest(nms, rply, req)
if err := nc.PublishRequest(nms, rply, req); err != nil {
return err
}
if o.hb > 0 {
if hbTimer == nil {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbErr = ErrNoHeartbeat
cancel()
})
} else {
hbTimer.Reset(2 * o.hb)
}
}
return nil
}
err = sendReq()
@@ -2822,6 +2885,9 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
// Ask for next message and wait if there are no messages
msg, err = sub.nextMsgWithContext(ctx, true, true)
if err == nil {
if hbTimer != nil {
hbTimer.Reset(2 * o.hb)
}
var usrMsg bool
usrMsg, err = checkMsg(msg, true, noWait)
@@ -2840,9 +2906,15 @@ func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error) {
}
}
}
if hbTimer != nil {
hbTimer.Stop()
}
}
// If there is at least a message added to msgs, then need to return OK and no error
if err != nil && len(msgs) == 0 {
if hbErr != nil {
return nil, hbErr
}
return nil, o.checkCtxErr(err)
}
return msgs, nil
@@ -2970,14 +3042,24 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
// If the context did not have a deadline, then create a new child context
// that will use the default timeout from the JS context.
ctx, cancel = context.WithTimeout(ctx, ttl)
} else {
ctx, cancel = context.WithCancel(ctx)
}
defer func() {
// only cancel the context here if we are sure the fetching goroutine has not been started yet
if cancel != nil && cancelContext {
if cancelContext {
cancel()
}
}()
// if heartbeat is set, validate it against the context timeout
if o.hb > 0 {
deadline, _ := ctx.Deadline()
if 2*o.hb >= time.Until(deadline) {
return nil, fmt.Errorf("%w: idle heartbeat value too large", ErrInvalidArg)
}
}
// Check if context not done already before making the request.
select {
case <-ctx.Done():
@@ -3031,9 +3113,10 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
requestBatch := batch - len(result.msgs)
req := nextRequest{
Expires: expires,
Batch: requestBatch,
MaxBytes: o.maxBytes,
Expires: expires,
Batch: requestBatch,
MaxBytes: o.maxBytes,
Heartbeat: o.hb,
}
reqJSON, err := json.Marshal(req)
if err != nil {
@@ -3051,11 +3134,17 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
result.err = err
return result, nil
}
var hbTimer *time.Timer
var hbErr error
if o.hb > 0 {
hbTimer = time.AfterFunc(2*o.hb, func() {
hbErr = ErrNoHeartbeat
cancel()
})
}
cancelContext = false
go func() {
if cancel != nil {
defer cancel()
}
defer cancel()
var requestMsgs int
for requestMsgs < requestBatch {
// Ask for next message and wait if there are no messages
@@ -3063,6 +3152,9 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
if err != nil {
break
}
if hbTimer != nil {
hbTimer.Reset(2 * o.hb)
}
var usrMsg bool
usrMsg, err = checkMsg(msg, true, false)
@@ -3082,7 +3174,11 @@ func (sub *Subscription) FetchBatch(batch int, opts ...PullOpt) (MessageBatch, e
}
}
if err != nil {
result.err = o.checkCtxErr(err)
if hbErr != nil {
result.err = hbErr
} else {
result.err = o.checkCtxErr(err)
}
}
close(result.msgs)
result.done <- struct{}{}
@@ -3654,6 +3750,53 @@ func (st *StorageType) UnmarshalJSON(data []byte) error {
return nil
}
type StoreCompression uint8
const (
NoCompression StoreCompression = iota
S2Compression
)
func (alg StoreCompression) String() string {
switch alg {
case NoCompression:
return "None"
case S2Compression:
return "S2"
default:
return "Unknown StoreCompression"
}
}
func (alg StoreCompression) MarshalJSON() ([]byte, error) {
var str string
switch alg {
case S2Compression:
str = "s2"
case NoCompression:
str = "none"
default:
return nil, fmt.Errorf("unknown compression algorithm")
}
return json.Marshal(str)
}
func (alg *StoreCompression) UnmarshalJSON(b []byte) error {
var str string
if err := json.Unmarshal(b, &str); err != nil {
return err
}
switch str {
case "s2":
*alg = S2Compression
case "none":
*alg = NoCompression
default:
return fmt.Errorf("unknown compression algorithm")
}
return nil
}
// Length of our hash used for named consumers.
const nameHashLen = 8
+45 -4
View File
@@ -33,6 +33,26 @@ var (
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}
// ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}
// ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}
// ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"}
// ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourceing with multiple subject transforms not supported by nats-server"}
// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}
@@ -42,6 +62,15 @@ var (
// ErrBadRequest is returned when invalid request is sent to JetStream API.
ErrBadRequest JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeBadRequest, Description: "bad request", Code: 400}}
// ErrDuplicateFilterSubjects is returned when both FilterSubject and FilterSubjects are specified when creating consumer.
ErrDuplicateFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeDuplicateFilterSubjects, Description: "consumer cannot have both FilterSubject and FilterSubjects specified", Code: 500}}
// ErrDuplicateFilterSubjects is returned when filter subjects overlap when creating consumer.
ErrOverlappingFilterSubjects JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeOverlappingFilterSubjects, Description: "consumer subject filters cannot overlap", Code: 500}}
// ErrEmptyFilter is returned when a filter in FilterSubjects is empty.
ErrEmptyFilter JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerEmptyFilter, Description: "consumer filter in FilterSubjects cannot be empty", Code: 500}}
// Client errors
// ErrConsumerNameAlreadyInUse is an error returned when consumer with given name already exists.
@@ -62,6 +91,11 @@ var (
// ErrConsumerNameRequired is returned when the provided consumer durable name is empty.
ErrConsumerNameRequired JetStreamError = &jsError{message: "consumer name is required"}
// ErrConsumerMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// multiple filter subjects with filter_subjects field. If this error is returned when executing AddConsumer(), the consumer with invalid
// configuration was already created in the server.
ErrConsumerMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "multiple consumer filter subjects not supported by nats-server"}
// ErrConsumerConfigRequired is returned when empty consumer consuguration is supplied to add/update consumer.
ErrConsumerConfigRequired JetStreamError = &jsError{message: "consumer configuration is required"}
@@ -104,6 +138,9 @@ var (
// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"}
// ErrNoHeartbeat is returned when no heartbeat is received from server when sending requests with pull consumer.
ErrNoHeartbeat JetStreamError = &jsError{message: "no heartbeat received"}
// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
@@ -120,13 +157,17 @@ const (
JSErrCodeStreamNotFound ErrorCode = 10059
JSErrCodeStreamNameInUse ErrorCode = 10058
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeConsumerNotFound ErrorCode = 10014
JSErrCodeConsumerNameExists ErrorCode = 10013
JSErrCodeConsumerAlreadyExists ErrorCode = 10105
JSErrCodeDuplicateFilterSubjects ErrorCode = 10136
JSErrCodeOverlappingFilterSubjects ErrorCode = 10138
JSErrCodeConsumerEmptyFilter ErrorCode = 10139
JSErrCodeMessageNotFound ErrorCode = 10037
JSErrCodeBadRequest ErrorCode = 10003
JSErrCodeBadRequest ErrorCode = 10003
JSStreamInvalidConfig ErrorCode = 10052
JSErrCodeStreamWrongLastSequence ErrorCode = 10071
)
+101 -35
View File
@@ -102,30 +102,35 @@ type JetStreamManager interface {
// There are sensible defaults for most. If no subjects are
// given the name will be used as the only subject.
type StreamConfig struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Subjects []string `json:"subjects,omitempty"`
Retention RetentionPolicy `json:"retention"`
MaxConsumers int `json:"max_consumers"`
MaxMsgs int64 `json:"max_msgs"`
MaxBytes int64 `json:"max_bytes"`
Discard DiscardPolicy `json:"discard"`
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
MaxAge time.Duration `json:"max_age"`
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
Storage StorageType `json:"storage"`
Replicas int `json:"num_replicas"`
NoAck bool `json:"no_ack,omitempty"`
Template string `json:"template_owner,omitempty"`
Duplicates time.Duration `json:"duplicate_window,omitempty"`
Placement *Placement `json:"placement,omitempty"`
Mirror *StreamSource `json:"mirror,omitempty"`
Sources []*StreamSource `json:"sources,omitempty"`
Sealed bool `json:"sealed,omitempty"`
DenyDelete bool `json:"deny_delete,omitempty"`
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
Compression StoreCompression `json:"compression"`
FirstSeq uint64 `json:"first_seq,omitempty"`
// Allow applying a subject transform to incoming messages before doing anything else.
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
// Allow republish of the message after being sequenced and stored.
RePublish *RePublish `json:"republish,omitempty"`
@@ -134,6 +139,20 @@ type StreamConfig struct {
AllowDirect bool `json:"allow_direct"`
// Allow higher performance and unified direct access for mirrors as well.
MirrorDirect bool `json:"mirror_direct"`
// Limits for consumers on this stream.
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`
// Metadata is additional metadata for the Stream.
// Keys starting with `_nats` are reserved.
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
}
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
type SubjectTransformConfig struct {
Source string `json:"src,omitempty"`
Destination string `json:"dest"`
}
// RePublish is for republishing messages once committed to a stream. The original
@@ -152,12 +171,13 @@ type Placement struct {
// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}
// ExternalStream allows you to qualify access to a stream source in another
@@ -167,6 +187,13 @@ type ExternalStream struct {
DeliverPrefix string `json:"deliver,omitempty"`
}
// StreamConsumerLimits are the limits for a consumer on a stream.
// These can be overridden on a per consumer basis.
type StreamConsumerLimits struct {
InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
}
// Helper for copying when we do not want to change user's version.
func (ss *StreamSource) copy() *StreamSource {
nss := *ss
@@ -407,6 +434,11 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
}
return nil, info.Error
}
// check whether multiple filter subjects (if used) are reflected in the returned ConsumerInfo
if len(cfg.FilterSubjects) != 0 && len(info.Config.FilterSubjects) == 0 {
return nil, ErrConsumerMultipleFilterSubjectsNotSupported
}
return info.ConsumerInfo, nil
}
@@ -780,6 +812,21 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, resp.Error
}
// check that input subject transform (if used) is reflected in the returned ConsumerInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}
if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}
return resp.StreamInfo, nil
}
@@ -897,11 +944,13 @@ type StreamAlternate struct {
// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}
// StreamState is information about the given stream.
@@ -973,6 +1022,23 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
}
return nil, resp.Error
}
// check that input subject transform (if used) is reflected in the returned StreamInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}
if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Config.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}
return resp.StreamInfo, nil
}
+10 -3
View File
@@ -432,14 +432,21 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
scfg.Mirror = m
scfg.MirrorDirect = true
} else if len(cfg.Sources) > 0 {
// For now we do not allow direct subjects for sources. If that is desired a user could use stream API directly.
for _, ss := range cfg.Sources {
if !strings.HasPrefix(ss.Name, kvBucketNamePre) {
ss = ss.copy()
var sourceBucketName string
if strings.HasPrefix(ss.Name, kvBucketNamePre) {
sourceBucketName = ss.Name[len(kvBucketNamePre):]
} else {
sourceBucketName = ss.Name
ss.Name = fmt.Sprintf(kvBucketNameTmpl, ss.Name)
}
if ss.External == nil || sourceBucketName != cfg.Bucket {
ss.SubjectTransforms = []SubjectTransformConfig{{Source: fmt.Sprintf(kvSubjectsTmpl, sourceBucketName), Destination: fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}}
}
scfg.Sources = append(scfg.Sources, ss)
}
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
} else {
scfg.Subjects = []string{fmt.Sprintf(kvSubjectsTmpl, cfg.Bucket)}
}
+5 -3
View File
@@ -47,7 +47,7 @@ import (
// Default Constants
const (
Version = "1.29.0"
Version = "1.30.2"
DefaultURL = "nats://127.0.0.1:4222"
DefaultPort = 4222
DefaultMaxReconnect = 60
@@ -3168,8 +3168,10 @@ func (nc *Conn) processMsg(data []byte) {
}
}
// Skip processing if this is a control message.
if !ctrlMsg {
// Skip processing if this is a control message and
// if not a pull consumer heartbeat. For pull consumers,
// heartbeats have to be handled on per request basis.
if !ctrlMsg || (jsi != nil && jsi.pull) {
var chanSubCheckFC bool
// Subscription internal stats (applicable only for non ChanSubscription's)
if sub.typ != ChanSubscription {
+15 -3
View File
@@ -149,6 +149,10 @@ type ObjectStoreConfig struct {
Storage StorageType `json:"storage,omitempty"`
Replicas int `json:"num_replicas,omitempty"`
Placement *Placement `json:"placement,omitempty"`
// Bucket-specific metadata
// NOTE: Metadata requires nats-server v2.10.0+
Metadata map[string]string `json:"metadata,omitempty"`
}
type ObjectStoreStatus interface {
@@ -168,6 +172,8 @@ type ObjectStoreStatus interface {
Size() uint64
// BackingStore provides details about the underlying storage
BackingStore() string
// Metadata is the user supplied metadata for the bucket
Metadata() map[string]string
}
// ObjectMetaOptions
@@ -178,9 +184,10 @@ type ObjectMetaOptions struct {
// ObjectMeta is high level information about an object.
type ObjectMeta struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
Headers Header `json:"headers,omitempty"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Headers Header `json:"headers,omitempty"`
Metadata map[string]string `json:"metadata,omitempty"`
// Optional options.
Opts *ObjectMetaOptions `json:"options,omitempty"`
@@ -272,6 +279,7 @@ func (js *js) CreateObjectStore(cfg *ObjectStoreConfig) (ObjectStore, error) {
Discard: DiscardNew,
AllowRollup: true,
AllowDirect: true,
Metadata: cfg.Metadata,
}
// Create our stream.
@@ -966,6 +974,7 @@ func (obs *obs) UpdateMeta(name string, meta *ObjectMeta) error {
info.Name = meta.Name
info.Description = meta.Description
info.Headers = meta.Headers
info.Metadata = meta.Metadata
// Prepare the meta message
if err = publishMeta(info, obs.js); err != nil {
@@ -1189,6 +1198,9 @@ func (s *ObjectBucketStatus) Size() uint64 { return s.nfo.State.Bytes }
// BackingStore indicates what technology is used for storage of the bucket
func (s *ObjectBucketStatus) BackingStore() string { return "JetStream" }
// Metadata is the metadata supplied when creating the bucket
func (s *ObjectBucketStatus) Metadata() map[string]string { return s.nfo.Config.Metadata }
// StreamInfo is the stream info retrieved to create the status
func (s *ObjectBucketStatus) StreamInfo() *StreamInfo { return s.nfo }
+3 -3
View File
@@ -1345,7 +1345,7 @@ github.com/mschoch/smat
# github.com/nats-io/jwt/v2 v2.5.2
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.1
# github.com/nats-io/nats-server/v2 v2.10.2
## explicit; go 1.20
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/ldap
@@ -1356,8 +1356,8 @@ github.com/nats-io/nats-server/v2/server/certidp
github.com/nats-io/nats-server/v2/server/certstore
github.com/nats-io/nats-server/v2/server/pse
github.com/nats-io/nats-server/v2/server/sysmem
# github.com/nats-io/nats.go v1.29.0
## explicit; go 1.19
# github.com/nats-io/nats.go v1.30.2
## explicit; go 1.20
github.com/nats-io/nats.go
github.com/nats-io/nats.go/encoders/builtin
github.com/nats-io/nats.go/internal/parser