mirror of
https://github.com/opencloud-eu/opencloud.git
synced 2026-05-07 03:50:30 -05:00
build(deps): bump github.com/nats-io/nats.go from 1.34.1 to 1.35.0
Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from 1.34.1 to 1.35.0. - [Release notes](https://github.com/nats-io/nats.go/releases) - [Commits](https://github.com/nats-io/nats.go/compare/v1.34.1...v1.35.0) --- updated-dependencies: - dependency-name: github.com/nats-io/nats.go dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
Ralf Haferkamp
parent
fa70142cdb
commit
60e88860c0
@@ -64,7 +64,7 @@ require (
|
||||
github.com/mna/pigeon v1.2.1
|
||||
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
|
||||
github.com/nats-io/nats-server/v2 v2.10.15
|
||||
github.com/nats-io/nats.go v1.34.1
|
||||
github.com/nats-io/nats.go v1.35.0
|
||||
github.com/oklog/run v1.1.0
|
||||
github.com/olekukonko/tablewriter v0.0.5
|
||||
github.com/onsi/ginkgo v1.16.5
|
||||
|
||||
@@ -1759,8 +1759,8 @@ github.com/nats-io/jwt/v2 v2.5.6 h1:Cp618+z4q042sWqHiSoIHFT08OZtAskui0hTmRfmGGQ=
|
||||
github.com/nats-io/jwt/v2 v2.5.6/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
|
||||
github.com/nats-io/nats-server/v2 v2.10.15 h1:O/l+ZT91ltMiiRJKjWLQJcGg7ypzjlb/bC5bFIRVw3M=
|
||||
github.com/nats-io/nats-server/v2 v2.10.15/go.mod h1:ul+pGt5I7e4U+nI09ZFDG4vqM+6Ce2Tou7UbVSnLiIw=
|
||||
github.com/nats-io/nats.go v1.34.1 h1:syWey5xaNHZgicYBemv0nohUPPmaLteiBEUT6Q5+F/4=
|
||||
github.com/nats-io/nats.go v1.34.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nats.go v1.35.0 h1:XFNqNM7v5B+MQMKqVGAyHwYhyKb48jrenXNxIU20ULk=
|
||||
github.com/nats-io/nats.go v1.35.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
|
||||
+14
-1
@@ -31,7 +31,7 @@ When using or transitioning to Go modules support:
|
||||
```bash
|
||||
# Go client latest or explicit version
|
||||
go get github.com/nats-io/nats.go/@latest
|
||||
go get github.com/nats-io/nats.go/@v1.34.1
|
||||
go get github.com/nats-io/nats.go/@v1.35.0
|
||||
|
||||
# For latest NATS Server, add /v2 at the end
|
||||
go get github.com/nats-io/nats-server/v2
|
||||
@@ -474,6 +474,19 @@ resp := &response{}
|
||||
err := c.RequestWithContext(ctx, "foo", req, resp)
|
||||
```
|
||||
|
||||
## Backwards compatibility
|
||||
|
||||
In the development of nats.go, we are committed to maintaining backward compatibility and ensuring a stable and reliable experience for all users. In general, we follow the standard go compatibility guidelines.
|
||||
However, it's important to clarify our stance on certain types of changes:
|
||||
|
||||
- **Expanding structures:**
|
||||
Adding new fields to structs is not considered a breaking change.
|
||||
|
||||
- **Adding methods to exported interfaces:**
|
||||
Extending public interfaces with new methods is also not viewed as a breaking change within the context of this project. It is important to note that no unexported methods will be added to interfaces allowing users to implement them.
|
||||
|
||||
Additionally, this library always supports at least 2 latest minor Go versions. For example, if the latest Go version is 1.22, the library will support Go 1.21 and 1.22.
|
||||
|
||||
## License
|
||||
|
||||
Unless otherwise noted, the NATS source files are distributed
|
||||
|
||||
+7
@@ -22,6 +22,10 @@ var (
|
||||
// API errors
|
||||
|
||||
// ErrJetStreamNotEnabled is an error returned when JetStream is not enabled for an account.
|
||||
//
|
||||
// Note: This error will not be returned in clustered mode, even if each
|
||||
// server in the cluster does not have JetStream enabled. In clustered mode,
|
||||
// requests will time out instead.
|
||||
ErrJetStreamNotEnabled JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeJetStreamNotEnabled, Description: "jetstream not enabled", Code: 503}}
|
||||
|
||||
// ErrJetStreamNotEnabledForAccount is an error returned when JetStream is not enabled for an account.
|
||||
@@ -120,6 +124,9 @@ var (
|
||||
// ErrInvalidConsumerName is returned when the provided consumer name is invalid (contains '.' or ' ').
|
||||
ErrInvalidConsumerName JetStreamError = &jsError{message: "invalid consumer name"}
|
||||
|
||||
// ErrInvalidFilterSubject is returned when the provided filter subject is invalid.
|
||||
ErrInvalidFilterSubject JetStreamError = &jsError{message: "invalid filter subject"}
|
||||
|
||||
// ErrNoMatchingStream is returned when stream lookup by subject is unsuccessful.
|
||||
ErrNoMatchingStream JetStreamError = &jsError{message: "no stream matches subject"}
|
||||
|
||||
|
||||
+137
-37
@@ -106,51 +106,143 @@ type JetStreamManager interface {
|
||||
// There are sensible defaults for most. If no subjects are
|
||||
// given the name will be used as the only subject.
|
||||
type StreamConfig struct {
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description,omitempty"`
|
||||
Subjects []string `json:"subjects,omitempty"`
|
||||
Retention RetentionPolicy `json:"retention"`
|
||||
MaxConsumers int `json:"max_consumers"`
|
||||
MaxMsgs int64 `json:"max_msgs"`
|
||||
MaxBytes int64 `json:"max_bytes"`
|
||||
Discard DiscardPolicy `json:"discard"`
|
||||
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
|
||||
MaxAge time.Duration `json:"max_age"`
|
||||
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
|
||||
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
|
||||
Storage StorageType `json:"storage"`
|
||||
Replicas int `json:"num_replicas"`
|
||||
NoAck bool `json:"no_ack,omitempty"`
|
||||
Template string `json:"template_owner,omitempty"`
|
||||
Duplicates time.Duration `json:"duplicate_window,omitempty"`
|
||||
Placement *Placement `json:"placement,omitempty"`
|
||||
Mirror *StreamSource `json:"mirror,omitempty"`
|
||||
Sources []*StreamSource `json:"sources,omitempty"`
|
||||
Sealed bool `json:"sealed,omitempty"`
|
||||
DenyDelete bool `json:"deny_delete,omitempty"`
|
||||
DenyPurge bool `json:"deny_purge,omitempty"`
|
||||
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
|
||||
Compression StoreCompression `json:"compression"`
|
||||
FirstSeq uint64 `json:"first_seq,omitempty"`
|
||||
// Name is the name of the stream. It is required and must be unique
|
||||
// across the JetStream account.
|
||||
//
|
||||
// Name Names cannot contain whitespace, ., *, >, path separators
|
||||
// (forward or backwards slash), and non-printable characters.
|
||||
Name string `json:"name"`
|
||||
|
||||
// Allow applying a subject transform to incoming messages before doing anything else.
|
||||
// Description is an optional description of the stream.
|
||||
Description string `json:"description,omitempty"`
|
||||
|
||||
// Subjects is a list of subjects that the stream is listening on.
|
||||
// Wildcards are supported. Subjects cannot be set if the stream is
|
||||
// created as a mirror.
|
||||
Subjects []string `json:"subjects,omitempty"`
|
||||
|
||||
// Retention defines the message retention policy for the stream.
|
||||
// Defaults to LimitsPolicy.
|
||||
Retention RetentionPolicy `json:"retention"`
|
||||
|
||||
// MaxConsumers specifies the maximum number of consumers allowed for
|
||||
// the stream.
|
||||
MaxConsumers int `json:"max_consumers"`
|
||||
|
||||
// MaxMsgs is the maximum number of messages the stream will store.
|
||||
// After reaching the limit, stream adheres to the discard policy.
|
||||
// If not set, server default is -1 (unlimited).
|
||||
MaxMsgs int64 `json:"max_msgs"`
|
||||
|
||||
// MaxBytes is the maximum total size of messages the stream will store.
|
||||
// After reaching the limit, stream adheres to the discard policy.
|
||||
// If not set, server default is -1 (unlimited).
|
||||
MaxBytes int64 `json:"max_bytes"`
|
||||
|
||||
// Discard defines the policy for handling messages when the stream
|
||||
// reaches its limits in terms of number of messages or total bytes.
|
||||
Discard DiscardPolicy `json:"discard"`
|
||||
|
||||
// DiscardNewPerSubject is a flag to enable discarding new messages per
|
||||
// subject when limits are reached. Requires DiscardPolicy to be
|
||||
// DiscardNew and the MaxMsgsPerSubject to be set.
|
||||
DiscardNewPerSubject bool `json:"discard_new_per_subject,omitempty"`
|
||||
|
||||
// MaxAge is the maximum age of messages that the stream will retain.
|
||||
MaxAge time.Duration `json:"max_age"`
|
||||
|
||||
// MaxMsgsPerSubject is the maximum number of messages per subject that
|
||||
// the stream will retain.
|
||||
MaxMsgsPerSubject int64 `json:"max_msgs_per_subject"`
|
||||
|
||||
// MaxMsgSize is the maximum size of any single message in the stream.
|
||||
MaxMsgSize int32 `json:"max_msg_size,omitempty"`
|
||||
|
||||
// Storage specifies the type of storage backend used for the stream
|
||||
// (file or memory).
|
||||
Storage StorageType `json:"storage"`
|
||||
|
||||
// Replicas is the number of stream replicas in clustered JetStream.
|
||||
// Defaults to 1, maximum is 5.
|
||||
Replicas int `json:"num_replicas"`
|
||||
|
||||
// NoAck is a flag to disable acknowledging messages received by this
|
||||
// stream.
|
||||
//
|
||||
// If set to true, publish methods from the JetStream client will not
|
||||
// work as expected, since they rely on acknowledgements. Core NATS
|
||||
// publish methods should be used instead. Note that this will make
|
||||
// message delivery less reliable.
|
||||
NoAck bool `json:"no_ack,omitempty"`
|
||||
|
||||
// Duplicates is the window within which to track duplicate messages.
|
||||
// If not set, server default is 2 minutes.
|
||||
Duplicates time.Duration `json:"duplicate_window,omitempty"`
|
||||
|
||||
// Placement is used to declare where the stream should be placed via
|
||||
// tags and/or an explicit cluster name.
|
||||
Placement *Placement `json:"placement,omitempty"`
|
||||
|
||||
// Mirror defines the configuration for mirroring another stream.
|
||||
Mirror *StreamSource `json:"mirror,omitempty"`
|
||||
|
||||
// Sources is a list of other streams this stream sources messages from.
|
||||
Sources []*StreamSource `json:"sources,omitempty"`
|
||||
|
||||
// Sealed streams do not allow messages to be published or deleted via limits or API,
|
||||
// sealed streams can not be unsealed via configuration update. Can only
|
||||
// be set on already created streams via the Update API.
|
||||
Sealed bool `json:"sealed,omitempty"`
|
||||
|
||||
// DenyDelete restricts the ability to delete messages from a stream via
|
||||
// the API. Defaults to false.
|
||||
DenyDelete bool `json:"deny_delete,omitempty"`
|
||||
|
||||
// DenyPurge restricts the ability to purge messages from a stream via
|
||||
// the API. Defaults to false.
|
||||
DenyPurge bool `json:"deny_purge,omitempty"`
|
||||
|
||||
// AllowRollup allows the use of the Nats-Rollup header to replace all
|
||||
// contents of a stream, or subject in a stream, with a single new
|
||||
// message.
|
||||
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`
|
||||
|
||||
// Compression specifies the message storage compression algorithm.
|
||||
// Defaults to NoCompression.
|
||||
Compression StoreCompression `json:"compression"`
|
||||
|
||||
// FirstSeq is the initial sequence number of the first message in the
|
||||
// stream.
|
||||
FirstSeq uint64 `json:"first_seq,omitempty"`
|
||||
|
||||
// SubjectTransform allows applying a transformation to matching
|
||||
// messages' subjects.
|
||||
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`
|
||||
|
||||
// Allow republish of the message after being sequenced and stored.
|
||||
// RePublish allows immediate republishing a message to the configured
|
||||
// subject after it's stored.
|
||||
RePublish *RePublish `json:"republish,omitempty"`
|
||||
|
||||
// Allow higher performance, direct access to get individual messages. E.g. KeyValue
|
||||
// AllowDirect enables direct access to individual messages using direct
|
||||
// get API. Defaults to false.
|
||||
AllowDirect bool `json:"allow_direct"`
|
||||
// Allow higher performance and unified direct access for mirrors as well.
|
||||
|
||||
// MirrorDirect enables direct access to individual messages from the
|
||||
// origin stream using direct get API. Defaults to false.
|
||||
MirrorDirect bool `json:"mirror_direct"`
|
||||
|
||||
// Limits for consumers on this stream.
|
||||
// ConsumerLimits defines limits of certain values that consumers can
|
||||
// set, defaults for those who don't set these settings
|
||||
ConsumerLimits StreamConsumerLimits `json:"consumer_limits,omitempty"`
|
||||
|
||||
// Metadata is additional metadata for the Stream.
|
||||
// Keys starting with `_nats` are reserved.
|
||||
// NOTE: Metadata requires nats-server v2.10.0+
|
||||
// Metadata is a set of application-defined key-value pairs for
|
||||
// associating metadata on the stream. This feature requires nats-server
|
||||
// v2.10.0 or later.
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
|
||||
// Template identifies the template that manages the Stream. DEPRECATED:
|
||||
// This feature is no longer supported.
|
||||
Template string `json:"template_owner,omitempty"`
|
||||
}
|
||||
|
||||
// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received.
|
||||
@@ -288,9 +380,13 @@ type accountInfoResponse struct {
|
||||
AccountInfo
|
||||
}
|
||||
|
||||
// AccountInfo retrieves info about the JetStream usage from the current account.
|
||||
// If JetStream is not enabled, this will return ErrJetStreamNotEnabled
|
||||
// Other errors can happen but are generally considered retryable
|
||||
// AccountInfo fetches account information from the server, containing details
|
||||
// about the account associated with this JetStream connection. If account is
|
||||
// not enabled for JetStream, ErrJetStreamNotEnabledForAccount is returned.
|
||||
//
|
||||
// If the server does not have JetStream enabled, ErrJetStreamNotEnabled is
|
||||
// returned (for a single server setup). For clustered topologies, AccountInfo
|
||||
// will time out.
|
||||
func (js *js) AccountInfo(opts ...JSOpt) (*AccountInfo, error) {
|
||||
o, cancel, err := getJSContextOpts(js.opts, opts...)
|
||||
if err != nil {
|
||||
@@ -410,6 +506,10 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o
|
||||
// if filter subject is empty or ">", use the endpoint without filter subject
|
||||
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
|
||||
} else {
|
||||
// safeguard against passing invalid filter subject in request subject
|
||||
if cfg.FilterSubject[0] == '.' || cfg.FilterSubject[len(cfg.FilterSubject)-1] == '.' {
|
||||
return nil, fmt.Errorf("%w: %q", ErrInvalidFilterSubject, cfg.FilterSubject)
|
||||
}
|
||||
// if filter subject is not empty, use the endpoint with filter subject
|
||||
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
|
||||
}
|
||||
|
||||
+36
-18
@@ -249,22 +249,22 @@ func purge() DeleteOpt {
|
||||
|
||||
// KeyValueConfig is for configuring a KeyValue store.
|
||||
type KeyValueConfig struct {
|
||||
Bucket string
|
||||
Description string
|
||||
MaxValueSize int32
|
||||
History uint8
|
||||
TTL time.Duration
|
||||
MaxBytes int64
|
||||
Storage StorageType
|
||||
Replicas int
|
||||
Placement *Placement
|
||||
RePublish *RePublish
|
||||
Mirror *StreamSource
|
||||
Sources []*StreamSource
|
||||
Bucket string `json:"bucket"`
|
||||
Description string `json:"description,omitempty"`
|
||||
MaxValueSize int32 `json:"max_value_size,omitempty"`
|
||||
History uint8 `json:"history,omitempty"`
|
||||
TTL time.Duration `json:"ttl,omitempty"`
|
||||
MaxBytes int64 `json:"max_bytes,omitempty"`
|
||||
Storage StorageType `json:"storage,omitempty"`
|
||||
Replicas int `json:"num_replicas,omitempty"`
|
||||
Placement *Placement `json:"placement,omitempty"`
|
||||
RePublish *RePublish `json:"republish,omitempty"`
|
||||
Mirror *StreamSource `json:"mirror,omitempty"`
|
||||
Sources []*StreamSource `json:"sources,omitempty"`
|
||||
|
||||
// Enable underlying stream compression.
|
||||
// NOTE: Compression is supported for nats-server 2.10.0+
|
||||
Compression bool
|
||||
Compression bool `json:"compression,omitempty"`
|
||||
}
|
||||
|
||||
// Used to watch all keys.
|
||||
@@ -344,8 +344,9 @@ const (
|
||||
|
||||
// Regex for valid keys and buckets.
|
||||
var (
|
||||
validBucketRe = regexp.MustCompile(`\A[a-zA-Z0-9_-]+\z`)
|
||||
validKeyRe = regexp.MustCompile(`\A[-/_=\.a-zA-Z0-9]+\z`)
|
||||
validBucketRe = regexp.MustCompile(`^[a-zA-Z0-9_-]+$`)
|
||||
validKeyRe = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9]+$`)
|
||||
validSearchKeyRe = regexp.MustCompile(`^[-/_=\.a-zA-Z0-9*]*[>]?$`)
|
||||
)
|
||||
|
||||
// KeyValue will lookup and bind to an existing KeyValue store.
|
||||
@@ -353,7 +354,7 @@ func (js *js) KeyValue(bucket string) (KeyValue, error) {
|
||||
if !js.nc.serverMinVersion(2, 6, 2) {
|
||||
return nil, errors.New("nats: key-value requires at least server version 2.6.2")
|
||||
}
|
||||
if !validBucketRe.MatchString(bucket) {
|
||||
if !bucketValid(bucket) {
|
||||
return nil, ErrInvalidBucketName
|
||||
}
|
||||
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
|
||||
@@ -381,7 +382,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
|
||||
if cfg == nil {
|
||||
return nil, ErrKeyValueConfigRequired
|
||||
}
|
||||
if !validBucketRe.MatchString(cfg.Bucket) {
|
||||
if !bucketValid(cfg.Bucket) {
|
||||
return nil, ErrInvalidBucketName
|
||||
}
|
||||
if _, err := js.AccountInfo(); err != nil {
|
||||
@@ -507,7 +508,7 @@ func (js *js) CreateKeyValue(cfg *KeyValueConfig) (KeyValue, error) {
|
||||
|
||||
// DeleteKeyValue will delete this KeyValue store (JetStream stream).
|
||||
func (js *js) DeleteKeyValue(bucket string) error {
|
||||
if !validBucketRe.MatchString(bucket) {
|
||||
if !bucketValid(bucket) {
|
||||
return ErrInvalidBucketName
|
||||
}
|
||||
stream := fmt.Sprintf(kvBucketNameTmpl, bucket)
|
||||
@@ -547,6 +548,13 @@ func (e *kve) Created() time.Time { return e.created }
|
||||
func (e *kve) Delta() uint64 { return e.delta }
|
||||
func (e *kve) Operation() KeyValueOp { return e.op }
|
||||
|
||||
func bucketValid(bucket string) bool {
|
||||
if len(bucket) == 0 {
|
||||
return false
|
||||
}
|
||||
return validBucketRe.MatchString(bucket)
|
||||
}
|
||||
|
||||
func keyValid(key string) bool {
|
||||
if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
|
||||
return false
|
||||
@@ -554,6 +562,13 @@ func keyValid(key string) bool {
|
||||
return validKeyRe.MatchString(key)
|
||||
}
|
||||
|
||||
func searchKeyValid(key string) bool {
|
||||
if len(key) == 0 || key[0] == '.' || key[len(key)-1] == '.' {
|
||||
return false
|
||||
}
|
||||
return validSearchKeyRe.MatchString(key)
|
||||
}
|
||||
|
||||
// Get returns the latest value for the key.
|
||||
func (kv *kvs) Get(key string) (KeyValueEntry, error) {
|
||||
e, err := kv.get(key, kvLatestRevision)
|
||||
@@ -951,6 +966,9 @@ func (kv *kvs) WatchAll(opts ...WatchOpt) (KeyWatcher, error) {
|
||||
// Watch will fire the callback when a key that matches the keys pattern is updated.
|
||||
// keys needs to be a valid NATS subject.
|
||||
func (kv *kvs) Watch(keys string, opts ...WatchOpt) (KeyWatcher, error) {
|
||||
if !searchKeyValid(keys) {
|
||||
return nil, fmt.Errorf("%w: %s", ErrInvalidKey, "keys cannot be empty and must be a valid NATS subject")
|
||||
}
|
||||
var o watchOpts
|
||||
for _, opt := range opts {
|
||||
if opt != nil {
|
||||
|
||||
+63
-15
@@ -47,7 +47,7 @@ import (
|
||||
|
||||
// Default Constants
|
||||
const (
|
||||
Version = "1.34.1"
|
||||
Version = "1.35.0"
|
||||
DefaultURL = "nats://127.0.0.1:4222"
|
||||
DefaultPort = 4222
|
||||
DefaultMaxReconnect = 60
|
||||
@@ -2161,6 +2161,47 @@ func (nc *Conn) waitForExits() {
|
||||
nc.wg.Wait()
|
||||
}
|
||||
|
||||
// ForceReconnect forces a reconnect attempt to the server.
|
||||
// This is a non-blocking call and will start the reconnect
|
||||
// process without waiting for it to complete.
|
||||
//
|
||||
// If the connection is already in the process of reconnecting,
|
||||
// this call will force an immediate reconnect attempt (bypassing
|
||||
// the current reconnect delay).
|
||||
func (nc *Conn) ForceReconnect() error {
|
||||
nc.mu.Lock()
|
||||
defer nc.mu.Unlock()
|
||||
|
||||
if nc.isClosed() {
|
||||
return ErrConnectionClosed
|
||||
}
|
||||
if nc.isReconnecting() {
|
||||
// if we're already reconnecting, force a reconnect attempt
|
||||
// even if we're in the middle of a backoff
|
||||
if nc.rqch != nil {
|
||||
close(nc.rqch)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Clear any queued pongs
|
||||
nc.clearPendingFlushCalls()
|
||||
|
||||
// Clear any queued and blocking requests.
|
||||
nc.clearPendingRequestCalls()
|
||||
|
||||
// Stop ping timer if set.
|
||||
nc.stopPingTimer()
|
||||
|
||||
// Go ahead and make sure we have flushed the outbound
|
||||
nc.bw.flush()
|
||||
nc.conn.Close()
|
||||
|
||||
nc.changeConnStatus(RECONNECTING)
|
||||
go nc.doReconnect(nil, true)
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConnectedUrl reports the connected server's URL
|
||||
func (nc *Conn) ConnectedUrl() string {
|
||||
if nc == nil {
|
||||
@@ -2420,7 +2461,7 @@ func (nc *Conn) connect() (bool, error) {
|
||||
nc.setup()
|
||||
nc.changeConnStatus(RECONNECTING)
|
||||
nc.bw.switchToPending()
|
||||
go nc.doReconnect(ErrNoServers)
|
||||
go nc.doReconnect(ErrNoServers, false)
|
||||
err = nil
|
||||
} else {
|
||||
nc.current = nil
|
||||
@@ -2720,7 +2761,7 @@ func (nc *Conn) stopPingTimer() {
|
||||
|
||||
// Try to reconnect using the option parameters.
|
||||
// This function assumes we are allowed to reconnect.
|
||||
func (nc *Conn) doReconnect(err error) {
|
||||
func (nc *Conn) doReconnect(err error, forceReconnect bool) {
|
||||
// We want to make sure we have the other watchers shutdown properly
|
||||
// here before we proceed past this point.
|
||||
nc.waitForExits()
|
||||
@@ -2776,7 +2817,8 @@ func (nc *Conn) doReconnect(err error) {
|
||||
break
|
||||
}
|
||||
|
||||
doSleep := i+1 >= len(nc.srvPool)
|
||||
doSleep := i+1 >= len(nc.srvPool) && !forceReconnect
|
||||
forceReconnect = false
|
||||
nc.mu.Unlock()
|
||||
|
||||
if !doSleep {
|
||||
@@ -2803,6 +2845,12 @@ func (nc *Conn) doReconnect(err error) {
|
||||
select {
|
||||
case <-rqch:
|
||||
rt.Stop()
|
||||
|
||||
// we need to reset the rqch channel to avoid
|
||||
// closing a closed channel in the next iteration
|
||||
nc.mu.Lock()
|
||||
nc.rqch = make(chan struct{})
|
||||
nc.mu.Unlock()
|
||||
case <-rt.C:
|
||||
}
|
||||
}
|
||||
@@ -2872,18 +2920,19 @@ func (nc *Conn) doReconnect(err error) {
|
||||
// Done with the pending buffer
|
||||
nc.bw.doneWithPending()
|
||||
|
||||
// This is where we are truly connected.
|
||||
nc.status = CONNECTED
|
||||
// Queue up the correct callback. If we are in initial connect state
|
||||
// (using retry on failed connect), we will call the ConnectedCB,
|
||||
// otherwise the ReconnectedCB.
|
||||
if nc.Opts.ReconnectedCB != nil && !nc.initc {
|
||||
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
|
||||
} else if nc.Opts.ConnectedCB != nil && nc.initc {
|
||||
nc.ach.push(func() { nc.Opts.ConnectedCB(nc) })
|
||||
}
|
||||
|
||||
// If we are here with a retry on failed connect, indicate that the
|
||||
// initial connect is now complete.
|
||||
nc.initc = false
|
||||
|
||||
// Queue up the reconnect callback.
|
||||
if nc.Opts.ReconnectedCB != nil {
|
||||
nc.ach.push(func() { nc.Opts.ReconnectedCB(nc) })
|
||||
}
|
||||
|
||||
// Release lock here, we will return below.
|
||||
nc.mu.Unlock()
|
||||
|
||||
@@ -2926,7 +2975,7 @@ func (nc *Conn) processOpErr(err error) {
|
||||
// Clear any queued pongs, e.g. pending flush calls.
|
||||
nc.clearPendingFlushCalls()
|
||||
|
||||
go nc.doReconnect(err)
|
||||
go nc.doReconnect(err, false)
|
||||
nc.mu.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -3753,7 +3802,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
|
||||
}
|
||||
|
||||
// Process key fetching original case.
|
||||
i := bytes.IndexByte([]byte(kv), ':')
|
||||
i := strings.IndexByte(kv, ':')
|
||||
if i < 0 {
|
||||
return nil, ErrBadHeaderMsg
|
||||
}
|
||||
@@ -3766,8 +3815,7 @@ func readMIMEHeader(tp *textproto.Reader) (textproto.MIMEHeader, error) {
|
||||
for i < len(kv) && (kv[i] == ' ' || kv[i] == '\t') {
|
||||
i++
|
||||
}
|
||||
value := string(kv[i:])
|
||||
m[key] = append(m[key], value)
|
||||
m[key] = append(m[key], kv[i:])
|
||||
if err != nil {
|
||||
return m, err
|
||||
}
|
||||
|
||||
Vendored
+1
-1
@@ -1422,7 +1422,7 @@ github.com/nats-io/nats-server/v2/server/certstore
|
||||
github.com/nats-io/nats-server/v2/server/pse
|
||||
github.com/nats-io/nats-server/v2/server/stree
|
||||
github.com/nats-io/nats-server/v2/server/sysmem
|
||||
# github.com/nats-io/nats.go v1.34.1
|
||||
# github.com/nats-io/nats.go v1.35.0
|
||||
## explicit; go 1.20
|
||||
github.com/nats-io/nats.go
|
||||
github.com/nats-io/nats.go/encoders/builtin
|
||||
|
||||
Reference in New Issue
Block a user