diff --git a/go/libraries/doltcore/sqle/cluster/assume_role.go b/go/libraries/doltcore/sqle/cluster/assume_role.go index aa2ca581a0..3f53865b96 100644 --- a/go/libraries/doltcore/sqle/cluster/assume_role.go +++ b/go/libraries/doltcore/sqle/cluster/assume_role.go @@ -1,4 +1,4 @@ -// Copyright 2022 Dolthub, Inc. +// Copyright 2022-2023 Dolthub, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package cluster import ( "errors" + "fmt" "github.com/dolthub/go-mysql-server/sql" "github.com/dolthub/go-mysql-server/sql/types" @@ -39,12 +40,16 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD if role == string(RoleDetectedBrokenConfig) { return nil, errors.New("cannot set role to detected_broken_config; valid values are 'primary' and 'standby'") } - changerole, err := controller.setRoleAndEpoch(role, epoch, true /* graceful */, int(ctx.Session.ID())) + saveConnID := int(ctx.Session.ID()) + res, err := controller.setRoleAndEpoch(role, epoch, roleTransitionOptions{ + graceful: true, + saveConnID: &saveConnID, + }) if err != nil { // We did not transition, no need to set our session to read-only, etc. return nil, err } - if changerole { + if res.changedRole { // We transitioned, make sure we do not run anymore queries on this session. ctx.Session.SetTransaction(nil) dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerTransitionedRolesErr) @@ -53,3 +58,64 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD }, } } + +func newTransitionToStandbyProcedure(controller *Controller) sql.ExternalStoredProcedureDetails { + return sql.ExternalStoredProcedureDetails{ + Name: "dolt_cluster_transition_to_standby", + Schema: sql.Schema{ + &sql.Column{ + Name: "caught_up", + Type: types.Int8, + Nullable: false, + }, + &sql.Column{ + Name: "database", + Type: types.LongText, + Nullable: false, + }, + &sql.Column{ + Name: "remote", + Type: types.LongText, + Nullable: false, + }, + &sql.Column{ + Name: "remote_url", + Type: types.LongText, + Nullable: false, + }, + }, + Function: func(ctx *sql.Context, epoch, minCaughtUpStandbys int) (sql.RowIter, error) { + saveConnID := int(ctx.Session.ID()) + res, err := controller.setRoleAndEpoch("standby", epoch, roleTransitionOptions{ + graceful: true, + minCaughtUpStandbys: minCaughtUpStandbys, + saveConnID: &saveConnID, + }) + if err != nil { + // We did not transition, no need to set our session to read-only, etc. + return nil, err + } + if res.changedRole { + // We transitioned, make sure we do not run anymore queries on this session. + ctx.Session.SetTransaction(nil) + dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerTransitionedRolesErr) + rows := make([]sql.Row, len(res.gracefulTransitionResults)) + for i, r := range res.gracefulTransitionResults { + var caughtUp int8 + if r.caughtUp { + caughtUp = 1 + } + rows[i] = sql.Row{ + caughtUp, + r.database, + r.remote, + r.remoteUrl, + } + } + return sql.RowsToRowIter(rows...), nil + } else { + return nil, fmt.Errorf("failed to transition server to standby; it is already standby.") + } + }, + } +} diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 91fe10bfbb..b917522b40 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -38,6 +38,7 @@ type commithook struct { rootLgr *logrus.Entry lgr atomic.Value // *logrus.Entry remotename string + remoteurl string dbname string mu sync.Mutex wg sync.WaitGroup @@ -87,11 +88,12 @@ var errDestDBRootHashMoved error = errors.New("cluster/commithook: standby repli const logFieldThread = "thread" const logFieldRole = "role" -func newCommitHook(lgr *logrus.Logger, remotename, dbname string, role Role, destDBF func(context.Context) (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook { +func newCommitHook(lgr *logrus.Logger, remotename, remoteurl, dbname string, role Role, destDBF func(context.Context) (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook { var ret commithook ret.rootLgr = lgr.WithField(logFieldThread, "Standby Replication - "+dbname+" to "+remotename) ret.lgr.Store(ret.rootLgr.WithField(logFieldRole, string(role))) ret.remotename = remotename + ret.remoteurl = remoteurl ret.dbname = dbname ret.role = role ret.destDBF = destDBF diff --git a/go/libraries/doltcore/sqle/cluster/commithook_test.go b/go/libraries/doltcore/sqle/cluster/commithook_test.go index 08496a7ed7..7075a2f372 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook_test.go +++ b/go/libraries/doltcore/sqle/cluster/commithook_test.go @@ -35,7 +35,7 @@ func TestCommitHookStartsNotCaughtUp(t *testing.T) { destEnv.DoltDB.Close() }) - hook := newCommitHook(logrus.StandardLogger(), "origin", "mydb", RolePrimary, func(context.Context) (*doltdb.DoltDB, error) { + hook := newCommitHook(logrus.StandardLogger(), "origin", "https://localhost:50051/mydb", "mydb", RolePrimary, func(context.Context) (*doltdb.DoltDB, error) { return destEnv.DoltDB, nil }, srcEnv.DoltDB, t.TempDir()) diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 7c821aa6e0..bd8b047cf1 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "net/http" + "net/url" "os" "strconv" "strings" @@ -118,7 +119,9 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) lgr: lgr, } roleSetter := func(role string, epoch int) { - ret.setRoleAndEpoch(role, epoch, false /* graceful */, -1 /* saveConnID */) + ret.setRoleAndEpoch(role, epoch, roleTransitionOptions{ + graceful: false, + }) } ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{}) ret.sinterceptor.setRole(role, epoch) @@ -239,7 +242,7 @@ func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql. if !ok { return nil, fmt.Errorf("sqle: cluster: standby replication: destination remote %s does not exist on database %s", r.Name(), name) } - commitHook := newCommitHook(c.lgr, r.Name(), name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) { + commitHook := newCommitHook(c.lgr, r.Name(), remote.Url, name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) { return remote.GetRemoteDB(ctx, types.Format_Default, dialprovider) }, denv.DoltDB, ttfdir) denv.DoltDB.PrependCommitHook(ctx, commitHook) @@ -260,6 +263,7 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) { return } store.Register(newAssumeRoleProcedure(c)) + store.Register(newTransitionToStandbyProcedure(c)) } func (c *Controller) ClusterDatabase() sql.Database { @@ -352,37 +356,69 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea return Role(persistentRole), epochi, nil } -func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, saveConnID int) (bool, error) { +type roleTransitionOptions struct { + // If true, all standby replicas must be caught up in order to + // transition from primary to standby. + graceful bool + + // If non-zero and |graceful| is true, will allow a transition from + // primary to standby to succeed only if this many standby replicas + // are known to be caught up at the finalization of the replication + // hooks. + minCaughtUpStandbys int + + // If non-nil, this connection will be saved if and when the connection + // process needs to terminate existing connections. + saveConnID *int +} + +type roleTransitionResult struct { + // true if the role changed as a result of this call. + changedRole bool + + // filled in with graceful transition results if this was a graceful + // transition and it was successful. + gracefulTransitionResults []graceTransitionResult +} + +func (c *Controller) setRoleAndEpoch(role string, epoch int, opts roleTransitionOptions) (roleTransitionResult, error) { + graceful := opts.graceful + saveConnID := -1 + if opts.saveConnID != nil { + saveConnID = *opts.saveConnID + } + c.mu.Lock() defer c.mu.Unlock() if epoch == c.epoch && role == string(c.role) { - return false, nil + return roleTransitionResult{false, nil}, nil } if role != string(RolePrimary) && role != string(RoleStandby) && role != string(RoleDetectedBrokenConfig) { - return false, fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role) + return roleTransitionResult{false, nil}, fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role) } if epoch < c.epoch { - return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch) + return roleTransitionResult{false, nil}, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch) } if epoch == c.epoch { // This is allowed for non-graceful transitions to 'standby', which only occur from interceptors and // other signals that the cluster is misconfigured. isallowed := !graceful && (role == string(RoleStandby) || role == string(RoleDetectedBrokenConfig)) if !isallowed { - return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role) + return roleTransitionResult{false, nil}, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role) } } changedrole := role != string(c.role) + var gracefulResults []graceTransitionResult if changedrole { var err error if role == string(RoleStandby) { if graceful { beforeRole, beforeEpoch := c.role, c.epoch - err = c.gracefulTransitionToStandby(saveConnID) + gracefulResults, err = c.gracefulTransitionToStandby(saveConnID, opts.minCaughtUpStandbys) if err == nil && (beforeRole != c.role || beforeEpoch != c.epoch) { // The role or epoch moved out from under us while we were unlocked and transitioning to standby. err = fmt.Errorf("error assuming role '%s' at epoch %d: the role configuration changed while we were replicating to our standbys. Please try again", role, epoch) @@ -390,7 +426,7 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, save if err != nil { c.setProviderIsStandby(c.role != RolePrimary) c.killRunningQueries(saveConnID) - return false, err + return roleTransitionResult{false, nil}, err } } else { c.immediateTransitionToStandby() @@ -413,7 +449,11 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, save h.setRole(c.role) } } - return changedrole, c.persistVariables() + _ = c.persistVariables() + return roleTransitionResult{ + changedRole: changedrole, + gracefulTransitionResults: gracefulResults, + }, nil } func (c *Controller) roleAndEpoch() (Role, int) { @@ -481,6 +521,16 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server return args } +// TODO: make the deadline here configurable or something. +const waitForHooksToReplicateTimeout = 10 * time.Second + +type graceTransitionResult struct { + caughtUp bool + database string + remote string + remoteUrl string +} + // The order of operations is: // * Set all databases in database_provider to read-only. // * Kill all running queries in GMS. @@ -494,15 +544,70 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server // after returning the results of dolt_assume_cluster_role(). // // called with c.mu held -func (c *Controller) gracefulTransitionToStandby(saveConnID int) error { +func (c *Controller) gracefulTransitionToStandby(saveConnID, minCaughtUpStandbys int) ([]graceTransitionResult, error) { c.setProviderIsStandby(true) c.killRunningQueries(saveConnID) + // waitForHooksToReplicate will release the lock while it // blocks, but will return with the lock held. - if err := c.waitForHooksToReplicate(); err != nil { - return err + states, err := c.waitForHooksToReplicate(waitForHooksToReplicateTimeout) + if err != nil { + return nil, err } - return nil + + if len(states) != len(c.commithooks) { + c.lgr.Warnf("cluster/controller: failed to transition to standby; the set of replicated databases changed during the transition.") + return nil, errors.New("cluster/controller: failed to transition to standby; the set of replicated databases changed during the transition.") + } + + res := make([]graceTransitionResult, len(states)) + for i := range states { + hook := c.commithooks[i] + res[i] = graceTransitionResult{ + caughtUp: states[i], + database: hook.dbname, + remote: hook.remotename, + remoteUrl: hook.remoteurl, + } + } + + if minCaughtUpStandbys == 0 { + for _, caughtUp := range states { + if !caughtUp { + c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.") + return nil, errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.") + } + } + c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.") + } else { + databases := make(map[string]struct{}) + replicas := make(map[string]int) + for _, r := range res { + databases[r.database] = struct{}{} + url, err := url.Parse(r.remoteUrl) + if err != nil { + return nil, fmt.Errorf("cluster/controller: could not parse remote_url (%s) for remote %s on database %s: %w", r.remoteUrl, r.remote, r.database, err) + } + if _, ok := replicas[url.Host]; !ok { + replicas[url.Host] = 0 + } + if r.caughtUp { + replicas[url.Host] = replicas[url.Host] + 1 + } + } + numCaughtUp := 0 + for _, v := range replicas { + if v == len(databases) { + numCaughtUp += 1 + } + } + if numCaughtUp < minCaughtUpStandbys { + return nil, fmt.Errorf("cluster/controller: failed to transition from primary to standby gracefully; could not ensure %d replicas were caught up on all %d databases. Only caught up %d standbys fully.", minCaughtUpStandbys, len(databases), numCaughtUp) + } + c.lgr.Tracef("cluster/controller: successfully replicated all databases to %d out of %d standbys; transitioning to standby.", numCaughtUp, len(replicas)) + } + + return res, nil } // The order of operations is: @@ -553,15 +658,15 @@ func (c *Controller) setProviderIsStandby(standby bool) { } } -const waitForHooksToReplicateTimeout = 10 * time.Second - // Called during a graceful transition from primary to standby. Waits until all // commithooks report nextHead == lastPushedHead. // -// TODO: make the deadline here configurable or something. +// Returns `[]bool` with an entry for each `commithook` which existed at the +// start of the call. The entry will be `true` if that `commithook` was caught +// up as part of this wait, and `false` otherwise. // // called with c.mu held -func (c *Controller) waitForHooksToReplicate() error { +func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, error) { commithooks := make([]*commithook, len(c.commithooks)) copy(commithooks, c.commithooks) caughtup := make([]bool, len(commithooks)) @@ -582,7 +687,7 @@ func (c *Controller) waitForHooksToReplicate() error { commithooks[j].setWaitNotify(nil) } c.lgr.Warnf("cluster/controller: failed to wait for graceful transition to standby; there were concurrent attempts to transition..") - return errors.New("cluster/controller: failed to transition from primary to standby gracefully; did not gain exclusive access to commithooks.") + return nil, errors.New("cluster/controller: failed to transition from primary to standby gracefully; did not gain exclusive access to commithooks.") } } c.mu.Unlock() @@ -591,33 +696,26 @@ func (c *Controller) waitForHooksToReplicate() error { wg.Wait() close(done) }() - var success bool select { case <-done: - success = true - case <-time.After(waitForHooksToReplicateTimeout): - success = false + case <-time.After(timeout): } c.mu.Lock() for _, ch := range commithooks { ch.setWaitNotify(nil) } - if !success { - // make certain we don't leak the wg.Wait goroutine in the failure case. - // at this point, none of the callbacks will ever be called again and - // ch.setWaitNotify grabs a lock and so establishes the happens before. - for _, b := range caughtup { - if !b { - wg.Done() - } + + // Make certain we don't leak the wg.Wait goroutine in the failure case. + // At this point, none of the callbacks will ever be called again and + // ch.setWaitNotify grabs a lock and so establishes the happens before. + for _, b := range caughtup { + if !b { + wg.Done() } - <-done - c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.") - return errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.") - } else { - c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.") - return nil } + <-done + + return caughtup, nil } // Within a cluster, if remotesapi is configured with a tls_ca, we take the diff --git a/go/libraries/doltcore/sqle/cluster/initdbhook.go b/go/libraries/doltcore/sqle/cluster/initdbhook.go index 7713c91540..ce8f453f5d 100644 --- a/go/libraries/doltcore/sqle/cluster/initdbhook.go +++ b/go/libraries/doltcore/sqle/cluster/initdbhook.go @@ -40,6 +40,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig dialprovider := controller.gRPCDialProvider(denv) var remoteDBs []func(context.Context) (*doltdb.DoltDB, error) + var remoteUrls []string for _, r := range controller.cfg.StandbyRemotes() { // TODO: url sanitize name remoteUrl := strings.Replace(r.RemoteURLTemplate(), dsess.URLTemplateDatabasePlaceholder, name, -1) @@ -55,6 +56,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig remoteDBs = append(remoteDBs, func(ctx context.Context) (*doltdb.DoltDB, error) { return r.GetRemoteDB(ctx, types.Format_Default, dialprovider) }) + remoteUrls = append(remoteUrls, remoteUrl) } role, _ := controller.roleAndEpoch() @@ -63,7 +65,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig if err != nil { return err } - commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir) + commitHook := newCommitHook(controller.lgr, r.Name(), remoteUrls[i], name, role, remoteDBs[i], denv.DoltDB, ttfdir) denv.DoltDB.PrependCommitHook(ctx, commitHook) controller.registerCommitHook(commitHook) if err := commitHook.Run(bt); err != nil { diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index cf231d1ae3..638747ffd3 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -836,6 +836,106 @@ tests: error_match: failed to transition from primary to standby gracefully - exec: "create table vals (i int primary key)" - exec: "insert into vals values (0)" +- name: dolt_cluster_transition_to_standby + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'create database repo1' + - exec: "use repo1" + - exec: 'create table vals (i int primary key)' + - exec: 'insert into vals values (0),(1),(2),(3),(4)' + - query: "call dolt_cluster_transition_to_standby('2', '1')" + result: + columns: ["caught_up", "database", "remote", "remote_url"] + rows: [["1", "repo1", "standby", "http://localhost:3852/repo1"]] +- name: dolt_cluster_transition_to_standby too many standbys provided + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'create database repo1' + - exec: "use repo1" + - exec: 'create table vals (i int primary key)' + - exec: 'insert into vals values (0),(1),(2),(3),(4)' + - query: "call dolt_cluster_transition_to_standby('2', '2')" + error_match: failed to transition from primary to standby gracefully; could not ensure 2 replicas were caught up on all 1 databases. Only caught up 1 standbys fully. - name: create new database, primary replicates to standby, fails over, new primary replicates to standby, fails over, new primary has all writes multi_repos: - name: server1