From 4e8ef842959ca5b0b6520512b2d5c848a56aba78 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 29 Sep 2022 13:26:14 -0700 Subject: [PATCH 1/7] go: sqle: cluster: Add a mechanism to kill inflight queries and connections when transition server role. --- go/cmd/dolt/commands/sqlserver/server.go | 7 ++ .../doltcore/sqle/cluster/assume_role.go | 2 +- .../doltcore/sqle/cluster/controller.go | 72 ++++++++++++++++++- 3 files changed, 78 insertions(+), 3 deletions(-) diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 0f2eabc84f..267d387cc5 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -266,11 +266,18 @@ func Serve( clusterRemoteSrv.Serve(listeners) }() } + + clusterController.ManageQueryConnections( + mySQLServer.SessionManager().Iter, + sqlEngine.GetUnderlyingEngine().ProcessList.Kill, + func(uint32) {}, // TODO: mySQLServer.SessionManager().KillConnection, + ) } else { lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err) startError = err return } + } if ok, f := mrEnv.IsLocked(); ok { diff --git a/go/libraries/doltcore/sqle/cluster/assume_role.go b/go/libraries/doltcore/sqle/cluster/assume_role.go index 52ea6cf7e6..8291dc1141 100644 --- a/go/libraries/doltcore/sqle/cluster/assume_role.go +++ b/go/libraries/doltcore/sqle/cluster/assume_role.go @@ -29,7 +29,7 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD }, }, Function: func(ctx *sql.Context, role string, epoch int) (sql.RowIter, error) { - err := controller.setRoleAndEpoch(role, epoch) + err := controller.setRoleAndEpoch(role, epoch, true /* graceful */) if err != nil { return nil, err } diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 9014fb56c7..1975d1573a 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -52,6 +52,10 @@ type Controller struct { sinterceptor serverinterceptor cinterceptor clientinterceptor lgr *logrus.Logger + + iterSessions IterSessions + killQuery func(uint32) + killConnection func(uint32) } type sqlvars interface { @@ -122,6 +126,16 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. return nil } +type IterSessions func(func(sql.Session) (bool, error)) error + +func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery, killConnection func(uint32)) { + c.mu.Lock() + defer c.mu.Unlock() + c.iterSessions = iterSessions + c.killQuery = killQuery + c.killConnection = killConnection +} + func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) { ttfdir, err := denv.TempTableFilesDir() if err != nil { @@ -246,7 +260,7 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea return Role(persistentRole), epochi, nil } -func (c *Controller) setRoleAndEpoch(role string, epoch int) error { +func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) error { c.mu.Lock() defer c.mu.Unlock() if epoch == c.epoch && role == string(c.role) { @@ -269,7 +283,21 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int) error { c.epoch = epoch if changedrole { - // TODO: Role is transitioning...lots of stuff to do. + var err error + if c.role == RoleStandby { + if graceful { + err = c.gracefulTransitionToStandby() + } else { + err = c.immediateTransitionToStandby() + // TODO: this should not fail; if it does, we still prevent transition for now. + } + } else { + err = c.transitionToPrimary() + // TODO: this should not fail; if it does, we still prevent transition for now. + } + if err != nil { + return err + } } c.refreshSystemVars() @@ -339,3 +367,43 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server args.DBCache = remotesrvStoreCache{args.DBCache, c} return args } + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Replicate all databases to their standby remotes. +// * If success, return success. +// * If failure, set all databases in database_provider back to their original state. Return failure. +func (c *Controller) gracefulTransitionToStandby() error { + return nil +} + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Return success. NOTE: we do not attempt to replicate to the standby. +func (c *Controller) immediateTransitionToStandby() error { + return nil +} + +// The order of operations is: +// * Set all databases in database_provider back to their original mode: read-write or read only. +// * Kill all running queries in GMS. +// * Return success. +func (c *Controller) transitionToPrimary() error { + return nil +} + +// Kills all running queries in the managed GMS engine. +func (c *Controller) killRunningQueries() { + c.mu.Lock() + iterSessions, killQuery, killConnection := c.iterSessions, c.killQuery, c.killConnection + c.mu.Unlock() + if c.iterSessions != nil { + iterSessions(func(session sql.Session) (stop bool, err error) { + killQuery(session.ID()) + killConnection(session.ID()) + return + }) + } +} From b5271155c2597dcd4e3cf03d03be08e6c67f8141 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 29 Sep 2022 15:40:30 -0700 Subject: [PATCH 2/7] go: sqle: cluster: Dolt databases on a standby are read only. They accept writes when the server transitions to primary. --- go/cmd/dolt/commands/engine/sqlengine.go | 1 + go/cmd/dolt/commands/sqlserver/server.go | 2 +- go/libraries/doltcore/remotesrv/grpc.go | 73 ++++-- .../doltcore/sqle/cluster/controller.go | 88 +++++-- .../doltcore/sqle/database_provider.go | 30 ++- go/libraries/doltcore/sqle/remotesrv.go | 10 +- .../bats/sql-server-cluster.bats | 218 +++++++++++++++++- 7 files changed, 376 insertions(+), 46 deletions(-) diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 9984d8a8d8..cd354afc44 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -114,6 +114,7 @@ func NewSqlEngine( config.ClusterController.RegisterStoredProcedures(pro) pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook) + config.ClusterController.ManageDatabaseProvider(pro) // Load in privileges from file, if it exists persister := mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath) diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 267d387cc5..c37d13c0f9 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -270,7 +270,7 @@ func Serve( clusterController.ManageQueryConnections( mySQLServer.SessionManager().Iter, sqlEngine.GetUnderlyingEngine().ProcessList.Kill, - func(uint32) {}, // TODO: mySQLServer.SessionManager().KillConnection, + func(uint32) {}, // TODO: mySQLServer.SessionManager().KillConnection, ) } else { lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err) diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index f7ef49c52c..4f8407a6f4 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -17,6 +17,7 @@ package remotesrv import ( "context" "encoding/base64" + "errors" "fmt" "io" "net/url" @@ -38,6 +39,8 @@ import ( "github.com/dolthub/dolt/go/store/types" ) +var ErrUnimplemented = errors.New("unimplemented") + type RemoteChunkStore struct { HttpHost string csCache DBCache @@ -79,7 +82,10 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -133,7 +139,10 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -199,7 +208,10 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore nextPath := getRepoPath(req) if nextPath != repoPath { repoPath = nextPath - cs = rs.getStore(logger, repoPath) + cs, err = rs.getStore(logger, repoPath) + if err != nil { + return err + } if cs == nil { return status.Error(codes.Internal, "Could not get chunkstore") } @@ -292,7 +304,10 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -341,7 +356,10 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -349,7 +367,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe logger.Printf("found %s", repoPath) - err := cs.Rebase(ctx) + err = cs.Rebase(ctx) if err != nil { logger.Printf("error occurred during processing of Rebace rpc of %s details: %v", repoPath, err) @@ -364,7 +382,10 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -385,7 +406,10 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -399,7 +423,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount) } - err := cs.AddTableFilesToManifest(ctx, updates) + err = cs.AddTableFilesToManifest(ctx, updates) if err != nil { logger.Printf("error occurred updating the manifest: %s", err.Error()) @@ -426,12 +450,15 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion) + cs, err := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") } - err := cs.Rebase(ctx) + err = cs.Rebase(ctx) if err != nil { return nil, err } @@ -453,7 +480,10 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -522,7 +552,10 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A defer func() { logger.Println("finished") }() repoPath := getRepoPath(req) - cs := rs.getStore(logger, repoPath) + cs, err := rs.getStore(logger, repoPath) + if err != nil { + return nil, err + } if cs == nil { return nil, status.Error(codes.Internal, "Could not get chunkstore") @@ -536,7 +569,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount) } - err := cs.AddTableFilesToManifest(ctx, updates) + err = cs.AddTableFilesToManifest(ctx, updates) if err != nil { logger.Printf("error occurred updating the manifest: %s", err.Error()) @@ -546,18 +579,20 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A return &remotesapi.AddTableFilesResponse{Success: true}, nil } -func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) RemoteSrvStore { +func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) (RemoteSrvStore, error) { return rs.getOrCreateStore(logger, repoPath, types.Format_Default.VersionString()) } -func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) RemoteSrvStore { +func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) (RemoteSrvStore, error) { cs, err := rs.csCache.Get(repoPath, nbfVerStr) - if err != nil { logger.Printf("Failed to retrieve chunkstore for %s\n", repoPath) + if errors.Is(err, ErrUnimplemented) { + return nil, status.Error(codes.Unimplemented, err.Error()) + } + return nil, err } - - return cs + return cs, nil } var requestId int32 diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 1975d1573a..c7fc28fed7 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -53,6 +53,7 @@ type Controller struct { cinterceptor clientinterceptor lgr *logrus.Logger + provider dbProvider iterSessions IterSessions killQuery func(uint32) killConnection func(uint32) @@ -62,6 +63,12 @@ type sqlvars interface { AddSystemVariables(sysVars []sql.SystemVariable) } +// We can manage certain aspects of the exposed databases on the server through +// this. +type dbProvider interface { + SetIsStandby(bool) +} + type procedurestore interface { Register(sql.ExternalStoredProcedureDetails) } @@ -128,7 +135,20 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. type IterSessions func(func(sql.Session) (bool, error)) error +func (c *Controller) ManageDatabaseProvider(p dbProvider) { + if c == nil { + return + } + c.mu.Lock() + defer c.mu.Unlock() + c.provider = p + c.provider.SetIsStandby(c.role == RoleStandby) +} + func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery, killConnection func(uint32)) { + if c == nil { + return + } c.mu.Lock() defer c.mu.Unlock() c.iterSessions = iterSessions @@ -279,27 +299,25 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro changedrole := role != string(c.role) - c.role = Role(role) - c.epoch = epoch - if changedrole { var err error - if c.role == RoleStandby { + if role == string(RoleStandby) { if graceful { err = c.gracefulTransitionToStandby() + if err != nil { + return err + } } else { - err = c.immediateTransitionToStandby() - // TODO: this should not fail; if it does, we still prevent transition for now. + c.immediateTransitionToStandby() } } else { - err = c.transitionToPrimary() - // TODO: this should not fail; if it does, we still prevent transition for now. - } - if err != nil { - return err + c.transitionToPrimary() } } + c.role = Role(role) + c.epoch = epoch + c.refreshSystemVars() c.cinterceptor.setRole(c.role, c.epoch) c.sinterceptor.setRole(c.role, c.epoch) @@ -374,7 +392,19 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server // * Replicate all databases to their standby remotes. // * If success, return success. // * If failure, set all databases in database_provider back to their original state. Return failure. +// +// called with c.mu held func (c *Controller) gracefulTransitionToStandby() error { + c.setProviderIsStandby(true) + c.killRunningQueries() + // TODO: this can block with c.mu held, although we are not too + // interested in the server proceeding gracefully while this is + // happening. + if err := c.waitForHooksToReplicate(); err != nil { + c.setProviderIsStandby(false) + c.killRunningQueries() + return err + } return nil } @@ -382,7 +412,11 @@ func (c *Controller) gracefulTransitionToStandby() error { // * Set all databases in database_provider to read-only. // * Kill all running queries in GMS. // * Return success. NOTE: we do not attempt to replicate to the standby. +// +// called with c.mu held func (c *Controller) immediateTransitionToStandby() error { + c.setProviderIsStandby(true) + c.killRunningQueries() return nil } @@ -390,20 +424,40 @@ func (c *Controller) immediateTransitionToStandby() error { // * Set all databases in database_provider back to their original mode: read-write or read only. // * Kill all running queries in GMS. // * Return success. +// +// called with c.mu held func (c *Controller) transitionToPrimary() error { + c.setProviderIsStandby(false) + c.killRunningQueries() return nil } // Kills all running queries in the managed GMS engine. +// called with c.mu held func (c *Controller) killRunningQueries() { - c.mu.Lock() - iterSessions, killQuery, killConnection := c.iterSessions, c.killQuery, c.killConnection - c.mu.Unlock() if c.iterSessions != nil { - iterSessions(func(session sql.Session) (stop bool, err error) { - killQuery(session.ID()) - killConnection(session.ID()) + c.iterSessions(func(session sql.Session) (stop bool, err error) { + c.killQuery(session.ID()) + c.killConnection(session.ID()) return }) } } + +// called with c.mu held +func (c *Controller) setProviderIsStandby(standby bool) { + if c.provider != nil { + c.provider.SetIsStandby(standby) + } +} + +// Called during a graceful transition from primary to standby. Waits until all +// commithooks report nextHead == lastPushedHead. +// +// TODO: Implement this. +// TODO: Report errors from commit hooks or add a deadline here or something. +// +// called with c.mu held +func (c *Controller) waitForHooksToReplicate() error { + return nil +} diff --git a/go/libraries/doltcore/sqle/database_provider.go b/go/libraries/doltcore/sqle/database_provider.go index af7a8e2ea0..690fafca0c 100644 --- a/go/libraries/doltcore/sqle/database_provider.go +++ b/go/libraries/doltcore/sqle/database_provider.go @@ -55,6 +55,7 @@ type DoltDatabaseProvider struct { remoteDialer dbfactory.GRPCDialProvider // TODO: why isn't this a method defined on the remote object dbFactoryUrl string + isStandby *bool } var _ sql.DatabaseProvider = (*DoltDatabaseProvider)(nil) @@ -116,6 +117,7 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files defaultBranch: defaultBranch, dbFactoryUrl: doltdb.LocalDirDoltDB, InitDatabaseHook: ConfigureReplicationDatabaseHook, + isStandby: new(bool), }, nil } @@ -148,6 +150,15 @@ func (p DoltDatabaseProvider) FileSystem() filesys.Filesys { return p.fs } +// If this DatabaseProvider is set to standby |true|, it returns every dolt +// database as a read only database. Set back to |false| to get read-write +// behavior from dolt databases again. +func (p DoltDatabaseProvider) SetIsStandby(standby bool) { + p.mu.Lock() + defer p.mu.Unlock() + *p.isStandby = standby +} + // FileSystemForDatabase returns a filesystem, with the working directory set to the root directory // of the requested database. If the requested database isn't found, a database not found error // is returned. @@ -168,9 +179,10 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da var ok bool p.mu.RLock() db, ok = p.databases[formatDbMapKeyName(name)] + standby := *p.isStandby p.mu.RUnlock() if ok { - return db, nil + return wrapForStandby(db, standby), nil } db, _, ok, err = p.databaseForRevision(ctx, name) @@ -189,7 +201,21 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da } // Don't track revision databases, just instantiate them on demand - return db, nil + return wrapForStandby(db, standby), nil +} + +func wrapForStandby(db sql.Database, standby bool) sql.Database { + if !standby { + return db + } + if _, ok := db.(ReadOnlyDatabase); ok { + return db + } + if db, ok := db.(Database); ok { + // :-/. Hopefully it's not too sliced. + return ReadOnlyDatabase{db} + } + return db } // attemptCloneReplica attempts to clone a database from the configured replication remote URL template, returning an error diff --git a/go/libraries/doltcore/sqle/remotesrv.go b/go/libraries/doltcore/sqle/remotesrv.go index 8932cd040d..d172df5bce 100644 --- a/go/libraries/doltcore/sqle/remotesrv.go +++ b/go/libraries/doltcore/sqle/remotesrv.go @@ -15,8 +15,6 @@ package sqle import ( - "errors" - "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" @@ -37,15 +35,15 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e if err != nil { return nil, err } - ddb, ok := db.(Database) + sdb, ok := db.(SqlDatabase) if !ok { - return nil, errors.New("unimplemented") + return nil, remotesrv.ErrUnimplemented } - datasdb := doltdb.HackDatasDatabaseFromDoltDB(ddb.DbData().Ddb) + datasdb := doltdb.HackDatasDatabaseFromDoltDB(sdb.DbData().Ddb) cs := datas.ChunkStoreFromDatabase(datasdb) rss, ok := cs.(remotesrv.RemoteSrvStore) if !ok { - return nil, errors.New("unimplemented") + return nil, remotesrv.ErrUnimplemented } return rss, nil } diff --git a/integration-tests/bats/sql-server-cluster.bats b/integration-tests/bats/sql-server-cluster.bats index 083295b65c..ca828a99f8 100644 --- a/integration-tests/bats/sql-server-cluster.bats +++ b/integration-tests/bats/sql-server-cluster.bats @@ -131,7 +131,9 @@ cluster: # same role, new epoch server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12" # new role, new epoch - server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13');" "" 1 + # we assert on a new connection, since the server may have killed the old one on the transition. + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" # Server comes back up with latest assumed role. kill $SERVER_PID @@ -205,6 +207,7 @@ cluster: cd serverone echo " +log_level: trace user: name: dolt listener: @@ -236,6 +239,7 @@ cluster: cd ../servertwo echo " +log_level: trace user: name: dolt listener: @@ -280,3 +284,215 @@ cluster: [ "$status" -eq 0 ] [[ "$output" =~ "| 5 " ]] || false } + +@test "sql-server-cluster: booted standby server is read only" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)' + cd .. + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + SERVER_PID=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1 + [[ "$output" =~ "Database repo1 is read-only" ]] || false + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" +} + +@test "sql-server-cluster: booted primary server is read write" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)' + cd .. + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + SERVER_PID=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10" +} + +@test "sql-server-cluster: standby transitioned to primary becomes writable" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)' + cd .. + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + SERVER_PID=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1 + [[ "$output" =~ "Database repo1 is read-only" ]] || false + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', 11)" "" 1 + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 0 + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10" +} + +@test "sql-server-cluster: primary transitioned to standby becomes read only" { + # In order to gracefully transition to standby, we will need the standby running. + cd serverone + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: standby + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5)" + run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', 11)" "" 1 + run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1 + [[ "$output" =~ "Database repo1 is read-only" ]] || false + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +} From b6e133d99a6ce7d0767e8d76f3773b7fead3f5d5 Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Fri, 30 Sep 2022 11:50:27 -0700 Subject: [PATCH 3/7] noop --- go/store/constants/version.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/store/constants/version.go b/go/store/constants/version.go index 7a9814756b..495dee6aaa 100644 --- a/go/store/constants/version.go +++ b/go/store/constants/version.go @@ -28,6 +28,8 @@ func init() { nbfVerStr := os.Getenv("DOLT_DEFAULT_BIN_FORMAT") if nbfVerStr != "" { FormatDefaultString = nbfVerStr + } else { + FormatDefaultString = FormatDefaultString } } From 99b8305ffd4f61e16667ad3669d2f544d533d3e4 Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Fri, 30 Sep 2022 12:08:13 -0700 Subject: [PATCH 4/7] fix schema_table_test --- .../doltcore/sqle/schema_table_test.go | 53 ++++++++++++++----- 1 file changed, 39 insertions(+), 14 deletions(-) diff --git a/go/libraries/doltcore/sqle/schema_table_test.go b/go/libraries/doltcore/sqle/schema_table_test.go index 1529593460..a51471beeb 100644 --- a/go/libraries/doltcore/sqle/schema_table_test.go +++ b/go/libraries/doltcore/sqle/schema_table_test.go @@ -54,34 +54,59 @@ func TestSchemaTableRecreationOlder(t *testing.T) { {Name: doltdb.SchemasTablesFragmentCol, Type: sql.Text, Source: doltdb.SchemasTableName, PrimaryKey: false}, }), sql.Collation_Default) require.NoError(t, err) - root, err := db.GetRoot(ctx) + sqlTbl, found, err := db.GetTableInsensitive(ctx, doltdb.SchemasTableName) require.NoError(t, err) - err = dEnv.UpdateWorkingRoot(ctx, root) + require.True(t, found) + inserter := sqlTbl.(*WritableDoltTable).Inserter(ctx) + err = inserter.Insert(ctx, sql.Row{"view", "view1", "SELECT v1 FROM test;"}) + require.NoError(t, err) + err = inserter.Insert(ctx, sql.Row{"view", "view2", "SELECT v2 FROM test;"}) + require.NoError(t, err) + err = inserter.Close(ctx) require.NoError(t, err) - expected := []sql.Row{ + table, err := sqlTbl.(*WritableDoltTable).DoltTable.DoltTable(ctx) + require.NoError(t, err) + + rowData, err := table.GetNomsRowData(ctx) + require.NoError(t, err) + expectedVals := []sql.Row{ {"view", "view1", "SELECT v1 FROM test;"}, {"view", "view2", "SELECT v2 FROM test;"}, } - actual, err := ExecuteSelect(t, dEnv, root, "SELECT * FROM "+doltdb.SchemasTableName) - require.NoError(t, err) - assert.Equal(t, expected, actual) + index := 0 + _ = rowData.IterAll(ctx, func(keyTpl, valTpl types.Value) error { + dRow, err := row.FromNoms(sqlTbl.(*WritableDoltTable).sch, keyTpl.(types.Tuple), valTpl.(types.Tuple)) + require.NoError(t, err) + sqlRow, err := sqlutil.DoltRowToSqlRow(dRow, sqlTbl.(*WritableDoltTable).sch) + require.NoError(t, err) + assert.Equal(t, expectedVals[index], sqlRow) + index++ + return nil + }) - // removes the old table and recreates it with the new schema - tbl, err := GetOrCreateDoltSchemasTable(ctx, db) + tbl, err := GetOrCreateDoltSchemasTable(ctx, db) // removes the old table and recreates it with the new schema require.NoError(t, err) - root, err = db.GetRoot(ctx) - require.NoError(t, err) - err = dEnv.UpdateWorkingRoot(ctx, root) + table, err = tbl.DoltTable.DoltTable(ctx) require.NoError(t, err) - expected = []sql.Row{ + rowData, err = table.GetNomsRowData(ctx) + require.NoError(t, err) + expectedVals = []sql.Row{ {"view", "view1", "SELECT v1 FROM test;", int64(1), nil}, {"view", "view2", "SELECT v2 FROM test;", int64(2), nil}, } - actual, err = ExecuteSelect(t, dEnv, root, "SELECT * FROM "+doltdb.SchemasTableName) - require.NoError(t, err) + index = 0 + _ = rowData.IterAll(ctx, func(keyTpl, valTpl types.Value) error { + dRow, err := row.FromNoms(tbl.sch, keyTpl.(types.Tuple), valTpl.(types.Tuple)) + require.NoError(t, err) + sqlRow, err := sqlutil.DoltRowToSqlRow(dRow, tbl.sch) + require.NoError(t, err) + assert.Equal(t, expectedVals[index], sqlRow) + index++ + return nil + }) indexes := tbl.sch.Indexes().AllIndexes() require.Len(t, indexes, 1) From 43d36df90fc92c62595549ab3d0501398aa6177b Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Fri, 30 Sep 2022 12:17:04 -0700 Subject: [PATCH 5/7] Revert "noop" This reverts commit b6e133d99a6ce7d0767e8d76f3773b7fead3f5d5. --- go/store/constants/version.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/store/constants/version.go b/go/store/constants/version.go index 495dee6aaa..7a9814756b 100644 --- a/go/store/constants/version.go +++ b/go/store/constants/version.go @@ -28,8 +28,6 @@ func init() { nbfVerStr := os.Getenv("DOLT_DEFAULT_BIN_FORMAT") if nbfVerStr != "" { FormatDefaultString = nbfVerStr - } else { - FormatDefaultString = FormatDefaultString } } From 4160bda8acf52110c0a7c92de0c2d6c35c3f9bfa Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 29 Sep 2022 13:26:14 -0700 Subject: [PATCH 6/7] go: sqle: cluster: Add a mechanism to kill inflight queries and connections when transition server role. --- go/cmd/dolt/commands/sqlserver/server.go | 7 ++ .../doltcore/sqle/cluster/assume_role.go | 2 +- .../doltcore/sqle/cluster/controller.go | 72 ++++++++++++++++++- .../bats/sql-server-cluster.bats | 5 +- 4 files changed, 81 insertions(+), 5 deletions(-) diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 0f2eabc84f..34539696f7 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -266,11 +266,18 @@ func Serve( clusterRemoteSrv.Serve(listeners) }() } + + clusterController.ManageQueryConnections( + mySQLServer.SessionManager().Iter, + sqlEngine.GetUnderlyingEngine().ProcessList.Kill, + mySQLServer.SessionManager().KillConnection, + ) } else { lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err) startError = err return } + } if ok, f := mrEnv.IsLocked(); ok { diff --git a/go/libraries/doltcore/sqle/cluster/assume_role.go b/go/libraries/doltcore/sqle/cluster/assume_role.go index 52ea6cf7e6..8291dc1141 100644 --- a/go/libraries/doltcore/sqle/cluster/assume_role.go +++ b/go/libraries/doltcore/sqle/cluster/assume_role.go @@ -29,7 +29,7 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD }, }, Function: func(ctx *sql.Context, role string, epoch int) (sql.RowIter, error) { - err := controller.setRoleAndEpoch(role, epoch) + err := controller.setRoleAndEpoch(role, epoch, true /* graceful */) if err != nil { return nil, err } diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 9014fb56c7..0f5c26c06f 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -52,6 +52,10 @@ type Controller struct { sinterceptor serverinterceptor cinterceptor clientinterceptor lgr *logrus.Logger + + iterSessions IterSessions + killQuery func(uint32) + killConnection func(uint32) error } type sqlvars interface { @@ -122,6 +126,16 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. return nil } +type IterSessions func(func(sql.Session) (bool, error)) error + +func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) { + c.mu.Lock() + defer c.mu.Unlock() + c.iterSessions = iterSessions + c.killQuery = killQuery + c.killConnection = killConnection +} + func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) { ttfdir, err := denv.TempTableFilesDir() if err != nil { @@ -246,7 +260,7 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea return Role(persistentRole), epochi, nil } -func (c *Controller) setRoleAndEpoch(role string, epoch int) error { +func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) error { c.mu.Lock() defer c.mu.Unlock() if epoch == c.epoch && role == string(c.role) { @@ -269,7 +283,21 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int) error { c.epoch = epoch if changedrole { - // TODO: Role is transitioning...lots of stuff to do. + var err error + if c.role == RoleStandby { + if graceful { + err = c.gracefulTransitionToStandby() + } else { + err = c.immediateTransitionToStandby() + // TODO: this should not fail; if it does, we still prevent transition for now. + } + } else { + err = c.transitionToPrimary() + // TODO: this should not fail; if it does, we still prevent transition for now. + } + if err != nil { + return err + } } c.refreshSystemVars() @@ -339,3 +367,43 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server args.DBCache = remotesrvStoreCache{args.DBCache, c} return args } + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Replicate all databases to their standby remotes. +// * If success, return success. +// * If failure, set all databases in database_provider back to their original state. Return failure. +func (c *Controller) gracefulTransitionToStandby() error { + return nil +} + +// The order of operations is: +// * Set all databases in database_provider to read-only. +// * Kill all running queries in GMS. +// * Return success. NOTE: we do not attempt to replicate to the standby. +func (c *Controller) immediateTransitionToStandby() error { + return nil +} + +// The order of operations is: +// * Set all databases in database_provider back to their original mode: read-write or read only. +// * Kill all running queries in GMS. +// * Return success. +func (c *Controller) transitionToPrimary() error { + return nil +} + +// Kills all running queries in the managed GMS engine. +func (c *Controller) killRunningQueries() { + c.mu.Lock() + iterSessions, killQuery, killConnection := c.iterSessions, c.killQuery, c.killConnection + c.mu.Unlock() + if c.iterSessions != nil { + iterSessions(func(session sql.Session) (stop bool, err error) { + killQuery(session.ID()) + killConnection(session.ID()) + return + }) + } +} diff --git a/integration-tests/bats/sql-server-cluster.bats b/integration-tests/bats/sql-server-cluster.bats index 083295b65c..2cf4da1a93 100644 --- a/integration-tests/bats/sql-server-cluster.bats +++ b/integration-tests/bats/sql-server-cluster.bats @@ -130,8 +130,9 @@ cluster: server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '10');" "status\n0" # same role, new epoch server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12" - # new role, new epoch - server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" + # new role, new epoch (this can drop the connection, so check the results in a new connection) + run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13');" "" 1 + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13" # Server comes back up with latest assumed role. kill $SERVER_PID From 922069a5a113d7b62317ce225c1bdc7032437236 Mon Sep 17 00:00:00 2001 From: reltuk Date: Fri, 30 Sep 2022 19:24:06 +0000 Subject: [PATCH 7/7] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/sqle/cluster/controller.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 0f5c26c06f..f67a58467d 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -372,8 +372,8 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server // * Set all databases in database_provider to read-only. // * Kill all running queries in GMS. // * Replicate all databases to their standby remotes. -// * If success, return success. -// * If failure, set all databases in database_provider back to their original state. Return failure. +// - If success, return success. +// - If failure, set all databases in database_provider back to their original state. Return failure. func (c *Controller) gracefulTransitionToStandby() error { return nil }