Merge pull request #686 from liquidata-inc/aaron/doltremotestorage-metric-client

go/libraries/doltcore/remotestorage: Move retrying and metrics instrumentation for doltremoteapi interactions to grpc client interceptors.
This commit is contained in:
Aaron Son
2020-05-19 10:08:27 -07:00
committed by GitHub
12 changed files with 217 additions and 263 deletions
+12 -4
View File
@@ -18,16 +18,18 @@ import (
"context"
"fmt"
eventsapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
"github.com/liquidata-inc/dolt/go/libraries/utils/filesys"
"google.golang.org/grpc"
"github.com/liquidata-inc/dolt/go/cmd/dolt/cli"
"github.com/liquidata-inc/dolt/go/cmd/dolt/commands"
"github.com/liquidata-inc/dolt/go/cmd/dolt/errhand"
eventsapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
remotesapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/creds"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/liquidata-inc/dolt/go/libraries/utils/argparser"
"github.com/liquidata-inc/dolt/go/libraries/utils/filesys"
)
var checkShortDesc = "Check authenticating with a credential keypair against a doltremoteapi."
@@ -136,8 +138,14 @@ func loadCred(dEnv *env.DoltEnv, apr *argparser.ArgParseResults) (creds.DoltCred
}
func checkCredAndPrintSuccess(ctx context.Context, dEnv *env.DoltEnv, dc creds.DoltCreds, endpoint string) errhand.VerboseError {
conn, err := dEnv.GrpcConnWithCreds(endpoint, false, dc)
endpoint, opts, err := dEnv.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: endpoint,
Creds: dc,
})
if err != nil {
return errhand.BuildDError("error: unable to build server endpoint options.").AddCause(err).Build()
}
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return errhand.BuildDError("error: unable to connect to server with credentials.").AddCause(err).Build()
}
+12 -1
View File
@@ -20,6 +20,8 @@ import (
"io"
"os"
"google.golang.org/grpc"
"github.com/liquidata-inc/dolt/go/cmd/dolt/cli"
"github.com/liquidata-inc/dolt/go/cmd/dolt/commands"
"github.com/liquidata-inc/dolt/go/cmd/dolt/errhand"
@@ -28,6 +30,7 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/creds"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env/actions"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/liquidata-inc/dolt/go/libraries/utils/argparser"
"github.com/liquidata-inc/dolt/go/libraries/utils/filesys"
)
@@ -159,7 +162,15 @@ func updateProfileWithCredentials(ctx context.Context, dEnv *env.DoltEnv, c cred
host := dEnv.Config.GetStringOrDefault(env.RemotesApiHostKey, env.DefaultRemotesApiHost)
port := dEnv.Config.GetStringOrDefault(env.RemotesApiHostPortKey, env.DefaultRemotesApiPort)
conn, err := dEnv.GrpcConnWithCreds(fmt.Sprintf("%s:%s", *host, *port), false, c)
hostAndPort := fmt.Sprintf("%s:%s", *host, *port)
endpoint, opts, err := dEnv.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: hostAndPort,
Creds: c,
})
if err != nil {
return fmt.Errorf("error: unable to build dial options server with credentials: %w", err)
}
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return fmt.Errorf("error: unable to connect to server with credentials: %w", err)
}
+10 -1
View File
@@ -20,6 +20,7 @@ import (
"time"
"github.com/skratchdot/open-golang/open"
"google.golang.org/grpc"
"github.com/liquidata-inc/dolt/go/cmd/dolt/cli"
"github.com/liquidata-inc/dolt/go/cmd/dolt/errhand"
@@ -28,6 +29,7 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/creds"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env/actions"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/liquidata-inc/dolt/go/libraries/utils/argparser"
"github.com/liquidata-inc/dolt/go/libraries/utils/filesys"
)
@@ -198,7 +200,14 @@ func openBrowserForCredsAdd(dEnv *env.DoltEnv, dc creds.DoltCreds) {
func getCredentialsClient(dEnv *env.DoltEnv, dc creds.DoltCreds) (remotesapi.CredentialsServiceClient, errhand.VerboseError) {
host := dEnv.Config.GetStringOrDefault(env.RemotesApiHostKey, env.DefaultRemotesApiHost)
port := dEnv.Config.GetStringOrDefault(env.RemotesApiHostPortKey, env.DefaultRemotesApiPort)
conn, err := dEnv.GrpcConnWithCreds(fmt.Sprintf("%s:%s", *host, *port), false, dc)
endpoint, opts, err := dEnv.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: fmt.Sprintf("%s:%s", *host, *port),
Creds: dc,
})
if err != nil {
return nil, errhand.BuildDError("error: unable to build dial options for connecting to server with credentials.").AddCause(err).Build()
}
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return nil, errhand.BuildDError("error: unable to connect to server with credentials.").AddCause(err).Build()
}
+13 -2
View File
@@ -22,10 +22,12 @@ import (
"time"
"github.com/fatih/color"
"google.golang.org/grpc"
"github.com/liquidata-inc/dolt/go/cmd/dolt/cli"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/dbfactory"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/liquidata-inc/dolt/go/libraries/events"
"github.com/liquidata-inc/dolt/go/libraries/utils/argparser"
"github.com/liquidata-inc/dolt/go/libraries/utils/filesys"
@@ -146,7 +148,16 @@ func getGRPCEmitter(dEnv *env.DoltEnv) *events.GrpcEmitter {
}
hostAndPort := fmt.Sprintf("%s:%d", *host, port)
conn, _ := dEnv.GrpcConnWithCreds(hostAndPort, insecure, nil)
endpoint, opts, err := dEnv.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: hostAndPort,
Insecure: insecure,
})
if err != nil {
return nil
}
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return nil
}
return events.NewGrpcEmitter(conn)
}
+3 -3
View File
@@ -63,9 +63,9 @@ var DBFactories = map[string]DBFactory{
}
// InitializeFactories initializes any factories that rely on a GRPCConnectionProvider (Namely http and https)
func InitializeFactories(grpcCP GRPCConnectionProvider) {
DBFactories[HTTPScheme] = NewDoltRemoteFactory(grpcCP, true)
DBFactories[HTTPSScheme] = NewDoltRemoteFactory(grpcCP, false)
func InitializeFactories(dp GRPCDialProvider) {
DBFactories[HTTPScheme] = NewDoltRemoteFactory(dp, true)
DBFactories[HTTPSScheme] = NewDoltRemoteFactory(dp, false)
}
// CreateDB creates a database based on the supplied urlStr, and creation params. The DBFactory used for creation is
+20 -7
View File
@@ -22,27 +22,29 @@ import (
"google.golang.org/grpc"
remotesapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/remotestorage"
"github.com/liquidata-inc/dolt/go/libraries/events"
"github.com/liquidata-inc/dolt/go/store/chunks"
"github.com/liquidata-inc/dolt/go/store/datas"
"github.com/liquidata-inc/dolt/go/store/types"
)
// GRPCConnectionProvider is an interface for getting a *grpc.ClientConn.
type GRPCConnectionProvider interface {
GrpcConn(hostAndPort string, insecure bool) (*grpc.ClientConn, error)
// GRPCDialProvider is an interface for getting a *grpc.ClientConn.
type GRPCDialProvider interface {
GetGRPCDialParams(grpcendpoint.Config) (string, []grpc.DialOption, error)
}
// DoldRemoteFactory is a DBFactory implementation for creating databases backed by a remote server that implements the
// GRPC rpcs defined by remoteapis.ChunkStoreServiceClient
type DoltRemoteFactory struct {
grpcCP GRPCConnectionProvider
dp GRPCDialProvider
insecure bool
}
// NewDoltRemoteFactory creates a DoltRemoteFactory instance using the given GRPCConnectionProvider, and insecure setting
func NewDoltRemoteFactory(grpcCP GRPCConnectionProvider, insecure bool) DoltRemoteFactory {
return DoltRemoteFactory{grpcCP, insecure}
func NewDoltRemoteFactory(dp GRPCDialProvider, insecure bool) DoltRemoteFactory {
return DoltRemoteFactory{dp, insecure}
}
// CreateDB creates a database backed by a remote server that implements the GRPC rpcs defined by
@@ -62,8 +64,19 @@ func (fact DoltRemoteFactory) CreateDB(ctx context.Context, nbf *types.NomsBinFo
}
func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.NomsBinFormat, urlObj *url.URL, params map[string]string) (chunks.ChunkStore, error) {
conn, err := fact.grpcCP.GrpcConn(urlObj.Host, fact.insecure)
endpoint, opts, err := fact.dp.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: urlObj.Host,
Insecure: fact.insecure,
WithEnvCreds: true,
})
if err != nil {
return nil, err
}
opts = append(opts, grpc.WithChainUnaryInterceptor(remotestorage.EventsUnaryClientInterceptor(events.GlobalCollector)))
opts = append(opts, grpc.WithChainUnaryInterceptor(remotestorage.RetryingUnaryClientInterceptor))
conn, err := grpc.Dial(endpoint, opts...)
if err != nil {
return nil, err
}
+26 -32
View File
@@ -1,4 +1,4 @@
// Copyright 2019 Liquidata, Inc.
// Copyright 2019-2020 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/creds"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/dbfactory"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/doltdb"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/ref"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
@@ -624,47 +625,40 @@ func (dEnv *DoltEnv) getUserAgentString() string {
return strings.Join(tokens, " ")
}
func (dEnv *DoltEnv) GrpcConnWithCreds(hostAndPort string, insecure bool, rpcCreds credentials.PerRPCCredentials) (*grpc.ClientConn, error) {
if strings.IndexRune(hostAndPort, ':') == -1 {
if insecure {
hostAndPort += ":80"
func (dEnv *DoltEnv) GetGRPCDialParams(config grpcendpoint.Config) (string, []grpc.DialOption, error) {
endpoint := config.Endpoint
if strings.IndexRune(endpoint, ':') == -1 {
if config.Insecure {
endpoint += ":80"
} else {
hostAndPort += ":443"
endpoint += ":443"
}
}
var dialOpt grpc.DialOption
if insecure {
dialOpt = grpc.WithInsecure()
var opts []grpc.DialOption
if config.Insecure {
opts = append(opts, grpc.WithInsecure())
} else {
tc := credentials.NewTLS(&tls.Config{})
dialOpt = grpc.WithTransportCredentials(tc)
opts = append(opts, grpc.WithTransportCredentials(tc))
}
opts := []grpc.DialOption{
dialOpt,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(128 * 1024 * 1024)),
grpc.WithUserAgent(dEnv.getUserAgentString()),
opts = append(opts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(128*1024*1024)))
opts = append(opts, grpc.WithUserAgent(dEnv.getUserAgentString()))
if config.Creds != nil {
opts = append(opts, grpc.WithPerRPCCredentials(config.Creds))
} else if config.WithEnvCreds {
rpcCreds, err := dEnv.getRPCCreds()
if err != nil {
return "", nil, err
}
if rpcCreds != nil {
opts = append(opts, grpc.WithPerRPCCredentials(rpcCreds))
}
}
if rpcCreds != nil {
opts = append(opts, grpc.WithPerRPCCredentials(rpcCreds))
}
conn, err := grpc.Dial(hostAndPort, opts...)
return conn, err
}
func (dEnv *DoltEnv) GrpcConn(hostAndPort string, insecure bool) (*grpc.ClientConn, error) {
rpcCreds, err := dEnv.getRPCCreds()
if err != nil {
return nil, err
}
return dEnv.GrpcConnWithCreds(hostAndPort, insecure, rpcCreds)
return endpoint, opts, nil
}
func (dEnv *DoltEnv) GetRemotes() (map[string]Remote, error) {
@@ -0,0 +1,26 @@
// Copyright 2020 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package grpcendpoint
import (
"google.golang.org/grpc/credentials"
)
type Config struct {
Endpoint string
Insecure bool
Creds credentials.PerRPCCredentials
WithEnvCreds bool
}
@@ -31,9 +31,7 @@ import (
"github.com/cenkalti/backoff"
eventsapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
remotesapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/liquidata-inc/dolt/go/libraries/events"
"github.com/liquidata-inc/dolt/go/libraries/utils/iohelp"
"github.com/liquidata-inc/dolt/go/store/atomicerr"
"github.com/liquidata-inc/dolt/go/store/chunks"
@@ -93,23 +91,10 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
org := tokens[0]
repoName := tokens[1]
if _, ok := csClient.(RetryingChunkStoreServiceClient); !ok {
csClient = RetryingChunkStoreServiceClient{csClient}
}
return NewDoltChunkStore(ctx, nbf, org, repoName, host, RetryingChunkStoreServiceClient{csClient})
return NewDoltChunkStore(ctx, nbf, org, repoName, host, csClient)
}
func NewDoltChunkStore(ctx context.Context, nbf *types.NomsBinFormat, org, repoName, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
if _, ok := csClient.(RetryingChunkStoreServiceClient); !ok {
csClient = RetryingChunkStoreServiceClient{csClient}
}
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_GET_REPO_METADATA)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
metadata, err := csClient.GetRepoMetadata(ctx, &remotesapi.GetRepoMetadataRequest{
RepoId: &remotesapi.RepoId{
Org: org,
@@ -122,7 +107,6 @@ func NewDoltChunkStore(ctx context.Context, nbf *types.NomsBinFormat, org, repoN
})
if err != nil {
counter.Inc()
return nil, err
}
@@ -283,16 +267,10 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (m
batchItr(len(hashesBytes), getLocsBatchSize, func(st, end int) (stop bool) {
batch := hashesBytes[st:end]
f := func() error {
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_GET_DOWNLOAD_LOCATIONS)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
req := remotesapi.GetDownloadLocsRequest{RepoId: dcs.getRepoId(), ChunkHashes: batch}
resp, err := dcs.csClient.GetDownloadLocations(ctx, &req)
if err != nil {
counter.Inc()
return NewRpcError(err, "GetDownloadLocations", dcs.host, req)
}
@@ -407,19 +385,12 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
currHashSl := hashSl[st:end]
currByteSl := byteSl[st:end]
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_HAS_CHUNKS)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
// send a request to the remote api to determine which chunks the remote api already has
req := remotesapi.HasChunksRequest{RepoId: dcs.getRepoId(), Hashes: currByteSl}
resp, err := dcs.csClient.HasChunks(ctx, &req)
if err != nil {
err = NewRpcError(err, "HasMany", dcs.host, req)
counter.Inc()
return true
}
@@ -483,16 +454,10 @@ func (dcs *DoltChunkStore) Version() string {
// Rebase brings this ChunkStore into sync with the persistent storage's
// current root.
func (dcs *DoltChunkStore) Rebase(ctx context.Context) error {
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_REBASE)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
req := &remotesapi.RebaseRequest{RepoId: dcs.getRepoId()}
_, err := dcs.csClient.Rebase(ctx, req)
if err != nil {
counter.Inc()
return NewRpcError(err, "Rebase", dcs.host, req)
}
@@ -502,16 +467,10 @@ func (dcs *DoltChunkStore) Rebase(ctx context.Context) error {
// Root returns the root of the database as of the time the ChunkStore
// was opened or the most recent call to Rebase.
func (dcs *DoltChunkStore) Root(ctx context.Context) (hash.Hash, error) {
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_ROOT)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
req := &remotesapi.RootRequest{RepoId: dcs.getRepoId()}
resp, err := dcs.csClient.Root(ctx, req)
if err != nil {
counter.Inc()
return hash.Hash{}, NewRpcError(err, "Root", dcs.host, req)
}
@@ -522,15 +481,9 @@ func (dcs *DoltChunkStore) Root(ctx context.Context) (hash.Hash, error) {
// persisted root hash from last to current (or keeps it the same).
// If last doesn't match the root in persistent storage, returns false.
func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_COMMIT)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
hashToChunkCount, err := dcs.uploadChunks(ctx)
if err != nil {
counter.Inc()
return false, err
}
@@ -552,7 +505,6 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
resp, err := dcs.csClient.Commit(ctx, req)
if err != nil {
counter.Inc()
return false, NewRpcError(err, "Commit", dcs.host, req)
}
@@ -630,16 +582,10 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
tfds = append(tfds, &v)
}
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_GET_UPLOAD_LOCATIONS)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
req := &remotesapi.GetUploadLocsRequest{RepoId: dcs.getRepoId(), TableFileDetails: tfds}
resp, err := dcs.csClient.GetUploadLocations(ctx, req)
if err != nil {
counter.Inc()
return map[hash.Hash]int{}, err
}
@@ -656,7 +602,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
}
if err != nil {
counter.Inc()
return map[hash.Hash]int{}, err
}
}
@@ -981,16 +926,10 @@ func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, nu
// Sources retrieves the current root hash, and a list of all the table files
func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, error) {
evt := events.NewEvent(eventsapi.ClientEventType_REMOTEAPI_LIST_TABLE_FILES)
defer events.GlobalCollector.CloseEventAndAdd(evt)
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
req := &remotesapi.ListTableFilesRequest{RepoId: dcs.getRepoId()}
resp, err := dcs.csClient.ListTableFiles(ctx, req)
if err != nil {
counter.Inc()
return hash.Hash{}, nil, err
}
@@ -0,0 +1,58 @@
// Copyright 2020 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotestorage
import (
"context"
"google.golang.org/grpc"
eventsapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
"github.com/liquidata-inc/dolt/go/libraries/events"
)
func EventsUnaryClientInterceptor(collector *events.Collector) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
errCount := int32(0)
if event, ok := remoteAPIMethodsToEvents[method]; ok {
evt := events.NewEvent(event)
defer func() {
if errCount > 0 {
counter := events.NewCounter(eventsapi.MetricID_REMOTEAPI_RPC_ERROR)
counter.Add(errCount)
evt.AddMetric(counter)
}
collector.CloseEventAndAdd(evt)
}()
}
err := invoker(ctx, method, req, reply, cc, opts...)
if err != nil {
errCount++
}
return err
}
}
var remoteAPIMethodsToEvents map[string]eventsapi.ClientEventType = map[string]eventsapi.ClientEventType{
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/GetRepoMetadata": eventsapi.ClientEventType_REMOTEAPI_GET_REPO_METADATA,
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/HasChunks": eventsapi.ClientEventType_REMOTEAPI_HAS_CHUNKS,
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/GetDownloadLocations": eventsapi.ClientEventType_REMOTEAPI_GET_DOWNLOAD_LOCATIONS,
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/GetUploadLocations": eventsapi.ClientEventType_REMOTEAPI_GET_UPLOAD_LOCATIONS,
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/Rebase": eventsapi.ClientEventType_REMOTEAPI_REBASE,
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/Root": eventsapi.ClientEventType_REMOTEAPI_ROOT,
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/Commit": eventsapi.ClientEventType_REMOTEAPI_COMMIT,
"/dolt.services.remotesapi.v1alpha1.ChunkStoreService/ListTableFiles": eventsapi.ClientEventType_REMOTEAPI_LIST_TABLE_FILES,
// "/dolt.services.remotesapi.v1alpha1.ChunkStoreService/AddTableFiles": eventsapi.ClientEventType_REMOTEAPI_ADD_TABLE_FILES,
}
@@ -1,151 +0,0 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotestorage
import (
"context"
"github.com/cenkalti/backoff"
"google.golang.org/grpc"
remotesapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
)
const (
csClientRetries = 5
)
var csRetryParams = backoff.NewExponentialBackOff()
type RetryingChunkStoreServiceClient struct {
client remotesapi.ChunkStoreServiceClient
}
func (c RetryingChunkStoreServiceClient) HasChunks(ctx context.Context, in *remotesapi.HasChunksRequest, opts ...grpc.CallOption) (*remotesapi.HasChunksResponse, error) {
var resp *remotesapi.HasChunksResponse
op := func() error {
var err error
resp, err = c.client.HasChunks(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) GetDownloadLocations(ctx context.Context, in *remotesapi.GetDownloadLocsRequest, opts ...grpc.CallOption) (*remotesapi.GetDownloadLocsResponse, error) {
var resp *remotesapi.GetDownloadLocsResponse
op := func() error {
var err error
resp, err = c.client.GetDownloadLocations(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) GetUploadLocations(ctx context.Context, in *remotesapi.GetUploadLocsRequest, opts ...grpc.CallOption) (*remotesapi.GetUploadLocsResponse, error) {
var resp *remotesapi.GetUploadLocsResponse
op := func() error {
var err error
resp, err = c.client.GetUploadLocations(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) GetRepoMetadata(ctx context.Context, in *remotesapi.GetRepoMetadataRequest, opts ...grpc.CallOption) (*remotesapi.GetRepoMetadataResponse, error) {
var resp *remotesapi.GetRepoMetadataResponse
op := func() error {
var err error
resp, err = c.client.GetRepoMetadata(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) Rebase(ctx context.Context, in *remotesapi.RebaseRequest, opts ...grpc.CallOption) (*remotesapi.RebaseResponse, error) {
var resp *remotesapi.RebaseResponse
op := func() error {
var err error
resp, err = c.client.Rebase(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) Root(ctx context.Context, in *remotesapi.RootRequest, opts ...grpc.CallOption) (*remotesapi.RootResponse, error) {
var resp *remotesapi.RootResponse
op := func() error {
var err error
resp, err = c.client.Root(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) Commit(ctx context.Context, in *remotesapi.CommitRequest, opts ...grpc.CallOption) (*remotesapi.CommitResponse, error) {
var resp *remotesapi.CommitResponse
op := func() error {
var err error
resp, err = c.client.Commit(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) ListTableFiles(ctx context.Context, in *remotesapi.ListTableFilesRequest, opts ...grpc.CallOption) (*remotesapi.ListTableFilesResponse, error) {
var resp *remotesapi.ListTableFilesResponse
op := func() error {
var err error
resp, err = c.client.ListTableFiles(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
func (c RetryingChunkStoreServiceClient) AddTableFiles(ctx context.Context, in *remotesapi.AddTableFilesRequest, opts ...grpc.CallOption) (*remotesapi.AddTableFilesResponse, error) {
var resp *remotesapi.AddTableFilesResponse
op := func() error {
var err error
resp, err = c.client.AddTableFiles(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}
@@ -0,0 +1,36 @@
// Copyright 2020 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package remotestorage
import (
"context"
"github.com/cenkalti/backoff"
"google.golang.org/grpc"
)
const (
csClientRetries = 5
)
var csRetryParams = backoff.NewExponentialBackOff()
func RetryingUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
doit := func() error {
err := invoker(ctx, method, req, reply, cc, opts...)
return processGrpcErr(err)
}
return backoff.Retry(doit, backoff.WithMaxRetries(csRetryParams, csClientRetries))
}