diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 0f2eabc84f..34539696f7 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -266,11 +266,18 @@ func Serve( clusterRemoteSrv.Serve(listeners) }() } + + clusterController.ManageQueryConnections( + mySQLServer.SessionManager().Iter, + sqlEngine.GetUnderlyingEngine().ProcessList.Kill, + mySQLServer.SessionManager().KillConnection, + ) } else { lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err) startError = err return } + } if ok, f := mrEnv.IsLocked(); ok { diff --git a/go/libraries/doltcore/sqle/cluster/assume_role.go b/go/libraries/doltcore/sqle/cluster/assume_role.go index 52ea6cf7e6..8291dc1141 100644 --- a/go/libraries/doltcore/sqle/cluster/assume_role.go +++ b/go/libraries/doltcore/sqle/cluster/assume_role.go @@ -29,7 +29,7 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD }, }, Function: func(ctx *sql.Context, role string, epoch int) (sql.RowIter, error) { - err := controller.setRoleAndEpoch(role, epoch) + err := controller.setRoleAndEpoch(role, epoch, true /* graceful */) if err != nil { return nil, err } diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 9014fb56c7..0f5c26c06f 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -52,6 +52,10 @@ type Controller struct { sinterceptor serverinterceptor cinterceptor clientinterceptor lgr *logrus.Logger + + iterSessions IterSessions + killQuery func(uint32) + killConnection func(uint32) error } type sqlvars interface { @@ -122,6 +126,16 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. return nil } +type IterSessions func(func(sql.Session) (bool, error)) error + +func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) { + c.mu.Lock() + defer c.mu.Unlock() + c.iterSessions = iterSessions + c.killQuery = killQuery + c.killConnection = killConnection +} + func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) { ttfdir, err := denv.TempTableFilesDir() if err != nil { @@ -246,7 +260,7 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea return Role(persistentRole), epochi, nil } -func (c *Controller) setRoleAndEpoch(role string, epoch int) error { +func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) error { c.mu.Lock() defer c.mu.Unlock() if epoch == c.epoch && role == string(c.role) { @@ -269,7 +283,21 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int) error { c.epoch = epoch if changedrole { - // TODO: Role is transitioning...lots of stuff to do. + var err error + if c.role == RoleStandby { + if graceful { + err = c.gracefulTransitionToStandby() + } else { + err = c.immediateTransitionToStandby() + // TODO: this should not fail; if it does, we still prevent transition for now. + } + } else { + err = c.transitionToPrimary() + // TODO: this should not fail; if it does, we still prevent transition for now. + } + if err != nil { + return err + } } c.refreshSystemVars() @@ -339,3 +367,43 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server args.DBCache = remotesrvStoreCache{args.DBCache, c} return args } + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Replicate all databases to their standby remotes. +// * If success, return success. +// * If failure, set all databases in database_provider back to their original state. Return failure. +func (c *Controller) gracefulTransitionToStandby() error { + return nil +} + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Return success. NOTE: we do not attempt to replicate to the standby. +func (c *Controller) immediateTransitionToStandby() error { + return nil +} + +// The order of operations is: +// * Set all databases in database_provider back to their original mode: read-write or read only. +// * Kill all running queries in GMS. +// * Return success. +func (c *Controller) transitionToPrimary() error { + return nil +} + +// Kills all running queries in the managed GMS engine. +func (c *Controller) killRunningQueries() { + c.mu.Lock() + iterSessions, killQuery, killConnection := c.iterSessions, c.killQuery, c.killConnection + c.mu.Unlock() + if c.iterSessions != nil { + iterSessions(func(session sql.Session) (stop bool, err error) { + killQuery(session.ID()) + killConnection(session.ID()) + return + }) + } +} diff --git a/integration-tests/bats/sql-server-cluster.bats b/integration-tests/bats/sql-server-cluster.bats index 083295b65c..2cf4da1a93 100644 --- a/integration-tests/bats/sql-server-cluster.bats +++ b/integration-tests/bats/sql-server-cluster.bats @@ -130,8 +130,9 @@ cluster: server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '10');" "status\n0" # same role, new epoch server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12" - # new role, new epoch - server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" + # new role, new epoch (this can drop the connection, so check the results in a new connection) + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13');" "" 1 + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" # Server comes back up with latest assumed role. kill $SERVER_PID