chore(deps): bump google.golang.org/grpc from 1.67.1 to 1.68.0

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.67.1 to 1.68.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.67.1...v1.68.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-11-12 06:30:59 +00:00
committed by Ralf Haferkamp
parent 932e23a410
commit 613214f4f8
37 changed files with 931 additions and 961 deletions
+2 -3
View File
@@ -1,6 +1,7 @@
module github.com/owncloud/ocis/v2
go 1.22.0
toolchain go1.22.9
require (
dario.cat/mergo v1.0.1
@@ -32,7 +33,6 @@ require (
github.com/go-micro/plugins/v4/server/grpc v1.2.0
github.com/go-micro/plugins/v4/server/http v1.2.2
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20240726082623-6831adfdcdc4
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0
github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0
github.com/go-playground/validator/v10 v10.22.1
@@ -107,7 +107,7 @@ require (
golang.org/x/term v0.26.0
golang.org/x/text v0.20.0
google.golang.org/genproto/googleapis/api v0.0.0-20241021214115-324edc3d5d38
google.golang.org/grpc v1.67.1
google.golang.org/grpc v1.68.0
google.golang.org/protobuf v1.35.1
gopkg.in/yaml.v2 v2.4.0
gotest.tools/v3 v3.5.1
@@ -301,7 +301,6 @@ require (
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect
github.com/skeema/knownhosts v1.2.1 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
+2 -6
View File
@@ -413,8 +413,6 @@ github.com/go-micro/plugins/v4/store/redis v1.2.1 h1:d9kwr9bSpoK9vkHkqcv+isQUbgB
github.com/go-micro/plugins/v4/store/redis v1.2.1/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg=
github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8=
github.com/go-micro/plugins/v4/transport/grpc v1.1.0/go.mod h1:J5xMp70xXZzm8yafICrDrWaUDd8Gwy8vt0xif7NcOPg=
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0 h1:EQj4l7fuOSz8ueUYhFlpZPp9+tN4JeONL32ARRKXW/U=
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0/go.mod h1:JR9Ox/iJIrcXm8nCWdAEBsyG7Q7lyMLzsTZPfXrqvwo=
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0 h1:UWBUYtMXCxQ9bIGOYcbLOjtPv8ovvCRjWWM6tHhB4S8=
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0/go.mod h1:8BYxs/wEE4ZJayHZQffw4A8s9rcPumyoNms0hYoNocM=
github.com/go-micro/plugins/v4/wrapper/trace/opentelemetry v1.2.0 h1:e2hgtWMNqJ3DmbMt9ZxzmH/BkVAw9Xg23l6CHrXQfKw=
@@ -1046,8 +1044,6 @@ github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:s
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/gunit v1.0.4/go.mod h1:EH5qMBab2UclzXUcpR8b93eHsIlp9u+pDQIRp5DZNzQ=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784 h1:0jjO3HdJfOn6gYHD/ZNZh0LLMxEAqkYX7xoDPQReEgs=
github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784/go.mod h1:ff/5myEGgtsAwf26goQCO905GrEm5ugEZSd6OWTsrhM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
@@ -1617,8 +1613,8 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E=
google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0=
google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA=
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e h1:m7aQHHqd0q89mRwhwS9Bx2rjyl/hsFAeta+uGrHsQaU=
google.golang.org/grpc/examples v0.0.0-20211102180624-670c133e568e/go.mod h1:gID3PKrg7pWKntu9Ss6zTLJ0ttC0X9IHgREOCZwbCVU=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
-191
View File
@@ -1,191 +0,0 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
Copyright 2015 Asim Aslam.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
@@ -1,93 +0,0 @@
package gobreaker
import (
"context"
"sync"
"github.com/sony/gobreaker"
"go-micro.dev/v4/client"
"go-micro.dev/v4/errors"
)
type BreakerMethod int
const (
BreakService BreakerMethod = iota
BreakServiceEndpoint
)
type clientWrapper struct {
bs gobreaker.Settings
bm BreakerMethod
cbs map[string]*gobreaker.TwoStepCircuitBreaker
mu sync.Mutex
client.Client
}
func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
var svc string
switch c.bm {
case BreakService:
svc = req.Service()
case BreakServiceEndpoint:
svc = req.Service() + "." + req.Endpoint()
}
c.mu.Lock()
cb, ok := c.cbs[svc]
if !ok {
cb = gobreaker.NewTwoStepCircuitBreaker(c.bs)
c.cbs[svc] = cb
}
c.mu.Unlock()
cbAllow, err := cb.Allow()
if err != nil {
return errors.New(req.Service(), err.Error(), 502)
}
if err = c.Client.Call(ctx, req, rsp, opts...); err == nil {
cbAllow(true)
return nil
}
merr := errors.Parse(err.Error())
switch {
case merr.Code == 0:
merr.Code = 503
case len(merr.Id) == 0:
merr.Id = req.Service()
}
if merr.Code >= 500 {
cbAllow(false)
} else {
cbAllow(true)
}
return merr
}
// NewClientWrapper returns a client Wrapper.
func NewClientWrapper() client.Wrapper {
return func(c client.Client) client.Client {
w := &clientWrapper{}
w.bs = gobreaker.Settings{}
w.cbs = make(map[string]*gobreaker.TwoStepCircuitBreaker)
w.Client = c
return w
}
}
// NewCustomClientWrapper takes a gobreaker.Settings and BreakerMethod. Returns a client Wrapper.
func NewCustomClientWrapper(bs gobreaker.Settings, bm BreakerMethod) client.Wrapper {
return func(c client.Client) client.Client {
w := &clientWrapper{}
w.bm = bm
w.bs = bs
w.cbs = make(map[string]*gobreaker.TwoStepCircuitBreaker)
w.Client = c
return w
}
}
-21
View File
@@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright 2015 Sony Corporation
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
-132
View File
@@ -1,132 +0,0 @@
gobreaker
=========
[![GoDoc](https://godoc.org/github.com/sony/gobreaker?status.svg)](http://godoc.org/github.com/sony/gobreaker)
[gobreaker][repo-url] implements the [Circuit Breaker pattern](https://msdn.microsoft.com/en-us/library/dn589784.aspx) in Go.
Installation
------------
```
go get github.com/sony/gobreaker
```
Usage
-----
The struct `CircuitBreaker` is a state machine to prevent sending requests that are likely to fail.
The function `NewCircuitBreaker` creates a new `CircuitBreaker`.
```go
func NewCircuitBreaker(st Settings) *CircuitBreaker
```
You can configure `CircuitBreaker` by the struct `Settings`:
```go
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
IsSuccessful func(err error) bool
}
```
- `Name` is the name of the `CircuitBreaker`.
- `MaxRequests` is the maximum number of requests allowed to pass through
when the `CircuitBreaker` is half-open.
If `MaxRequests` is 0, `CircuitBreaker` allows only 1 request.
- `Interval` is the cyclic period of the closed state
for `CircuitBreaker` to clear the internal `Counts`, described later in this section.
If `Interval` is 0, `CircuitBreaker` doesn't clear the internal `Counts` during the closed state.
- `Timeout` is the period of the open state,
after which the state of `CircuitBreaker` becomes half-open.
If `Timeout` is 0, the timeout value of `CircuitBreaker` is set to 60 seconds.
- `ReadyToTrip` is called with a copy of `Counts` whenever a request fails in the closed state.
If `ReadyToTrip` returns true, `CircuitBreaker` will be placed into the open state.
If `ReadyToTrip` is `nil`, default `ReadyToTrip` is used.
Default `ReadyToTrip` returns true when the number of consecutive failures is more than 5.
- `OnStateChange` is called whenever the state of `CircuitBreaker` changes.
- `IsSuccessful` is called with the error returned from a request.
If `IsSuccessful` returns true, the error is counted as a success.
Otherwise the error is counted as a failure.
If `IsSuccessful` is nil, default `IsSuccessful` is used, which returns false for all non-nil errors.
The struct `Counts` holds the numbers of requests and their successes/failures:
```go
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
```
`CircuitBreaker` clears the internal `Counts` either
on the change of the state or at the closed-state intervals.
`Counts` ignores the results of the requests sent before clearing.
`CircuitBreaker` can wrap any function to send a request:
```go
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error)
```
The method `Execute` runs the given request if `CircuitBreaker` accepts it.
`Execute` returns an error instantly if `CircuitBreaker` rejects the request.
Otherwise, `Execute` returns the result of the request.
If a panic occurs in the request, `CircuitBreaker` handles it as an error
and causes the same panic again.
Example
-------
```go
var cb *breaker.CircuitBreaker
func Get(url string) ([]byte, error) {
body, err := cb.Execute(func() (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
})
if err != nil {
return nil, err
}
return body.([]byte), nil
}
```
See [example](https://github.com/sony/gobreaker/blob/master/example) for details.
License
-------
The MIT License (MIT)
See [LICENSE](https://github.com/sony/gobreaker/blob/master/LICENSE) for details.
[repo-url]: https://github.com/sony/gobreaker
-380
View File
@@ -1,380 +0,0 @@
// Package gobreaker implements the Circuit Breaker pattern.
// See https://msdn.microsoft.com/en-us/library/dn589784.aspx.
package gobreaker
import (
"errors"
"fmt"
"sync"
"time"
)
// State is a type that represents a state of CircuitBreaker.
type State int
// These constants are states of CircuitBreaker.
const (
StateClosed State = iota
StateHalfOpen
StateOpen
)
var (
// ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests
ErrTooManyRequests = errors.New("too many requests")
// ErrOpenState is returned when the CB state is open
ErrOpenState = errors.New("circuit breaker is open")
)
// String implements stringer interface.
func (s State) String() string {
switch s {
case StateClosed:
return "closed"
case StateHalfOpen:
return "half-open"
case StateOpen:
return "open"
default:
return fmt.Sprintf("unknown state: %d", s)
}
}
// Counts holds the numbers of requests and their successes/failures.
// CircuitBreaker clears the internal Counts either
// on the change of the state or at the closed-state intervals.
// Counts ignores the results of the requests sent before clearing.
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func (c *Counts) onRequest() {
c.Requests++
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
// Settings configures CircuitBreaker:
//
// Name is the name of the CircuitBreaker.
//
// MaxRequests is the maximum number of requests allowed to pass through
// when the CircuitBreaker is half-open.
// If MaxRequests is 0, the CircuitBreaker allows only 1 request.
//
// Interval is the cyclic period of the closed state
// for the CircuitBreaker to clear the internal Counts.
// If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
//
// Timeout is the period of the open state,
// after which the state of the CircuitBreaker becomes half-open.
// If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
//
// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
// If ReadyToTrip is nil, default ReadyToTrip is used.
// Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
//
// OnStateChange is called whenever the state of the CircuitBreaker changes.
//
// IsSuccessful is called with the error returned from a request.
// If IsSuccessful returns true, the error is counted as a success.
// Otherwise the error is counted as a failure.
// If IsSuccessful is nil, default IsSuccessful is used, which returns false for all non-nil errors.
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
IsSuccessful func(err error) bool
}
// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker struct {
name string
maxRequests uint32
interval time.Duration
timeout time.Duration
readyToTrip func(counts Counts) bool
isSuccessful func(err error) bool
onStateChange func(name string, from State, to State)
mutex sync.Mutex
state State
generation uint64
counts Counts
expiry time.Time
}
// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
// with the breaker functionality, it only checks whether a request can proceed and
// expects the caller to report the outcome in a separate step using a callback.
type TwoStepCircuitBreaker struct {
cb *CircuitBreaker
}
// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker(st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = st.Name
cb.onStateChange = st.OnStateChange
if st.MaxRequests == 0 {
cb.maxRequests = 1
} else {
cb.maxRequests = st.MaxRequests
}
if st.Interval <= 0 {
cb.interval = defaultInterval
} else {
cb.interval = st.Interval
}
if st.Timeout <= 0 {
cb.timeout = defaultTimeout
} else {
cb.timeout = st.Timeout
}
if st.ReadyToTrip == nil {
cb.readyToTrip = defaultReadyToTrip
} else {
cb.readyToTrip = st.ReadyToTrip
}
if st.IsSuccessful == nil {
cb.isSuccessful = defaultIsSuccessful
} else {
cb.isSuccessful = st.IsSuccessful
}
cb.toNewGeneration(time.Now())
return cb
}
// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker {
return &TwoStepCircuitBreaker{
cb: NewCircuitBreaker(st),
}
}
const defaultInterval = time.Duration(0) * time.Second
const defaultTimeout = time.Duration(60) * time.Second
func defaultReadyToTrip(counts Counts) bool {
return counts.ConsecutiveFailures > 5
}
func defaultIsSuccessful(err error) bool {
return err == nil
}
// Name returns the name of the CircuitBreaker.
func (cb *CircuitBreaker) Name() string {
return cb.name
}
// State returns the current state of the CircuitBreaker.
func (cb *CircuitBreaker) State() State {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, _ := cb.currentState(now)
return state
}
// Counts returns internal counters
func (cb *CircuitBreaker) Counts() Counts {
cb.mutex.Lock()
defer cb.mutex.Unlock()
return cb.counts
}
// Execute runs the given request if the CircuitBreaker accepts it.
// Execute returns an error instantly if the CircuitBreaker rejects the request.
// Otherwise, Execute returns the result of the request.
// If a panic occurs in the request, the CircuitBreaker handles it as an error
// and causes the same panic again.
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, cb.isSuccessful(err))
return result, err
}
// Name returns the name of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) Name() string {
return tscb.cb.Name()
}
// State returns the current state of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) State() State {
return tscb.cb.State()
}
// Counts returns internal counters
func (tscb *TwoStepCircuitBreaker) Counts() Counts {
return tscb.cb.Counts()
}
// Allow checks if a new request can proceed. It returns a callback that should be used to
// register the success or failure in a separate step. If the circuit breaker doesn't allow
// requests, it returns an error.
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) {
generation, err := tscb.cb.beforeRequest()
if err != nil {
return nil, err
}
return func(success bool) {
tscb.cb.afterRequest(generation, success)
}, nil
}
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, ErrOpenState
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, ErrTooManyRequests
}
cb.counts.onRequest()
return generation, nil
}
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onSuccess()
case StateHalfOpen:
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
}
}
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
case StateHalfOpen:
cb.setState(StateOpen, now)
}
}
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.toNewGeneration(now)
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
cb.counts.clear()
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}
+8 -8
View File
@@ -4,7 +4,7 @@ We definitely welcome your patches and contributions to gRPC! Please read the gR
organization's [governance rules](https://github.com/grpc/grpc-community/blob/master/governance.md)
and [contribution guidelines](https://github.com/grpc/grpc-community/blob/master/CONTRIBUTING.md) before proceeding.
If you are new to github, please start by reading [Pull Request howto](https://help.github.com/articles/about-pull-requests/)
If you are new to GitHub, please start by reading [Pull Request howto](https://help.github.com/articles/about-pull-requests/)
## Legal requirements
@@ -25,8 +25,8 @@ How to get your contributions merged smoothly and quickly.
is a great place to start. These issues are well-documented and usually can be
resolved with a single pull request.
- If you are adding a new file, make sure it has the copyright message template
at the top as a comment. You can copy over the message from an existing file
- If you are adding a new file, make sure it has the copyright message template
at the top as a comment. You can copy over the message from an existing file
and update the year.
- The grpc package should only depend on standard Go packages and a small number
@@ -39,12 +39,12 @@ How to get your contributions merged smoothly and quickly.
proposal](https://github.com/grpc/proposal).
- Provide a good **PR description** as a record of **what** change is being made
and **why** it was made. Link to a github issue if it exists.
and **why** it was made. Link to a GitHub issue if it exists.
- If you want to fix formatting or style, consider whether your changes are an
obvious improvement or might be considered a personal preference. If a style
change is based on preference, it likely will not be accepted. If it corrects
widely agreed-upon anti-patterns, then please do create a PR and explain the
- If you want to fix formatting or style, consider whether your changes are an
obvious improvement or might be considered a personal preference. If a style
change is based on preference, it likely will not be accepted. If it corrects
widely agreed-upon anti-patterns, then please do create a PR and explain the
benefits of the change.
- Unless your PR is trivial, you should expect there will be reviewer comments
+10 -5
View File
@@ -130,7 +130,7 @@ type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
// If it's in the list, the connection will be kept.
// If it's not in the list, the connection will gracefully closed, and
// If it's not in the list, the connection will gracefully close, and
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
@@ -142,8 +142,11 @@ type SubConn interface {
Connect()
// GetOrBuildProducer returns a reference to the existing Producer for this
// ProducerBuilder in this SubConn, or, if one does not currently exist,
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
// creates a new one and returns it. Returns a close function which may be
// called when the Producer is no longer needed. Otherwise the producer
// will automatically be closed upon connection loss or subchannel close.
// Should only be called on a SubConn in state Ready. Otherwise the
// producer will be unable to create streams.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
// Shutdown shuts down the SubConn gracefully. Any started RPCs will be
// allowed to complete. No future calls should be made on the SubConn.
@@ -452,8 +455,10 @@ type ProducerBuilder interface {
// Build creates a Producer. The first parameter is always a
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
// associated SubConn), but is declared as `any` to avoid a dependency
// cycle. Should also return a close function that will be called when all
// references to the Producer have been given up.
// cycle. Build also returns a close function that will be called when all
// references to the Producer have been given up for a SubConn, or when a
// connectivity state change occurs on the SubConn. The close function
// should always block until all asynchronous cleanup work is completed.
Build(grpcClientConnInterface any) (p Producer, close func())
}
+1 -1
View File
@@ -133,7 +133,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
}
}
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
// will trigger re-resolve. Also records this as a resolver error, so when
// the overall state turns transient failure, the error message will have
// the zero address information.
if len(s.ResolverState.Addresses) == 0 {
+24
View File
@@ -0,0 +1,24 @@
/*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package internal contains code internal to the pickfirst package.
package internal
import "math/rand"
// RandShuffle pseudo-randomizes the order of addresses.
var RandShuffle = rand.Shuffle
+11 -3
View File
@@ -26,18 +26,23 @@ import (
"math/rand"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/envconfig"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
_ "google.golang.org/grpc/balancer/pickfirst/pickfirstleaf" // For automatically registering the new pickfirst if required.
)
func init() {
if envconfig.NewPickFirstEnabled {
return
}
balancer.Register(pickfirstBuilder{})
internal.ShuffleAddressListForTesting = func(n int, swap func(i, j int)) { rand.Shuffle(n, swap) }
}
var logger = grpclog.Component("pick-first-lb")
@@ -103,10 +108,13 @@ func (b *pickfirstBalancer) ResolverError(err error) {
})
}
// Shuffler is an interface for shuffling an address list.
type Shuffler interface {
ShuffleAddressListForTesting(n int, swap func(i, j int))
}
// ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n
// is the number of elements. swap swaps the elements with indexes i and j.
func ShuffleAddressListForTesting(n int, swap func(i, j int)) { rand.Shuffle(n, swap) }
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
@@ -140,7 +148,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// within each endpoint. - A61
if cfg.ShuffleAddressList {
endpoints = append([]resolver.Endpoint{}, endpoints...)
internal.ShuffleAddressListForTesting.(func(int, func(int, int)))(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
}
// "Flatten the list by concatenating the ordered list of addresses for each
@@ -0,0 +1,625 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package pickfirstleaf contains the pick_first load balancing policy which
// will be the universal leaf policy after dualstack changes are implemented.
//
// # Experimental
//
// Notice: This package is EXPERIMENTAL and may be changed or removed in a
// later release.
package pickfirstleaf
import (
"encoding/json"
"errors"
"fmt"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/pickfirst/internal"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/envconfig"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
func init() {
if envconfig.NewPickFirstEnabled {
// Register as the default pick_first balancer.
Name = "pick_first"
}
balancer.Register(pickfirstBuilder{})
}
var (
logger = grpclog.Component("pick-first-leaf-lb")
// Name is the name of the pick_first_leaf balancer.
// It is changed to "pick_first" in init() if this balancer is to be
// registered as the default pickfirst.
Name = "pick_first_leaf"
)
// TODO: change to pick-first when this becomes the default pick_first policy.
const logPrefix = "[pick-first-leaf-lb %p] "
type pickfirstBuilder struct{}
func (pickfirstBuilder) Build(cc balancer.ClientConn, _ balancer.BuildOptions) balancer.Balancer {
b := &pickfirstBalancer{
cc: cc,
addressList: addressList{},
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,
mu: sync.Mutex{},
}
b.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(logPrefix, b))
return b
}
func (b pickfirstBuilder) Name() string {
return Name
}
func (pickfirstBuilder) ParseConfig(js json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
var cfg pfConfig
if err := json.Unmarshal(js, &cfg); err != nil {
return nil, fmt.Errorf("pickfirst: unable to unmarshal LB policy config: %s, error: %v", string(js), err)
}
return cfg, nil
}
type pfConfig struct {
serviceconfig.LoadBalancingConfig `json:"-"`
// If set to true, instructs the LB policy to shuffle the order of the list
// of endpoints received from the name resolver before attempting to
// connect to them.
ShuffleAddressList bool `json:"shuffleAddressList"`
}
// scData keeps track of the current state of the subConn.
// It is not safe for concurrent access.
type scData struct {
// The following fields are initialized at build time and read-only after
// that.
subConn balancer.SubConn
addr resolver.Address
state connectivity.State
lastErr error
}
func (b *pickfirstBalancer) newSCData(addr resolver.Address) (*scData, error) {
sd := &scData{
state: connectivity.Idle,
addr: addr,
}
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{
StateListener: func(state balancer.SubConnState) {
b.updateSubConnState(sd, state)
},
})
if err != nil {
return nil, err
}
sd.subConn = sc
return sd, nil
}
type pickfirstBalancer struct {
// The following fields are initialized at build time and read-only after
// that and therefore do not need to be guarded by a mutex.
logger *internalgrpclog.PrefixLogger
cc balancer.ClientConn
// The mutex is used to ensure synchronization of updates triggered
// from the idle picker and the already serialized resolver,
// SubConn state updates.
mu sync.Mutex
state connectivity.State
// scData for active subonns mapped by address.
subConns *resolver.AddressMap
addressList addressList
firstPass bool
numTF int
}
// ResolverError is called by the ClientConn when the name resolver produces
// an error or when pickfirst determined the resolver update to be invalid.
func (b *pickfirstBalancer) ResolverError(err error) {
b.mu.Lock()
defer b.mu.Unlock()
b.resolverErrorLocked(err)
}
func (b *pickfirstBalancer) resolverErrorLocked(err error) {
if b.logger.V(2) {
b.logger.Infof("Received error from the name resolver: %v", err)
}
// The picker will not change since the balancer does not currently
// report an error. If the balancer hasn't received a single good resolver
// update yet, transition to TRANSIENT_FAILURE.
if b.state != connectivity.TransientFailure && b.addressList.size() > 0 {
if b.logger.V(2) {
b.logger.Infof("Ignoring resolver error because balancer is using a previous good update.")
}
return
}
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
b.mu.Lock()
defer b.mu.Unlock()
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// Cleanup state pertaining to the previous resolver state.
// Treat an empty address list like an error by calling b.ResolverError.
b.state = connectivity.TransientFailure
b.closeSubConnsLocked()
b.addressList.updateAddrs(nil)
b.resolverErrorLocked(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
cfg, ok := state.BalancerConfig.(pfConfig)
if state.BalancerConfig != nil && !ok {
return fmt.Errorf("pickfirst: received illegal BalancerConfig (type %T): %v: %w", state.BalancerConfig, state.BalancerConfig, balancer.ErrBadResolverState)
}
if b.logger.V(2) {
b.logger.Infof("Received new config %s, resolver state %s", pretty.ToJSON(cfg), pretty.ToJSON(state.ResolverState))
}
var newAddrs []resolver.Address
if endpoints := state.ResolverState.Endpoints; len(endpoints) != 0 {
// Perform the optional shuffling described in gRFC A62. The shuffling
// will change the order of endpoints but not touch the order of the
// addresses within each endpoint. - A61
if cfg.ShuffleAddressList {
endpoints = append([]resolver.Endpoint{}, endpoints...)
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
}
// "Flatten the list by concatenating the ordered list of addresses for
// each of the endpoints, in order." - A61
for _, endpoint := range endpoints {
// "In the flattened list, interleave addresses from the two address
// families, as per RFC-8305 section 4." - A61
// TODO: support the above language.
newAddrs = append(newAddrs, endpoint.Addresses...)
}
} else {
// Endpoints not set, process addresses until we migrate resolver
// emissions fully to Endpoints. The top channel does wrap emitted
// addresses with endpoints, however some balancers such as weighted
// target do not forward the corresponding correct endpoints down/split
// endpoints properly. Once all balancers correctly forward endpoints
// down, can delete this else conditional.
newAddrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
newAddrs = append([]resolver.Address{}, newAddrs...)
internal.RandShuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
}
}
// If an address appears in multiple endpoints or in the same endpoint
// multiple times, we keep it only once. We will create only one SubConn
// for the address because an AddressMap is used to store SubConns.
// Not de-duplicating would result in attempting to connect to the same
// SubConn multiple times in the same pass. We don't want this.
newAddrs = deDupAddresses(newAddrs)
// Since we have a new set of addresses, we are again at first pass.
b.firstPass = true
// If the previous ready SubConn exists in new address list,
// keep this connection and don't create new SubConns.
prevAddr := b.addressList.currentAddress()
prevAddrsCount := b.addressList.size()
b.addressList.updateAddrs(newAddrs)
if b.state == connectivity.Ready && b.addressList.seekTo(prevAddr) {
return nil
}
b.reconcileSubConnsLocked(newAddrs)
// If it's the first resolver update or the balancer was already READY
// (but the new address list does not contain the ready SubConn) or
// CONNECTING, enter CONNECTING.
// We may be in TRANSIENT_FAILURE due to a previous empty address list,
// we should still enter CONNECTING because the sticky TF behaviour
// mentioned in A62 applies only when the TRANSIENT_FAILURE is reported
// due to connectivity failures.
if b.state == connectivity.Ready || b.state == connectivity.Connecting || prevAddrsCount == 0 {
// Start connection attempt at first address.
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
b.requestConnectionLocked()
} else if b.state == connectivity.TransientFailure {
// If we're in TRANSIENT_FAILURE, we stay in TRANSIENT_FAILURE until
// we're READY. See A62.
b.requestConnectionLocked()
}
return nil
}
// UpdateSubConnState is unused as a StateListener is always registered when
// creating SubConns.
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", subConn, state)
}
func (b *pickfirstBalancer) Close() {
b.mu.Lock()
defer b.mu.Unlock()
b.closeSubConnsLocked()
b.state = connectivity.Shutdown
}
// ExitIdle moves the balancer out of idle state. It can be called concurrently
// by the idlePicker and clientConn so access to variables should be
// synchronized.
func (b *pickfirstBalancer) ExitIdle() {
b.mu.Lock()
defer b.mu.Unlock()
if b.state == connectivity.Idle && b.addressList.currentAddress() == b.addressList.first() {
b.firstPass = true
b.requestConnectionLocked()
}
}
func (b *pickfirstBalancer) closeSubConnsLocked() {
for _, sd := range b.subConns.Values() {
sd.(*scData).subConn.Shutdown()
}
b.subConns = resolver.NewAddressMap()
}
// deDupAddresses ensures that each address appears only once in the slice.
func deDupAddresses(addrs []resolver.Address) []resolver.Address {
seenAddrs := resolver.NewAddressMap()
retAddrs := []resolver.Address{}
for _, addr := range addrs {
if _, ok := seenAddrs.Get(addr); ok {
continue
}
retAddrs = append(retAddrs, addr)
}
return retAddrs
}
// reconcileSubConnsLocked updates the active subchannels based on a new address
// list from the resolver. It does this by:
// - closing subchannels: any existing subchannels associated with addresses
// that are no longer in the updated list are shut down.
// - removing subchannels: entries for these closed subchannels are removed
// from the subchannel map.
//
// This ensures that the subchannel map accurately reflects the current set of
// addresses received from the name resolver.
func (b *pickfirstBalancer) reconcileSubConnsLocked(newAddrs []resolver.Address) {
newAddrsMap := resolver.NewAddressMap()
for _, addr := range newAddrs {
newAddrsMap.Set(addr, true)
}
for _, oldAddr := range b.subConns.Keys() {
if _, ok := newAddrsMap.Get(oldAddr); ok {
continue
}
val, _ := b.subConns.Get(oldAddr)
val.(*scData).subConn.Shutdown()
b.subConns.Delete(oldAddr)
}
}
// shutdownRemainingLocked shuts down remaining subConns. Called when a subConn
// becomes ready, which means that all other subConn must be shutdown.
func (b *pickfirstBalancer) shutdownRemainingLocked(selected *scData) {
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.subConn != selected.subConn {
sd.subConn.Shutdown()
}
}
b.subConns = resolver.NewAddressMap()
b.subConns.Set(selected.addr, selected)
}
// requestConnectionLocked starts connecting on the subchannel corresponding to
// the current address. If no subchannel exists, one is created. If the current
// subchannel is in TransientFailure, a connection to the next address is
// attempted until a subchannel is found.
func (b *pickfirstBalancer) requestConnectionLocked() {
if !b.addressList.isValid() {
return
}
var lastErr error
for valid := true; valid; valid = b.addressList.increment() {
curAddr := b.addressList.currentAddress()
sd, ok := b.subConns.Get(curAddr)
if !ok {
var err error
// We want to assign the new scData to sd from the outer scope,
// hence we can't use := below.
sd, err = b.newSCData(curAddr)
if err != nil {
// This should never happen, unless the clientConn is being shut
// down.
if b.logger.V(2) {
b.logger.Infof("Failed to create a subConn for address %v: %v", curAddr.String(), err)
}
// Do nothing, the LB policy will be closed soon.
return
}
b.subConns.Set(curAddr, sd)
}
scd := sd.(*scData)
switch scd.state {
case connectivity.Idle:
scd.subConn.Connect()
case connectivity.TransientFailure:
// Try the next address.
lastErr = scd.lastErr
continue
case connectivity.Ready:
// Should never happen.
b.logger.Errorf("Requesting a connection even though we have a READY SubConn")
case connectivity.Shutdown:
// Should never happen.
b.logger.Errorf("SubConn with state SHUTDOWN present in SubConns map")
case connectivity.Connecting:
// Wait for the SubConn to report success or failure.
}
return
}
// All the remaining addresses in the list are in TRANSIENT_FAILURE, end the
// first pass.
b.endFirstPassLocked(lastErr)
}
func (b *pickfirstBalancer) updateSubConnState(sd *scData, newState balancer.SubConnState) {
b.mu.Lock()
defer b.mu.Unlock()
oldState := sd.state
sd.state = newState.ConnectivityState
// Previously relevant SubConns can still callback with state updates.
// To prevent pickers from returning these obsolete SubConns, this logic
// is included to check if the current list of active SubConns includes this
// SubConn.
if activeSD, found := b.subConns.Get(sd.addr); !found || activeSD != sd {
return
}
if newState.ConnectivityState == connectivity.Shutdown {
return
}
if newState.ConnectivityState == connectivity.Ready {
b.shutdownRemainingLocked(sd)
if !b.addressList.seekTo(sd.addr) {
// This should not fail as we should have only one SubConn after
// entering READY. The SubConn should be present in the addressList.
b.logger.Errorf("Address %q not found address list in %v", sd.addr, b.addressList.addresses)
return
}
b.state = connectivity.Ready
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Ready,
Picker: &picker{result: balancer.PickResult{SubConn: sd.subConn}},
})
return
}
// If the LB policy is READY, and it receives a subchannel state change,
// it means that the READY subchannel has failed.
// A SubConn can also transition from CONNECTING directly to IDLE when
// a transport is successfully created, but the connection fails
// before the SubConn can send the notification for READY. We treat
// this as a successful connection and transition to IDLE.
if (b.state == connectivity.Ready && newState.ConnectivityState != connectivity.Ready) || (oldState == connectivity.Connecting && newState.ConnectivityState == connectivity.Idle) {
// Once a transport fails, the balancer enters IDLE and starts from
// the first address when the picker is used.
b.shutdownRemainingLocked(sd)
b.state = connectivity.Idle
b.addressList.reset()
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &idlePicker{exitIdle: sync.OnceFunc(b.ExitIdle)},
})
return
}
if b.firstPass {
switch newState.ConnectivityState {
case connectivity.Connecting:
// The balancer can be in either IDLE, CONNECTING or
// TRANSIENT_FAILURE. If it's in TRANSIENT_FAILURE, stay in
// TRANSIENT_FAILURE until it's READY. See A62.
// If the balancer is already in CONNECTING, no update is needed.
if b.state == connectivity.Idle {
b.state = connectivity.Connecting
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
}
case connectivity.TransientFailure:
sd.lastErr = newState.ConnectionError
// Since we're re-using common SubConns while handling resolver
// updates, we could receive an out of turn TRANSIENT_FAILURE from
// a pass over the previous address list. We ignore such updates.
if curAddr := b.addressList.currentAddress(); !equalAddressIgnoringBalAttributes(&curAddr, &sd.addr) {
return
}
if b.addressList.increment() {
b.requestConnectionLocked()
return
}
// End of the first pass.
b.endFirstPassLocked(newState.ConnectionError)
}
return
}
// We have finished the first pass, keep re-connecting failing SubConns.
switch newState.ConnectivityState {
case connectivity.TransientFailure:
b.numTF = (b.numTF + 1) % b.subConns.Len()
sd.lastErr = newState.ConnectionError
if b.numTF%b.subConns.Len() == 0 {
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: newState.ConnectionError},
})
}
// We don't need to request re-resolution since the SubConn already
// does that before reporting TRANSIENT_FAILURE.
// TODO: #7534 - Move re-resolution requests from SubConn into
// pick_first.
case connectivity.Idle:
sd.subConn.Connect()
}
}
func (b *pickfirstBalancer) endFirstPassLocked(lastErr error) {
b.firstPass = false
b.numTF = 0
b.state = connectivity.TransientFailure
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: lastErr},
})
// Start re-connecting all the SubConns that are already in IDLE.
for _, v := range b.subConns.Values() {
sd := v.(*scData)
if sd.state == connectivity.Idle {
sd.subConn.Connect()
}
}
}
type picker struct {
result balancer.PickResult
err error
}
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
// CONNECTING when Pick is called.
type idlePicker struct {
exitIdle func()
}
func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.exitIdle()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
// addressList manages sequentially iterating over addresses present in a list
// of endpoints. It provides a 1 dimensional view of the addresses present in
// the endpoints.
// This type is not safe for concurrent access.
type addressList struct {
addresses []resolver.Address
idx int
}
func (al *addressList) isValid() bool {
return al.idx < len(al.addresses)
}
func (al *addressList) size() int {
return len(al.addresses)
}
// increment moves to the next index in the address list.
// This method returns false if it went off the list, true otherwise.
func (al *addressList) increment() bool {
if !al.isValid() {
return false
}
al.idx++
return al.idx < len(al.addresses)
}
// currentAddress returns the current address pointed to in the addressList.
// If the list is in an invalid state, it returns an empty address instead.
func (al *addressList) currentAddress() resolver.Address {
if !al.isValid() {
return resolver.Address{}
}
return al.addresses[al.idx]
}
// first returns the first address in the list. If the list is empty, it returns
// an empty address instead.
func (al *addressList) first() resolver.Address {
if len(al.addresses) == 0 {
return resolver.Address{}
}
return al.addresses[0]
}
func (al *addressList) reset() {
al.idx = 0
}
func (al *addressList) updateAddrs(addrs []resolver.Address) {
al.addresses = addrs
al.reset()
}
// seekTo returns false if the needle was not found and the current index was
// left unchanged.
func (al *addressList) seekTo(needle resolver.Address) bool {
for ai, addr := range al.addresses {
if !equalAddressIgnoringBalAttributes(&addr, &needle) {
continue
}
al.idx = ai
return true
}
return false
}
// equalAddressIgnoringBalAttributes returns true is a and b are considered
// equal. This is different from the Equal method on the resolver.Address type
// which considers all fields to determine equality. Here, we only consider
// fields that are meaningful to the SubConn.
func equalAddressIgnoringBalAttributes(a, b *resolver.Address) bool {
return a.Addr == b.Addr && a.ServerName == b.ServerName &&
a.Attributes.Equal(b.Attributes) &&
a.Metadata == b.Metadata
}
+29 -19
View File
@@ -24,12 +24,14 @@ import (
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)
var setConnectedAddress = internal.SetConnectedAddress.(func(*balancer.SubConnState, resolver.Address))
@@ -256,8 +258,8 @@ type acBalancerWrapper struct {
ccb *ccBalancerWrapper // read-only
stateListener func(balancer.SubConnState)
mu sync.Mutex
producers map[balancer.ProducerBuilder]*refCountedProducer
producersMu sync.Mutex
producers map[balancer.ProducerBuilder]*refCountedProducer
}
// updateState is invoked by grpc to push a subConn state update to the
@@ -267,6 +269,9 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
if ctx.Err() != nil || acbw.ccb.balancer == nil {
return
}
// Invalidate all producers on any state change.
acbw.closeProducers()
// Even though it is optional for balancers, gracefulswitch ensures
// opts.StateListener is set, so this cannot ever be nil.
// TODO: delete this comment when UpdateSubConnState is removed.
@@ -275,16 +280,6 @@ func (acbw *acBalancerWrapper) updateState(s connectivity.State, curAddr resolve
setConnectedAddress(&scs, curAddr)
}
acbw.stateListener(scs)
acbw.ac.mu.Lock()
defer acbw.ac.mu.Unlock()
if s == connectivity.Ready {
// When changing states to READY, reset stateReadyChan. Wait until
// after we notify the LB policy's listener(s) in order to prevent
// ac.getTransport() from unblocking before the LB policy starts
// tracking the subchannel as READY.
close(acbw.ac.stateReadyChan)
acbw.ac.stateReadyChan = make(chan struct{})
}
})
}
@@ -301,6 +296,7 @@ func (acbw *acBalancerWrapper) Connect() {
}
func (acbw *acBalancerWrapper) Shutdown() {
acbw.closeProducers()
acbw.ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}
@@ -308,9 +304,10 @@ func (acbw *acBalancerWrapper) Shutdown() {
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
transport, err := acbw.ac.getTransport(ctx)
if err != nil {
return nil, err
transport := acbw.ac.getReadyTransport()
if transport == nil {
return nil, status.Errorf(codes.Unavailable, "SubConn state is not Ready")
}
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
}
@@ -335,8 +332,8 @@ type refCountedProducer struct {
}
func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
acbw.mu.Lock()
defer acbw.mu.Unlock()
acbw.producersMu.Lock()
defer acbw.producersMu.Unlock()
// Look up existing producer from this builder.
pData := acbw.producers[pb]
@@ -353,13 +350,26 @@ func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (
// and delete the refCountedProducer from the map if the total reference
// count goes to zero.
unref := func() {
acbw.mu.Lock()
acbw.producersMu.Lock()
// If closeProducers has already closed this producer instance, refs is
// set to 0, so the check after decrementing will never pass, and the
// producer will not be double-closed.
pData.refs--
if pData.refs == 0 {
defer pData.close() // Run outside the acbw mutex
delete(acbw.producers, pb)
}
acbw.mu.Unlock()
acbw.producersMu.Unlock()
}
return pData.producer, grpcsync.OnceFunc(unref)
}
func (acbw *acBalancerWrapper) closeProducers() {
acbw.producersMu.Lock()
defer acbw.producersMu.Unlock()
for pb, pData := range acbw.producers {
pData.refs = 0
pData.close()
delete(acbw.producers, pb)
}
}
+18 -36
View File
@@ -825,14 +825,13 @@ func (cc *ClientConn) newAddrConnLocked(addrs []resolver.Address, opts balancer.
}
ac := &addrConn{
state: connectivity.Idle,
cc: cc,
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
stateReadyChan: make(chan struct{}),
state: connectivity.Idle,
cc: cc,
addrs: copyAddresses(addrs),
scopts: opts,
dopts: cc.dopts,
channelz: channelz.RegisterSubChannel(cc.channelz, ""),
resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Start with our address set to the first address; this may be updated if
@@ -1141,10 +1140,15 @@ func (cc *ClientConn) Close() error {
<-cc.resolverWrapper.serializer.Done()
<-cc.balancerWrapper.serializer.Done()
var wg sync.WaitGroup
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
wg.Add(1)
go func(ac *addrConn) {
defer wg.Done()
ac.tearDown(ErrClientConnClosing)
}(ac)
}
wg.Wait()
cc.addTraceEvent("deleted")
// TraceEvent needs to be called before RemoveEntry, as TraceEvent may add
// trace reference to the entity being deleted, and thus prevent it from being
@@ -1179,8 +1183,7 @@ type addrConn struct {
addrs []resolver.Address // All addresses that the resolver resolved to.
// Use updateConnectivityState for updating addrConn's connectivity state.
state connectivity.State
stateReadyChan chan struct{} // closed and recreated on every READY state change.
state connectivity.State
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
@@ -1251,6 +1254,8 @@ func (ac *addrConn) resetTransportAndUnlock() {
ac.mu.Unlock()
if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
// TODO: #7534 - Move re-resolution requests into the pick_first LB policy
// to ensure one resolution request per pass instead of per subconn failure.
ac.cc.resolveNow(resolver.ResolveNowOptions{})
ac.mu.Lock()
if acCtx.Err() != nil {
@@ -1292,7 +1297,7 @@ func (ac *addrConn) resetTransportAndUnlock() {
ac.mu.Unlock()
}
// tryAllAddrs tries to creates a connection to the addresses, and stop when at
// tryAllAddrs tries to create a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.
func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, connectDeadline time.Time) error {
@@ -1504,29 +1509,6 @@ func (ac *addrConn) getReadyTransport() transport.ClientTransport {
return nil
}
// getTransport waits until the addrconn is ready and returns the transport.
// If the context expires first, returns an appropriate status. If the
// addrConn is stopped first, returns an Unavailable status error.
func (ac *addrConn) getTransport(ctx context.Context) (transport.ClientTransport, error) {
for ctx.Err() == nil {
ac.mu.Lock()
t, state, sc := ac.transport, ac.state, ac.stateReadyChan
ac.mu.Unlock()
if state == connectivity.Ready {
return t, nil
}
if state == connectivity.Shutdown {
return nil, status.Errorf(codes.Unavailable, "SubConn shutting down")
}
select {
case <-ctx.Done():
case <-sc:
}
}
return nil, status.FromContextError(ctx.Err()).Err()
}
// tearDown starts to tear down the addrConn.
//
// Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
+22 -7
View File
@@ -200,25 +200,40 @@ var tls12ForbiddenCipherSuites = map[uint16]struct{}{
// NewTLS uses c to construct a TransportCredentials based on TLS.
func NewTLS(c *tls.Config) TransportCredentials {
tc := &tlsCreds{credinternal.CloneTLSConfig(c)}
tc.config.NextProtos = credinternal.AppendH2ToNextProtos(tc.config.NextProtos)
config := applyDefaults(c)
if config.GetConfigForClient != nil {
oldFn := config.GetConfigForClient
config.GetConfigForClient = func(hello *tls.ClientHelloInfo) (*tls.Config, error) {
cfgForClient, err := oldFn(hello)
if err != nil || cfgForClient == nil {
return cfgForClient, err
}
return applyDefaults(cfgForClient), nil
}
}
return &tlsCreds{config: config}
}
func applyDefaults(c *tls.Config) *tls.Config {
config := credinternal.CloneTLSConfig(c)
config.NextProtos = credinternal.AppendH2ToNextProtos(config.NextProtos)
// If the user did not configure a MinVersion and did not configure a
// MaxVersion < 1.2, use MinVersion=1.2, which is required by
// https://datatracker.ietf.org/doc/html/rfc7540#section-9.2
if tc.config.MinVersion == 0 && (tc.config.MaxVersion == 0 || tc.config.MaxVersion >= tls.VersionTLS12) {
tc.config.MinVersion = tls.VersionTLS12
if config.MinVersion == 0 && (config.MaxVersion == 0 || config.MaxVersion >= tls.VersionTLS12) {
config.MinVersion = tls.VersionTLS12
}
// If the user did not configure CipherSuites, use all "secure" cipher
// suites reported by the TLS package, but remove some explicitly forbidden
// by https://datatracker.ietf.org/doc/html/rfc7540#appendix-A
if tc.config.CipherSuites == nil {
if config.CipherSuites == nil {
for _, cs := range tls.CipherSuites() {
if _, ok := tls12ForbiddenCipherSuites[cs.ID]; !ok {
tc.config.CipherSuites = append(tc.config.CipherSuites, cs.ID)
config.CipherSuites = append(config.CipherSuites, cs.ID)
}
}
}
return tc
return config
}
// NewClientTLSFromCert constructs TLS credentials from the provided root
+1 -1
View File
@@ -436,7 +436,7 @@ func WithTimeout(d time.Duration) DialOption {
// option to true from the Control field. For a concrete example of how to do
// this, see internal.NetDialerWithTCPKeepalive().
//
// For more information, please see [issue 23459] in the Go github repo.
// For more information, please see [issue 23459] in the Go GitHub repo.
//
// [issue 23459]: https://github.com/golang/go/issues/23459
func WithContextDialer(f func(context.Context, string) (net.Conn, error)) DialOption {
@@ -33,6 +33,8 @@ type lbConfig struct {
childConfig serviceconfig.LoadBalancingConfig
}
// ChildName returns the name of the child balancer of the gracefulswitch
// Balancer.
func ChildName(l serviceconfig.LoadBalancingConfig) string {
return l.(*lbConfig).childBuilder.Name()
}
+15
View File
@@ -43,6 +43,8 @@ type Channel struct {
// Non-zero traceRefCount means the trace of this channel cannot be deleted.
traceRefCount int32
// ChannelMetrics holds connectivity state, target and call metrics for the
// channel within channelz.
ChannelMetrics ChannelMetrics
}
@@ -50,6 +52,8 @@ type Channel struct {
// nesting.
func (c *Channel) channelzIdentifier() {}
// String returns a string representation of the Channel, including its parent
// entity and ID.
func (c *Channel) String() string {
if c.Parent == nil {
return fmt.Sprintf("Channel #%d", c.ID)
@@ -61,24 +65,31 @@ func (c *Channel) id() int64 {
return c.ID
}
// SubChans returns a copy of the map of sub-channels associated with the
// Channel.
func (c *Channel) SubChans() map[int64]string {
db.mu.RLock()
defer db.mu.RUnlock()
return copyMap(c.subChans)
}
// NestedChans returns a copy of the map of nested channels associated with the
// Channel.
func (c *Channel) NestedChans() map[int64]string {
db.mu.RLock()
defer db.mu.RUnlock()
return copyMap(c.nestedChans)
}
// Trace returns a copy of the Channel's trace data.
func (c *Channel) Trace() *ChannelTrace {
db.mu.RLock()
defer db.mu.RUnlock()
return c.trace.copy()
}
// ChannelMetrics holds connectivity state, target and call metrics for the
// channel within channelz.
type ChannelMetrics struct {
// The current connectivity state of the channel.
State atomic.Pointer[connectivity.State]
@@ -136,12 +147,16 @@ func strFromPointer(s *string) string {
return *s
}
// String returns a string representation of the ChannelMetrics, including its
// state, target, and call metrics.
func (c *ChannelMetrics) String() string {
return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v",
c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(),
)
}
// NewChannelMetricForTesting creates a new instance of ChannelMetrics with
// specified initial values for testing purposes.
func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics {
c := &ChannelMetrics{}
c.State.Store(&state)
+2
View File
@@ -59,6 +59,8 @@ func NewServerMetricsForTesting(started, succeeded, failed, timestamp int64) *Se
return sm
}
// CopyFrom copies the metrics data from the provided ServerMetrics
// instance into the current instance.
func (sm *ServerMetrics) CopyFrom(o *ServerMetrics) {
sm.CallsStarted.Store(o.CallsStarted.Load())
sm.CallsSucceeded.Store(o.CallsSucceeded.Load())
+7
View File
@@ -70,13 +70,18 @@ type EphemeralSocketMetrics struct {
RemoteFlowControlWindow int64
}
// SocketType represents the type of socket.
type SocketType string
// SocketType can be one of these.
const (
SocketTypeNormal = "NormalSocket"
SocketTypeListen = "ListenSocket"
)
// Socket represents a socket within channelz which includes socket
// metrics and data related to socket activity and provides methods
// for managing and interacting with sockets.
type Socket struct {
Entity
SocketType SocketType
@@ -100,6 +105,8 @@ type Socket struct {
Security credentials.ChannelzSecurityValue
}
// String returns a string representation of the Socket, including its parent
// entity, socket type, and ID.
func (ls *Socket) String() string {
return fmt.Sprintf("%s %s #%d", ls.Parent, ls.SocketType, ls.ID)
}
+2
View File
@@ -47,12 +47,14 @@ func (sc *SubChannel) id() int64 {
return sc.ID
}
// Sockets returns a copy of the sockets map associated with the SubChannel.
func (sc *SubChannel) Sockets() map[int64]string {
db.mu.RLock()
defer db.mu.RUnlock()
return copyMap(sc.sockets)
}
// Trace returns a copy of the ChannelTrace associated with the SubChannel.
func (sc *SubChannel) Trace() *ChannelTrace {
db.mu.RLock()
defer db.mu.RUnlock()
+14 -5
View File
@@ -79,13 +79,21 @@ type TraceEvent struct {
Parent *TraceEvent
}
// ChannelTrace provides tracing information for a channel.
// It tracks various events and metadata related to the channel's lifecycle
// and operations.
type ChannelTrace struct {
cm *channelMap
clearCalled bool
cm *channelMap
clearCalled bool
// The time when the trace was created.
CreationTime time.Time
EventNum int64
mu sync.Mutex
Events []*traceEvent
// A counter for the number of events recorded in the
// trace.
EventNum int64
mu sync.Mutex
// A slice of traceEvent pointers representing the events recorded for
// this channel.
Events []*traceEvent
}
func (c *ChannelTrace) copy() *ChannelTrace {
@@ -175,6 +183,7 @@ var refChannelTypeToString = map[RefChannelType]string{
RefNormalSocket: "NormalSocket",
}
// String returns a string representation of the RefChannelType
func (r RefChannelType) String() string {
return refChannelTypeToString[r]
}
+5
View File
@@ -50,6 +50,11 @@ var (
// xDS fallback is turned on. If this is unset or is false, only the first
// xDS server in the list of server configs will be used.
XDSFallbackSupport = boolFromEnv("GRPC_EXPERIMENTAL_XDS_FALLBACK", false)
// NewPickFirstEnabled is set if the new pickfirst leaf policy is to be used
// instead of the exiting pickfirst implementation. This can be enabled by
// setting the environment variable "GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST"
// to "true".
NewPickFirstEnabled = boolFromEnv("GRPC_EXPERIMENTAL_ENABLE_NEW_PICK_FIRST", false)
)
func boolFromEnv(envVar string, def bool) bool {
+1 -1
View File
@@ -53,7 +53,7 @@ func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
return cs
}
// TrySchedule tries to schedules the provided callback function f to be
// TrySchedule tries to schedule the provided callback function f to be
// executed in the order it was added. This is a best-effort operation. If the
// context passed to NewCallbackSerializer was canceled before this method is
// called, the callback will not be scheduled.
+1 -1
View File
@@ -39,7 +39,7 @@ func ParseMethod(methodName string) (service, method string, _ error) {
}
// baseContentType is the base content-type for gRPC. This is a valid
// content-type on it's own, but can also include a content-subtype such as
// content-type on its own, but can also include a content-subtype such as
// "proto" as a suffix after "+" or ";". See
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
// for more details.
+3 -1
View File
@@ -182,6 +182,7 @@ func (m *Manager) tryEnterIdleMode() bool {
return true
}
// EnterIdleModeForTesting instructs the channel to enter idle mode.
func (m *Manager) EnterIdleModeForTesting() {
m.tryEnterIdleMode()
}
@@ -225,7 +226,7 @@ func (m *Manager) ExitIdleMode() error {
// came in and OnCallBegin() noticed that the calls count is negative.
// - Channel is in idle mode, and multiple new RPCs come in at the same
// time, all of them notice a negative calls count in OnCallBegin and get
// here. The first one to get the lock would got the channel to exit idle.
// here. The first one to get the lock would get the channel to exit idle.
// - Channel is not in idle mode, and the user calls Connect which calls
// m.ExitIdleMode.
//
@@ -266,6 +267,7 @@ func (m *Manager) isClosed() bool {
return atomic.LoadInt32(&m.closed) == 1
}
// Close stops the timer associated with the Manager, if it exists.
func (m *Manager) Close() {
atomic.StoreInt32(&m.closed, 1)
+3 -5
View File
@@ -191,6 +191,8 @@ var (
// ExitIdleModeForTesting gets the ClientConn to exit IDLE mode.
ExitIdleModeForTesting any // func(*grpc.ClientConn) error
// ChannelzTurnOffForTesting disables the Channelz service for testing
// purposes.
ChannelzTurnOffForTesting func()
// TriggerXDSResourceNotFoundForTesting causes the provided xDS Client to
@@ -205,10 +207,6 @@ var (
// default resolver scheme.
UserSetDefaultScheme = false
// ShuffleAddressListForTesting pseudo-randomizes the order of addresses. n
// is the number of elements. swap swaps the elements with indexes i and j.
ShuffleAddressListForTesting any // func(n int, swap func(i, j int))
// ConnectedAddress returns the connected address for a SubConnState. The
// address is only valid if the state is READY.
ConnectedAddress any // func (scs SubConnState) resolver.Address
@@ -235,7 +233,7 @@ var (
//
// The implementation is expected to create a health checking RPC stream by
// calling newStream(), watch for the health status of serviceName, and report
// it's health back by calling setConnectivityState().
// its health back by calling setConnectivityState().
//
// The health checking protocol is defined at:
// https://github.com/grpc/grpc/blob/master/doc/health-checking.md
+1 -1
View File
@@ -177,7 +177,7 @@ type dnsResolver struct {
// finished. Otherwise, data race will be possible. [Race Example] in
// dns_resolver_test we replace the real lookup functions with mocked ones to
// facilitate testing. If Close() doesn't wait for watcher() goroutine
// finishes, race detector sometimes will warns lookup (READ the lookup
// finishes, race detector sometimes will warn lookup (READ the lookup
// function pointers) inside watcher() goroutine has data race with
// replaceNetFunc (WRITE the lookup function pointers).
wg sync.WaitGroup
+10
View File
@@ -54,6 +54,8 @@ func verifyLabels(desc *estats.MetricDescriptor, labelsRecv ...string) {
}
}
// RecordInt64Count records the measurement alongside labels on the int
// count associated with the provided handle.
func (l *MetricsRecorderList) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) {
verifyLabels(handle.Descriptor(), labels...)
@@ -62,6 +64,8 @@ func (l *MetricsRecorderList) RecordInt64Count(handle *estats.Int64CountHandle,
}
}
// RecordFloat64Count records the measurement alongside labels on the float
// count associated with the provided handle.
func (l *MetricsRecorderList) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) {
verifyLabels(handle.Descriptor(), labels...)
@@ -70,6 +74,8 @@ func (l *MetricsRecorderList) RecordFloat64Count(handle *estats.Float64CountHand
}
}
// RecordInt64Histo records the measurement alongside labels on the int
// histo associated with the provided handle.
func (l *MetricsRecorderList) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) {
verifyLabels(handle.Descriptor(), labels...)
@@ -78,6 +84,8 @@ func (l *MetricsRecorderList) RecordInt64Histo(handle *estats.Int64HistoHandle,
}
}
// RecordFloat64Histo records the measurement alongside labels on the float
// histo associated with the provided handle.
func (l *MetricsRecorderList) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) {
verifyLabels(handle.Descriptor(), labels...)
@@ -86,6 +94,8 @@ func (l *MetricsRecorderList) RecordFloat64Histo(handle *estats.Float64HistoHand
}
}
// RecordInt64Gauge records the measurement alongside labels on the int
// gauge associated with the provided handle.
func (l *MetricsRecorderList) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) {
verifyLabels(handle.Descriptor(), labels...)
+34 -1
View File
@@ -149,6 +149,8 @@ func (s *Status) WithDetails(details ...protoadapt.MessageV1) (*Status, error) {
// Details returns a slice of details messages attached to the status.
// If a detail cannot be decoded, the error is returned in place of the detail.
// If the detail can be decoded, the proto message returned is of the same
// type that was given to WithDetails().
func (s *Status) Details() []any {
if s == nil || s.s == nil {
return nil
@@ -160,7 +162,38 @@ func (s *Status) Details() []any {
details = append(details, err)
continue
}
details = append(details, detail)
// The call to MessageV1Of is required to unwrap the proto message if
// it implemented only the MessageV1 API. The proto message would have
// been wrapped in a V2 wrapper in Status.WithDetails. V2 messages are
// added to a global registry used by any.UnmarshalNew().
// MessageV1Of has the following behaviour:
// 1. If the given message is a wrapped MessageV1, it returns the
// unwrapped value.
// 2. If the given message already implements MessageV1, it returns it
// as is.
// 3. Else, it wraps the MessageV2 in a MessageV1 wrapper.
//
// Since the Status.WithDetails() API only accepts MessageV1, calling
// MessageV1Of ensures we return the same type that was given to
// WithDetails:
// * If the give type implemented only MessageV1, the unwrapping from
// point 1 above will restore the type.
// * If the given type implemented both MessageV1 and MessageV2, point 2
// above will ensure no wrapping is performed.
// * If the given type implemented only MessageV2 and was wrapped using
// MessageV1Of before passing to WithDetails(), it would be unwrapped
// in WithDetails by calling MessageV2Of(). Point 3 above will ensure
// that the type is wrapped in a MessageV1 wrapper again before
// returning. Note that protoc-gen-go doesn't generate code which
// implements ONLY MessageV2 at the time of writing.
//
// NOTE: Status details can also be added using the FromProto method.
// This could theoretically allow passing a Detail message that only
// implements the V2 API. In such a case the message will be wrapped in
// a MessageV1 wrapper when fetched using Details().
// Since protoc-gen-go generates only code that implements both V1 and
// V2 APIs for backward compatibility, this is not a concern.
details = append(details, protoadapt.MessageV1Of(detail))
}
return details
}
+39 -21
View File
@@ -86,9 +86,9 @@ type http2Client struct {
writerDone chan struct{} // sync point to enable testing.
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{}
framer *framer
goAway chan struct{}
keepaliveDone chan struct{} // Closed when the keepalive goroutine exits.
framer *framer
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
// Do not access controlBuf with mu held.
@@ -335,6 +335,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
readerDone: make(chan struct{}),
writerDone: make(chan struct{}),
goAway: make(chan struct{}),
keepaliveDone: make(chan struct{}),
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
fc: &trInFlow{limit: uint32(icwz)},
scheme: scheme,
@@ -527,8 +528,9 @@ func (t *http2Client) getPeer() *peer.Peer {
// to be the last frame loopy writes to the transport.
func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
maxStreamID := t.nextID - 2
t.mu.Unlock()
if err := t.framer.fr.WriteGoAway(maxStreamID, http2.ErrCodeNo, g.debugData); err != nil {
return false, err
}
return false, g.closeConn
@@ -1008,6 +1010,9 @@ func (t *http2Client) Close(err error) {
// should unblock it so that the goroutine eventually exits.
t.kpDormancyCond.Signal()
}
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
goAwayDebugMessage := t.goAwayDebugMessage
t.mu.Unlock()
// Per HTTP/2 spec, a GOAWAY frame must be sent before closing the
@@ -1025,11 +1030,13 @@ func (t *http2Client) Close(err error) {
}
t.cancel()
t.conn.Close()
// Waits for the reader and keepalive goroutines to exit before returning to
// ensure all resources are cleaned up before Close can return.
<-t.readerDone
if t.keepaliveEnabled {
<-t.keepaliveDone
}
channelz.RemoveEntry(t.channelz.ID)
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
var st *status.Status
if len(goAwayDebugMessage) > 0 {
st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
@@ -1316,11 +1323,11 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
t.controlBuf.put(pingAck)
}
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
t.mu.Lock()
if t.state == closing {
t.mu.Unlock()
return
return nil
}
if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
// When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
@@ -1332,8 +1339,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id))
return
return connectionErrorf(true, nil, "received goaway with non-zero even-numbered stream id: %v", id)
}
// A client can receive multiple GoAways from the server (see
// https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
@@ -1350,8 +1356,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return
return connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID)
}
default:
t.setGoAwayReason(f)
@@ -1375,8 +1380,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.prevGoAwayID = id
if len(t.activeStreams) == 0 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
return
return connectionErrorf(true, nil, "received goaway and there are no active streams")
}
streamsToClose := make([]*Stream, 0)
@@ -1393,6 +1397,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
for _, stream := range streamsToClose {
t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
}
return nil
}
// setGoAwayReason sets the value of t.goAwayReason based
@@ -1628,7 +1633,13 @@ func (t *http2Client) readServerPreface() error {
// network connection. If the server preface is not read successfully, an
// error is pushed to errCh; otherwise errCh is closed with no error.
func (t *http2Client) reader(errCh chan<- error) {
defer close(t.readerDone)
var errClose error
defer func() {
close(t.readerDone)
if errClose != nil {
t.Close(errClose)
}
}()
if err := t.readServerPreface(); err != nil {
errCh <- err
@@ -1669,7 +1680,7 @@ func (t *http2Client) reader(errCh chan<- error) {
continue
}
// Transport error.
t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
errClose = connectionErrorf(true, err, "error reading from server: %v", err)
return
}
switch frame := frame.(type) {
@@ -1684,7 +1695,7 @@ func (t *http2Client) reader(errCh chan<- error) {
case *http2.PingFrame:
t.handlePing(frame)
case *http2.GoAwayFrame:
t.handleGoAway(frame)
errClose = t.handleGoAway(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
default:
@@ -1697,6 +1708,13 @@ func (t *http2Client) reader(errCh chan<- error) {
// keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() {
var err error
defer func() {
close(t.keepaliveDone)
if err != nil {
t.Close(err)
}
}()
p := &ping{data: [8]byte{}}
// True iff a ping has been sent, and no data has been received since then.
outstandingPing := false
@@ -1720,7 +1738,7 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
err = connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout")
return
}
t.mu.Lock()
+9
View File
@@ -547,6 +547,15 @@ func (s *Stream) write(m recvMsg) {
s.buf.put(m)
}
// ReadHeader reads data into the provided header slice from the stream. It
// first checks if there was an error during a previous read operation and
// returns it if present. It then requests a read operation for the length of
// the header. It continues to read from the stream until the entire header
// slice is filled or an error occurs. If an `io.EOF` error is encountered
// with partially read data, it is converted to `io.ErrUnexpectedEOF` to
// indicate an unexpected end of the stream. The method returns any error
// encountered during the read process or nil if the header was successfully
// read.
func (s *Stream) ReadHeader(header []byte) (err error) {
// Don't request a read if there was an error earlier
if er := s.trReader.er; er != nil {
+24 -8
View File
@@ -65,6 +65,9 @@ var (
refObjectPool = sync.Pool{New: func() any { return new(atomic.Int32) }}
)
// IsBelowBufferPoolingThreshold returns true if the given size is less than or
// equal to the threshold for buffer pooling. This is used to determine whether
// to pool buffers or allocate them directly.
func IsBelowBufferPoolingThreshold(size int) bool {
return size <= bufferPoolingThreshold
}
@@ -89,7 +92,11 @@ func newBuffer() *buffer {
//
// Note that the backing array of the given data is not copied.
func NewBuffer(data *[]byte, pool BufferPool) Buffer {
if pool == nil || IsBelowBufferPoolingThreshold(len(*data)) {
// Use the buffer's capacity instead of the length, otherwise buffers may
// not be reused under certain conditions. For example, if a large buffer
// is acquired from the pool, but fewer bytes than the buffering threshold
// are written to it, the buffer will not be returned to the pool.
if pool == nil || IsBelowBufferPoolingThreshold(cap(*data)) {
return (SliceBuffer)(*data)
}
b := newBuffer()
@@ -194,19 +201,19 @@ func (b *buffer) read(buf []byte) (int, Buffer) {
return n, b
}
// String returns a string representation of the buffer. May be used for
// debugging purposes.
func (b *buffer) String() string {
return fmt.Sprintf("mem.Buffer(%p, data: %p, length: %d)", b, b.ReadOnlyData(), len(b.ReadOnlyData()))
}
// ReadUnsafe reads bytes from the given Buffer into the provided slice.
// It does not perform safety checks.
func ReadUnsafe(dst []byte, buf Buffer) (int, Buffer) {
return buf.read(dst)
}
// SplitUnsafe modifies the receiver to point to the first n bytes while it
// returns a new reference to the remaining bytes. The returned Buffer functions
// just like a normal reference acquired using Ref().
// returns a new reference to the remaining bytes. The returned Buffer
// functions just like a normal reference acquired using Ref().
func SplitUnsafe(buf Buffer, n int) (left, right Buffer) {
return buf.split(n)
}
@@ -232,12 +239,21 @@ func (e emptyBuffer) read([]byte) (int, Buffer) {
return 0, e
}
// SliceBuffer is a Buffer implementation that wraps a byte slice. It provides
// methods for reading, splitting, and managing the byte slice.
type SliceBuffer []byte
// ReadOnlyData returns the byte slice.
func (s SliceBuffer) ReadOnlyData() []byte { return s }
func (s SliceBuffer) Ref() {}
func (s SliceBuffer) Free() {}
func (s SliceBuffer) Len() int { return len(s) }
// Ref is a noop implementation of Ref.
func (s SliceBuffer) Ref() {}
// Free is a noop implementation of Free.
func (s SliceBuffer) Free() {}
// Len is a noop implementation of Len.
func (s SliceBuffer) Len() int { return len(s) }
func (s SliceBuffer) split(n int) (left, right Buffer) {
return s[:n], s[n:]
+1 -2
View File
@@ -791,9 +791,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool
if !haveCompressor {
if isServer {
return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
} else {
return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
return status.Newf(codes.Internal, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
+1 -1
View File
@@ -19,4 +19,4 @@
package grpc
// Version is the current grpc version.
const Version = "1.67.1"
const Version = "1.68.0"
+4 -8
View File
@@ -961,9 +961,6 @@ github.com/go-micro/plugins/v4/store/nats-js-kv
# github.com/go-micro/plugins/v4/store/redis v1.2.1
## explicit; go 1.17
github.com/go-micro/plugins/v4/store/redis
# github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker v1.2.0
## explicit; go 1.17
github.com/go-micro/plugins/v4/wrapper/breaker/gobreaker
# github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus v1.2.0
## explicit; go 1.17
github.com/go-micro/plugins/v4/wrapper/monitoring/prometheus
@@ -1798,9 +1795,6 @@ github.com/sirupsen/logrus
# github.com/skeema/knownhosts v1.2.1
## explicit; go 1.17
github.com/skeema/knownhosts
# github.com/sony/gobreaker v0.5.0
## explicit; go 1.12
github.com/sony/gobreaker
# github.com/spacewander/go-suffix-tree v0.0.0-20191010040751-0865e368c784
## explicit
github.com/spacewander/go-suffix-tree
@@ -2266,8 +2260,8 @@ google.golang.org/genproto/googleapis/api/httpbody
## explicit; go 1.21
google.golang.org/genproto/googleapis/rpc/errdetails
google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.67.1
## explicit; go 1.21
# google.golang.org/grpc v1.68.0
## explicit; go 1.22.7
google.golang.org/grpc
google.golang.org/grpc/attributes
google.golang.org/grpc/backoff
@@ -2275,6 +2269,8 @@ google.golang.org/grpc/balancer
google.golang.org/grpc/balancer/base
google.golang.org/grpc/balancer/grpclb/state
google.golang.org/grpc/balancer/pickfirst
google.golang.org/grpc/balancer/pickfirst/internal
google.golang.org/grpc/balancer/pickfirst/pickfirstleaf
google.golang.org/grpc/balancer/roundrobin
google.golang.org/grpc/binarylog/grpc_binarylog_v1
google.golang.org/grpc/channelz