go: sqle: cluster: Add a mechanism to kill inflight queries and connections when transition server role.

This commit is contained in:
Aaron Son
2022-09-29 13:26:14 -07:00
parent ff743b2ab9
commit 635bbd46c8
4 changed files with 81 additions and 5 deletions

View File

@@ -266,11 +266,18 @@ func Serve(
clusterRemoteSrv.Serve(listeners)
}()
}
clusterController.ManageQueryConnections(
mySQLServer.SessionManager().Iter,
sqlEngine.GetUnderlyingEngine().ProcessList.Kill,
mySQLServer.SessionManager().KillConnection,
)
} else {
lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err)
startError = err
return
}
}
if ok, f := mrEnv.IsLocked(); ok {

View File

@@ -29,7 +29,7 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD
},
},
Function: func(ctx *sql.Context, role string, epoch int) (sql.RowIter, error) {
err := controller.setRoleAndEpoch(role, epoch)
err := controller.setRoleAndEpoch(role, epoch, true /* graceful */)
if err != nil {
return nil, err
}

View File

@@ -52,6 +52,10 @@ type Controller struct {
sinterceptor serverinterceptor
cinterceptor clientinterceptor
lgr *logrus.Logger
iterSessions IterSessions
killQuery func(uint32)
killConnection func(uint32) error
}
type sqlvars interface {
@@ -122,6 +126,16 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.
return nil
}
type IterSessions func(func(sql.Session) (bool, error)) error
func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) {
c.mu.Lock()
defer c.mu.Unlock()
c.iterSessions = iterSessions
c.killQuery = killQuery
c.killConnection = killConnection
}
func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) {
ttfdir, err := denv.TempTableFilesDir()
if err != nil {
@@ -246,7 +260,7 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea
return Role(persistentRole), epochi, nil
}
func (c *Controller) setRoleAndEpoch(role string, epoch int) error {
func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) error {
c.mu.Lock()
defer c.mu.Unlock()
if epoch == c.epoch && role == string(c.role) {
@@ -269,7 +283,21 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int) error {
c.epoch = epoch
if changedrole {
// TODO: Role is transitioning...lots of stuff to do.
var err error
if c.role == RoleStandby {
if graceful {
err = c.gracefulTransitionToStandby()
} else {
err = c.immediateTransitionToStandby()
// TODO: this should not fail; if it does, we still prevent transition for now.
}
} else {
err = c.transitionToPrimary()
// TODO: this should not fail; if it does, we still prevent transition for now.
}
if err != nil {
return err
}
}
c.refreshSystemVars()
@@ -339,3 +367,43 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server
args.DBCache = remotesrvStoreCache{args.DBCache, c}
return args
}
// The order of operations is:
// * Set all databases in database_provider to read-only.
// * Kill all running queries in GMS.
// * Replicate all databases to their standby remotes.
// * If success, return success.
// * If failure, set all databases in database_provider back to their original state. Return failure.
func (c *Controller) gracefulTransitionToStandby() error {
return nil
}
// The order of operations is:
// * Set all databases in database_provider to read-only.
// * Kill all running queries in GMS.
// * Return success. NOTE: we do not attempt to replicate to the standby.
func (c *Controller) immediateTransitionToStandby() error {
return nil
}
// The order of operations is:
// * Set all databases in database_provider back to their original mode: read-write or read only.
// * Kill all running queries in GMS.
// * Return success.
func (c *Controller) transitionToPrimary() error {
return nil
}
// Kills all running queries in the managed GMS engine.
func (c *Controller) killRunningQueries() {
c.mu.Lock()
iterSessions, killQuery, killConnection := c.iterSessions, c.killQuery, c.killConnection
c.mu.Unlock()
if c.iterSessions != nil {
iterSessions(func(session sql.Session) (stop bool, err error) {
killQuery(session.ID())
killConnection(session.ID())
return
})
}
}

View File

@@ -130,8 +130,9 @@ cluster:
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '10');" "status\n0"
# same role, new epoch
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12"
# new role, new epoch
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
# new role, new epoch (this can drop the connection, so check the results in a new connection)
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13');" "" 1
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
# Server comes back up with latest assumed role.
kill $SERVER_PID