go/libraries/doltcore/dbfactory, misc: Ensure we close a *grpc.ClientConn when we are done with it.

In the cluster commithook, we had a failure mode where we would leak a
*grpc.ClientConn without closing it. It turns out, if an outbound request has
made on a ClientConn, it will continue retrying the connection. And we would
leak one on a fixed interval, resulting in ever-increasing CPU utilization.
This commit is contained in:
Aaron Son
2024-04-07 15:14:46 -07:00
parent 5732cda698
commit c487288938
9 changed files with 47 additions and 18 deletions

View File

@@ -158,6 +158,7 @@ func checkCredAndPrintSuccess(ctx context.Context, dEnv *env.DoltEnv, dc creds.D
if err != nil {
return errhand.BuildDError("error: unable to connect to server with credentials.").AddCause(err).Build()
}
defer conn.Close()
grpcClient := remotesapi.NewCredentialsServiceClient(conn)

View File

@@ -173,6 +173,7 @@ func updateProfileWithCredentials(ctx context.Context, dEnv *env.DoltEnv, c cred
if err != nil {
return fmt.Errorf("error: unable to connect to server with credentials: %w", err)
}
defer conn.Close()
grpcClient := remotesapi.NewCredentialsServiceClient(conn)
resp, err := grpcClient.WhoAmI(ctx, &remotesapi.WhoAmIRequest{})
if err != nil {

View File

@@ -130,52 +130,52 @@ func (cmd SendMetricsCmd) Exec(ctx context.Context, commandStr string, args []st
// FlushLoggedEvents flushes any logged events in the directory given to an appropriate event emitter
func FlushLoggedEvents(ctx context.Context, dEnv *env.DoltEnv, userHomeDir string, outputType string) error {
emitter, err := NewEmitter(outputType, dEnv)
emitter, closer, err := NewEmitter(outputType, dEnv)
if err != nil {
return err
}
defer closer()
flusher := events.NewFileFlusher(dEnv.FS, userHomeDir, dbfactory.DoltDir, emitter)
return flusher.Flush(ctx)
}
// NewEmitter returns an emitter for the given configuration provider, of the type named. If an empty name is provided,
// defaults to a file-based emitter.
func NewEmitter(emitterType string, pro EmitterConfigProvider) (events.Emitter, error) {
func NewEmitter(emitterType string, pro EmitterConfigProvider) (events.Emitter, func() error, error) {
switch emitterType {
case events.EmitterTypeNull:
return events.NullEmitter{}, nil
return events.NullEmitter{}, func() error { return nil }, nil
case events.EmitterTypeStdout:
return events.WriterEmitter{Wr: os.Stdout}, nil
return events.WriterEmitter{Wr: os.Stdout}, func() error { return nil }, nil
case events.EmitterTypeGrpc:
return GRPCEmitterForConfig(pro)
case events.EmitterTypeFile:
homeDir, err := pro.GetUserHomeDir()
if err != nil {
return nil, err
return nil, nil, err
}
return events.NewFileEmitter(homeDir, dbfactory.DoltDir), nil
return events.NewFileEmitter(homeDir, dbfactory.DoltDir), func() error { return nil }, nil
case events.EmitterTypeLogger:
return events.NewLoggerEmitter(logrus.DebugLevel), nil
return events.NewLoggerEmitter(logrus.DebugLevel), func() error { return nil }, nil
default:
return nil, fmt.Errorf("unknown emitter type: %s", emitterType)
return nil, nil, fmt.Errorf("unknown emitter type: %s", emitterType)
}
}
// GRPCEmitterForConfig returns an event emitter for the given environment, or nil if the environment cannot
// provide one
func GRPCEmitterForConfig(pro EmitterConfigProvider) (*events.GrpcEmitter, error) {
func GRPCEmitterForConfig(pro EmitterConfigProvider) (*events.GrpcEmitter, func() error, error) {
cfg, err := GRPCEventRemoteConfig(pro)
if err != nil {
return nil, err
return nil, nil, err
}
conn, err := grpc.Dial(cfg.Endpoint, cfg.DialOptions...)
if err != nil {
return nil, err
return nil, nil, err
}
return events.NewGrpcEmitter(conn), nil
return events.NewGrpcEmitter(conn), conn.Close, nil
}
// GRPCEventRemoteConfig returns a GRPCRemoteConfig for the given configuration provider

View File

@@ -596,6 +596,7 @@ type heartbeatService struct {
version string
eventEmitter events.Emitter
interval time.Duration
closer func() error
}
func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
@@ -620,7 +621,7 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
return &heartbeatService{} // will be defunct on Run()
}
emitter, err := commands.NewEmitter(emitterType, dEnv)
emitter, closer, err := commands.NewEmitter(emitterType, dEnv)
if err != nil {
return &heartbeatService{} // will be defunct on Run()
}
@@ -631,11 +632,18 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
version: version,
eventEmitter: emitter,
interval: duration,
closer: closer,
}
}
func (h *heartbeatService) Init(ctx context.Context) error { return nil }
func (h *heartbeatService) Stop() error { return nil }
func (h *heartbeatService) Stop() error {
if h.closer != nil {
return h.closer()
}
return nil
}
func (h *heartbeatService) Run(ctx context.Context) {
// Faulty config settings or disabled metrics can cause us to not have a valid event emitter

View File

@@ -128,13 +128,15 @@ func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.Noms
csClient := remotesapi.NewChunkStoreServiceClient(conn)
cs, err := remotestorage.NewDoltChunkStoreFromPath(ctx, nbf, urlObj.Path, urlObj.Host, wsValidate, csClient)
if err != nil {
conn.Close()
return nil, fmt.Errorf("could not access dolt url '%s': %w", urlObj.String(), err)
}
cs = cs.WithHTTPFetcher(cfg.HTTPFetcher)
cs.SetFinalizer(conn.Close)
if _, ok := params[NoCachingParameter]; ok {
cs = cs.WithNoopChunkCache()
}
return cs, err
return cs, nil
}

View File

@@ -112,6 +112,7 @@ type DoltChunkStore struct {
host string
root hash.Hash
csClient remotesapi.ChunkStoreServiceClient
finalizer func() error
cache ChunkCache
metadata *remotesapi.GetRepoMetadataResponse
nbf *types.NomsBinFormat
@@ -159,6 +160,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
repoToken: repoToken,
host: host,
csClient: csClient,
finalizer: func() error { return nil },
cache: newMapChunkCache(),
metadata: metadata,
nbf: nbf,
@@ -181,6 +183,7 @@ func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
@@ -198,6 +201,7 @@ func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: noopChunkCache,
metadata: dcs.metadata,
nbf: dcs.nbf,
@@ -216,6 +220,7 @@ func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
@@ -234,6 +239,7 @@ func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams
host: dcs.host,
root: dcs.root,
csClient: dcs.csClient,
finalizer: dcs.finalizer,
cache: dcs.cache,
metadata: dcs.metadata,
nbf: dcs.nbf,
@@ -248,6 +254,10 @@ func (dcs *DoltChunkStore) SetLogger(logger chunks.DebugLogger) {
dcs.logger = logger
}
func (dcs *DoltChunkStore) SetFinalizer(f func() error) {
dcs.finalizer = f
}
func (dcs *DoltChunkStore) logf(fmt string, args ...interface{}) {
if dcs.logger != nil {
dcs.logger.Logf(fmt, args...)
@@ -961,7 +971,7 @@ func (dcs *DoltChunkStore) PersistGhostHashes(ctx context.Context, refs hash.Has
// Close() concurrently with any other ChunkStore method; behavior is
// undefined and probably crashy.
func (dcs *DoltChunkStore) Close() error {
return nil
return dcs.finalizer()
}
// getting this working using the simplest approach first

View File

@@ -224,6 +224,9 @@ func (c *Controller) Run() {
c.bcReplication.Run()
}()
wg.Wait()
for _, client := range c.replicationClients {
client.closer()
}
}
func (c *Controller) GracefulStop() error {
@@ -1127,6 +1130,7 @@ type replicationServiceClient struct {
url string
tls bool
client replicationapi.ReplicationServiceClient
closer func() error
}
func (c *Controller) replicationServiceDialOptions() []grpc.DialOption {
@@ -1164,6 +1168,7 @@ func (c *Controller) replicationServiceClients(ctx context.Context) ([]*replicat
url: grpcTarget,
tls: c.tlsCfg != nil,
client: client,
closer: cc.Close,
})
}
return ret, nil

View File

@@ -106,6 +106,7 @@ func (r *mysqlDbReplica) Run() {
r.mu.Lock()
defer r.mu.Unlock()
r.lgr.Tracef("mysqlDbReplica[%s]: running", r.client.remote)
defer r.client.closer()
for !r.shutdown {
if r.role != RolePrimary {
r.wait()

View File

@@ -116,12 +116,13 @@ func (we WriterEmitter) LogEventsRequest(ctx context.Context, req *eventsapi.Log
// GrpcEmitter sends events to a GRPC service implementing the eventsapi
type GrpcEmitter struct {
client eventsapi.ClientEventsServiceClient
conn *grpc.ClientConn
}
// NewGrpcEmitter creates a new GrpcEmitter
func NewGrpcEmitter(conn *grpc.ClientConn) *GrpcEmitter {
client := eventsapi.NewClientEventsServiceClient(conn)
return &GrpcEmitter{client}
return &GrpcEmitter{client, conn}
}
func (em *GrpcEmitter) LogEvents(ctx context.Context, version string, evts []*eventsapi.ClientEvent) error {