mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-28 10:19:56 -06:00
go/libraries/doltcore/sqle: cluster: When a database is dropped, shutdown the replication goroutines and stop returning its status in the dolt_cluster_status table.
This commit is contained in:
@@ -130,6 +130,7 @@ func NewSqlEngine(
|
||||
|
||||
config.ClusterController.RegisterStoredProcedures(pro)
|
||||
pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook)
|
||||
pro.DropDatabaseHook = config.ClusterController.DropDatabaseHook
|
||||
|
||||
// Create the engine
|
||||
engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{
|
||||
|
||||
@@ -43,6 +43,7 @@ type commithook struct {
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
cond *sync.Cond
|
||||
shutdown atomic.Bool
|
||||
nextHead hash.Hash
|
||||
lastPushedHead hash.Hash
|
||||
nextPushAttempt time.Time
|
||||
@@ -126,7 +127,7 @@ func (h *commithook) replicate(ctx context.Context) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
shouldHeartbeat := false
|
||||
for {
|
||||
for !h.shutdown.Load() {
|
||||
lgr := h.logger()
|
||||
// Shutdown for context canceled.
|
||||
if ctx.Err() != nil {
|
||||
@@ -371,7 +372,7 @@ func (h *commithook) tick(ctx context.Context) {
|
||||
defer h.wg.Done()
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
for !h.shutdown.Load() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -381,6 +382,11 @@ func (h *commithook) tick(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *commithook) databaseWasDropped() {
|
||||
h.shutdown.Store(true)
|
||||
h.cond.Signal()
|
||||
}
|
||||
|
||||
func (h *commithook) recordSuccessfulRemoteSrvCommit() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
@@ -310,6 +310,27 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) {
|
||||
store.Register(newTransitionToStandbyProcedure(c))
|
||||
}
|
||||
|
||||
func (c *Controller) DropDatabaseHook(dbname string) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
j := 0
|
||||
for i := 0; i < len(c.commithooks); i++ {
|
||||
if c.commithooks[i].dbname == dbname {
|
||||
c.commithooks[i].databaseWasDropped()
|
||||
continue
|
||||
}
|
||||
if j != i {
|
||||
c.commithooks[j] = c.commithooks[i]
|
||||
}
|
||||
j += 1
|
||||
}
|
||||
c.commithooks = c.commithooks[:j]
|
||||
}
|
||||
|
||||
func (c *Controller) ClusterDatabase() sql.Database {
|
||||
if c == nil {
|
||||
return nil
|
||||
|
||||
@@ -48,6 +48,7 @@ type DoltDatabaseProvider struct {
|
||||
functions map[string]sql.Function
|
||||
externalProcedures sql.ExternalStoredProcedureRegistry
|
||||
InitDatabaseHook InitDatabaseHook
|
||||
DropDatabaseHook DropDatabaseHook
|
||||
mu *sync.RWMutex
|
||||
|
||||
defaultBranch string
|
||||
@@ -446,6 +447,7 @@ func (p DoltDatabaseProvider) CreateCollatedDatabase(ctx *sql.Context, name stri
|
||||
}
|
||||
|
||||
type InitDatabaseHook func(ctx *sql.Context, pro DoltDatabaseProvider, name string, env *env.DoltEnv) error
|
||||
type DropDatabaseHook func(name string)
|
||||
|
||||
// ConfigureReplicationDatabaseHook sets up replication for a newly created database as necessary
|
||||
// TODO: consider the replication heads / all heads setting
|
||||
@@ -635,6 +637,12 @@ func (p DoltDatabaseProvider) DropDatabase(ctx *sql.Context, name string) error
|
||||
return err
|
||||
}
|
||||
|
||||
if p.DropDatabaseHook != nil {
|
||||
// For symmetry with InitDatabaseHook and the names we see in
|
||||
// MultiEnv initialization, we use `name` here, not `dbKey`.
|
||||
p.DropDatabaseHook(name)
|
||||
}
|
||||
|
||||
rootDbLoc, err := p.fs.Abs("")
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -1417,3 +1417,68 @@ tests:
|
||||
- exec: 'call dolt_gc()'
|
||||
error_match: "must be the primary"
|
||||
- exec: 'call dolt_gc("--shallow")'
|
||||
- name: dropped database no longer in dolt_cluster_status
|
||||
multi_repos:
|
||||
- name: server1
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3309
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3852/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3851
|
||||
server:
|
||||
args: ["--config", "server.yaml"]
|
||||
port: 3309
|
||||
- name: server2
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3310
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3851/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3852
|
||||
server:
|
||||
args: ["--config", "server.yaml"]
|
||||
port: 3310
|
||||
connections:
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: 'create database repo1'
|
||||
- exec: 'use repo1'
|
||||
- exec: 'SET @@GLOBAL.dolt_cluster_ack_writes_timeout_secs = 10'
|
||||
- exec: 'create table vals (i int primary key)'
|
||||
- query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status"
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"]
|
||||
rows:
|
||||
- ["repo1", "standby", "primary", "1", "1", "NULL"]
|
||||
- exec: 'use dolt_cluster'
|
||||
- exec: 'drop database repo1'
|
||||
- query: 'show databases'
|
||||
result:
|
||||
columns: ["Database"]
|
||||
rows:
|
||||
- ["dolt_cluster"]
|
||||
- ["information_schema"]
|
||||
- ["mysql"]
|
||||
- query: "select `database`, standby_remote, role, epoch, replication_lag_millis is not null as `replication_lag_millis`, current_error from dolt_cluster.dolt_cluster_status"
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"]
|
||||
rows: []
|
||||
|
||||
Reference in New Issue
Block a user