Moved emitter constructor logic out of events packge, where it causes a cycle

This commit is contained in:
Zach Musgrave
2023-12-04 16:22:48 -08:00
parent a7b31a3736
commit 41637d5078
4 changed files with 86 additions and 81 deletions

View File

@@ -16,6 +16,8 @@ package commands
import (
"context"
"fmt"
"os"
"strconv"
"time"
@@ -23,10 +25,12 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/dolthub/dolt/go/libraries/events"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/fatih/color"
"google.golang.org/grpc"
)
// SendMetricsCommand is the command used for sending metrics
@@ -127,7 +131,7 @@ func (cmd SendMetricsCmd) Exec(ctx context.Context, commandStr string, args []st
// FlushLoggedEvents flushes any logged events in the directory given to an appropriate event emitter
func FlushLoggedEvents(ctx context.Context, dEnv *env.DoltEnv, userHomeDir string, outputType string) error {
emitter, err := events.NewEmitter(outputType, dEnv)
emitter, err := NewEmitter(outputType, dEnv)
if err != nil {
return err
}
@@ -136,3 +140,71 @@ func FlushLoggedEvents(ctx context.Context, dEnv *env.DoltEnv, userHomeDir strin
return flusher.Flush(ctx)
}
// NewEmitter returns an emitter for the given configuration provider, of the type named. If an empty name is provided,
// defaults to a file-based emitter.
func NewEmitter(emitterType string, pro EmitterConfigProvider) (events.Emitter, error) {
switch emitterType {
case events.EmitterTypeNull:
return events.NullEmitter{}, nil
case events.EmitterTypeStdout:
return events.WriterEmitter{Wr: os.Stdout}, nil
case events.EmitterTypeGrpc:
return GRPCEmitterForConfig(pro), nil
case events.EmitterTypeFile:
homeDir, err := pro.GetUserHomeDir()
if err != nil {
return nil, err
}
return events.NewFileEmitter(homeDir, dbfactory.DoltDir), nil
default:
return nil, fmt.Errorf("unknown emitter type: %s", emitterType)
}
}
// GRPCEmitterForConfig returns an event emitter for the given environment, or nil if the environment cannot
// provide one
func GRPCEmitterForConfig(pro EmitterConfigProvider) *events.GrpcEmitter {
cfg, err := GRPCEventRemoteConfig(pro)
if err != nil {
return nil
}
conn, err := grpc.Dial(cfg.Endpoint, cfg.DialOptions...)
if err != nil {
return nil
}
return events.NewGrpcEmitter(conn)
}
// GRPCEventRemoteConfig returns a GRPCRemoteConfig for the given configuration provider
func GRPCEventRemoteConfig(pro EmitterConfigProvider) (dbfactory.GRPCRemoteConfig, error) {
host := pro.GetConfig().GetStringOrDefault(config.MetricsHost, events.DefaultMetricsHost)
portStr := pro.GetConfig().GetStringOrDefault(config.MetricsPort, events.DefaultMetricsPort)
insecureStr := pro.GetConfig().GetStringOrDefault(config.MetricsInsecure, "false")
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return dbfactory.GRPCRemoteConfig{}, nil
}
insecure, _ := strconv.ParseBool(insecureStr)
hostAndPort := fmt.Sprintf("%s:%d", host, port)
cfg, err := pro.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: hostAndPort,
Insecure: insecure,
})
if err != nil {
return dbfactory.GRPCRemoteConfig{}, nil
}
return cfg, nil
}
// EmitterConfigProvider is an interface used to get the configuration to create an emitter
type EmitterConfigProvider interface {
GetGRPCDialParams(config grpcendpoint.Config) (dbfactory.GRPCRemoteConfig, error)
GetConfig() config.ReadableConfig
GetUserHomeDir() (string, error)
}

View File

