From e4aa6ffad86dcb2aade0baeb85712e01718b1ca4 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 26 Sep 2023 14:22:24 -0700 Subject: [PATCH] go/libraries/doltcore/sqle/cluster: First pass at replicating drop database. --- go/cmd/dolt/commands/engine/sqlengine.go | 26 ++++-- .../eventsapi/v1alpha1/client_event.pb.go | 5 +- .../v1alpha1/client_event_grpc.pb.go | 1 + .../eventsapi/v1alpha1/event_constants.pb.go | 5 +- .../remotesapi/v1alpha1/chunkstore.pb.go | 5 +- .../remotesapi/v1alpha1/chunkstore_grpc.pb.go | 1 + .../remotesapi/v1alpha1/credentials.pb.go | 5 +- .../v1alpha1/credentials_grpc.pb.go | 1 + .../replicationapi/v1alpha1/replication.pb.go | 5 +- .../v1alpha1/replication_grpc.pb.go | 1 + .../doltcore/sqle/cluster/controller.go | 59 ++++++++++++- .../sqle/cluster/replication_service.go | 17 ++++ .../tests/sql-server-cluster.yaml | 88 +++++++++++++++++++ 13 files changed, 201 insertions(+), 18 deletions(-) diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 57e289b7ef..de99d697cf 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -135,7 +135,19 @@ func NewSqlEngine( config.ClusterController.RegisterStoredProcedures(pro) pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook) - pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook + + sqlEngine := &SqlEngine{} + + var dropDatabaseProvider = func(ctx context.Context, name string) error { + sqlCtx, err := sqlEngine.NewDefaultContext(ctx) + if err != nil { + return err + } + return pro.DropDatabase(sqlCtx, name) + } + + config.ClusterController.SetDropDatabaseProvider(dropDatabaseProvider) + pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook() // Create the engine engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{ @@ -223,12 +235,12 @@ func NewSqlEngine( } } - return &SqlEngine{ - provider: pro, - contextFactory: sqlContextFactory(), - dsessFactory: sessFactory, - engine: engine, - }, nil + sqlEngine.provider = pro + sqlEngine.contextFactory = sqlContextFactory() + sqlEngine.dsessFactory = sessFactory + sqlEngine.engine = engine + + return sqlEngine, nil } // NewRebasedSqlEngine returns a smalled rebased engine primarily used in filterbranch. diff --git a/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event.pb.go b/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event.pb.go index 50e349ba74..6e09d77b48 100644 --- a/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event.pb.go +++ b/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event.pb.go @@ -23,12 +23,13 @@ package eventsapi import ( + reflect "reflect" + sync "sync" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" durationpb "google.golang.org/protobuf/types/known/durationpb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" - reflect "reflect" - sync "sync" ) const ( diff --git a/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event_grpc.pb.go b/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event_grpc.pb.go index 085a9fda2b..8a09ad1cf6 100644 --- a/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event_grpc.pb.go +++ b/go/gen/proto/dolt/services/eventsapi/v1alpha1/client_event_grpc.pb.go @@ -24,6 +24,7 @@ package eventsapi import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/go/gen/proto/dolt/services/eventsapi/v1alpha1/event_constants.pb.go b/go/gen/proto/dolt/services/eventsapi/v1alpha1/event_constants.pb.go index 9e558db062..188068cfc1 100644 --- a/go/gen/proto/dolt/services/eventsapi/v1alpha1/event_constants.pb.go +++ b/go/gen/proto/dolt/services/eventsapi/v1alpha1/event_constants.pb.go @@ -23,10 +23,11 @@ package eventsapi import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go index 0fc8ae7e15..dde8ed815b 100644 --- a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go +++ b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go @@ -21,11 +21,12 @@ package remotesapi import ( + reflect "reflect" + sync "sync" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" timestamppb "google.golang.org/protobuf/types/known/timestamppb" - reflect "reflect" - sync "sync" ) const ( diff --git a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore_grpc.pb.go b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore_grpc.pb.go index 828da30d62..28cfc20104 100644 --- a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore_grpc.pb.go +++ b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore_grpc.pb.go @@ -22,6 +22,7 @@ package remotesapi import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials.pb.go b/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials.pb.go index 8164b2bb9a..7c2e689b33 100644 --- a/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials.pb.go +++ b/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials.pb.go @@ -21,10 +21,11 @@ package remotesapi import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials_grpc.pb.go b/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials_grpc.pb.go index e859388139..8ae0dfc1eb 100644 --- a/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials_grpc.pb.go +++ b/go/gen/proto/dolt/services/remotesapi/v1alpha1/credentials_grpc.pb.go @@ -22,6 +22,7 @@ package remotesapi import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go index c0dd217790..6ac797942a 100644 --- a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go +++ b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication.pb.go @@ -21,10 +21,11 @@ package replicationapi import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go index 3af833933d..275cc34a39 100644 --- a/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go +++ b/go/gen/proto/dolt/services/replicationapi/v1alpha1/replication_grpc.pb.go @@ -22,6 +22,7 @@ package replicationapi import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index a5bc5f8e0b..963cd97407 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -36,7 +36,9 @@ import ( gmstypes "github.com/dolthub/go-mysql-server/sql/types" "github.com/sirupsen/logrus" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/status" replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" @@ -94,6 +96,8 @@ type Controller struct { branchControlController *branch_control.Controller branchControlFilesys filesys.Filesys bcReplication *branchControlReplication + + dropDatabaseProvider func(context.Context, string) error } type sqlvars interface { @@ -310,12 +314,27 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) { store.Register(newTransitionToStandbyProcedure(c)) } -func (c *Controller) DropDatabaseHook(dbname string) { +func (c *Controller) SetDropDatabaseProvider(dropDatabaseProvider func(context.Context, string) error) { if c == nil { return } c.mu.Lock() defer c.mu.Unlock() + c.dropDatabaseProvider = dropDatabaseProvider +} + +func (c *Controller) DropDatabaseHook() func(string) { + if c == nil { + return nil + } + return c.dropDatabaseHook +} + +func (c *Controller) dropDatabaseHook(dbname string) { + c.mu.Lock() + defer c.mu.Unlock() + + // We always cleanup the commithooks associated with that database. j := 0 for i := 0; i < len(c.commithooks); i++ { @@ -329,6 +348,43 @@ func (c *Controller) DropDatabaseHook(dbname string) { j += 1 } c.commithooks = c.commithooks[:j] + + if c.role != RolePrimary { + return + } + + // If we are the primary, we will replicate the drop to our standby replicas. + + for _, client := range c.replicationClients { + client := client + go c.replicateDropDatabase(client, dbname) + } +} + +func (c *Controller) replicateDropDatabase(client *replicationServiceClient, dbname string) { + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = time.Millisecond + bo.MaxInterval = time.Minute + bo.MaxElapsedTime = 0 + for { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + _, err := client.client.DropDatabase(ctx, &replicationapi.DropDatabaseRequest{ + Name: dbname, + }) + cancel() + if err == nil { + c.lgr.Tracef("successfully replicated drop of [%s] to %s", dbname, client.remote) + return + } + if status.Code(err) == codes.FailedPrecondition { + c.lgr.Warnf("drop of [%s] to %s will note be replicated; FailedPrecondition", dbname, client.remote) + return + } + c.lgr.Warnf("failed to replicate drop of [%s] to %s: %v", dbname, client.remote, err) + d := bo.NextBackOff() + c.lgr.Tracef("sleeping %v before next drop attempt for database [%s] at %s", d, dbname, client.remote) + time.Sleep(d) + } } func (c *Controller) ClusterDatabase() sql.Database { @@ -639,6 +695,7 @@ func (c *Controller) RegisterGrpcServices(srv *grpc.Server) { mysqlDb: c.mysqlDb, branchControl: c.branchControlController, branchControlFilesys: c.branchControlFilesys, + dropDatabaseProvider: c.dropDatabaseProvider, lgr: c.lgr.WithFields(logrus.Fields{}), }) } diff --git a/go/libraries/doltcore/sqle/cluster/replication_service.go b/go/libraries/doltcore/sqle/cluster/replication_service.go index 5e569f5a61..0a9fac6676 100644 --- a/go/libraries/doltcore/sqle/cluster/replication_service.go +++ b/go/libraries/doltcore/sqle/cluster/replication_service.go @@ -20,6 +20,8 @@ import ( "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/mysql_db" "github.com/sirupsen/logrus" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/utils/filesys" @@ -38,6 +40,8 @@ type replicationServiceServer struct { branchControl BranchControlPersistence branchControlFilesys filesys.Filesys + + dropDatabaseProvider func(context.Context, string) error } func (s *replicationServiceServer) UpdateUsersAndGrants(ctx context.Context, req *replicationapi.UpdateUsersAndGrantsRequest) (*replicationapi.UpdateUsersAndGrantsResponse, error) { @@ -66,3 +70,16 @@ func (s *replicationServiceServer) UpdateBranchControl(ctx context.Context, req } return &replicationapi.UpdateBranchControlResponse{}, nil } + +func (s *replicationServiceServer) DropDatabase(ctx context.Context, req *replicationapi.DropDatabaseRequest) (*replicationapi.DropDatabaseResponse, error) { + if s.dropDatabaseProvider == nil { + return nil, status.Error(codes.Unimplemented, "unimplemented") + } + + err := s.dropDatabaseProvider(ctx, req.Name) + s.lgr.Tracef("dropped database [%s] through dropDatabaseProvider. err: %v", req.Name, err) + if err != nil && !sql.ErrDatabaseNotFound.Is(err) { + return nil, err + } + return &replicationapi.DropDatabaseResponse{}, nil +} diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index 8ffc7a8d0d..635d78a9cb 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -1482,3 +1482,91 @@ tests: result: columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] rows: [] +- name: dropped database is no longer present on replica + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'SET @@GLOBAL.dolt_cluster_ack_writes_timeout_secs = 10' + - exec: 'create database repo1' + - exec: 'use repo1' + - exec: 'create table vals (i int primary key)' + - exec: 'create database repo2' + - exec: 'use repo2' + - exec: 'create table vals (i int primary key)' + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: + - ["repo1", "standby", "primary", "1", "1", "NULL"] + - ["repo2", "standby", "primary", "1", "1", "NULL"] + - exec: 'use repo1' + - exec: 'drop database repo2' + - exec: 'insert into vals values (0),(1),(2),(3),(4)' + - query: 'show databases' + result: + columns: ["Database"] + rows: + - ["dolt_cluster"] + - ["information_schema"] + - ["mysql"] + - ["repo1"] + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: + - ["repo1", "standby", "primary", "1", "1", "NULL"] + - on: server2 + queries: + - query: 'show databases' + result: + columns: ["Database"] + rows: + - ["dolt_cluster"] + - ["information_schema"] + - ["mysql"] + - ["repo1"] + retry_attempts: 100 + - query: "select `database`, standby_remote, role, epoch from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch",] + rows: + - ["repo1", "standby", "standby", "1"]