Merge pull request #6204 from dolthub/aaron/cluster-assume-role-standby-changes

sqle: cluster: Add dolt_cluster_transition_to_standby stored procedure.
This commit is contained in:
Aaron Son
2023-06-22 14:03:13 -07:00
committed by GitHub
6 changed files with 311 additions and 43 deletions

View File

@@ -1,4 +1,4 @@
// Copyright 2022 Dolthub, Inc.
// Copyright 2022-2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@ package cluster
import (
"errors"
"fmt"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
@@ -39,12 +40,16 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD
if role == string(RoleDetectedBrokenConfig) {
return nil, errors.New("cannot set role to detected_broken_config; valid values are 'primary' and 'standby'")
}
changerole, err := controller.setRoleAndEpoch(role, epoch, true /* graceful */, int(ctx.Session.ID()))
saveConnID := int(ctx.Session.ID())
res, err := controller.setRoleAndEpoch(role, epoch, roleTransitionOptions{
graceful: true,
saveConnID: &saveConnID,
})
if err != nil {
// We did not transition, no need to set our session to read-only, etc.
return nil, err
}
if changerole {
if res.changedRole {
// We transitioned, make sure we do not run anymore queries on this session.
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerTransitionedRolesErr)
@@ -53,3 +58,64 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD
},
}
}
func newTransitionToStandbyProcedure(controller *Controller) sql.ExternalStoredProcedureDetails {
return sql.ExternalStoredProcedureDetails{
Name: "dolt_cluster_transition_to_standby",
Schema: sql.Schema{
&sql.Column{
Name: "caught_up",
Type: types.Int8,
Nullable: false,
},
&sql.Column{
Name: "database",
Type: types.LongText,
Nullable: false,
},
&sql.Column{
Name: "remote",
Type: types.LongText,
Nullable: false,
},
&sql.Column{
Name: "remote_url",
Type: types.LongText,
Nullable: false,
},
},
Function: func(ctx *sql.Context, epoch, minCaughtUpStandbys int) (sql.RowIter, error) {
saveConnID := int(ctx.Session.ID())
res, err := controller.setRoleAndEpoch("standby", epoch, roleTransitionOptions{
graceful: true,
minCaughtUpStandbys: minCaughtUpStandbys,
saveConnID: &saveConnID,
})
if err != nil {
// We did not transition, no need to set our session to read-only, etc.
return nil, err
}
if res.changedRole {
// We transitioned, make sure we do not run anymore queries on this session.
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerTransitionedRolesErr)
rows := make([]sql.Row, len(res.gracefulTransitionResults))
for i, r := range res.gracefulTransitionResults {
var caughtUp int8
if r.caughtUp {
caughtUp = 1
}
rows[i] = sql.Row{
caughtUp,
r.database,
r.remote,
r.remoteUrl,
}
}
return sql.RowsToRowIter(rows...), nil
} else {
return nil, fmt.Errorf("failed to transition server to standby; it is already standby.")
}
},
}
}

View File