@@ -590,11 +590,6 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
emitterType = events.EmitterTypeGrpc
}
emitter, err := events.NewEmitter(emitterType, dEnv)
if err != nil {
return &heartbeatService{} // will be defunct on Run()
}
interval, ok := os.LookupEnv(sqlServerHeartbeatIntervalEnvVar)
if !ok {
interval = "24h"
@@ -602,7 +597,12 @@ func newHeartbeatService(version string, dEnv *env.DoltEnv) *heartbeatService {
duration, err := time.ParseDuration(interval)
if err != nil {
return nil
return &heartbeatService{} // will be defunct on Run()
}
emitter, err := commands.NewEmitter(emitterType, dEnv)
if err != nil {
return &heartbeatService{} // will be defunct on Run()
}
return &heartbeatService{

View File

@@ -19,6 +19,7 @@ import (
"fmt"
"strings"
eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/fatih/color"
@@ -72,6 +73,7 @@ func (cmd StatusCmd) RequiresRepo() bool {
}
var _ cli.RepoNotRequiredCommand = StatusCmd{}
var _ cli.EventMonitoredCommand = StatusCmd{}
// Name is returns the name of the Dolt cli command. This is what is used on the command line to invoke the command
func (cmd StatusCmd) Name() string {
@@ -94,6 +96,10 @@ func (cmd StatusCmd) ArgParser() *argparser.ArgParser {
return ap
}
func (cmd StatusCmd) EventType() eventsapi.ClientEventType {
return eventsapi.ClientEventType_STATUS
}
// Exec executes the command
func (cmd StatusCmd) Exec(ctx context.Context, commandStr string, args []string, _ *env.DoltEnv, cliCtx cli.CliContext) int {
// parse arguments

View File

@@ -18,15 +18,10 @@ import (
"context"
"fmt"
"io"
"os"
"runtime"
"strconv"
"strings"
"time"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/grpcendpoint"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/fatih/color"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/prototext"
@@ -59,13 +54,6 @@ type Emitter interface {
LogEventsRequest(ctx context.Context, req *eventsapi.LogEventsRequest) error
}
// EmitterConfigProvider is an interface used to get the configuration to create an emitter
type EmitterConfigProvider interface {
GetGRPCDialParams(config grpcendpoint.Config) (dbfactory.GRPCRemoteConfig, error)
GetConfig() config.ReadableConfig
GetUserHomeDir() (string, error)
}
// NullEmitter is an emitter that drops events
type NullEmitter struct{}
@@ -198,65 +186,4 @@ func (fe *FileEmitter) LogEventsRequest(ctx context.Context, req *eventsapi.LogE
}
return nil
}
// NewEmitter returns an emitter for the given configuration provider, of the type named. If an empty name is provided,
// defaults to a file-based emitter.
func NewEmitter(emitterType string, pro EmitterConfigProvider) (Emitter, error) {
switch emitterType {
case EmitterTypeNull:
return NullEmitter{}, nil
case EmitterTypeStdout:
return WriterEmitter{Wr: os.Stdout}, nil
case EmitterTypeGrpc:
return GRPCEmitterForConfig(pro), nil
case EmitterTypeFile:
homeDir, err := pro.GetUserHomeDir()
if err != nil {
return nil, err
}
return NewFileEmitter(homeDir, dbfactory.DoltDir), nil
default:
return nil, fmt.Errorf("unknown emitter type: %s", emitterType)
}
}
// GRPCEmitterForConfig returns an event emitter for the given environment, or nil if the environment cannot
// provide one
func GRPCEmitterForConfig(pro EmitterConfigProvider) *GrpcEmitter {
cfg, err := GRPCEventRemoteConfig(pro)
if err != nil {
return nil
}
conn, err := grpc.Dial(cfg.Endpoint, cfg.DialOptions...)
if err != nil {
return nil
}
return NewGrpcEmitter(conn)
}
// GRPCEventRemoteConfig returns a GRPCRemoteConfig for the given configuration provider
func GRPCEventRemoteConfig(pro EmitterConfigProvider) (dbfactory.GRPCRemoteConfig, error) {
host := pro.GetConfig().GetStringOrDefault(config.MetricsHost, DefaultMetricsHost)
portStr := pro.GetConfig().GetStringOrDefault(config.MetricsPort, DefaultMetricsPort)
insecureStr := pro.GetConfig().GetStringOrDefault(config.MetricsInsecure, "false")
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return dbfactory.GRPCRemoteConfig{}, nil
}
insecure, _ := strconv.ParseBool(insecureStr)
hostAndPort := fmt.Sprintf("%s:%d", host, port)
cfg, err := pro.GetGRPCDialParams(grpcendpoint.Config{
Endpoint: hostAndPort,
Insecure: insecure,
})
if err != nil {
return dbfactory.GRPCRemoteConfig{}, nil
}
return cfg, nil
}
}