build(deps): bump github.com/nats-io/nats-server/v2

Bumps [github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server) from 2.10.10 to 2.10.12.
- [Release notes](https://github.com/nats-io/nats-server/releases)
- [Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
- [Commits](https://github.com/nats-io/nats-server/compare/v2.10.10...v2.10.12)

---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-03-13 06:25:53 +00:00
committed by Ralf Haferkamp
parent ea347343be
commit 37cac4b26d
45 changed files with 1143 additions and 423 deletions
+6 -6
View File
@@ -61,7 +61,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/mna/pigeon v1.2.1
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/nats-io/nats-server/v2 v2.10.10
github.com/nats-io/nats-server/v2 v2.10.12
github.com/nats-io/nats.go v1.33.1
github.com/oklog/run v1.1.0
github.com/olekukonko/tablewriter v0.0.5
@@ -97,13 +97,13 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0
go.opentelemetry.io/otel/sdk v1.24.0
go.opentelemetry.io/otel/trace v1.24.0
golang.org/x/crypto v0.19.0
golang.org/x/crypto v0.21.0
golang.org/x/exp v0.0.0-20240205201215-2c58cdc269a3
golang.org/x/image v0.15.0
golang.org/x/net v0.21.0
golang.org/x/oauth2 v0.17.0
golang.org/x/sync v0.6.0
golang.org/x/term v0.17.0
golang.org/x/term v0.18.0
golang.org/x/text v0.14.0
google.golang.org/genproto/googleapis/api v0.0.0-20240125205218-1f4bbc51befe
google.golang.org/grpc v1.62.0
@@ -250,7 +250,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/juliangruber/go-intersect v1.1.0 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.17.5 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/libregraph/oidc-go v1.0.0 // indirect
@@ -277,7 +277,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
github.com/nats-io/jwt/v2 v2.5.5 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/nxadm/tail v1.4.8 // indirect
@@ -336,7 +336,7 @@ require (
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/sys v0.17.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.17.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
+12 -12
View File
@@ -1589,8 +1589,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.5 h1:d4vBd+7CHydUqpFBgUEKkSdtSugf9YFmSkvUYPquI5E=
github.com/klauspost/compress v1.17.5/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
@@ -1741,10 +1741,10 @@ github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOl
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/namedotcom/go v0.0.0-20180403034216-08470befbe04/go.mod h1:5sN+Lt1CaY4wsPvgQH/jsuJi4XO2ssZbdsIizr4CVC8=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.10 h1:g1Wd64J5SGsoqWSx1qoNu9/At7a2x+jE7Qtf2XpEx/I=
github.com/nats-io/nats-server/v2 v2.10.10/go.mod h1:/TE61Dos8NlwZnjzyE3ZlOnM6dgl7tf937dnf4VclrA=
github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4=
github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A=
github.com/nats-io/nats-server/v2 v2.10.12 h1:G6u+RDrHkw4bkwn7I911O5jqys7jJVRY6MwgndyUsnE=
github.com/nats-io/nats-server/v2 v2.10.12/go.mod h1:H1n6zXtYLFCgXcf/SF8QNTSIFuS8tyZQMN9NguUHdEs=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
@@ -2169,8 +2169,8 @@ golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIi
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.19.0 h1:ENy+Az/9Y1vSrlrvBSyna3PITt4tiZLf7sgCjZBX7Wo=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -2497,8 +2497,8 @@ golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
@@ -2515,8 +2515,8 @@ golang.org/x/term v0.10.0/go.mod h1:lpqdcUyK/oCiQxvxVrppt5ggO2KCZ5QblwqPnfZ6d5o=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+21 -1
View File
@@ -117,6 +117,12 @@ func encodeBlockGo(dst, src []byte) (d int) {
i--
base--
}
// Bail if we exceed the maximum size.
if d+(base-nextEmit) > dstLimit {
return 0
}
d += emitLiteral(dst[d:], src[nextEmit:base])
// Extend forward
@@ -152,7 +158,6 @@ func encodeBlockGo(dst, src []byte) (d int) {
if s >= sLimit {
goto emitRemainder
}
cv = load64(src, s)
continue
}
@@ -325,6 +330,11 @@ func encodeBlockSnappyGo(dst, src []byte) (d int) {
i--
base--
}
// Bail if we exceed the maximum size.
if d+(base-nextEmit) > dstLimit {
return 0
}
d += emitLiteral(dst[d:], src[nextEmit:base])
// Extend forward
@@ -532,6 +542,11 @@ searchDict:
i--
base--
}
// Bail if we exceed the maximum size.
if d+(base-nextEmit) > dstLimit {
return 0
}
d += emitLiteral(dst[d:], src[nextEmit:base])
if debug && nextEmit != base {
fmt.Println("emitted ", base-nextEmit, "literals")
@@ -880,6 +895,11 @@ searchDict:
i--
base--
}
// Bail if we exceed the maximum size.
if d+(base-nextEmit) > dstLimit {
return 0
}
d += emitLiteral(dst[d:], src[nextEmit:base])
if debug && nextEmit != base {
fmt.Println("emitted ", base-nextEmit, "literals")
+108
View File
@@ -100,6 +100,15 @@ repeat_extend_back_loop_encodeBlockAsm:
JNZ repeat_extend_back_loop_encodeBlockAsm
repeat_extend_back_end_encodeBlockAsm:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 5(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeBlockAsm
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeBlockAsm:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeBlockAsm
@@ -1513,6 +1522,15 @@ repeat_extend_back_loop_encodeBlockAsm4MB:
JNZ repeat_extend_back_loop_encodeBlockAsm4MB
repeat_extend_back_end_encodeBlockAsm4MB:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 4(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeBlockAsm4MB
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeBlockAsm4MB:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeBlockAsm4MB
@@ -2828,6 +2846,15 @@ repeat_extend_back_loop_encodeBlockAsm12B:
JNZ repeat_extend_back_loop_encodeBlockAsm12B
repeat_extend_back_end_encodeBlockAsm12B:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeBlockAsm12B
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeBlockAsm12B:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeBlockAsm12B
@@ -3903,6 +3930,15 @@ repeat_extend_back_loop_encodeBlockAsm10B:
JNZ repeat_extend_back_loop_encodeBlockAsm10B
repeat_extend_back_end_encodeBlockAsm10B:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeBlockAsm10B
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeBlockAsm10B:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeBlockAsm10B
@@ -4978,6 +5014,15 @@ repeat_extend_back_loop_encodeBlockAsm8B:
JNZ repeat_extend_back_loop_encodeBlockAsm8B
repeat_extend_back_end_encodeBlockAsm8B:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeBlockAsm8B
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeBlockAsm8B:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeBlockAsm8B
@@ -10756,6 +10801,15 @@ repeat_extend_back_loop_encodeSnappyBlockAsm:
JNZ repeat_extend_back_loop_encodeSnappyBlockAsm
repeat_extend_back_end_encodeSnappyBlockAsm:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 5(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeSnappyBlockAsm
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeSnappyBlockAsm:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeSnappyBlockAsm
@@ -11678,6 +11732,15 @@ repeat_extend_back_loop_encodeSnappyBlockAsm64K:
JNZ repeat_extend_back_loop_encodeSnappyBlockAsm64K
repeat_extend_back_end_encodeSnappyBlockAsm64K:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeSnappyBlockAsm64K
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeSnappyBlockAsm64K:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeSnappyBlockAsm64K
@@ -12504,6 +12567,15 @@ repeat_extend_back_loop_encodeSnappyBlockAsm12B:
JNZ repeat_extend_back_loop_encodeSnappyBlockAsm12B
repeat_extend_back_end_encodeSnappyBlockAsm12B:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeSnappyBlockAsm12B
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeSnappyBlockAsm12B:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeSnappyBlockAsm12B
@@ -13330,6 +13402,15 @@ repeat_extend_back_loop_encodeSnappyBlockAsm10B:
JNZ repeat_extend_back_loop_encodeSnappyBlockAsm10B
repeat_extend_back_end_encodeSnappyBlockAsm10B:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeSnappyBlockAsm10B
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeSnappyBlockAsm10B:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeSnappyBlockAsm10B
@@ -14156,6 +14237,15 @@ repeat_extend_back_loop_encodeSnappyBlockAsm8B:
JNZ repeat_extend_back_loop_encodeSnappyBlockAsm8B
repeat_extend_back_end_encodeSnappyBlockAsm8B:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_encodeSnappyBlockAsm8B
MOVQ $0x00000000, ret+48(FP)
RET
repeat_dst_size_check_encodeSnappyBlockAsm8B:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_encodeSnappyBlockAsm8B
@@ -17949,6 +18039,15 @@ repeat_extend_back_loop_calcBlockSize:
JNZ repeat_extend_back_loop_calcBlockSize
repeat_extend_back_end_calcBlockSize:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 5(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_calcBlockSize
MOVQ $0x00000000, ret+24(FP)
RET
repeat_dst_size_check_calcBlockSize:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_calcBlockSize
@@ -18531,6 +18630,15 @@ repeat_extend_back_loop_calcBlockSizeSmall:
JNZ repeat_extend_back_loop_calcBlockSizeSmall
repeat_extend_back_end_calcBlockSizeSmall:
MOVL SI, BX
SUBL 12(SP), BX
LEAQ 3(AX)(BX*1), BX
CMPQ BX, (SP)
JB repeat_dst_size_check_calcBlockSizeSmall
MOVQ $0x00000000, ret+24(FP)
RET
repeat_dst_size_check_calcBlockSizeSmall:
MOVL 12(SP), BX
CMPL BX, SI
JEQ emit_literal_done_repeat_emit_calcBlockSizeSmall
+12 -3
View File
@@ -452,6 +452,12 @@ func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, e
for toWrite := range queue {
entry := <-toWrite
reUse <- toWrite
if hasErr() || entry == nil {
if entry != nil {
writtenBlocks <- entry
}
continue
}
if hasErr() {
writtenBlocks <- entry
continue
@@ -471,13 +477,13 @@ func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, e
}
}()
// Reader
defer func() {
close(queue)
if r.err != nil {
err = r.err
setErr(r.err)
} else if err != nil {
setErr(err)
}
close(queue)
wg.Wait()
if err == nil {
err = aErr
@@ -485,6 +491,7 @@ func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, e
written = aWritten
}()
// Reader
for !hasErr() {
if !r.readFull(r.buf[:4], true) {
if r.err == io.EOF {
@@ -553,11 +560,13 @@ func (r *Reader) DecodeConcurrent(w io.Writer, concurrent int) (written int64, e
if err != nil {
writtenBlocks <- decoded
setErr(err)
entry <- nil
return
}
if !r.ignoreCRC && crc(decoded) != checksum {
writtenBlocks <- decoded
setErr(ErrCRC)
entry <- nil
return
}
entry <- decoded
+15 -6
View File
@@ -215,7 +215,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return 0, err
}
if len(w.ibuf) > 0 {
err := w.Flush()
err := w.AsyncFlush()
if err != nil {
return 0, err
}
@@ -225,7 +225,7 @@ func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
if err := w.EncodeBuffer(buf); err != nil {
return 0, err
}
return int64(len(buf)), w.Flush()
return int64(len(buf)), w.AsyncFlush()
}
for {
inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
@@ -354,7 +354,7 @@ func (w *Writer) EncodeBuffer(buf []byte) (err error) {
}
// Flush queued data first.
if len(w.ibuf) > 0 {
err := w.Flush()
err := w.AsyncFlush()
if err != nil {
return err
}
@@ -716,9 +716,9 @@ func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
return nRet, nil
}
// Flush flushes the Writer to its underlying io.Writer.
// This does not apply padding.
func (w *Writer) Flush() error {
// AsyncFlush writes any buffered bytes to a block and starts compressing it.
// It does not wait for the output has been written as Flush() does.
func (w *Writer) AsyncFlush() error {
if err := w.err(nil); err != nil {
return err
}
@@ -738,6 +738,15 @@ func (w *Writer) Flush() error {
}
}
}
return w.err(nil)
}
// Flush flushes the Writer to its underlying io.Writer.
// This does not apply padding.
func (w *Writer) Flush() error {
if err := w.AsyncFlush(); err != nil {
return err
}
if w.output == nil {
return w.err(nil)
}
+32 -1
View File
@@ -17,6 +17,7 @@ package jwt
import (
"errors"
"fmt"
"sort"
"time"
@@ -174,7 +175,7 @@ func (a *Account) AddMapping(sub Subject, to ...WeightedMapping) {
// holder of the private key can decrypt. The auth service can also optionally encrypt the response back to the server using it's
// publick xkey which will be in the authorization request.
type ExternalAuthorization struct {
AuthUsers StringList `json:"auth_users"`
AuthUsers StringList `json:"auth_users,omitempty"`
AllowedAccounts StringList `json:"allowed_accounts,omitempty"`
XKey string `json:"xkey,omitempty"`
}
@@ -229,10 +230,25 @@ type Account struct {
DefaultPermissions Permissions `json:"default_permissions,omitempty"`
Mappings Mapping `json:"mappings,omitempty"`
Authorization ExternalAuthorization `json:"authorization,omitempty"`
Trace *MsgTrace `json:"trace,omitempty"`
Info
GenericFields
}
// MsgTrace holds distributed message tracing configuration
type MsgTrace struct {
// Destination is the subject the server will send message traces to
// if the inbound message contains the "traceparent" header and has
// its sampled field indicating that the trace should be triggered.
Destination Subject `json:"dest,omitempty"`
// Sampling is used to set the probability sampling, that is, the
// server will get a random number between 1 and 100 and trigger
// the trace if the number is lower than this Sampling value.
// The valid range is [1..100]. If the value is not set Validate()
// will set the value to 100.
Sampling int `json:"sampling,omitempty"`
}
// Validate checks if the account is valid, based on the wrapper
func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) {
a.Imports.Validate(acct.Subject, vr)
@@ -241,6 +257,21 @@ func (a *Account) Validate(acct *AccountClaims, vr *ValidationResults) {
a.DefaultPermissions.Validate(vr)
a.Mappings.Validate(vr)
a.Authorization.Validate(vr)
if a.Trace != nil {
tvr := CreateValidationResults()
a.Trace.Destination.Validate(tvr)
if !tvr.IsEmpty() {
vr.AddError(fmt.Sprintf("the account Trace.Destination %s", tvr.Issues[0].Description))
}
if a.Trace.Destination.HasWildCards() {
vr.AddError("the account Trace.Destination subject %q is not a valid publish subject", a.Trace.Destination)
}
if a.Trace.Sampling < 0 || a.Trace.Sampling > 100 {
vr.AddError("the account Trace.Sampling value '%d' is not valid, should be in the range [1..100]", a.Trace.Sampling)
} else if a.Trace.Sampling == 0 {
a.Trace.Sampling = 100
}
}
if !a.Limits.IsEmpty() && a.Limits.Imports >= 0 && int64(len(a.Imports)) > a.Limits.Imports {
vr.AddError("the account contains more imports than allowed by the operator")
+8 -2
View File
@@ -119,6 +119,7 @@ type Export struct {
Latency *ServiceLatency `json:"service_latency,omitempty"`
AccountTokenPosition uint `json:"account_token_position,omitempty"`
Advertise bool `json:"advertise,omitempty"`
AllowTrace bool `json:"allow_trace,omitempty"`
Info
}
@@ -160,8 +161,13 @@ func (e *Export) Validate(vr *ValidationResults) {
if e.IsService() && !e.IsSingleResponse() && !e.IsChunkedResponse() && !e.IsStreamResponse() {
vr.AddError("invalid response type for service: %q", e.ResponseType)
}
if e.IsStream() && e.ResponseType != "" {
vr.AddError("invalid response type for stream: %q", e.ResponseType)
if e.IsStream() {
if e.ResponseType != "" {
vr.AddError("invalid response type for stream: %q", e.ResponseType)
}
if e.AllowTrace {
vr.AddError("AllowTrace only valid for service export")
}
}
if e.Latency != nil {
if !e.IsService() {
+4
View File
@@ -40,6 +40,7 @@ type Import struct {
LocalSubject RenamingSubject `json:"local_subject,omitempty"`
Type ExportType `json:"type,omitempty"`
Share bool `json:"share,omitempty"`
AllowTrace bool `json:"allow_trace,omitempty"`
}
// IsService returns true if the import is of type service
@@ -66,6 +67,9 @@ func (i *Import) Validate(actPubKey string, vr *ValidationResults) {
if !i.IsService() && !i.IsStream() {
vr.AddError("invalid import type: %q", i.Type)
}
if i.IsService() && i.AllowTrace {
vr.AddError("AllowTrace only valid for stream import")
}
if i.Account == "" {
vr.AddError("account to import from is not specified")
+8
View File
@@ -177,10 +177,18 @@ func (s Subject) Validate(vr *ValidationResults) {
v := string(s)
if v == "" {
vr.AddError("subject cannot be empty")
// No other checks after that make sense
return
}
if strings.Contains(v, " ") {
vr.AddError("subject %q cannot have spaces", v)
}
if v[0] == '.' || v[len(v)-1] == '.' {
vr.AddError("subject %q cannot start or end with a `.`", v)
}
if strings.Contains(v, "..") {
vr.AddError("subject %q cannot contain consecutive `.`", v)
}
}
func (s Subject) countTokenWildcards() int {
+1 -1
View File
@@ -265,7 +265,7 @@ or the server receiving another `CONNECT` packet with the same client ID. See
`mqttHandleClosedClient()` and `mqttHandleWill()`. Steps:
1. Send out the Will Message if applicable (if not caused by a `DISCONNECT` packet)
2. Delete the the JetStream consumers for to QoS 1 and 2 packet delivery through
2. Delete the JetStream consumers for to QoS 1 and 2 packet delivery through
JS API calls (if "clean" session flag is set)
3. Delete the session record from the “$MQTT_sess” stream, based on recorded
stream sequence. (if "clean" session flag is set)
+7
View File
@@ -894,6 +894,13 @@ func (a *Account) registerLeafNodeCluster(cluster string) {
a.leafClusters[cluster]++
}
// Check to see if we already have this cluster registered.
func (a *Account) hasLeafNodeCluster(cluster string) bool {
a.mu.RLock()
defer a.mu.RUnlock()
return a.leafClusters[cluster] > 0
}
// Check to see if this cluster is isolated, meaning the only one.
// Read Lock should be held.
func (a *Account) isLeafNodeClusterIsolated(cluster string) bool {
+6
View File
@@ -241,30 +241,35 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
arc, err := decodeResponse(rc, rmsg, racc)
if err != nil {
c.authViolation()
respCh <- titleCase(err.Error())
return
}
vr := jwt.CreateValidationResults()
arc.Validate(vr)
if len(vr.Issues) > 0 {
c.authViolation()
respCh <- fmt.Sprintf("Error validating user JWT: %v", vr.Issues[0])
return
}
// Make sure that the user is what we requested.
if arc.Subject != pub {
c.authViolation()
respCh <- fmt.Sprintf("Expected authorized user of %q but got %q on account %q", pub, arc.Subject, racc.Name)
return
}
expiration, allowedConnTypes, err := getExpirationAndAllowedConnections(arc, racc.Name)
if err != nil {
c.authViolation()
respCh <- titleCase(err.Error())
return
}
targetAcc, err := assignAccountAndPermissions(arc, racc.Name)
if err != nil {
c.authViolation()
respCh <- titleCase(err.Error())
return
}
@@ -280,6 +285,7 @@ func (s *Server) processClientOrLeafCallout(c *client, opts *Options) (authorize
// Build internal user and bind to the targeted account.
nkuser := buildInternalNkeyUser(arc, allowedConnTypes, targetAcc)
if err := c.RegisterNkeyUser(nkuser); err != nil {
c.authViolation()
respCh <- fmt.Sprintf("Could not register auth callout user: %v", err)
return
}
+31 -21
View File
@@ -797,8 +797,14 @@ func (c *client) registerWithAccount(acc *Account) error {
// Check if we have a max connections violation
if kind == CLIENT && acc.MaxTotalConnectionsReached() {
return ErrTooManyAccountConnections
} else if kind == LEAF && acc.MaxTotalLeafNodesReached() {
return ErrTooManyAccountConnections
} else if kind == LEAF {
// Check if we are already connected to this cluster.
if rc := c.remoteCluster(); rc != _EMPTY_ && acc.hasLeafNodeCluster(rc) {
return ErrLeafNodeLoop
}
if acc.MaxTotalLeafNodesReached() {
return ErrTooManyAccountConnections
}
}
// Add in new one.
@@ -854,8 +860,12 @@ func (c *client) applyAccountLimits() {
}
}
}
c.acc.mu.RLock()
minLimit(&c.mpay, c.acc.mpay)
minLimit(&c.msubs, c.acc.msubs)
c.acc.mu.RUnlock()
s := c.srv
opts := s.getOpts()
mPay := opts.MaxPayload
@@ -4058,27 +4068,27 @@ func getHeader(key string, hdr []byte) []byte {
return nil
}
index := bytes.Index(hdr, []byte(key))
if index < 0 {
return nil
}
// Make sure this key does not have additional prefix.
if index < 2 || hdr[index-1] != '\n' || hdr[index-2] != '\r' {
return nil
}
index += len(key)
if index >= len(hdr) {
return nil
}
if hdr[index] != ':' {
return nil
}
index++
var value []byte
hdrLen := len(hdr)
for hdr[index] == ' ' && index < hdrLen {
// Check that we have enough characters, this will handle the -1 case of the key not
// being found and will also handle not having enough characters for trailing CRLF.
if index < 2 {
return nil
}
// There should be a terminating CRLF.
if index >= hdrLen-1 || hdr[index-1] != '\n' || hdr[index-2] != '\r' {
return nil
}
// The key should be immediately followed by a : separator.
index += len(key) + 1
if index >= hdrLen || hdr[index-1] != ':' {
return nil
}
// Skip over whitespace before the value.
for index < hdrLen && hdr[index] == ' ' {
index++
}
// Collect together the rest of the value until we hit a CRLF.
var value []byte
for index < hdrLen {
if hdr[index] == '\r' && index < hdrLen-1 && hdr[index+1] == '\n' {
break
@@ -4556,7 +4566,7 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
sindex := 0
lqs := len(qsubs)
if lqs > 1 {
sindex = int(fastrand.Uint32()) % lqs
sindex = int(fastrand.Uint32() % uint32(lqs))
}
// Find a subscription that is able to deliver this message starting at a random index.
+1 -1
View File
@@ -41,7 +41,7 @@ var (
const (
// VERSION is the current version for the server.
VERSION = "2.10.10"
VERSION = "2.10.12"
// PROTO is the currently supported protocol.
// 0 was the original
+99 -21
View File
@@ -1758,7 +1758,13 @@ func (acc *Account) checkNewConsumerConfig(cfg, ncfg *ConsumerConfig) error {
if cfg.OptStartSeq != ncfg.OptStartSeq {
return errors.New("start sequence can not be updated")
}
if cfg.OptStartTime != ncfg.OptStartTime {
if cfg.OptStartTime != nil && ncfg.OptStartTime != nil {
// Both have start times set, compare them directly:
if !cfg.OptStartTime.Equal(*ncfg.OptStartTime) {
return errors.New("start time can not be updated")
}
} else if cfg.OptStartTime != nil || ncfg.OptStartTime != nil {
// At least one start time is set and the other is not
return errors.New("start time can not be updated")
}
if cfg.AckPolicy != ncfg.AckPolicy {
@@ -2247,16 +2253,21 @@ func (o *consumer) checkPendingRequests() {
// This will release any pending pull requests if applicable.
// Should be called only by the leader being deleted or stopped.
// Lock should be held.
func (o *consumer) releaseAnyPendingRequests() {
func (o *consumer) releaseAnyPendingRequests(isAssigned bool) {
if o.mset == nil || o.outq == nil || o.waiting.len() == 0 {
return
}
hdr := []byte("NATS/1.0 409 Consumer Deleted\r\n\r\n")
var hdr []byte
if !isAssigned {
hdr = []byte("NATS/1.0 409 Consumer Deleted\r\n\r\n")
}
wq := o.waiting
o.waiting = nil
for i, rp := 0, wq.rp; i < wq.n; i++ {
if wr := wq.reqs[rp]; wr != nil {
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
if hdr != nil {
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
wr.recycle()
}
rp = (rp + 1) % cap(wq.reqs)
@@ -3100,7 +3111,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
// Since we can't send that message to the requestor, we need to
// notify that we are closing the request.
const maxBytesT = "NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n"
hdr := []byte(fmt.Sprintf(maxBytesT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, maxBytesT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
// Remove the current one, no longer valid due to max bytes limit.
o.waiting.removeCurrent()
@@ -3123,7 +3134,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
} else {
// We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there.
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
o.waiting.removeCurrent()
if o.node != nil {
@@ -3135,7 +3146,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
if wr.interest != wr.reply {
const intExpT = "NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n"
hdr := []byte(fmt.Sprintf(intExpT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, intExpT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
// Remove the current one, no longer valid.
@@ -3208,7 +3219,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
}
sendErr := func(status int, description string) {
hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description))
hdr := fmt.Appendf(nil, "NATS/1.0 %d %s\r\n\r\n", status, description)
o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
@@ -3600,7 +3611,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
wr := wq.reqs[rp]
// Check expiration.
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
remove(wr, rp)
i++
@@ -4033,7 +4044,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
// If given request fulfilled batch size, but there are still pending bytes, send information about it.
if wrn <= 0 && wrb > 0 {
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, []byte(fmt.Sprintf(JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb)), nil, nil, 0))
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, fmt.Appendf(nil, JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb), nil, nil, 0))
}
// Reset our idle heartbeat timer if set.
if hb != nil {
@@ -4096,10 +4107,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
func (o *consumer) sendIdleHeartbeat(subj string) {
const t = "NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n%s: %d\r\n\r\n"
sseq, dseq := o.sseq-1, o.dseq-1
hdr := []byte(fmt.Sprintf(t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq))
hdr := fmt.Appendf(nil, t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq)
if fcp := o.fcid; fcp != _EMPTY_ {
// Add in that we are stalled on flow control here.
addOn := []byte(fmt.Sprintf("%s: %s\r\n\r\n", JSConsumerStalled, fcp))
addOn := fmt.Appendf(nil, "%s: %s\r\n\r\n", JSConsumerStalled, fcp)
hdr = append(hdr[:len(hdr)-LEN_CR_LF], []byte(addOn)...)
}
o.outq.send(newJSPubMsg(subj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
@@ -4783,6 +4794,23 @@ func (o *consumer) selectStartingSeqNo() {
// If we are here we are time based.
// TODO(dlc) - Once clustered can't rely on this.
o.sseq = o.mset.store.GetSeqFromTime(*o.cfg.OptStartTime)
// Here we want to see if we are filtered, and if so possibly close the gap
// to the nearest first given our starting sequence from time. This is so we do
// not force the system to do a linear walk between o.sseq and the real first.
if len(o.subjf) > 0 {
nseq := state.LastSeq
for _, filter := range o.subjf {
// Use first sequence since this is more optimized atm.
ss := o.mset.store.FilteredState(state.FirstSeq, filter.subject)
if ss.First > o.sseq && ss.First < nseq {
nseq = ss.First
}
}
// Skip ahead if possible.
if nseq > o.sseq && nseq < state.LastSeq {
o.sseq = nseq
}
}
} else {
// DeliverNew
o.sseq = state.LastSeq + 1
@@ -4901,17 +4929,26 @@ func (o *consumer) hasNoLocalInterest() bool {
// This is when the underlying stream has been purged.
// sseq is the new first seq for the stream after purge.
// Lock should be held.
func (o *consumer) purge(sseq uint64, slseq uint64) {
// Lock should NOT be held.
func (o *consumer) purge(sseq uint64, slseq uint64, isWider bool) {
// Do not update our state unless we know we are the leader.
if !o.isLeader() {
return
}
// Signals all have been purged for this consumer.
if sseq == 0 {
if sseq == 0 && !isWider {
sseq = slseq + 1
}
var store StreamStore
if isWider {
o.mu.RLock()
if o.mset != nil {
store = o.mset.store
}
o.mu.RUnlock()
}
o.mu.Lock()
// Do not go backwards
if o.sseq < sseq {
@@ -4920,7 +4957,6 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
if o.asflr < sseq {
o.asflr = sseq - 1
// We need to remove those no longer relevant from pending.
for seq, p := range o.pending {
if seq <= o.asflr {
@@ -4934,8 +4970,24 @@ func (o *consumer) purge(sseq uint64, slseq uint64) {
delete(o.rdc, seq)
// rdq handled below.
}
if isWider && store != nil {
// Our filtered subject, which could be all, is wider than the underlying purge.
// We need to check if the pending items left are still valid.
var smv StoreMsg
if _, err := store.LoadMsg(seq, &smv); err == errDeletedMsg || err == ErrStoreMsgNotFound {
if p.Sequence > o.adflr {
o.adflr = p.Sequence
if o.adflr > o.dseq {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
delete(o.rdc, seq)
}
}
}
}
// This means we can reset everything at this point.
if len(o.pending) == 0 {
o.pending, o.rdc = nil, nil
@@ -4994,27 +5046,52 @@ func (o *consumer) isClosed() bool {
}
func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.mu.Lock()
js := o.js
// If dflag is true determine if we are still assigned.
var isAssigned bool
if dflag {
o.mu.RLock()
acc, stream, consumer := o.acc, o.stream, o.name
isClustered := o.js != nil && o.js.isClustered()
o.mu.RUnlock()
if isClustered {
// Grab jsa to check assignment.
var jsa *jsAccount
if acc != nil {
// Need lock here to avoid data race.
acc.mu.RLock()
jsa = acc.js
acc.mu.RUnlock()
}
if jsa != nil {
isAssigned = jsa.consumerAssigned(stream, consumer)
}
}
}
o.mu.Lock()
if o.closed {
o.mu.Unlock()
return nil
}
o.closed = true
// Check if we are the leader and are being deleted.
// Check if we are the leader and are being deleted (as a node).
if dflag && o.isLeader() {
// If we are clustered and node leader (probable from above), stepdown.
if node := o.node; node != nil && node.Leader() {
node.StepDown()
}
if advisory {
// dflag does not necessarily mean that the consumer is being deleted,
// just that the consumer node is being removed from this peer, so we
// send delete advisories only if we are no longer assigned at the meta layer,
// or we are not clustered.
if !isAssigned && advisory {
o.sendDeleteAdvisoryLocked()
}
if o.isPullMode() {
// Release any pending.
o.releaseAnyPendingRequests()
o.releaseAnyPendingRequests(isAssigned)
}
}
@@ -5064,6 +5141,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
ca = o.ca
}
sigSubs := o.sigSubs
js := o.js
o.mu.Unlock()
if c != nil {
+3
View File
@@ -60,6 +60,9 @@ var (
// connections.
ErrTooManyAccountConnections = errors.New("maximum account active connections exceeded")
// ErrLeafNodeLoop signals a leafnode is trying to register for a cluster we already have registered.
ErrLeafNodeLoop = errors.New("leafnode loop detected")
// ErrTooManySubs signals a client that the maximum number of subscriptions per connection
// has been reached.
ErrTooManySubs = errors.New("maximum subscriptions exceeded")
+58 -16
View File
@@ -1466,6 +1466,16 @@ func (fs *fileStore) warn(format string, args ...any) {
fs.srv.Warnf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}
// For doing debug logging.
// Lock should be held.
func (fs *fileStore) debug(format string, args ...any) {
// No-op if no server configured.
if fs.srv == nil {
return
}
fs.srv.Debugf(fmt.Sprintf("Filestore [%s] %s", fs.cfg.Name, format), args...)
}
// Track local state but ignore timestamps here.
func updateTrackingState(state *StreamState, mb *msgBlock) {
if state.FirstSeq == 0 {
@@ -2225,30 +2235,44 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
fseq, isAll, subs := start, filter == _EMPTY_ || filter == fwcs, []string{filter}
if err := mb.ensurePerSubjectInfoLoaded(); err != nil {
return nil, false, err
var didLoad bool
if mb.fssNotLoaded() {
// Make sure we have fss loaded.
mb.loadMsgsWithLock()
didLoad = true
}
// If we only have 1 subject currently and it matches our filter we can also set isAll.
if !isAll && len(mb.fss) == 1 {
_, isAll = mb.fss[filter]
}
// Skip scan of mb.fss if number of messages in the block are less than
// 1/2 the number of subjects in mb.fss. Or we have a wc and lots of fss entries.
const linearScanMaxFSS = 32
// Make sure to start at mb.first.seq if fseq < mb.first.seq
if seq := atomic.LoadUint64(&mb.first.seq); seq > fseq {
fseq = seq
}
lseq := atomic.LoadUint64(&mb.last.seq)
doLinearScan := isAll || 2*int(lseq-fseq) < len(mb.fss) || (wc && len(mb.fss) > linearScanMaxFSS)
// Optionally build the isMatch for wildcard filters.
tsa := [32]string{}
fsa := [32]string{}
var fts []string
var isMatch func(subj string) bool
// Decide to build.
if wc {
fts = tokenizeSubjectIntoSlice(fsa[:0], filter)
isMatch = func(subj string) bool {
tts := tokenizeSubjectIntoSlice(tsa[:0], subj)
return isSubsetMatchTokenized(tts, fts)
}
}
// Only do linear scan if isAll or we are wildcarded and have to traverse more fss than actual messages.
doLinearScan := isAll || (wc && len(mb.fss) > int(lseq-fseq))
if !doLinearScan {
// If we have a wildcard match against all tracked subjects we know about.
if wc {
subs = subs[:0]
for subj := range mb.fss {
if subjectIsSubsetMatch(subj, filter) {
if isMatch(subj) {
subs = append(subs, subj)
}
}
@@ -2270,11 +2294,17 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
}
}
if fseq > lseq {
return nil, false, ErrStoreMsgNotFound
// If we guess to not do a linear scan, but the above resulted in alot of subs that will
// need to be checked for every scanned message, revert.
// TODO(dlc) - we could memoize the subs across calls.
if len(subs) > int(lseq-fseq) {
doLinearScan = true
}
if fseq > lseq {
return nil, didLoad, ErrStoreMsgNotFound
}
var didLoad bool
// Need messages loaded from here on out.
if mb.cacheNotLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
@@ -2298,7 +2328,7 @@ func (mb *msgBlock) firstMatching(filter string, wc bool, start uint64, sm *Stor
return fsm, expireOk, nil
}
if doLinearScan {
if wc && subjectIsSubsetMatch(fsm.subj, filter) {
if wc && isMatch(sm.subj) {
return fsm, expireOk, nil
} else if !wc && fsm.subj == filter {
return fsm, expireOk, nil
@@ -2824,7 +2854,7 @@ func (fs *fileStore) NumPending(sseq uint64, filter string, lastPerSubject bool)
} else {
// We need to adjust for all matches in this block.
// Make sure we have fss loaded. This loads whole block now.
if mb.cacheNotLoaded() {
if mb.fssNotLoaded() {
mb.loadMsgsWithLock()
shouldExpire = true
}
@@ -2877,7 +2907,10 @@ func (fs *fileStore) SubjectsTotals(filter string) map[string]uint64 {
if fs.psim.Size() == 0 {
return nil
}
// Match all if no filter given.
if filter == _EMPTY_ {
filter = fwcs
}
fst := make(map[string]uint64)
fs.psim.Match(stringToBytes(filter), func(subj []byte, psi *psi) {
fst[string(subj)] = psi.total
@@ -4476,7 +4509,11 @@ func (fs *fileStore) resetAgeChk(delta int64) {
fireIn := fs.cfg.MaxAge
if delta > 0 && time.Duration(delta) < fireIn {
fireIn = time.Duration(delta)
if fireIn = time.Duration(delta); fireIn < 250*time.Millisecond {
// Only fire at most once every 250ms.
// Excessive firing can effect ingest performance.
fireIn = time.Second
}
}
if fs.ageChk != nil {
fs.ageChk.Reset(fireIn)
@@ -5342,7 +5379,10 @@ func (mb *msgBlock) writeAt(buf []byte, woff int64) (int, error) {
mb.mockWriteErr = false
return 0, errors.New("mock write error")
}
return mb.mfd.WriteAt(buf, woff)
<-dios
n, err := mb.mfd.WriteAt(buf, woff)
dios <- struct{}{}
return n, err
}
// flushPendingMsgsLocked writes out any messages for this message block.
@@ -5553,7 +5593,9 @@ func (mb *msgBlock) loadBlock(buf []byte) ([]byte, error) {
buf = buf[:sz]
}
<-dios
n, err := io.ReadFull(f, buf)
dios <- struct{}{}
// On success capture raw bytes size.
if err == nil {
mb.rbytes = uint64(n)
@@ -7535,7 +7577,7 @@ func (fs *fileStore) writeFullState() error {
}
if cap(buf) > sz {
fs.warn("WriteFullState reallocated from %d to %d", sz, cap(buf))
fs.debug("WriteFullState reallocated from %d to %d", sz, cap(buf))
}
// Write to a tmp file and rename.
+1 -1
View File
@@ -2555,7 +2555,7 @@ func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgr
mh = append(mh, subject...)
mh = append(mh, ' ')
if len(queues) > 0 {
if reply != nil {
if len(reply) > 0 {
mh = append(mh, "+ "...) // Signal that there is a reply.
mh = append(mh, mreply...)
mh = append(mh, ' ')
+24 -7
View File
@@ -23,6 +23,7 @@ import (
"math"
"os"
"path/filepath"
"runtime/debug"
"strconv"
"strings"
"sync"
@@ -2151,6 +2152,10 @@ func (jsa *jsAccount) storageTotals() (uint64, uint64) {
}
func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string, replicas int) (bool, *ApiError) {
return jsa.wouldExceedLimits(storeType, tierName, replicas, _EMPTY_, nil, nil)
}
func (jsa *jsAccount) wouldExceedLimits(storeType StorageType, tierName string, replicas int, subj string, hdr, msg []byte) (bool, *ApiError) {
jsa.usageMu.RLock()
defer jsa.usageMu.RUnlock()
@@ -2164,24 +2169,31 @@ func (jsa *jsAccount) limitsExceeded(storeType StorageType, tierName string, rep
return false, nil
}
r := int64(replicas)
if r < 1 || tierName == _EMPTY_ {
// Make sure replicas is correct.
if r < 1 {
r = 1
}
// This is for limits. If we have no tier, consider all to be flat, vs tiers like R3 where we want to scale limit by replication.
lr := r
if tierName == _EMPTY_ {
lr = 1
}
// Since tiers are flat we need to scale limit up by replicas when checking.
if storeType == MemoryStorage {
totalMem := inUse.total.mem
if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes*r {
totalMem := inUse.total.mem + (int64(memStoreMsgSize(subj, hdr, msg)) * r)
if selectedLimits.MemoryMaxStreamBytes > 0 && totalMem > selectedLimits.MemoryMaxStreamBytes*lr {
return true, nil
}
if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory*r {
if selectedLimits.MaxMemory >= 0 && totalMem > selectedLimits.MaxMemory*lr {
return true, nil
}
} else {
totalStore := inUse.total.store
if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes*r {
totalStore := inUse.total.store + (int64(fileStoreMsgSize(subj, hdr, msg)) * r)
if selectedLimits.StoreMaxStreamBytes > 0 && totalStore > selectedLimits.StoreMaxStreamBytes*lr {
return true, nil
}
if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore*r {
if selectedLimits.MaxStore >= 0 && totalStore > selectedLimits.MaxStore*lr {
return true, nil
}
}
@@ -2463,6 +2475,11 @@ func (s *Server) dynJetStreamConfig(storeDir string, maxStore, maxMem int64) *Je
} else {
// Estimate to 75% of total memory if we can determine system memory.
if sysMem := sysmem.Memory(); sysMem > 0 {
// Check if we have been limited with GOMEMLIMIT and if lower use that value.
if gml := debug.SetMemoryLimit(-1); gml != math.MaxInt64 && gml < sysMem {
s.Debugf("JetStream detected GOMEMLIMIT of %v", friendlyBytes(gml))
sysMem = gml
}
jsc.MaxMemory = sysMem / 4 * 3
} else {
jsc.MaxMemory = JetStreamMaxMemDefault
+22 -5
View File
@@ -277,6 +277,9 @@ const (
// JSAdvisoryStreamRestoreCompletePre notification that a restore was completed.
JSAdvisoryStreamRestoreCompletePre = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE"
// JSAdvisoryDomainLeaderElectedPre notification that a jetstream domain has elected a leader.
JSAdvisoryDomainLeaderElected = "$JS.EVENT.ADVISORY.DOMAIN.LEADER_ELECTED"
// JSAdvisoryStreamLeaderElectedPre notification that a replicated stream has elected a leader.
JSAdvisoryStreamLeaderElectedPre = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED"
@@ -3709,7 +3712,13 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
// Create our ack flow handler.
// This is very simple for now.
acks := make(chan struct{}, 1)
ackSize := defaultSnapshotWindowSize / chunkSize
if ackSize < 8 {
ackSize = 8
} else if ackSize > 8*1024 {
ackSize = 8 * 1024
}
acks := make(chan struct{}, ackSize)
acks <- struct{}{}
// Track bytes outstanding.
@@ -3731,7 +3740,7 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
defer mset.unsubscribe(ackSub)
// TODO(dlc) - Add in NATS-Chunked-Sequence header
var hdr []byte
for index := 1; ; index++ {
chunk := make([]byte, chunkSize)
n, err := r.Read(chunk)
@@ -3748,19 +3757,27 @@ func (s *Server) streamSnapshot(ci *ClientInfo, acc *Account, mset *stream, sr *
if atomic.LoadInt32(&out) > defaultSnapshotWindowSize {
select {
case <-acks:
case <-inch: // Lost interest
// ok to proceed.
case <-inch:
// Lost interest
hdr = []byte("NATS/1.0 408 No Interest\r\n\r\n")
goto done
case <-time.After(2 * time.Second):
hdr = []byte("NATS/1.0 408 No Flow Response\r\n\r\n")
goto done
case <-time.After(10 * time.Millisecond):
}
}
ackReply := fmt.Sprintf("%s.%d.%d", ackSubj, len(chunk), index)
if hdr == nil {
hdr = []byte("NATS/1.0 204\r\n\r\n")
}
mset.outq.send(newJSPubMsg(reply, _EMPTY_, ackReply, nil, chunk, nil, 0))
atomic.AddInt32(&out, int32(len(chunk)))
}
done:
// Send last EOF
// TODO(dlc) - place hash in header
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, nil, nil, 0))
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
// For determining consumer request type.
+177 -88
View File
@@ -2357,6 +2357,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// No special processing needed for when we are caught up on restart.
if ce == nil {
isRecovering = false
// If we are interest based make sure to check consumers if interest retention policy.
// This is to make sure we process any outstanding acks.
mset.checkInterestState()
// Make sure we create a new snapshot in case things have changed such that any existing
// snapshot may no longer be valid.
doSnapshot()
@@ -2873,10 +2876,22 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
}
// Process the actual message here.
if err := mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts); err != nil {
err = mset.processJetStreamMsg(subject, reply, hdr, msg, lseq, ts)
// If we have inflight make sure to clear after processing.
// TODO(dlc) - technically check on inflight != nil could cause datarace.
// But do not want to acquire lock since tracking this will be rare.
if mset.inflight != nil {
mset.clMu.Lock()
delete(mset.inflight, lseq)
mset.clMu.Unlock()
}
if err != nil {
if err == errLastSeqMismatch {
var state StreamState
mset.store.FastState(&state)
// If we have no msgs and the other side is delivering us a sequence past where we
// should be reset. This is possible if the other side has a stale snapshot and no longer
// has those messages. So compact and retry to reset.
@@ -3108,6 +3123,12 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
if sa == nil {
return
}
// Clear inflight if we have it.
mset.clMu.Lock()
mset.inflight = nil
mset.clMu.Unlock()
js.mu.Lock()
s, account, err := js.srv, sa.Client.serviceAccount(), sa.err
client, subject, reply := sa.Client, sa.Subject, sa.Reply
@@ -3130,6 +3151,14 @@ func (js *jetStream) processStreamLeaderChange(mset *stream, isLeader bool) {
if node := mset.raftNode(); node != nil && !node.Quorum() && time.Since(node.Created()) > 5*time.Second {
s.sendStreamLostQuorumAdvisory(mset)
}
// Clear clseq. If we become leader again, it will be fixed up
// automatically on the next processClusteredInboundMsg call.
mset.mu.Lock()
if mset.clseq > 0 {
mset.clseq = 0
}
mset.mu.Unlock()
}
// Tell stream to switch leader status.
@@ -4689,7 +4718,7 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
if nb > 0 && ne >= compactNumMin || nb > compactSizeMin {
doSnapshot(false)
}
} else {
} else if err != errConsumerClosed {
s.Warnf("Error applying consumer entries to '%s > %s'", ca.Client.serviceAccount(), ca.Name)
}
}
@@ -4878,7 +4907,9 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
}
panic(err.Error())
}
o.processReplicatedAck(dseq, sseq)
if err := o.processReplicatedAck(dseq, sseq); err == errConsumerClosed {
return err
}
case updateSkipOp:
o.mu.Lock()
if !o.isLeader() {
@@ -4913,13 +4944,14 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
return nil
}
func (o *consumer) processReplicatedAck(dseq, sseq uint64) {
o.mu.Lock()
var errConsumerClosed = errors.New("consumer closed")
func (o *consumer) processReplicatedAck(dseq, sseq uint64) error {
o.mu.Lock()
mset := o.mset
if o.closed || mset == nil {
o.mu.Unlock()
return
return errConsumerClosed
}
// Update activity.
@@ -4930,7 +4962,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) {
if o.retention == LimitsPolicy {
o.mu.Unlock()
return
return nil
}
var sagap uint64
@@ -4942,7 +4974,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) {
state, err := o.store.State()
if err != nil {
o.mu.Unlock()
return
return err
}
sagap = sseq - state.AckFloor.Stream
}
@@ -4957,6 +4989,7 @@ func (o *consumer) processReplicatedAck(dseq, sseq uint64) {
} else {
mset.ackMsg(o, sseq)
}
return nil
}
var errBadAckUpdate = errors.New("jetstream cluster bad replicated ack update")
@@ -5333,6 +5366,31 @@ func (js *jetStream) stopUpdatesSub() {
}
}
func (s *Server) sendDomainLeaderElectAdvisory() {
js, cc := s.getJetStreamCluster()
if js == nil || cc == nil {
return
}
js.mu.RLock()
node := cc.meta
js.mu.RUnlock()
adv := &JSDomainLeaderElectedAdvisory{
TypedEvent: TypedEvent{
Type: JSDomainLeaderElectedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Leader: node.GroupLeader(),
Replicas: s.replicas(node),
Cluster: s.cachedClusterName(),
Domain: s.getOpts().JetStreamDomain,
}
s.publishAdvisory(nil, JSAdvisoryDomainLeaderElected, adv)
}
func (js *jetStream) processLeaderChange(isLeader bool) {
if js == nil {
return
@@ -5346,6 +5404,7 @@ func (js *jetStream) processLeaderChange(isLeader bool) {
if isLeader {
s.Noticef("Self is new JetStream cluster metadata leader")
s.sendDomainLeaderElectAdvisory()
} else {
var node string
if meta := js.getMetaGroup(); meta != nil {
@@ -5500,6 +5559,7 @@ func (e *selectPeerError) accumulate(eAdd *selectPeerError) {
// selectPeerGroup will select a group of peers to start a raft group.
// when peers exist already the unique tag prefix check for the replaceFirstExisting will be skipped
// js lock should be held.
func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamConfig, existing []string, replaceFirstExisting int, ignore []string) ([]string, *selectPeerError) {
if cluster == _EMPTY_ || cfg == nil {
return nil, &selectPeerError{misc: true}
@@ -5521,6 +5581,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
id string
avail uint64
ha int
ns int
}
var nodes []wn
@@ -5585,6 +5646,22 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}
// Grab the number of streams and HA assets currently assigned to each peer.
// HAAssets under usage is async, so calculate here in realtime based on assignments.
peerStreams := make(map[string]int, len(peers))
peerHA := make(map[string]int, len(peers))
for _, asa := range cc.streams {
for _, sa := range asa {
isHA := len(sa.Group.Peers) > 1
for _, peer := range sa.Group.Peers {
peerStreams[peer]++
if isHA {
peerHA[peer]++
}
}
}
}
maxHaAssets := s.getOpts().JetStreamLimits.MaxHAAssets
// An error is a result of multiple individual placement decisions.
@@ -5647,7 +5724,6 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
var available uint64
var ha int
if ni.stats != nil {
switch cfg.Storage {
case MemoryStorage:
@@ -5667,7 +5743,6 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
available = uint64(ni.cfg.MaxStore) - used
}
}
ha = ni.stats.HAAssets
}
// Otherwise check if we have enough room if maxBytes set.
@@ -5699,7 +5774,7 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
}
// Add to our list of potential nodes.
nodes = append(nodes, wn{p.ID, available, ha})
nodes = append(nodes, wn{p.ID, available, peerHA[p.ID], peerStreams[p.ID]})
}
// If we could not select enough peers, fail.
@@ -5711,10 +5786,14 @@ func (cc *jetStreamCluster) selectPeerGroup(r int, cluster string, cfg *StreamCo
}
return nil, &err
}
// Sort based on available from most to least.
sort.Slice(nodes, func(i, j int) bool { return nodes[i].avail > nodes[j].avail })
// If we are placing a replicated stream, let's sort based in haAssets, as that is more important to balance.
// Sort based on available from most to least, breaking ties by number of total streams assigned to the peer.
sort.Slice(nodes, func(i, j int) bool {
if nodes[i].avail == nodes[j].avail {
return nodes[i].ns < nodes[j].ns
}
return nodes[i].avail > nodes[j].avail
})
// If we are placing a replicated stream, let's sort based on HAAssets, as that is more important to balance.
if cfg.Replicas > 1 {
sort.SliceStable(nodes, func(i, j int) bool { return nodes[i].ha < nodes[j].ha })
}
@@ -7468,9 +7547,16 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.mu.RLock()
canRespond := !mset.cfg.NoAck && len(reply) > 0
name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, int64(mset.cfg.Replicas), mset.tier, mset.outq, mset.node
maxMsgSize, lseq, clfs := int(mset.cfg.MaxMsgSize), mset.lseq, mset.clfs
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes
isLeader, isSealed := mset.isLeader(), mset.cfg.Sealed
// We need to track state to check limits if interest retention and discard new with max msgs or bytes.
var state StreamState
if interestPolicy && discard == DiscardNew && (maxMsgs > 0 || maxBytes > 0) {
mset.store.FastState(&state)
}
mset.mu.RUnlock()
// This should not happen but possible now that we allow scale up, and scale down where this could trigger.
@@ -7506,50 +7592,14 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
}
// Check here pre-emptively if we have exceeded our account limits.
var exceeded bool
jsa.usageMu.Lock()
jsaLimits, ok := jsa.limits[tierName]
if !ok {
jsa.usageMu.Unlock()
err := fmt.Errorf("no JetStream resource limits found account: %q", jsa.acc().Name)
s.RateLimitWarnf(err.Error())
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSNoLimitsError()
response, _ = json.Marshal(resp)
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
if exceeded, err := jsa.wouldExceedLimits(st, tierName, r, subject, hdr, msg); exceeded {
if err == nil {
err = NewJSAccountResourcesExceededError()
}
return err
}
t, ok := jsa.usage[tierName]
if !ok {
t = &jsaStorage{}
jsa.usage[tierName] = t
}
// Make sure replicas is correct.
if r < 1 {
r = 1
}
// This is for limits. If we have no tier, consider all to be flat, vs tiers like R3 where we want to scale limit by replication.
lr := r
if tierName == _EMPTY_ {
lr = 1
}
// Tiers are flat, meaning the limit for R3 will be 100GB, not 300GB, so compare to total but adjust limits.
if st == MemoryStorage && jsaLimits.MaxMemory > 0 {
exceeded = t.total.mem+(int64(memStoreMsgSize(subject, hdr, msg))*r) > (jsaLimits.MaxMemory * lr)
} else if jsaLimits.MaxStore > 0 {
exceeded = t.total.store+(int64(fileStoreMsgSize(subject, hdr, msg))*r) > (jsaLimits.MaxStore * lr)
}
jsa.usageMu.Unlock()
// If we have exceeded our account limits go ahead and return.
if exceeded {
err := fmt.Errorf("JetStream resource limits exceeded for account: %q", jsa.acc().Name)
s.RateLimitWarnf(err.Error())
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSAccountResourcesExceededError()
resp.Error = err
response, _ = json.Marshal(resp)
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
}
@@ -7623,11 +7673,53 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// We only use mset.clseq for clustering and in case we run ahead of actual commits.
// Check if we need to set initial value here
mset.clMu.Lock()
if mset.clseq == 0 || mset.clseq < lseq {
if mset.clseq == 0 || mset.clseq < lseq+mset.clfs {
// Re-capture
lseq, clfs = mset.lseq, mset.clfs
mset.clseq = lseq + clfs
lseq = mset.lastSeq()
mset.clseq = lseq + mset.clfs
}
// Check if we have an interest policy and discard new with max msgs or bytes.
// We need to deny here otherwise it could succeed on some peers and not others
// depending on consumer ack state. So we deny here, if we allow that means we know
// it would succeed on every peer.
if interestPolicy && discard == DiscardNew && (maxMsgs > 0 || maxBytes > 0) {
// Track inflight.
if mset.inflight == nil {
mset.inflight = make(map[uint64]uint64)
}
if mset.cfg.Storage == FileStorage {
mset.inflight[mset.clseq] = fileStoreMsgSize(subject, hdr, msg)
} else {
mset.inflight[mset.clseq] = memStoreMsgSize(subject, hdr, msg)
}
var err error
if maxMsgs > 0 && state.Msgs+uint64(len(mset.inflight)) > uint64(maxMsgs) {
err = ErrMaxMsgs
} else if maxBytes > 0 {
// TODO(dlc) - Could track this rollup independently.
var bytesPending uint64
for _, nb := range mset.inflight {
bytesPending += nb
}
if state.Bytes+bytesPending > uint64(maxBytes) {
err = ErrMaxBytes
}
}
if err != nil {
delete(mset.inflight, mset.clseq)
mset.clMu.Unlock()
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
resp.Error = NewJSStreamStoreFailedError(err, Unless(err))
response, _ = json.Marshal(resp)
outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
}
return err
}
}
esm := encodeStreamMsgAllowCompress(subject, reply, hdr, msg, mset.clseq, time.Now().UnixNano(), mset.compressOK)
mset.clseq++
@@ -7639,7 +7731,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
// Check to see if we are being overrun.
// TODO(dlc) - Make this a limit where we drop messages to protect ourselves, but allow to be configured.
if mset.clseq-(lseq+clfs) > streamLagWarnThreshold {
if mset.clseq-(lseq+mset.clfs) > streamLagWarnThreshold {
lerr := fmt.Errorf("JetStream stream '%s > %s' has high message lag", jsa.acc().Name, name)
s.RateLimitWarnf(lerr.Error())
}
@@ -7814,14 +7906,14 @@ var (
func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
// Update any deletes, etc.
mset.processSnapshotDeletes(snap)
mset.setCLFS(snap.Failed)
mset.mu.Lock()
var state StreamState
mset.store.FastState(&state)
mset.setCLFS(snap.Failed)
sreq := mset.calculateSyncRequest(&state, snap)
s, js, subject, n := mset.srv, mset.js, mset.sa.Sync, mset.node
s, js, subject, n, st := mset.srv, mset.js, mset.sa.Sync, mset.node, mset.cfg.Storage
qname := fmt.Sprintf("[ACC:%s] stream '%s' snapshot", mset.acc.Name, mset.cfg.Name)
mset.mu.Unlock()
@@ -7831,7 +7923,7 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
}
// Just return if up to date or already exceeded limits.
if sreq == nil || js.limitsExceeded(mset.cfg.Storage) {
if sreq == nil || js.limitsExceeded(st) {
return nil
}
@@ -7889,24 +7981,6 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState) (e error) {
// On exit, we will release our semaphore if we acquired it.
defer releaseSyncOutSem()
// Check our final state when we exit cleanly.
// This will make sure we have interest consumers updated.
checkFinalState := func() {
// Bail if no stream.
if mset == nil {
return
}
mset.mu.RLock()
consumers := make([]*consumer, 0, len(mset.consumers))
for _, o := range mset.consumers {
consumers = append(consumers, o)
}
mset.mu.RUnlock()
for _, o := range consumers {
o.checkStateForInterestStream()
}
}
// Do not let this go on forever.
const maxRetries = 3
var numRetries int
@@ -8011,7 +8085,7 @@ RETRY:
// Check for eof signaling.
if len(msg) == 0 {
msgsQ.recycle(&mrecs)
checkFinalState()
mset.checkInterestState()
return nil
}
if _, err := mset.processCatchupMsg(msg); err == nil {
@@ -8506,10 +8580,13 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
// We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
sreq.FirstSeq = state.FirstSeq
if state.LastSeq > sreq.LastSeq {
sreq.LastSeq = state.LastSeq
}
}
// Setup sequences to walk through.
@@ -8645,10 +8722,22 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
// Check for a condition where our state's first is now past the last that we could have sent.
// If so reset last and continue sending.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()
if last < state.FirstSeq {
last = state.LastSeq
}
// Recheck our exit condition.
if seq == last {
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
}
select {
case <-remoteQuitCh:
+15 -3
View File
@@ -193,10 +193,22 @@ const JSRestoreCompleteAdvisoryType = "io.nats.jetstream.advisory.v1.restore_com
// Clustering specific.
// JSStreamLeaderElectedAdvisoryType is sent when the system elects a leader for a stream.
// JSClusterLeaderElectedAdvisoryType is sent when the system elects a new meta leader.
const JSDomainLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.domain_leader_elected"
// JSClusterLeaderElectedAdvisory indicates that a domain has elected a new leader.
type JSDomainLeaderElectedAdvisory struct {
TypedEvent
Leader string `json:"leader"`
Replicas []*PeerInfo `json:"replicas"`
Cluster string `json:"cluster"`
Domain string `json:"domain,omitempty"`
}
// JSStreamLeaderElectedAdvisoryType is sent when the system elects a new leader for a stream.
const JSStreamLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.stream_leader_elected"
// JSStreamLeaderElectedAdvisory indicates that a stream has lost quorum and is stalled.
// JSStreamLeaderElectedAdvisory indicates that a stream has elected a new leader.
type JSStreamLeaderElectedAdvisory struct {
TypedEvent
Account string `json:"account,omitempty"`
@@ -222,7 +234,7 @@ type JSStreamQuorumLostAdvisory struct {
// JSConsumerLeaderElectedAdvisoryType is sent when the system elects a leader for a consumer.
const JSConsumerLeaderElectedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_leader_elected"
// JSConsumerLeaderElectedAdvisory indicates that a stream has lost quorum and is stalled.
// JSConsumerLeaderElectedAdvisory indicates that a consumer has elected a new leader.
type JSConsumerLeaderElectedAdvisory struct {
TypedEvent
Account string `json:"account,omitempty"`
+13 -9
View File
@@ -2018,8 +2018,8 @@ func (s *Server) initLeafNodeSmapAndSendSubs(c *client) {
c.leaf.smap[oldGWReplyPrefix+"*.>"]++
c.leaf.smap[gwReplyPrefix+">"]++
}
// Detect loop by subscribing to a specific subject and checking
// if this is coming back to us.
// Detect loops by subscribing to a specific subject and checking
// if this sub is coming back to us.
c.leaf.smap[lds]++
// Check if we need to add an existing siReply to our map.
@@ -2083,7 +2083,7 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
isLDS := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
// Capture the cluster even if its empty.
cluster := _EMPTY_
var cluster string
if sub.origin != nil {
cluster = bytesToString(sub.origin)
}
@@ -2112,11 +2112,11 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
}
// Check to make sure this sub does not have an origin cluster that matches the leafnode.
ln.mu.Lock()
skip := (cluster != _EMPTY_ && cluster == ln.remoteCluster()) || (delta > 0 && !ln.canSubscribe(subject))
// If skipped, make sure that we still let go the "$LDS." subscription that allows
// the detection of a loop.
if isLDS || !skip {
ln.updateSmap(sub, delta)
// the detection of loops as long as different cluster.
clusterDifferent := cluster != ln.remoteCluster()
if (isLDS && clusterDifferent) || ((cluster == _EMPTY_ || clusterDifferent) && (delta <= 0 || ln.canSubscribe(subject))) {
ln.updateSmap(sub, delta, isLDS)
}
ln.mu.Unlock()
}
@@ -2125,7 +2125,7 @@ func (acc *Account) updateLeafNodes(sub *subscription, delta int32) {
// This will make an update to our internal smap and determine if we should send out
// an interest update to the remote side.
// Lock should be held.
func (c *client) updateSmap(sub *subscription, delta int32) {
func (c *client) updateSmap(sub *subscription, delta int32, isLDS bool) {
if c.leaf.smap == nil {
return
}
@@ -2133,7 +2133,7 @@ func (c *client) updateSmap(sub *subscription, delta int32) {
// If we are solicited make sure this is a local client or a non-solicited leaf node
skind := sub.client.kind
updateClient := skind == CLIENT || skind == SYSTEM || skind == JETSTREAM || skind == ACCOUNT
if c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
if !isLDS && c.isSpokeLeafNode() && !(updateClient || (skind == LEAF && !sub.client.isSpokeLeafNode())) {
return
}
@@ -2315,6 +2315,7 @@ func (c *client) processLeafSub(argo []byte) (err error) {
acc := c.acc
// Check if we have a loop.
ldsPrefix := bytes.HasPrefix(sub.subject, []byte(leafNodeLoopDetectionSubjectPrefix))
if ldsPrefix && bytesToString(sub.subject) == acc.getLDSubject() {
c.mu.Unlock()
c.handleLeafNodeLoop(true)
@@ -2983,6 +2984,9 @@ func (s *Server) leafNodeFinishConnectProcess(c *client) {
if err == ErrTooManyAccountConnections {
c.maxAccountConnExceeded()
return
} else if err == ErrLeafNodeLoop {
c.handleLeafNodeLoop(true)
return
}
c.Errorf("Registering leaf with account %s resulted in error: %v", acc.Name, err)
c.closeConnection(ProtocolViolation)
+22 -14
View File
@@ -1,4 +1,4 @@
// Copyright 2019-2023 The NATS Authors
// Copyright 2019-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -392,7 +392,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
// 1. See if we match any subs from fss.
// 2. If we match and the sseq is past ss.Last then we can use meta only.
// 3. If we match and we need to do a partial, break and clear any totals and do a full scan like num pending.
// 3. If we match we need to do a partial, break and clear any totals and do a full scan like num pending.
isMatch := func(subj string) bool {
if isAll {
@@ -429,7 +429,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
if sseq <= fss.First {
update(fss)
} else if sseq <= fss.Last {
// We matched but its a partial.
// We matched but it is a partial.
havePartial = true
// Don't break here, we will update to keep tracking last.
update(fss)
@@ -444,7 +444,7 @@ func (ms *memStore) filteredStateLocked(sseq uint64, filter string, lastPerSubje
// If we are here we need to scan the msgs.
// Capture first and last sequences for scan and then clear what we had.
first, last := ss.First, ss.Last
// To track if we decide to exclude and we need to calculate first.
// To track if we decide to exclude we need to calculate first.
var needScanFirst bool
if first < sseq {
first = sseq
@@ -644,7 +644,11 @@ func (ms *memStore) resetAgeChk(delta int64) {
fireIn := ms.cfg.MaxAge
if delta > 0 && time.Duration(delta) < fireIn {
fireIn = time.Duration(delta)
if fireIn = time.Duration(delta); fireIn < 250*time.Millisecond {
// Only fire at most once every 250ms.
// Excessive firing can effect ingest performance.
fireIn = time.Second
}
}
if ms.ageChk != nil {
ms.ageChk.Reset(fireIn)
@@ -655,19 +659,21 @@ func (ms *memStore) resetAgeChk(delta int64) {
// Will expire msgs that are too old.
func (ms *memStore) expireMsgs() {
ms.mu.Lock()
defer ms.mu.Unlock()
ms.mu.RLock()
now := time.Now().UnixNano()
minAge := now - int64(ms.cfg.MaxAge)
ms.mu.RUnlock()
for {
ms.mu.Lock()
if sm, ok := ms.msgs[ms.state.FirstSeq]; ok && sm.ts <= minAge {
ms.deleteFirstMsgOrPanic()
// Recalculate in case we are expiring a bunch.
now = time.Now().UnixNano()
minAge = now - int64(ms.cfg.MaxAge)
ms.mu.Unlock()
} else {
// We will exit here
if len(ms.msgs) == 0 {
if ms.ageChk != nil {
ms.ageChk.Stop()
@@ -686,7 +692,8 @@ func (ms *memStore) expireMsgs() {
ms.ageChk = time.AfterFunc(fireIn, ms.expireMsgs)
}
}
return
ms.mu.Unlock()
break
}
}
}
@@ -792,10 +799,11 @@ func (ms *memStore) Compact(seq uint64) (uint64, error) {
ms.mu.Unlock()
return 0, ErrStoreMsgNotFound
}
fseq := ms.state.FirstSeq
ms.state.FirstSeq = seq
ms.state.FirstTime = time.Unix(0, sm.ts).UTC()
for seq := seq - 1; seq > 0; seq-- {
for seq := seq - 1; seq >= fseq; seq-- {
if sm := ms.msgs[seq]; sm != nil {
bytes += memStoreMsgSize(sm.subj, sm.hdr, sm.msg)
purged++
@@ -1127,7 +1135,7 @@ func (ms *memStore) removeSeqPerSubject(subj string, seq uint64) {
}
}
// Will recalulate the first sequence for this subject in this block.
// Will recalculate the first sequence for this subject in this block.
// Lock should be held.
func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *SimpleState) {
for tseq := startSeq + 1; tseq <= ss.Last; tseq++ {
@@ -1140,7 +1148,7 @@ func (ms *memStore) recalculateFirstForSubj(subj string, startSeq uint64, ss *Si
}
// Removes the message referenced by seq.
// Lock should he held.
// Lock should be held.
func (ms *memStore) removeMsg(seq uint64, secure bool) bool {
var ss uint64
sm, ok := ms.msgs[seq]
@@ -1479,7 +1487,7 @@ func (o *consumerMemStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) erro
if o.state.Redelivered == nil {
o.state.Redelivered = make(map[uint64]uint64)
}
// Only update if greater then what we already have.
// Only update if greater than what we already have.
if o.state.Redelivered[sseq] < dc-1 {
o.state.Redelivered[sseq] = dc - 1
}
+44 -30
View File
@@ -234,6 +234,8 @@ type mqttSessionManager struct {
sessions map[string]*mqttAccountSessionManager // key is account name
}
var testDisableRMSCache = false
type mqttAccountSessionManager struct {
mu sync.RWMutex
sessions map[string]*mqttSession // key is MQTT client ID
@@ -243,7 +245,7 @@ type mqttAccountSessionManager struct {
flapTimer *time.Timer // Timer to perform some cleanup of the flappers map
sl *Sublist // sublist allowing to find retained messages for given subscription
retmsgs map[string]*mqttRetainedMsgRef // retained messages
rmsCache sync.Map // map[subject]*mqttRetainedMsg
rmsCache *sync.Map // map[subject]mqttRetainedMsg
jsa mqttJSA
rrmLastSeq uint64 // Restore retained messages expected last sequence
rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded
@@ -1142,6 +1144,9 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
quitCh: quitCh,
},
}
if !testDisableRMSCache {
as.rmsCache = &sync.Map{}
}
// TODO record domain name in as here
// The domain to communicate with may be required for JS calls.
@@ -1236,10 +1241,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
})
// Start the go routine that will clean up cached retained messages that expired.
s.startGoRoutine(func() {
defer s.grWG.Done()
as.cleanupRetainedMessageCache(s, closeCh)
})
if as.rmsCache != nil {
s.startGoRoutine(func() {
defer s.grWG.Done()
as.cleanupRetainedMessageCache(s, closeCh)
})
}
lookupStream := func(stream, txt string) (*StreamInfo, error) {
si, err := jsa.lookupStream(stream)
@@ -1444,21 +1451,12 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
rmLegacyDurName := mqttRetainedMsgsStreamName + "_" + jsa.id
jsa.deleteConsumer(mqttRetainedMsgsStreamName, rmLegacyDurName)
// Using ephemeral consumer is too risky because if this server were to be
// disconnected from the rest for few seconds, then the leader would remove
// the consumer, so even after a reconnect, we would no longer receive
// retained messages.
//
// So we use a durable consumer, and create a new one each time we start.
// The old one should expire and get deleted due to inactivity. The name for
// the durable is $MQTT_rmsgs_{uuid}_{server-name}, the server name is just
// for readability.
rmDurName := mqttRetainedMsgsStreamName + "_" + nuid.Next() + "_" + s.String()
// Create a new, uniquely names consumer for retained messages for this
// server. The prior one will expire eventually.
ccfg := &CreateConsumerRequest{
Stream: mqttRetainedMsgsStreamName,
Config: ConsumerConfig{
Durable: rmDurName,
Name: mqttRetainedMsgsStreamName + "_" + nuid.Next(),
FilterSubject: mqttRetainedMsgsStreamSubject + ">",
DeliverSubject: rmsubj,
ReplayPolicy: ReplayInstant,
@@ -1466,7 +1464,7 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc
InactiveThreshold: 5 * time.Minute,
},
}
if _, err := jsa.createConsumer(ccfg); err != nil {
if _, err := jsa.createEphemeralConsumer(ccfg); err != nil {
return nil, fmt.Errorf("create retained messages consumer for account %q: %v", accName, err)
}
@@ -1646,17 +1644,26 @@ func (jsa *mqttJSA) sendAck(ackSubject string) {
jsa.sendq.push(&mqttJSPubMsg{subj: ackSubject, hdr: -1})
}
func (jsa *mqttJSA) createConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) {
func (jsa *mqttJSA) createEphemeralConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) {
cfgb, err := json.Marshal(cfg)
if err != nil {
return nil, err
}
var subj string
if cfg.Config.Durable != _EMPTY_ {
subj = fmt.Sprintf(JSApiDurableCreateT, cfg.Stream, cfg.Config.Durable)
} else {
subj = fmt.Sprintf(JSApiConsumerCreateT, cfg.Stream)
subj := fmt.Sprintf(JSApiConsumerCreateT, cfg.Stream)
ccri, err := jsa.newRequest(mqttJSAConsumerCreate, subj, 0, cfgb)
if err != nil {
return nil, err
}
ccr := ccri.(*JSApiConsumerCreateResponse)
return ccr, ccr.ToError()
}
func (jsa *mqttJSA) createDurableConsumer(cfg *CreateConsumerRequest) (*JSApiConsumerCreateResponse, error) {
cfgb, err := json.Marshal(cfg)
if err != nil {
return nil, err
}
subj := fmt.Sprintf(JSApiDurableCreateT, cfg.Stream, cfg.Config.Durable)
ccri, err := jsa.newRequest(mqttJSAConsumerCreate, subj, 0, cfgb)
if err != nil {
return nil, err
@@ -2243,9 +2250,7 @@ func (as *mqttAccountSessionManager) handleRetainedMsg(key string, rf *mqttRetai
// Update the in-memory retained message cache but only for messages
// that are already in the cache, i.e. have been (recently) used.
if rm != nil {
as.setCachedRetainedMsg(key, rm, true, copyBytesToCache)
}
as.setCachedRetainedMsg(key, rm, true, copyBytesToCache)
return
}
}
@@ -2269,7 +2274,9 @@ func (as *mqttAccountSessionManager) handleRetainedMsgDel(subject string, seq ui
as.sl = NewSublistWithCache()
}
if erm, ok := as.retmsgs[subject]; ok {
as.rmsCache.Delete(subject)
if as.rmsCache != nil {
as.rmsCache.Delete(subject)
}
if erm.sub != nil {
as.sl.Remove(erm.sub)
erm.sub = nil
@@ -2770,6 +2777,7 @@ func (as *mqttAccountSessionManager) loadRetainedMessages(subjects map[string]st
w.Warnf("failed to decode retained message for subject %q: %v", ss[i], err)
continue
}
// Add the loaded retained message to the cache, and to the results map.
key := ss[i][len(mqttRetainedMsgsStreamSubject):]
as.setCachedRetainedMsg(key, &rm, false, false)
@@ -2960,6 +2968,9 @@ func (as *mqttAccountSessionManager) transferRetainedToPerKeySubjectStream(log *
}
func (as *mqttAccountSessionManager) getCachedRetainedMsg(subject string) *mqttRetainedMsg {
if as.rmsCache == nil {
return nil
}
v, ok := as.rmsCache.Load(subject)
if !ok {
return nil
@@ -2973,6 +2984,9 @@ func (as *mqttAccountSessionManager) getCachedRetainedMsg(subject string) *mqttR
}
func (as *mqttAccountSessionManager) setCachedRetainedMsg(subject string, rm *mqttRetainedMsg, onlyReplace bool, copyBytesToCache bool) {
if as.rmsCache == nil || rm == nil {
return
}
rm.expiresFromCache = time.Now().Add(mqttRetainedCacheTTL)
if onlyReplace {
if _, ok := as.rmsCache.Load(subject); !ok {
@@ -4927,7 +4941,7 @@ func (sess *mqttSession) ensurePubRelConsumerSubscription(c *client) error {
if opts.MQTT.ConsumerInactiveThreshold > 0 {
ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold
}
if _, err := sess.jsa.createConsumer(ccr); err != nil {
if _, err := sess.jsa.createDurableConsumer(ccr); err != nil {
c.Errorf("Unable to add JetStream consumer for PUBREL for client %q: err=%v", id, err)
return err
}
@@ -5033,7 +5047,7 @@ func (sess *mqttSession) processJSConsumer(c *client, subject, sid string,
if opts.MQTT.ConsumerInactiveThreshold > 0 {
ccr.Config.InactiveThreshold = opts.MQTT.ConsumerInactiveThreshold
}
if _, err := sess.jsa.createConsumer(ccr); err != nil {
if _, err := sess.jsa.createDurableConsumer(ccr); err != nil {
c.Errorf("Unable to add JetStream consumer for subscription on %q: err=%v", subject, err)
return nil, nil, err
}
+48 -9
View File
@@ -14,12 +14,14 @@
package server
import (
"bytes"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/asn1"
"encoding/base64"
"encoding/pem"
"errors"
"fmt"
"io"
"net/http"
@@ -161,16 +163,43 @@ func (oc *OCSPMonitor) getRemoteStatus() ([]byte, *ocsp.Response, error) {
if config := opts.OCSPConfig; config != nil {
overrideURLs = config.OverrideURLs
}
getRequestBytes := func(u string, hc *http.Client) ([]byte, error) {
getRequestBytes := func(u string, reqDER []byte, hc *http.Client) ([]byte, error) {
reqEnc := base64.StdEncoding.EncodeToString(reqDER)
u = fmt.Sprintf("%s/%s", u, reqEnc)
start := time.Now()
resp, err := hc.Get(u)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("non-ok http status: %d", resp.StatusCode)
}
oc.srv.Debugf("Received OCSP response (method=GET, status=%v, url=%s, duration=%.3fs)",
resp.StatusCode, u, time.Since(start).Seconds())
if resp.StatusCode > 299 {
return nil, fmt.Errorf("non-ok http status on GET request (reqlen=%d): %d", len(reqEnc), resp.StatusCode)
}
return io.ReadAll(resp.Body)
}
postRequestBytes := func(u string, body []byte, hc *http.Client) ([]byte, error) {
hreq, err := http.NewRequest("POST", u, bytes.NewReader(body))
if err != nil {
return nil, err
}
hreq.Header.Add("Content-Type", "application/ocsp-request")
hreq.Header.Add("Accept", "application/ocsp-response")
start := time.Now()
resp, err := hc.Do(hreq)
if err != nil {
return nil, err
}
defer resp.Body.Close()
oc.srv.Debugf("Received OCSP response (method=POST, status=%v, url=%s, duration=%.3fs)",
resp.StatusCode, u, time.Since(start).Seconds())
if resp.StatusCode > 299 {
return nil, fmt.Errorf("non-ok http status on POST request (reqlen=%d): %d", len(body), resp.StatusCode)
}
return io.ReadAll(resp.Body)
}
@@ -182,8 +211,6 @@ func (oc *OCSPMonitor) getRemoteStatus() ([]byte, *ocsp.Response, error) {
return nil, nil, err
}
reqEnc := base64.StdEncoding.EncodeToString(reqDER)
responders := oc.Leaf.OCSPServer
if len(overrideURLs) > 0 {
responders = overrideURLs
@@ -195,18 +222,30 @@ func (oc *OCSPMonitor) getRemoteStatus() ([]byte, *ocsp.Response, error) {
oc.mu.Lock()
hc := oc.hc
oc.mu.Unlock()
var raw []byte
for _, u := range responders {
var postErr, getErr error
u = strings.TrimSuffix(u, "/")
raw, err = getRequestBytes(fmt.Sprintf("%s/%s", u, reqEnc), hc)
if err == nil {
// Prefer to make POST requests first.
raw, postErr = postRequestBytes(u, reqDER, hc)
if postErr == nil {
err = nil
break
} else {
// Fallback to use a GET request.
raw, getErr = getRequestBytes(u, reqDER, hc)
if getErr == nil {
err = nil
break
} else {
err = errors.Join(postErr, getErr)
}
}
}
if err != nil {
return nil, nil, fmt.Errorf("exhausted ocsp servers: %w", err)
}
resp, err := ocsp.ParseResponse(raw, oc.Issuer)
if err != nil {
return nil, nil, fmt.Errorf("failed to get remote status: %w", err)
+1 -1
View File
@@ -2293,7 +2293,7 @@ func parseLeafNodes(v interface{}, opts *Options, errors *[]error, warnings *[]e
}
opts.LeafNode.Remotes = remotes
case "reconnect", "reconnect_delay", "reconnect_interval":
opts.LeafNode.ReconnectInterval = time.Duration(int(mv.(int64))) * time.Second
opts.LeafNode.ReconnectInterval = parseDuration("reconnect", tk, mv, errors, warnings)
case "tls":
tc, err := parseTLS(tk, true)
if err != nil {
+39 -75
View File
@@ -184,10 +184,8 @@ type raft struct {
sq *sendq // Send queue for outbound RPC messages
aesub *subscription // Subscription for handleAppendEntry callbacks
wtv []byte // Term and vote to be written
wps []byte // Peer state to be written
wtvch chan struct{} // Signals when a term vote was just written, to kick file writer
wpsch chan struct{} // Signals when a peer state was just written, to kick file writer
wtv []byte // Term and vote to be written
wps []byte // Peer state to be written
catchup *catchupState // For when we need to catch up as a follower.
progress map[string]*ipQueue[uint64] // For leader or server catching up a follower.
@@ -380,8 +378,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
js: s.getJetStream(),
sq: sq,
quit: make(chan struct{}),
wtvch: make(chan struct{}, 1),
wpsch: make(chan struct{}, 1),
reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"),
votes: newIPQueue[*voteResponse](s, qpfx+"vresp"),
prop: newIPQueue[*Entry](s, qpfx+"entry"),
@@ -518,8 +514,6 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig, labels pprofLabe
// Start the run goroutine for the Raft state machine.
s.startGoRoutine(n.run, labels)
// Start the filewriter.
s.startGoRoutine(n.fileWriter)
return n, nil
}
@@ -2799,16 +2793,15 @@ func (n *raft) applyCommit(index uint64) error {
committed = append(committed, e)
}
}
// Pass to the upper layers if we have normal entries.
if len(committed) > 0 {
if fpae {
delete(n.pae, index)
}
n.apply.push(newCommittedEntry(index, committed))
} else {
// If we processed inline update our applied index.
n.applied = index
if fpae {
delete(n.pae, index)
}
// Pass to the upper layers if we have normal entries. It is
// entirely possible that 'committed' might be an empty slice here,
// which will happen if we've processed updates inline (like peer
// states). In which case the upper layer will just call down with
// Applied() with no further action.
n.apply.push(newCommittedEntry(index, committed))
// Place back in the pool.
ae.returnToPool()
return nil
@@ -2920,7 +2913,9 @@ func (n *raft) runAsCandidate() {
n.requestVote()
// We vote for ourselves.
votes := 1
votes := map[string]struct{}{
n.ID(): {},
}
for {
elect := n.electTimer()
@@ -2948,11 +2943,11 @@ func (n *raft) runAsCandidate() {
nterm := n.term
n.RUnlock()
if vresp.granted && nterm >= vresp.term {
if vresp.granted && nterm == vresp.term {
// only track peers that would be our followers
n.trackPeer(vresp.peer)
votes++
if n.wonElection(votes) {
votes[vresp.peer] = struct{}{}
if n.wonElection(len(votes)) {
// Become LEADER if we have won and gotten a quorum with everyone we should hear from.
n.switchToLeader()
return
@@ -3686,11 +3681,11 @@ func (n *raft) writePeerState(ps *peerState) {
if bytes.Equal(n.wps, pse) {
return
}
// Stamp latest and kick writer.
// Stamp latest and write the peer state file.
n.wps = pse
select {
case n.wpsch <- struct{}{}:
default:
if err := writePeerState(n.sd, ps); err != nil && !n.isClosed() {
n.setWriteErrLocked(err)
n.warn("Error writing peer state file for %q: %v", n.group, err)
}
}
@@ -3705,10 +3700,7 @@ func writePeerState(sd string, ps *peerState) error {
err := os.WriteFile(psf, encodePeerState(ps), defaultFilePerms)
dios <- struct{}{}
if err != nil {
return err
}
return nil
return err
}
func readPeerState(sd string) (ps *peerState, err error) {
@@ -3725,6 +3717,20 @@ func readPeerState(sd string) (ps *peerState, err error) {
const termVoteFile = "tav.idx"
const termVoteLen = idLen + 8
// Writes out our term & vote outside of a specific raft context.
func writeTermVote(sd string, wtv []byte) error {
psf := filepath.Join(sd, termVoteFile)
if _, err := os.Stat(psf); err != nil && !os.IsNotExist(err) {
return err
}
<-dios
err := os.WriteFile(psf, wtv, defaultFilePerms)
dios <- struct{}{}
return err
}
// readTermVote will read the largest term and who we voted from to stable storage.
// Lock should be held.
func (n *raft) readTermVote() (term uint64, voted string, err error) {
@@ -3791,48 +3797,6 @@ func (n *raft) setWriteErr(err error) {
n.setWriteErrLocked(err)
}
func (n *raft) fileWriter() {
s := n.s
defer s.grWG.Done()
n.RLock()
tvf := filepath.Join(n.sd, termVoteFile)
psf := filepath.Join(n.sd, peerStateFile)
n.RUnlock()
for s.isRunning() {
select {
case <-n.quit:
return
case <-n.wtvch:
// We've been asked to write out the term-and-vote file.
var buf [termVoteLen]byte
n.RLock()
copy(buf[0:], n.wtv)
n.RUnlock()
<-dios
err := os.WriteFile(tvf, buf[:], defaultFilePerms)
dios <- struct{}{}
if err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing term and vote file for %q: %v", n.group, err)
}
case <-n.wpsch:
// We've been asked to write out the peer state file.
n.RLock()
buf := copyBytes(n.wps)
n.RUnlock()
<-dios
err := os.WriteFile(psf, buf, defaultFilePerms)
dios <- struct{}{}
if err != nil && !n.isClosed() {
n.setWriteErr(err)
n.warn("Error writing peer state file for %q: %v", n.group, err)
}
}
}
}
// writeTermVote will record the largest term and who we voted for to stable storage.
// Lock should be held.
func (n *raft) writeTermVote() {
@@ -3846,11 +3810,11 @@ func (n *raft) writeTermVote() {
if bytes.Equal(n.wtv, b) {
return
}
// Stamp latest and kick writer.
// Stamp latest and write the term & vote file.
n.wtv = b
select {
case n.wtvch <- struct{}{}:
default:
if err := writeTermVote(n.sd, n.wtv); err != nil && !n.isClosed() {
n.setWriteErrLocked(err)
n.warn("Error writing term and vote file for %q: %v", n.group, err)
}
}
+3
View File
@@ -999,6 +999,9 @@ func (s *Server) Reload() error {
// type. This returns an error if an option which doesn't support
// hot-swapping was changed.
func (s *Server) ReloadOptions(newOpts *Options) error {
s.reloadMu.Lock()
defer s.reloadMu.Unlock()
s.mu.Lock()
curOpts := s.getOpts()
+11 -3
View File
@@ -124,6 +124,7 @@ type Server struct {
stats
scStats
mu sync.RWMutex
reloadMu sync.RWMutex // Write-locked when a config reload is taking place ONLY
kp nkeys.KeyPair
xkp nkeys.KeyPair
xpub string
@@ -1143,9 +1144,11 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
// If we have leafnodes they need to be updated.
if reloading && a == s.gacc {
a.mu.Lock()
var mappings []*mapping
mappings := make(map[string]*mapping)
if len(a.mappings) > 0 && a.nleafs > 0 {
mappings = append(mappings, a.mappings...)
for _, em := range a.mappings {
mappings[em.src] = em
}
}
a.mu.Unlock()
if len(mappings) > 0 || len(oldGMappings) > 0 {
@@ -1156,7 +1159,10 @@ func (s *Server) configureAccounts(reloading bool) (map[string]struct{}, error)
}
// Remove any old ones if needed.
for _, em := range oldGMappings {
lc.forceRemoveFromSmap(em.src)
// Only remove if not in the new ones.
if _, ok := mappings[em.src]; !ok {
lc.forceRemoveFromSmap(em.src)
}
}
}
a.lmu.RUnlock()
@@ -2679,7 +2685,9 @@ func (s *Server) acceptConnections(l net.Listener, acceptName string, createFunc
}
tmpDelay = ACCEPT_MIN_SLEEP
if !s.startGoRoutine(func() {
s.reloadMu.RLock()
createFunc(conn)
s.reloadMu.RUnlock()
s.grWG.Done()
}) {
conn.Close()
+72 -45
View File
@@ -283,6 +283,7 @@ type stream struct {
clMu sync.Mutex
clseq uint64
clfs uint64
inflight map[uint64]uint64
leader string
lqsent time.Time
catchups map[string]uint64
@@ -991,9 +992,7 @@ func (mset *stream) rebuildDedupe() {
}
func (mset *stream) lastSeqAndCLFS() (uint64, uint64) {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.lseq, mset.getCLFS()
return mset.lastSeq(), mset.getCLFS()
}
func (mset *stream) getCLFS() uint64 {
@@ -1010,9 +1009,8 @@ func (mset *stream) setCLFS(clfs uint64) {
func (mset *stream) lastSeq() uint64 {
mset.mu.RLock()
lseq := mset.lseq
mset.mu.RUnlock()
return lseq
defer mset.mu.RUnlock()
return mset.lseq
}
func (mset *stream) setLastSeq(lseq uint64) {
@@ -1998,12 +1996,13 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// Purge consumers.
// Check for filtered purge.
if preq != nil && preq.Subject != _EMPTY_ {
ss := store.FilteredState(state.FirstSeq, preq.Subject)
ss := store.FilteredState(fseq, preq.Subject)
fseq = ss.First
}
mset.clsMu.RLock()
for _, o := range mset.cList {
start := fseq
o.mu.RLock()
// we update consumer sequences if:
// no subject was specified, we can purge all consumers sequences
@@ -2013,10 +2012,15 @@ func (mset *stream) purge(preq *JSApiStreamPurgeRequest) (purged uint64, err err
// or consumer filter subject is subset of purged subject,
// but not the other way around.
o.isEqualOrSubsetMatch(preq.Subject)
// Check if a consumer has a wider subject space then what we purged
var isWider bool
if !doPurge && preq != nil && o.isFilteredMatch(preq.Subject) {
doPurge, isWider = true, true
start = state.FirstSeq
}
o.mu.RUnlock()
if doPurge {
o.purge(fseq, lseq)
o.purge(start, lseq, isWider)
}
}
mset.clsMu.RUnlock()
@@ -4125,7 +4129,7 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
if len(hdr) == 0 {
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\n\r\n"
hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano)))
hdr = fmt.Appendf(nil, ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano))
} else {
hdr = copyBytes(hdr)
hdr = genHeader(hdr, JSStream, name)
@@ -4158,6 +4162,11 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.mu.Lock()
s, store := mset.srv, mset.store
bumpCLFS := func() {
mset.clMu.Lock()
mset.clfs++
mset.clMu.Unlock()
}
// Apply the input subject transform if any
if mset.itr != nil {
ts, err := mset.itr.Match(subject)
@@ -4187,6 +4196,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
if isSealed {
outq := mset.outq
mset.mu.Unlock()
bumpCLFS()
if canRespond && outq != nil {
resp.PubAck = &PubAck{Stream: name}
resp.Error = ApiErrors[JSStreamSealedErr]
@@ -4242,17 +4252,17 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Process additional msg headers if still present.
var msgId string
var rollupSub, rollupAll bool
isClustered := mset.isClustered()
if len(hdr) > 0 {
outq := mset.outq
isClustered := mset.isClustered()
// Certain checks have already been performed if in clustered mode, so only check if not.
if !isClustered {
// Expected stream.
if sname := getExpectedStream(hdr); sname != _EMPTY_ && sname != name {
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamNotMatchError()
@@ -4266,8 +4276,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Dedupe detection.
if msgId = getMsgId(hdr); msgId != _EMPTY_ {
if dde := mset.checkMsgId(msgId); dde != nil {
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
response := append(pubAck, strconv.FormatUint(dde.seq, 10)...)
response = append(response, ",\"duplicate\": true}"...)
@@ -4290,8 +4300,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
fseq, err = 0, nil
}
if err != nil || fseq != seq {
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(fseq)
@@ -4305,8 +4315,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Expected last sequence.
if seq, exists := getExpectedLastSeq(hdr); exists && seq != mset.lseq {
mlseq := mset.lseq
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastSequenceError(mlseq)
@@ -4322,8 +4332,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
if lmsgId != mset.lmsgId {
last := mset.lmsgId
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamWrongLastMsgIDError(last)
@@ -4336,8 +4346,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check for any rollups.
if rollup := getRollup(hdr); rollup != _EMPTY_ {
if !mset.cfg.AllowRollup || mset.cfg.DenyPurge {
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamRollupFailedError(errors.New("rollup not permitted"))
@@ -4353,6 +4363,7 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
rollupAll = true
default:
mset.mu.Unlock()
bumpCLFS()
return fmt.Errorf("rollup value invalid: %q", rollup)
}
}
@@ -4367,8 +4378,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check to see if we are over the max msg size.
if maxMsgSize >= 0 && (len(hdr)+len(msg)) > maxMsgSize {
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamMessageExceedsMaximumError()
@@ -4379,8 +4390,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
if len(hdr) > math.MaxUint16 {
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSStreamHeaderExceedsMaximumError()
@@ -4393,8 +4404,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
// Check to see if we have exceeded our limits.
if js.limitsExceeded(stype) {
s.resourcesExceededError()
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = NewJSInsufficientResourcesError()
@@ -4478,6 +4489,22 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
}
}
// If clustered this was already checked and we do not want to check here and possibly introduce skew.
if !isClustered {
if exceeded, err := jsa.wouldExceedLimits(stype, tierName, mset.cfg.Replicas, subject, hdr, msg); exceeded {
if err == nil {
err = NewJSAccountResourcesExceededError()
}
s.RateLimitWarnf("JetStream resource limits exceeded for account: %q", accName)
if canRespond {
resp.PubAck = &PubAck{Stream: name}
resp.Error = err
response, _ = json.Marshal(resp)
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, nil, response, nil, 0))
}
}
}
// Store actual msg.
if lseq == 0 && ts == 0 {
seq, ts, err = store.StoreMsg(subject, hdr, msg)
@@ -4499,8 +4526,8 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
mset.clfs++
mset.mu.Unlock()
bumpCLFS()
switch err {
case ErrMaxMsgs, ErrMaxBytes, ErrMaxMsgsPerSubject, ErrMsgTooLarge:
@@ -4519,28 +4546,6 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
return err
}
if exceeded, apiErr := jsa.limitsExceeded(stype, tierName, mset.cfg.Replicas); exceeded {
s.RateLimitWarnf("JetStream resource limits exceeded for account: %q", accName)
if canRespond {
resp.PubAck = &PubAck{Stream: name}
if apiErr == nil {
resp.Error = NewJSAccountResourcesExceededError()
} else {
resp.Error = apiErr
}
response, _ = json.Marshal(resp)
mset.outq.sendMsg(reply, response)
}
// If we did not succeed put those values back.
var state StreamState
mset.store.FastState(&state)
mset.lseq = state.LastSeq
mset.lmsgId = olmsgId
mset.mu.Unlock()
store.RemoveMsg(seq)
return nil
}
// If we have a msgId make sure to save.
if msgId != _EMPTY_ {
mset.storeMsgIdLocked(&ddentry{msgId, seq, ts})
@@ -4564,10 +4569,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
if !thdrsOnly {
hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tsStr, tlseq))
hdr = fmt.Appendf(nil, ht, name, subject, seq, tsStr, tlseq)
rpMsg = copyBytes(msg)
} else {
hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tsStr, tlseq, len(msg)))
hdr = fmt.Appendf(nil, htho, name, subject, seq, tsStr, tlseq, len(msg))
}
} else {
// Slow path.
@@ -5170,6 +5175,28 @@ func (mset *stream) getPublicConsumers() []*consumer {
return obs
}
// Will check for interest retention and make sure messages
// that have been acked are processed.
func (mset *stream) checkInterestState() {
if mset == nil {
return
}
mset.mu.RLock()
// If we are limits based nothing to do.
if mset.cfg.Retention == LimitsPolicy {
mset.mu.RUnlock()
return
}
consumers := make([]*consumer, 0, len(mset.consumers))
for _, o := range mset.consumers {
consumers = append(consumers, o)
}
mset.mu.RUnlock()
for _, o := range consumers {
o.checkStateForInterestStream()
}
}
func (mset *stream) isInterestRetention() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
+1 -2
View File
@@ -34,8 +34,7 @@ type node interface {
// Maximum prefix len
// We expect the most savings to come from long shared prefixes.
// This also makes the meta base layer exactly 64 bytes, a normal L1 cache line.
const maxPrefixLen = 60
const maxPrefixLen = 24
// 64 bytes total - an L1 cache line.
type meta struct {
+8 -9
View File
@@ -65,13 +65,13 @@ func genParts(filter []byte, parts [][]byte) [][]byte {
// Match our parts against a fragment, which could be prefix for nodes or a suffix for leafs.
func matchParts(parts [][]byte, frag []byte) ([][]byte, bool) {
if len(frag) == 0 {
lf := len(frag)
if lf == 0 {
return parts, true
}
var si int
lpi := len(parts) - 1
lf := len(frag)
for i, part := range parts {
if si >= lf {
@@ -97,12 +97,11 @@ func matchParts(parts [][]byte, frag []byte) ([][]byte, bool) {
return nil, true
}
}
end := min(si+lp, len(frag))
// If part is bigger then the fragment, adjust to a portion on the part.
partialPart := lp > end
if partialPart {
end := min(si+lp, lf)
// If part is bigger then the remaining fragment, adjust to a portion on the part.
if si+lp > end {
// Frag is smaller then part itself.
part = part[:end]
part = part[:end-si]
}
if !bytes.Equal(part, frag[si:end]) {
return parts, false
@@ -114,10 +113,10 @@ func matchParts(parts [][]byte, frag []byte) ([][]byte, bool) {
}
// If we matched a partial, do not move past current part
// but update the part to what was consumed. This allows upper layers to continue.
if end < lp {
if end < si+lp {
if end >= lf {
parts = append([][]byte{}, parts...) // Create a copy before modifying.
parts[i] = parts[i][lf:]
parts[i] = parts[i][lf-si:]
} else {
i++
}
+1 -1
View File
@@ -278,7 +278,7 @@ func (t *SubjectTree[T]) match(n node, parts [][]byte, pre []byte, cb func(subje
}
// We have matched here. If we are a leaf and have exhausted all parts or he have a FWC fire callback.
if n.isLeaf() {
if len(nparts) == 0 || hasFWC {
if len(nparts) == 0 || (hasFWC && len(nparts) == 1) {
ln := n.(*leaf[T])
cb(append(pre, ln.suffix...), &ln.value)
}
+11 -2
View File
@@ -1,4 +1,4 @@
// Copyright 2016-2023 The NATS Authors
// Copyright 2016-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@@ -1052,7 +1052,16 @@ func visitLevel(l *level, depth int) int {
// Determine if a subject has any wildcard tokens.
func subjectHasWildcard(subject string) bool {
return !subjectIsLiteral(subject)
// This one exits earlier then !subjectIsLiteral(subject)
for i, c := range subject {
if c == pwc || c == fwc {
if (i == 0 || subject[i-1] == btsep) &&
(i+1 == len(subject) || subject[i+1] == btsep) {
return true
}
}
}
return false
}
// Determine if the subject has any wildcards. Fast version, does not check for
+6 -8
View File
@@ -19,15 +19,14 @@
#define POLY1305_MUL(h0, h1, h2, r0, r1, t0, t1, t2, t3, t4, t5) \
MULLD r0, h0, t0; \
MULLD r0, h1, t4; \
MULHDU r0, h0, t1; \
MULLD r0, h1, t4; \
MULHDU r0, h1, t5; \
ADDC t4, t1, t1; \
MULLD r0, h2, t2; \
ADDZE t5; \
MULHDU r1, h0, t4; \
MULLD r1, h0, h0; \
ADD t5, t2, t2; \
ADDE t5, t2, t2; \
ADDC h0, t1, t1; \
MULLD h2, r1, t3; \
ADDZE t4, h0; \
@@ -37,13 +36,11 @@
ADDE t5, t3, t3; \
ADDC h0, t2, t2; \
MOVD $-4, t4; \
MOVD t0, h0; \
MOVD t1, h1; \
ADDZE t3; \
ANDCC $3, t2, h2; \
AND t2, t4, t0; \
RLDICL $0, t2, $62, h2; \
AND t2, t4, h0; \
ADDC t0, h0, h0; \
ADDE t3, h1, h1; \
ADDE t3, t1, h1; \
SLD $62, t3, t4; \
SRD $2, t2; \
ADDZE h2; \
@@ -75,6 +72,7 @@ TEXT ·update(SB), $0-32
loop:
POLY1305_ADD(R4, R8, R9, R10, R20, R21, R22)
PCALIGN $16
multiply:
POLY1305_MUL(R8, R9, R10, R11, R12, R16, R17, R18, R14, R20, R21)
ADD $-16, R5
+7 -6
View File
@@ -279,21 +279,22 @@ func getOIDFromHashAlgorithm(target crypto.Hash) asn1.ObjectIdentifier {
// This is the exposed reflection of the internal OCSP structures.
// The status values that can be expressed in OCSP. See RFC 6960.
// The status values that can be expressed in OCSP. See RFC 6960.
// These are used for the Response.Status field.
const (
// Good means that the certificate is valid.
Good = iota
Good = 0
// Revoked means that the certificate has been deliberately revoked.
Revoked
Revoked = 1
// Unknown means that the OCSP responder doesn't know about the certificate.
Unknown
Unknown = 2
// ServerFailed is unused and was never used (see
// https://go-review.googlesource.com/#/c/18944). ParseResponse will
// return a ResponseError when an error response is parsed.
ServerFailed
ServerFailed = 3
)
// The enumerated reasons for revoking a certificate. See RFC 5280.
// The enumerated reasons for revoking a certificate. See RFC 5280.
const (
Unspecified = 0
KeyCompromise = 1
+1 -1
View File
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build (aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris || zos) && go1.9
//go:build aix || darwin || dragonfly || freebsd || linux || netbsd || openbsd || solaris || zos
package unix
+1 -1
View File
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build darwin && go1.12
//go:build darwin
package unix
+7 -5
View File
@@ -13,6 +13,7 @@
package unix
import (
"errors"
"sync"
"unsafe"
)
@@ -169,25 +170,26 @@ func Getfsstat(buf []Statfs_t, flags int) (n int, err error) {
func Uname(uname *Utsname) error {
mib := []_C_int{CTL_KERN, KERN_OSTYPE}
n := unsafe.Sizeof(uname.Sysname)
if err := sysctl(mib, &uname.Sysname[0], &n, nil, 0); err != nil {
// Suppress ENOMEM errors to be compatible with the C library __xuname() implementation.
if err := sysctl(mib, &uname.Sysname[0], &n, nil, 0); err != nil && !errors.Is(err, ENOMEM) {
return err
}
mib = []_C_int{CTL_KERN, KERN_HOSTNAME}
n = unsafe.Sizeof(uname.Nodename)
if err := sysctl(mib, &uname.Nodename[0], &n, nil, 0); err != nil {
if err := sysctl(mib, &uname.Nodename[0], &n, nil, 0); err != nil && !errors.Is(err, ENOMEM) {
return err
}
mib = []_C_int{CTL_KERN, KERN_OSRELEASE}
n = unsafe.Sizeof(uname.Release)
if err := sysctl(mib, &uname.Release[0], &n, nil, 0); err != nil {
if err := sysctl(mib, &uname.Release[0], &n, nil, 0); err != nil && !errors.Is(err, ENOMEM) {
return err
}
mib = []_C_int{CTL_KERN, KERN_VERSION}
n = unsafe.Sizeof(uname.Version)
if err := sysctl(mib, &uname.Version[0], &n, nil, 0); err != nil {
if err := sysctl(mib, &uname.Version[0], &n, nil, 0); err != nil && !errors.Is(err, ENOMEM) {
return err
}
@@ -205,7 +207,7 @@ func Uname(uname *Utsname) error {
mib = []_C_int{CTL_HW, HW_MACHINE}
n = unsafe.Sizeof(uname.Machine)
if err := sysctl(mib, &uname.Machine[0], &n, nil, 0); err != nil {
if err := sysctl(mib, &uname.Machine[0], &n, nil, 0); err != nil && !errors.Is(err, ENOMEM) {
return err
}
+99
View File
@@ -1849,6 +1849,105 @@ func Dup2(oldfd, newfd int) error {
//sys Fsmount(fd int, flags int, mountAttrs int) (fsfd int, err error)
//sys Fsopen(fsName string, flags int) (fd int, err error)
//sys Fspick(dirfd int, pathName string, flags int) (fd int, err error)
//sys fsconfig(fd int, cmd uint, key *byte, value *byte, aux int) (err error)
func fsconfigCommon(fd int, cmd uint, key string, value *byte, aux int) (err error) {
var keyp *byte
if keyp, err = BytePtrFromString(key); err != nil {
return
}
return fsconfig(fd, cmd, keyp, value, aux)
}
// FsconfigSetFlag is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_SET_FLAG.
//
// fd is the filesystem context to act upon.
// key the parameter key to set.
func FsconfigSetFlag(fd int, key string) (err error) {
return fsconfigCommon(fd, FSCONFIG_SET_FLAG, key, nil, 0)
}
// FsconfigSetString is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_SET_STRING.
//
// fd is the filesystem context to act upon.
// key the parameter key to set.
// value is the parameter value to set.
func FsconfigSetString(fd int, key string, value string) (err error) {
var valuep *byte
if valuep, err = BytePtrFromString(value); err != nil {
return
}
return fsconfigCommon(fd, FSCONFIG_SET_STRING, key, valuep, 0)
}
// FsconfigSetBinary is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_SET_BINARY.
//
// fd is the filesystem context to act upon.
// key the parameter key to set.
// value is the parameter value to set.
func FsconfigSetBinary(fd int, key string, value []byte) (err error) {
if len(value) == 0 {
return EINVAL
}
return fsconfigCommon(fd, FSCONFIG_SET_BINARY, key, &value[0], len(value))
}
// FsconfigSetPath is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_SET_PATH.
//
// fd is the filesystem context to act upon.
// key the parameter key to set.
// path is a non-empty path for specified key.
// atfd is a file descriptor at which to start lookup from or AT_FDCWD.
func FsconfigSetPath(fd int, key string, path string, atfd int) (err error) {
var valuep *byte
if valuep, err = BytePtrFromString(path); err != nil {
return
}
return fsconfigCommon(fd, FSCONFIG_SET_PATH, key, valuep, atfd)
}
// FsconfigSetPathEmpty is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_SET_PATH_EMPTY. The same as
// FconfigSetPath but with AT_PATH_EMPTY implied.
func FsconfigSetPathEmpty(fd int, key string, path string, atfd int) (err error) {
var valuep *byte
if valuep, err = BytePtrFromString(path); err != nil {
return
}
return fsconfigCommon(fd, FSCONFIG_SET_PATH_EMPTY, key, valuep, atfd)
}
// FsconfigSetFd is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_SET_FD.
//
// fd is the filesystem context to act upon.
// key the parameter key to set.
// value is a file descriptor to be assigned to specified key.
func FsconfigSetFd(fd int, key string, value int) (err error) {
return fsconfigCommon(fd, FSCONFIG_SET_FD, key, nil, value)
}
// FsconfigCreate is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_CMD_CREATE.
//
// fd is the filesystem context to act upon.
func FsconfigCreate(fd int) (err error) {
return fsconfig(fd, FSCONFIG_CMD_CREATE, nil, nil, 0)
}
// FsconfigReconfigure is equivalent to fsconfig(2) called
// with cmd == FSCONFIG_CMD_RECONFIGURE.
//
// fd is the filesystem context to act upon.
func FsconfigReconfigure(fd int) (err error) {
return fsconfig(fd, FSCONFIG_CMD_RECONFIGURE, nil, nil, 0)
}
//sys Getdents(fd int, buf []byte) (n int, err error) = SYS_GETDENTS64
//sysnb Getpgid(pid int) (pgid int, err error)
+10
View File
@@ -906,6 +906,16 @@ func Fspick(dirfd int, pathName string, flags int) (fd int, err error) {
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func fsconfig(fd int, cmd uint, key *byte, value *byte, aux int) (err error) {
_, _, e1 := Syscall6(SYS_FSCONFIG, uintptr(fd), uintptr(cmd), uintptr(unsafe.Pointer(key)), uintptr(unsafe.Pointer(value)), uintptr(aux), 0)
if e1 != 0 {
err = errnoErr(e1)
}
return
}
// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT
func Getdents(fd int, buf []byte) (n int, err error) {
var _p0 unsafe.Pointer
if len(buf) > 0 {
+60
View File
@@ -836,6 +836,15 @@ const (
FSPICK_EMPTY_PATH = 0x8
FSMOUNT_CLOEXEC = 0x1
FSCONFIG_SET_FLAG = 0x0
FSCONFIG_SET_STRING = 0x1
FSCONFIG_SET_BINARY = 0x2
FSCONFIG_SET_PATH = 0x3
FSCONFIG_SET_PATH_EMPTY = 0x4
FSCONFIG_SET_FD = 0x5
FSCONFIG_CMD_CREATE = 0x6
FSCONFIG_CMD_RECONFIGURE = 0x7
)
type OpenHow struct {
@@ -1550,6 +1559,7 @@ const (
IFLA_DEVLINK_PORT = 0x3e
IFLA_GSO_IPV4_MAX_SIZE = 0x3f
IFLA_GRO_IPV4_MAX_SIZE = 0x40
IFLA_DPLL_PIN = 0x41
IFLA_PROTO_DOWN_REASON_UNSPEC = 0x0
IFLA_PROTO_DOWN_REASON_MASK = 0x1
IFLA_PROTO_DOWN_REASON_VALUE = 0x2
@@ -1565,6 +1575,7 @@ const (
IFLA_INET6_ICMP6STATS = 0x6
IFLA_INET6_TOKEN = 0x7
IFLA_INET6_ADDR_GEN_MODE = 0x8
IFLA_INET6_RA_MTU = 0x9
IFLA_BR_UNSPEC = 0x0
IFLA_BR_FORWARD_DELAY = 0x1
IFLA_BR_HELLO_TIME = 0x2
@@ -1612,6 +1623,9 @@ const (
IFLA_BR_MCAST_MLD_VERSION = 0x2c
IFLA_BR_VLAN_STATS_PER_PORT = 0x2d
IFLA_BR_MULTI_BOOLOPT = 0x2e
IFLA_BR_MCAST_QUERIER_STATE = 0x2f
IFLA_BR_FDB_N_LEARNED = 0x30
IFLA_BR_FDB_MAX_LEARNED = 0x31
IFLA_BRPORT_UNSPEC = 0x0
IFLA_BRPORT_STATE = 0x1
IFLA_BRPORT_PRIORITY = 0x2
@@ -1649,6 +1663,14 @@ const (
IFLA_BRPORT_BACKUP_PORT = 0x22
IFLA_BRPORT_MRP_RING_OPEN = 0x23
IFLA_BRPORT_MRP_IN_OPEN = 0x24
IFLA_BRPORT_MCAST_EHT_HOSTS_LIMIT = 0x25
IFLA_BRPORT_MCAST_EHT_HOSTS_CNT = 0x26
IFLA_BRPORT_LOCKED = 0x27
IFLA_BRPORT_MAB = 0x28
IFLA_BRPORT_MCAST_N_GROUPS = 0x29
IFLA_BRPORT_MCAST_MAX_GROUPS = 0x2a
IFLA_BRPORT_NEIGH_VLAN_SUPPRESS = 0x2b
IFLA_BRPORT_BACKUP_NHID = 0x2c
IFLA_INFO_UNSPEC = 0x0
IFLA_INFO_KIND = 0x1
IFLA_INFO_DATA = 0x2
@@ -1670,6 +1692,9 @@ const (
IFLA_MACVLAN_MACADDR = 0x4
IFLA_MACVLAN_MACADDR_DATA = 0x5
IFLA_MACVLAN_MACADDR_COUNT = 0x6
IFLA_MACVLAN_BC_QUEUE_LEN = 0x7
IFLA_MACVLAN_BC_QUEUE_LEN_USED = 0x8
IFLA_MACVLAN_BC_CUTOFF = 0x9
IFLA_VRF_UNSPEC = 0x0
IFLA_VRF_TABLE = 0x1
IFLA_VRF_PORT_UNSPEC = 0x0
@@ -1693,9 +1718,22 @@ const (
IFLA_XFRM_UNSPEC = 0x0
IFLA_XFRM_LINK = 0x1
IFLA_XFRM_IF_ID = 0x2
IFLA_XFRM_COLLECT_METADATA = 0x3
IFLA_IPVLAN_UNSPEC = 0x0
IFLA_IPVLAN_MODE = 0x1
IFLA_IPVLAN_FLAGS = 0x2
NETKIT_NEXT = -0x1
NETKIT_PASS = 0x0
NETKIT_DROP = 0x2
NETKIT_REDIRECT = 0x7
NETKIT_L2 = 0x0
NETKIT_L3 = 0x1
IFLA_NETKIT_UNSPEC = 0x0
IFLA_NETKIT_PEER_INFO = 0x1
IFLA_NETKIT_PRIMARY = 0x2
IFLA_NETKIT_POLICY = 0x3
IFLA_NETKIT_PEER_POLICY = 0x4
IFLA_NETKIT_MODE = 0x5
IFLA_VXLAN_UNSPEC = 0x0
IFLA_VXLAN_ID = 0x1
IFLA_VXLAN_GROUP = 0x2
@@ -1726,6 +1764,8 @@ const (
IFLA_VXLAN_GPE = 0x1b
IFLA_VXLAN_TTL_INHERIT = 0x1c
IFLA_VXLAN_DF = 0x1d
IFLA_VXLAN_VNIFILTER = 0x1e
IFLA_VXLAN_LOCALBYPASS = 0x1f
IFLA_GENEVE_UNSPEC = 0x0
IFLA_GENEVE_ID = 0x1
IFLA_GENEVE_REMOTE = 0x2
@@ -1740,6 +1780,7 @@ const (
IFLA_GENEVE_LABEL = 0xb
IFLA_GENEVE_TTL_INHERIT = 0xc
IFLA_GENEVE_DF = 0xd
IFLA_GENEVE_INNER_PROTO_INHERIT = 0xe
IFLA_BAREUDP_UNSPEC = 0x0
IFLA_BAREUDP_PORT = 0x1
IFLA_BAREUDP_ETHERTYPE = 0x2
@@ -1752,6 +1793,8 @@ const (
IFLA_GTP_FD1 = 0x2
IFLA_GTP_PDP_HASHSIZE = 0x3
IFLA_GTP_ROLE = 0x4
IFLA_GTP_CREATE_SOCKETS = 0x5
IFLA_GTP_RESTART_COUNT = 0x6
IFLA_BOND_UNSPEC = 0x0
IFLA_BOND_MODE = 0x1
IFLA_BOND_ACTIVE_SLAVE = 0x2
@@ -1781,6 +1824,9 @@ const (
IFLA_BOND_AD_ACTOR_SYSTEM = 0x1a
IFLA_BOND_TLB_DYNAMIC_LB = 0x1b
IFLA_BOND_PEER_NOTIF_DELAY = 0x1c
IFLA_BOND_AD_LACP_ACTIVE = 0x1d
IFLA_BOND_MISSED_MAX = 0x1e
IFLA_BOND_NS_IP6_TARGET = 0x1f
IFLA_BOND_AD_INFO_UNSPEC = 0x0
IFLA_BOND_AD_INFO_AGGREGATOR = 0x1
IFLA_BOND_AD_INFO_NUM_PORTS = 0x2
@@ -1796,6 +1842,7 @@ const (
IFLA_BOND_SLAVE_AD_AGGREGATOR_ID = 0x6
IFLA_BOND_SLAVE_AD_ACTOR_OPER_PORT_STATE = 0x7
IFLA_BOND_SLAVE_AD_PARTNER_OPER_PORT_STATE = 0x8
IFLA_BOND_SLAVE_PRIO = 0x9
IFLA_VF_INFO_UNSPEC = 0x0
IFLA_VF_INFO = 0x1
IFLA_VF_UNSPEC = 0x0
@@ -1854,8 +1901,16 @@ const (
IFLA_STATS_LINK_XSTATS_SLAVE = 0x3
IFLA_STATS_LINK_OFFLOAD_XSTATS = 0x4
IFLA_STATS_AF_SPEC = 0x5
IFLA_STATS_GETSET_UNSPEC = 0x0
IFLA_STATS_GET_FILTERS = 0x1
IFLA_STATS_SET_OFFLOAD_XSTATS_L3_STATS = 0x2
IFLA_OFFLOAD_XSTATS_UNSPEC = 0x0
IFLA_OFFLOAD_XSTATS_CPU_HIT = 0x1
IFLA_OFFLOAD_XSTATS_HW_S_INFO = 0x2
IFLA_OFFLOAD_XSTATS_L3_STATS = 0x3
IFLA_OFFLOAD_XSTATS_HW_S_INFO_UNSPEC = 0x0
IFLA_OFFLOAD_XSTATS_HW_S_INFO_REQUEST = 0x1
IFLA_OFFLOAD_XSTATS_HW_S_INFO_USED = 0x2
IFLA_XDP_UNSPEC = 0x0
IFLA_XDP_FD = 0x1
IFLA_XDP_ATTACHED = 0x2
@@ -1885,6 +1940,11 @@ const (
IFLA_RMNET_UNSPEC = 0x0
IFLA_RMNET_MUX_ID = 0x1
IFLA_RMNET_FLAGS = 0x2
IFLA_MCTP_UNSPEC = 0x0
IFLA_MCTP_NET = 0x1
IFLA_DSA_UNSPEC = 0x0
IFLA_DSA_CONDUIT = 0x1
IFLA_DSA_MASTER = 0x1
)
const (
+7 -7
View File
@@ -1237,8 +1237,8 @@ github.com/justinas/alice
# github.com/kevinburke/ssh_config v1.2.0
## explicit
github.com/kevinburke/ssh_config
# github.com/klauspost/compress v1.17.5
## explicit; go 1.19
# github.com/klauspost/compress v1.17.7
## explicit; go 1.20
github.com/klauspost/compress/flate
github.com/klauspost/compress/internal/race
github.com/klauspost/compress/s2
@@ -1390,10 +1390,10 @@ github.com/mohae/deepcopy
# github.com/mschoch/smat v0.2.0
## explicit; go 1.13
github.com/mschoch/smat
# github.com/nats-io/jwt/v2 v2.5.3
# github.com/nats-io/jwt/v2 v2.5.5
## explicit; go 1.18
github.com/nats-io/jwt/v2
# github.com/nats-io/nats-server/v2 v2.10.10
# github.com/nats-io/nats-server/v2 v2.10.12
## explicit; go 1.20
github.com/nats-io/nats-server/v2/conf
github.com/nats-io/nats-server/v2/internal/fastrand
@@ -2017,7 +2017,7 @@ go.uber.org/zap/internal/color
go.uber.org/zap/internal/exit
go.uber.org/zap/zapcore
go.uber.org/zap/zapgrpc
# golang.org/x/crypto v0.19.0
# golang.org/x/crypto v0.21.0
## explicit; go 1.18
golang.org/x/crypto/argon2
golang.org/x/crypto/bcrypt
@@ -2099,7 +2099,7 @@ golang.org/x/oauth2/internal
golang.org/x/sync/errgroup
golang.org/x/sync/semaphore
golang.org/x/sync/singleflight
# golang.org/x/sys v0.17.0
# golang.org/x/sys v0.18.0
## explicit; go 1.18
golang.org/x/sys/cpu
golang.org/x/sys/execabs
@@ -2110,7 +2110,7 @@ golang.org/x/sys/windows/registry
golang.org/x/sys/windows/svc
golang.org/x/sys/windows/svc/eventlog
golang.org/x/sys/windows/svc/mgr
# golang.org/x/term v0.17.0
# golang.org/x/term v0.18.0
## explicit; go 1.18
golang.org/x/term
# golang.org/x/text v0.14.0