diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index fd8602ad2e..f87e4819f2 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -130,6 +130,7 @@ func NewSqlEngine( config.ClusterController.RegisterStoredProcedures(pro) pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook) + pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook // Create the engine engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{ diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index b917522b40..3dfb169fbd 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -43,6 +43,7 @@ type commithook struct { mu sync.Mutex wg sync.WaitGroup cond *sync.Cond + shutdown atomic.Bool nextHead hash.Hash lastPushedHead hash.Hash nextPushAttempt time.Time @@ -126,7 +127,7 @@ func (h *commithook) replicate(ctx context.Context) { h.mu.Lock() defer h.mu.Unlock() shouldHeartbeat := false - for { + for !h.shutdown.Load() { lgr := h.logger() // Shutdown for context canceled. if ctx.Err() != nil { @@ -371,7 +372,7 @@ func (h *commithook) tick(ctx context.Context) { defer h.wg.Done() ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() - for { + for !h.shutdown.Load() { select { case <-ctx.Done(): return @@ -381,6 +382,11 @@ func (h *commithook) tick(ctx context.Context) { } } +func (h *commithook) databaseWasDropped() { + h.shutdown.Store(true) + h.cond.Signal() +} + func (h *commithook) recordSuccessfulRemoteSrvCommit() { h.mu.Lock() defer h.mu.Unlock() diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 4d50d44480..a5bc5f8e0b 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -310,6 +310,27 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) { store.Register(newTransitionToStandbyProcedure(c)) } +func (c *Controller) DropDatabaseHook(dbname string) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + + j := 0 + for i := 0; i < len(c.commithooks); i++ { + if c.commithooks[i].dbname == dbname { + c.commithooks[i].databaseWasDropped() + continue + } + if j != i { + c.commithooks[j] = c.commithooks[i] + } + j += 1 + } + c.commithooks = c.commithooks[:j] +} + func (c *Controller) ClusterDatabase() sql.Database { if c == nil { return nil diff --git a/go/libraries/doltcore/sqle/database_provider.go b/go/libraries/doltcore/sqle/database_provider.go index 260b2ab600..224d599b46 100644 --- a/go/libraries/doltcore/sqle/database_provider.go +++ b/go/libraries/doltcore/sqle/database_provider.go @@ -48,6 +48,7 @@ type DoltDatabaseProvider struct { functions map[string]sql.Function externalProcedures sql.ExternalStoredProcedureRegistry InitDatabaseHook InitDatabaseHook + DropDatabaseHook DropDatabaseHook mu *sync.RWMutex defaultBranch string @@ -446,6 +447,7 @@ func (p DoltDatabaseProvider) CreateCollatedDatabase(ctx *sql.Context, name stri } type InitDatabaseHook func(ctx *sql.Context, pro DoltDatabaseProvider, name string, env *env.DoltEnv) error +type DropDatabaseHook func(name string) // ConfigureReplicationDatabaseHook sets up replication for a newly created database as necessary // TODO: consider the replication heads / all heads setting @@ -635,6 +637,12 @@ func (p DoltDatabaseProvider) DropDatabase(ctx *sql.Context, name string) error return err } + if p.DropDatabaseHook != nil { + // For symmetry with InitDatabaseHook and the names we see in + // MultiEnv initialization, we use `name` here, not `dbKey`. + p.DropDatabaseHook(name) + } + rootDbLoc, err := p.fs.Abs("") if err != nil { return err diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index 638747ffd3..8ffc7a8d0d 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -1417,3 +1417,68 @@ tests: - exec: 'call dolt_gc()' error_match: "must be the primary" - exec: 'call dolt_gc("--shallow")' +- name: dropped database no longer in dolt_cluster_status + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'create database repo1' + - exec: 'use repo1' + - exec: 'SET @@GLOBAL.dolt_cluster_ack_writes_timeout_secs = 10' + - exec: 'create table vals (i int primary key)' + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: + - ["repo1", "standby", "primary", "1", "1", "NULL"] + - exec: 'use dolt_cluster' + - exec: 'drop database repo1' + - query: 'show databases' + result: + columns: ["Database"] + rows: + - ["dolt_cluster"] + - ["information_schema"] + - ["mysql"] + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: []