mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-28 10:19:56 -06:00
go/libraries/doltcore/sqle/cluster: First pass at replicating drop database.
This commit is contained in:
@@ -135,7 +135,19 @@ func NewSqlEngine(
|
||||
|
||||
config.ClusterController.RegisterStoredProcedures(pro)
|
||||
pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook)
|
||||
pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook
|
||||
|
||||
sqlEngine := &SqlEngine{}
|
||||
|
||||
var dropDatabaseProvider = func(ctx context.Context, name string) error {
|
||||
sqlCtx, err := sqlEngine.NewDefaultContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return pro.DropDatabase(sqlCtx, name)
|
||||
}
|
||||
|
||||
config.ClusterController.SetDropDatabaseProvider(dropDatabaseProvider)
|
||||
pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook()
|
||||
|
||||
// Create the engine
|
||||
engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{
|
||||
@@ -223,12 +235,12 @@ func NewSqlEngine(
|
||||
}
|
||||
}
|
||||
|
||||
return &SqlEngine{
|
||||
provider: pro,
|
||||
contextFactory: sqlContextFactory(),
|
||||
dsessFactory: sessFactory,
|
||||
engine: engine,
|
||||
}, nil
|
||||
sqlEngine.provider = pro
|
||||
sqlEngine.contextFactory = sqlContextFactory()
|
||||
sqlEngine.dsessFactory = sessFactory
|
||||
sqlEngine.engine = engine
|
||||
|
||||
return sqlEngine, nil
|
||||
}
|
||||
|
||||
// NewRebasedSqlEngine returns a smalled rebased engine primarily used in filterbranch.
|
||||
|
||||
@@ -23,12 +23,13 @@
|
||||
package eventsapi
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
durationpb "google.golang.org/protobuf/types/known/durationpb"
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -24,6 +24,7 @@ package eventsapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -23,10 +23,11 @@
|
||||
package eventsapi
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -21,11 +21,12 @@
|
||||
package remotesapi
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -22,6 +22,7 @@ package remotesapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -21,10 +21,11 @@
|
||||
package remotesapi
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -22,6 +22,7 @@ package remotesapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -21,10 +21,11 @@
|
||||
package replicationapi
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -22,6 +22,7 @@ package replicationapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -36,7 +36,9 @@ import (
|
||||
gmstypes "github.com/dolthub/go-mysql-server/sql/types"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
|
||||
@@ -94,6 +96,8 @@ type Controller struct {
|
||||
branchControlController *branch_control.Controller
|
||||
branchControlFilesys filesys.Filesys
|
||||
bcReplication *branchControlReplication
|
||||
|
||||
dropDatabaseProvider func(context.Context, string) error
|
||||
}
|
||||
|
||||
type sqlvars interface {
|
||||
@@ -310,12 +314,27 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) {
|
||||
store.Register(newTransitionToStandbyProcedure(c))
|
||||
}
|
||||
|
||||
func (c *Controller) DropDatabaseHook(dbname string) {
|
||||
func (c *Controller) SetDropDatabaseProvider(dropDatabaseProvider func(context.Context, string) error) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.dropDatabaseProvider = dropDatabaseProvider
|
||||
}
|
||||
|
||||
func (c *Controller) DropDatabaseHook() func(string) {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
return c.dropDatabaseHook
|
||||
}
|
||||
|
||||
func (c *Controller) dropDatabaseHook(dbname string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// We always cleanup the commithooks associated with that database.
|
||||
|
||||
j := 0
|
||||
for i := 0; i < len(c.commithooks); i++ {
|
||||
@@ -329,6 +348,43 @@ func (c *Controller) DropDatabaseHook(dbname string) {
|
||||
j += 1
|
||||
}
|
||||
c.commithooks = c.commithooks[:j]
|
||||
|
||||
if c.role != RolePrimary {
|
||||
return
|
||||
}
|
||||
|
||||
// If we are the primary, we will replicate the drop to our standby replicas.
|
||||
|
||||
for _, client := range c.replicationClients {
|
||||
client := client
|
||||
go c.replicateDropDatabase(client, dbname)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) replicateDropDatabase(client *replicationServiceClient, dbname string) {
|
||||
bo := backoff.NewExponentialBackOff()
|
||||
bo.InitialInterval = time.Millisecond
|
||||
bo.MaxInterval = time.Minute
|
||||
bo.MaxElapsedTime = 0
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||
_, err := client.client.DropDatabase(ctx, &replicationapi.DropDatabaseRequest{
|
||||
Name: dbname,
|
||||
})
|
||||
cancel()
|
||||
if err == nil {
|
||||
c.lgr.Tracef("successfully replicated drop of [%s] to %s", dbname, client.remote)
|
||||
return
|
||||
}
|
||||
if status.Code(err) == codes.FailedPrecondition {
|
||||
c.lgr.Warnf("drop of [%s] to %s will note be replicated; FailedPrecondition", dbname, client.remote)
|
||||
return
|
||||
}
|
||||
c.lgr.Warnf("failed to replicate drop of [%s] to %s: %v", dbname, client.remote, err)
|
||||
d := bo.NextBackOff()
|
||||
c.lgr.Tracef("sleeping %v before next drop attempt for database [%s] at %s", d, dbname, client.remote)
|
||||
time.Sleep(d)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) ClusterDatabase() sql.Database {
|
||||
@@ -639,6 +695,7 @@ func (c *Controller) RegisterGrpcServices(srv *grpc.Server) {
|
||||
mysqlDb: c.mysqlDb,
|
||||
branchControl: c.branchControlController,
|
||||
branchControlFilesys: c.branchControlFilesys,
|
||||
dropDatabaseProvider: c.dropDatabaseProvider,
|
||||
lgr: c.lgr.WithFields(logrus.Fields{}),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ import (
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/dolthub/go-mysql-server/sql/mysql_db"
|
||||
"github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
replicationapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/replicationapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
@@ -38,6 +40,8 @@ type replicationServiceServer struct {
|
||||
|
||||
branchControl BranchControlPersistence
|
||||
branchControlFilesys filesys.Filesys
|
||||
|
||||
dropDatabaseProvider func(context.Context, string) error
|
||||
}
|
||||
|
||||
func (s *replicationServiceServer) UpdateUsersAndGrants(ctx context.Context, req *replicationapi.UpdateUsersAndGrantsRequest) (*replicationapi.UpdateUsersAndGrantsResponse, error) {
|
||||
@@ -66,3 +70,16 @@ func (s *replicationServiceServer) UpdateBranchControl(ctx context.Context, req
|
||||
}
|
||||
return &replicationapi.UpdateBranchControlResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *replicationServiceServer) DropDatabase(ctx context.Context, req *replicationapi.DropDatabaseRequest) (*replicationapi.DropDatabaseResponse, error) {
|
||||
if s.dropDatabaseProvider == nil {
|
||||
return nil, status.Error(codes.Unimplemented, "unimplemented")
|
||||
}
|
||||
|
||||
err := s.dropDatabaseProvider(ctx, req.Name)
|
||||
s.lgr.Tracef("dropped database [%s] through dropDatabaseProvider. err: %v", req.Name, err)
|
||||
if err != nil && !sql.ErrDatabaseNotFound.Is(err) {
|
||||
return nil, err
|
||||
}
|
||||
return &replicationapi.DropDatabaseResponse{}, nil
|
||||
}
|
||||
|
||||
@@ -1482,3 +1482,91 @@ tests:
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"]
|
||||
rows: []
|
||||
- name: dropped database is no longer present on replica
|
||||
multi_repos:
|
||||
- name: server1
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3309
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3852/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3851
|
||||
server:
|
||||
args: ["--config", "server.yaml"]
|
||||
port: 3309
|
||||
- name: server2
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3310
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3851/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3852
|
||||
server:
|
||||
args: ["--config", "server.yaml"]
|
||||
port: 3310
|
||||
connections:
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: 'SET @@GLOBAL.dolt_cluster_ack_writes_timeout_secs = 10'
|
||||
- exec: 'create database repo1'
|
||||
- exec: 'use repo1'
|
||||
- exec: 'create table vals (i int primary key)'
|
||||
- exec: 'create database repo2'
|
||||
- exec: 'use repo2'
|
||||
- exec: 'create table vals (i int primary key)'
|
||||
- query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status"
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"]
|
||||
rows:
|
||||
- ["repo1", "standby", "primary", "1", "1", "NULL"]
|
||||
- ["repo2", "standby", "primary", "1", "1", "NULL"]
|
||||
- exec: 'use repo1'
|
||||
- exec: 'drop database repo2'
|
||||
- exec: 'insert into vals values (0),(1),(2),(3),(4)'
|
||||
- query: 'show databases'
|
||||
result:
|
||||
columns: ["Database"]
|
||||
rows:
|
||||
- ["dolt_cluster"]
|
||||
- ["information_schema"]
|
||||
- ["mysql"]
|
||||
- ["repo1"]
|
||||
- query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status"
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"]
|
||||
rows:
|
||||
- ["repo1", "standby", "primary", "1", "1", "NULL"]
|
||||
- on: server2
|
||||
queries:
|
||||
- query: 'show databases'
|
||||
result:
|
||||
columns: ["Database"]
|
||||
rows:
|
||||
- ["dolt_cluster"]
|
||||
- ["information_schema"]
|
||||
- ["mysql"]
|
||||
- ["repo1"]
|
||||
retry_attempts: 100
|
||||
- query: "select `database`, standby_remote, role, epoch from dolt_cluster.dolt_cluster_status"
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch",]
|
||||
rows:
|
||||
- ["repo1", "standby", "standby", "1"]
|
||||
|
||||
Reference in New Issue
Block a user