mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-13 19:29:58 -05:00
go/libraries/doltcore/remotestorage: Migrate to backoff/v4, fix some misusage of the backoff API.
This commit is contained in:
@@ -10,7 +10,6 @@ require (
|
||||
github.com/aws/aws-sdk-go v1.32.6
|
||||
github.com/bcicen/jstream v1.0.0
|
||||
github.com/boltdb/bolt v1.3.1
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible
|
||||
github.com/denisbrodbeck/machineid v1.0.1
|
||||
github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078
|
||||
github.com/dolthub/fslock v0.0.3
|
||||
@@ -48,7 +47,7 @@ require (
|
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
|
||||
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261
|
||||
google.golang.org/api v0.32.0
|
||||
google.golang.org/grpc v1.37.0
|
||||
google.golang.org/grpc v1.49.0
|
||||
google.golang.org/protobuf v1.27.1
|
||||
gopkg.in/square/go-jose.v2 v2.5.1
|
||||
gopkg.in/src-d/go-errors.v1 v1.0.0
|
||||
@@ -56,6 +55,7 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v4 v4.1.3
|
||||
github.com/dolthub/go-mysql-server v0.12.1-0.20220920214908-aa94dc1d23d7
|
||||
github.com/google/flatbuffers v2.0.6+incompatible
|
||||
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6
|
||||
|
||||
@@ -128,8 +128,9 @@ github.com/bombsimon/wsl/v3 v3.1.0/go.mod h1:st10JtZYLE4D5sC7b8xV4zTKZwAQjCH/Hy2
|
||||
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
|
||||
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
|
||||
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
|
||||
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
@@ -199,7 +200,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
|
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
|
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
|
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
|
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
|
||||
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
|
||||
@@ -1168,8 +1168,8 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
|
||||
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
|
||||
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
|
||||
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
|
||||
google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c=
|
||||
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
|
||||
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
|
||||
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
@@ -80,13 +80,16 @@ const (
|
||||
|
||||
var tracer = otel.Tracer("github.com/dolthub/dolt/go/libraries/doltcore/remotestorage")
|
||||
|
||||
var uploadRetryParams = backoff.NewExponentialBackOff()
|
||||
var downRetryParams = backoff.NewExponentialBackOff()
|
||||
func downloadBackOff(ctx context.Context) backoff.BackOff {
|
||||
ret := backoff.NewExponentialBackOff()
|
||||
ret.MaxInterval = 5 * time.Second
|
||||
return backoff.WithContext(backoff.WithMaxRetries(ret, downRetryCount), ctx)
|
||||
}
|
||||
|
||||
func init() {
|
||||
uploadRetryParams.MaxInterval = 5 * time.Second
|
||||
|
||||
downRetryParams.MaxInterval = 5 * time.Second
|
||||
func uploadBackOff(ctx context.Context) backoff.BackOff {
|
||||
ret := backoff.NewExponentialBackOff()
|
||||
ret.MaxInterval = 5 * time.Second
|
||||
return backoff.WithContext(backoff.WithMaxRetries(ret, uploadRetryCount), ctx)
|
||||
}
|
||||
|
||||
// Only hedge downloads of ranges < 4MB in length for now.
|
||||
@@ -617,7 +620,7 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (d
|
||||
}
|
||||
return processGrpcErr(err)
|
||||
}
|
||||
return backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
|
||||
return backoff.Retry(op, grpcBackOff(ctx))
|
||||
})
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
@@ -1009,7 +1012,7 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table
|
||||
return nil
|
||||
}
|
||||
|
||||
return backoff.Retry(op, backoff.WithMaxRetries(uploadRetryParams, uploadRetryCount))
|
||||
return backoff.Retry(op, uploadBackOff(ctx))
|
||||
}
|
||||
|
||||
type Sizer interface {
|
||||
@@ -1213,7 +1216,7 @@ func rangeDownloadWithRetries(ctx context.Context, stats StatsRecorder, fetcher
|
||||
}
|
||||
|
||||
dstart := time.Now()
|
||||
err := backoff.Retry(op, backoff.WithMaxRetries(downRetryParams, downRetryCount))
|
||||
err := backoff.Retry(op, downloadBackOff(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@@ -46,34 +46,12 @@ func processHttpResp(resp *http.Response, err error) error {
|
||||
|
||||
// ProcessGrpcErr converts an error from a Grpc call into a RetriableCallState
|
||||
func processGrpcErr(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
st, ok := status.FromError(err)
|
||||
|
||||
if !ok {
|
||||
return err
|
||||
}
|
||||
|
||||
switch st.Code() {
|
||||
case codes.OK:
|
||||
return nil
|
||||
|
||||
case codes.Canceled,
|
||||
codes.Unknown,
|
||||
codes.DeadlineExceeded,
|
||||
codes.Aborted,
|
||||
codes.Internal,
|
||||
codes.DataLoss,
|
||||
codes.ResourceExhausted,
|
||||
codes.Unavailable:
|
||||
return err
|
||||
|
||||
case codes.InvalidArgument,
|
||||
codes.NotFound,
|
||||
codes.AlreadyExists,
|
||||
|
||||
@@ -17,20 +17,22 @@ package remotestorage
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
const (
|
||||
csClientRetries = 5
|
||||
grpcRetries = 5
|
||||
)
|
||||
|
||||
var csRetryParams = backoff.NewExponentialBackOff()
|
||||
func grpcBackOff(ctx context.Context) backoff.BackOff {
|
||||
ret := backoff.NewExponentialBackOff()
|
||||
return backoff.WithContext(backoff.WithMaxRetries(ret, grpcRetries), ctx)
|
||||
}
|
||||
|
||||
func RetryingUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
doit := func() error {
|
||||
err := invoker(ctx, method, req, reply, cc, opts...)
|
||||
return processGrpcErr(err)
|
||||
return processGrpcErr(invoker(ctx, method, req, reply, cc, opts...))
|
||||
}
|
||||
return backoff.Retry(doit, backoff.WithMaxRetries(csRetryParams, csClientRetries))
|
||||
return backoff.Retry(doit, grpcBackOff(ctx))
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user