@@ -38,6 +38,7 @@ type commithook struct {
rootLgr *logrus.Entry
lgr atomic.Value // *logrus.Entry
remotename string
remoteurl string
dbname string
mu sync.Mutex
wg sync.WaitGroup
@@ -87,11 +88,12 @@ var errDestDBRootHashMoved error = errors.New("cluster/commithook: standby repli
const logFieldThread = "thread"
const logFieldRole = "role"
func newCommitHook(lgr *logrus.Logger, remotename, dbname string, role Role, destDBF func(context.Context) (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook {
func newCommitHook(lgr *logrus.Logger, remotename, remoteurl, dbname string, role Role, destDBF func(context.Context) (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook {
var ret commithook
ret.rootLgr = lgr.WithField(logFieldThread, "Standby Replication - "+dbname+" to "+remotename)
ret.lgr.Store(ret.rootLgr.WithField(logFieldRole, string(role)))
ret.remotename = remotename
ret.remoteurl = remoteurl
ret.dbname = dbname
ret.role = role
ret.destDBF = destDBF

View File

@@ -35,7 +35,7 @@ func TestCommitHookStartsNotCaughtUp(t *testing.T) {
destEnv.DoltDB.Close()
})
hook := newCommitHook(logrus.StandardLogger(), "origin", "mydb", RolePrimary, func(context.Context) (*doltdb.DoltDB, error) {
hook := newCommitHook(logrus.StandardLogger(), "origin", "https://localhost:50051/mydb", "mydb", RolePrimary, func(context.Context) (*doltdb.DoltDB, error) {
return destEnv.DoltDB, nil
}, srcEnv.DoltDB, t.TempDir())

View File

@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net/http"
"net/url"
"os"
"strconv"
"strings"
@@ -118,7 +119,9 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig)
lgr: lgr,
}
roleSetter := func(role string, epoch int) {
ret.setRoleAndEpoch(role, epoch, false /* graceful */, -1 /* saveConnID */)
ret.setRoleAndEpoch(role, epoch, roleTransitionOptions{
graceful: false,
})
}
ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{})
ret.sinterceptor.setRole(role, epoch)
@@ -239,7 +242,7 @@ func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.
if !ok {
return nil, fmt.Errorf("sqle: cluster: standby replication: destination remote %s does not exist on database %s", r.Name(), name)
}
commitHook := newCommitHook(c.lgr, r.Name(), name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) {
commitHook := newCommitHook(c.lgr, r.Name(), remote.Url, name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) {
return remote.GetRemoteDB(ctx, types.Format_Default, dialprovider)
}, denv.DoltDB, ttfdir)
denv.DoltDB.PrependCommitHook(ctx, commitHook)
@@ -260,6 +263,7 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) {
return
}
store.Register(newAssumeRoleProcedure(c))
store.Register(newTransitionToStandbyProcedure(c))
}
func (c *Controller) ClusterDatabase() sql.Database {
@@ -352,37 +356,69 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea
return Role(persistentRole), epochi, nil
}
func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, saveConnID int) (bool, error) {
type roleTransitionOptions struct {
// If true, all standby replicas must be caught up in order to
// transition from primary to standby.
graceful bool
// If non-zero and |graceful| is true, will allow a transition from
// primary to standby to succeed only if this many standby replicas
// are known to be caught up at the finalization of the replication
// hooks.
minCaughtUpStandbys int
// If non-nil, this connection will be saved if and when the connection
// process needs to terminate existing connections.
saveConnID *int
}
type roleTransitionResult struct {
// true if the role changed as a result of this call.
changedRole bool
// filled in with graceful transition results if this was a graceful
// transition and it was successful.
gracefulTransitionResults []graceTransitionResult
}
func (c *Controller) setRoleAndEpoch(role string, epoch int, opts roleTransitionOptions) (roleTransitionResult, error) {
graceful := opts.graceful
saveConnID := -1
if opts.saveConnID != nil {
saveConnID = *opts.saveConnID
}
c.mu.Lock()
defer c.mu.Unlock()
if epoch == c.epoch && role == string(c.role) {
return false, nil
return roleTransitionResult{false, nil}, nil
}
if role != string(RolePrimary) && role != string(RoleStandby) && role != string(RoleDetectedBrokenConfig) {
return false, fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role)
return roleTransitionResult{false, nil}, fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role)
}
if epoch < c.epoch {
return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch)
return roleTransitionResult{false, nil}, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch)
}
if epoch == c.epoch {
// This is allowed for non-graceful transitions to 'standby', which only occur from interceptors and
// other signals that the cluster is misconfigured.
isallowed := !graceful && (role == string(RoleStandby) || role == string(RoleDetectedBrokenConfig))
if !isallowed {
return false, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role)
return roleTransitionResult{false, nil}, fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role)
}
}
changedrole := role != string(c.role)
var gracefulResults []graceTransitionResult
if changedrole {
var err error
if role == string(RoleStandby) {
if graceful {
beforeRole, beforeEpoch := c.role, c.epoch
err = c.gracefulTransitionToStandby(saveConnID)
gracefulResults, err = c.gracefulTransitionToStandby(saveConnID, opts.minCaughtUpStandbys)
if err == nil && (beforeRole != c.role || beforeEpoch != c.epoch) {
// The role or epoch moved out from under us while we were unlocked and transitioning to standby.
err = fmt.Errorf("error assuming role '%s' at epoch %d: the role configuration changed while we were replicating to our standbys. Please try again", role, epoch)
@@ -390,7 +426,7 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, save
if err != nil {
c.setProviderIsStandby(c.role != RolePrimary)
c.killRunningQueries(saveConnID)
return false, err
return roleTransitionResult{false, nil}, err
}
} else {
c.immediateTransitionToStandby()
@@ -413,7 +449,11 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, save
h.setRole(c.role)
}
}
return changedrole, c.persistVariables()
_ = c.persistVariables()
return roleTransitionResult{
changedRole: changedrole,
gracefulTransitionResults: gracefulResults,
}, nil
}
func (c *Controller) roleAndEpoch() (Role, int) {
@@ -481,6 +521,16 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server
return args
}
// TODO: make the deadline here configurable or something.
const waitForHooksToReplicateTimeout = 10 * time.Second
type graceTransitionResult struct {
caughtUp bool
database string
remote string
remoteUrl string
}
// The order of operations is:
// * Set all databases in database_provider to read-only.
// * Kill all running queries in GMS.
@@ -494,15 +544,70 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server
// after returning the results of dolt_assume_cluster_role().
//
// called with c.mu held
func (c *Controller) gracefulTransitionToStandby(saveConnID int) error {
func (c *Controller) gracefulTransitionToStandby(saveConnID, minCaughtUpStandbys int) ([]graceTransitionResult, error) {
c.setProviderIsStandby(true)
c.killRunningQueries(saveConnID)
// waitForHooksToReplicate will release the lock while it
// blocks, but will return with the lock held.
if err := c.waitForHooksToReplicate(); err != nil {
return err
states, err := c.waitForHooksToReplicate(waitForHooksToReplicateTimeout)
if err != nil {
return nil, err
}
return nil
if len(states) != len(c.commithooks) {
c.lgr.Warnf("cluster/controller: failed to transition to standby; the set of replicated databases changed during the transition.")
return nil, errors.New("cluster/controller: failed to transition to standby; the set of replicated databases changed during the transition.")
}
res := make([]graceTransitionResult, len(states))
for i := range states {
hook := c.commithooks[i]
res[i] = graceTransitionResult{
caughtUp: states[i],
database: hook.dbname,
remote: hook.remotename,
remoteUrl: hook.remoteurl,
}
}
if minCaughtUpStandbys == 0 {
for _, caughtUp := range states {
if !caughtUp {
c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.")
return nil, errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.")
}
}
c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.")
} else {
databases := make(map[string]struct{})
replicas := make(map[string]int)
for _, r := range res {
databases[r.database] = struct{}{}
url, err := url.Parse(r.remoteUrl)
if err != nil {
return nil, fmt.Errorf("cluster/controller: could not parse remote_url (%s) for remote %s on database %s: %w", r.remoteUrl, r.remote, r.database, err)
}
if _, ok := replicas[url.Host]; !ok {
replicas[url.Host] = 0
}
if r.caughtUp {
replicas[url.Host] = replicas[url.Host] + 1
}
}
numCaughtUp := 0
for _, v := range replicas {
if v == len(databases) {
numCaughtUp += 1
}
}
if numCaughtUp < minCaughtUpStandbys {
return nil, fmt.Errorf("cluster/controller: failed to transition from primary to standby gracefully; could not ensure %d replicas were caught up on all %d databases. Only caught up %d standbys fully.", minCaughtUpStandbys, len(databases), numCaughtUp)
}
c.lgr.Tracef("cluster/controller: successfully replicated all databases to %d out of %d standbys; transitioning to standby.", numCaughtUp, len(replicas))
}
return res, nil
}
// The order of operations is:
@@ -553,15 +658,15 @@ func (c *Controller) setProviderIsStandby(standby bool) {
}
}
const waitForHooksToReplicateTimeout = 10 * time.Second
// Called during a graceful transition from primary to standby. Waits until all
// commithooks report nextHead == lastPushedHead.
//
// TODO: make the deadline here configurable or something.
// Returns `[]bool` with an entry for each `commithook` which existed at the
// start of the call. The entry will be `true` if that `commithook` was caught
// up as part of this wait, and `false` otherwise.
//
// called with c.mu held
func (c *Controller) waitForHooksToReplicate() error {
func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, error) {
commithooks := make([]*commithook, len(c.commithooks))
copy(commithooks, c.commithooks)
caughtup := make([]bool, len(commithooks))
@@ -582,7 +687,7 @@ func (c *Controller) waitForHooksToReplicate() error {
commithooks[j].setWaitNotify(nil)
}
c.lgr.Warnf("cluster/controller: failed to wait for graceful transition to standby; there were concurrent attempts to transition..")
return errors.New("cluster/controller: failed to transition from primary to standby gracefully; did not gain exclusive access to commithooks.")
return nil, errors.New("cluster/controller: failed to transition from primary to standby gracefully; did not gain exclusive access to commithooks.")
}
}
c.mu.Unlock()
@@ -591,33 +696,26 @@ func (c *Controller) waitForHooksToReplicate() error {
wg.Wait()
close(done)
}()
var success bool
select {
case <-done:
success = true
case <-time.After(waitForHooksToReplicateTimeout):
success = false
case <-time.After(timeout):
}
c.mu.Lock()
for _, ch := range commithooks {
ch.setWaitNotify(nil)
}
if !success {
// make certain we don't leak the wg.Wait goroutine in the failure case.
// at this point, none of the callbacks will ever be called again and
// ch.setWaitNotify grabs a lock and so establishes the happens before.
for _, b := range caughtup {
if !b {
wg.Done()
}
// Make certain we don't leak the wg.Wait goroutine in the failure case.
// At this point, none of the callbacks will ever be called again and
// ch.setWaitNotify grabs a lock and so establishes the happens before.
for _, b := range caughtup {
if !b {
wg.Done()
}
<-done
c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.")
return errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.")
} else {
c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.")
return nil
}
<-done
return caughtup, nil
}
// Within a cluster, if remotesapi is configured with a tls_ca, we take the

View File

@@ -40,6 +40,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
dialprovider := controller.gRPCDialProvider(denv)
var remoteDBs []func(context.Context) (*doltdb.DoltDB, error)
var remoteUrls []string
for _, r := range controller.cfg.StandbyRemotes() {
// TODO: url sanitize name
remoteUrl := strings.Replace(r.RemoteURLTemplate(), dsess.URLTemplateDatabasePlaceholder, name, -1)
@@ -55,6 +56,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
remoteDBs = append(remoteDBs, func(ctx context.Context) (*doltdb.DoltDB, error) {
return r.GetRemoteDB(ctx, types.Format_Default, dialprovider)
})
remoteUrls = append(remoteUrls, remoteUrl)
}
role, _ := controller.roleAndEpoch()
@@ -63,7 +65,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
if err != nil {
return err
}
commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir)
commitHook := newCommitHook(controller.lgr, r.Name(), remoteUrls[i], name, role, remoteDBs[i], denv.DoltDB, ttfdir)
denv.DoltDB.PrependCommitHook(ctx, commitHook)
controller.registerCommitHook(commitHook)
if err := commitHook.Run(bt); err != nil {

View File

@@ -836,6 +836,106 @@ tests:
error_match: failed to transition from primary to standby gracefully
- exec: "create table vals (i int primary key)"
- exec: "insert into vals values (0)"
- name: dolt_cluster_transition_to_standby
multi_repos:
- name: server1
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3309
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3852/{database}
bootstrap_role: primary
bootstrap_epoch: 1
remotesapi:
port: 3851
server:
args: ["--config", "server.yaml"]
port: 3309
- name: server2
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3310
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3851/{database}
bootstrap_role: standby
bootstrap_epoch: 1
remotesapi:
port: 3852
server:
args: ["--config", "server.yaml"]
port: 3310
connections:
- on: server1
queries:
- exec: 'create database repo1'
- exec: "use repo1"
- exec: 'create table vals (i int primary key)'
- exec: 'insert into vals values (0),(1),(2),(3),(4)'
- query: "call dolt_cluster_transition_to_standby('2', '1')"
result:
columns: ["caught_up", "database", "remote", "remote_url"]
rows: [["1", "repo1", "standby", "http://localhost:3852/repo1"]]
- name: dolt_cluster_transition_to_standby too many standbys provided
multi_repos:
- name: server1
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3309
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3852/{database}
bootstrap_role: primary
bootstrap_epoch: 1
remotesapi:
port: 3851
server:
args: ["--config", "server.yaml"]
port: 3309
- name: server2
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3310
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3851/{database}
bootstrap_role: standby
bootstrap_epoch: 1
remotesapi:
port: 3852
server:
args: ["--config", "server.yaml"]
port: 3310
connections:
- on: server1
queries:
- exec: 'create database repo1'
- exec: "use repo1"
- exec: 'create table vals (i int primary key)'
- exec: 'insert into vals values (0),(1),(2),(3),(4)'
- query: "call dolt_cluster_transition_to_standby('2', '2')"
error_match: failed to transition from primary to standby gracefully; could not ensure 2 replicas were caught up on all 1 databases. Only caught up 1 standbys fully.
- name: create new database, primary replicates to standby, fails over, new primary replicates to standby, fails over, new primary has all writes
multi_repos:
- name: server1