Merge pull request #6831 from dolthub/aaron/cluster-nondolt-replication-graceful-standby

go: sqle: cluster: When performing a graceful transition to standby, take mysql and dolt_branch_control replication state into account.
This commit is contained in:
Aaron Son
2023-10-17 16:50:32 -07:00
committed by GitHub
4 changed files with 108 additions and 55 deletions
@@ -148,6 +148,9 @@ func (r *branchControlReplica) Run() {
}
func (r *branchControlReplica) wait() {
if r.waitNotify != nil {
r.waitNotify()
}
if r.isCaughtUp() {
attempt := r.progressNotifier.BeginAttempt()
r.progressNotifier.RecordSuccess(attempt)
@@ -248,11 +251,16 @@ func (p *branchControlReplication) UpdateBranchControlContents(ctx context.Conte
}
}
func (p *branchControlReplication) waitForReplication(timeout time.Duration) bool {
func (p *branchControlReplication) waitForReplication(timeout time.Duration) ([]graceTransitionResult, error) {
p.mu.Lock()
replicas := make([]*branchControlReplica, len(p.replicas))
copy(replicas, p.replicas)
caughtup := make([]bool, len(replicas))
res := make([]graceTransitionResult, len(replicas))
for i := range res {
res[i].database = "dolt_branch_control"
res[i].remote = replicas[i].client.remote
res[i].remoteUrl = replicas[i].client.httpUrl()
}
var wg sync.WaitGroup
wg.Add(len(replicas))
for li, lr := range replicas {
@@ -260,9 +268,9 @@ func (p *branchControlReplication) waitForReplication(timeout time.Duration) boo
r := lr
ok := r.setWaitNotify(func() {
// called with r.mu locked.
if !caughtup[i] {
if !res[i].caughtUp {
if r.isCaughtUp() {
caughtup[i] = true
res[i].caughtUp = true
wg.Done()
} else {
}
@@ -272,7 +280,7 @@ func (p *branchControlReplication) waitForReplication(timeout time.Duration) boo
for j := li - 1; j >= 0; j-- {
replicas[j].setWaitNotify(nil)
}
return false
return nil, errors.New("cluster: dolt_branch_control replication: could not wait for replication. Concurrent waiters conflicted with each other.")
}
}
p.mu.Unlock()
@@ -296,14 +304,12 @@ func (p *branchControlReplication) waitForReplication(timeout time.Duration) boo
// Make certain we don't leak the wg.Wait goroutine in the failure case.
// At this point, none of the callbacks will ever be called again and
// ch.setWaitNotify grabs a lock and so establishes the happens before.
all := true
for _, b := range caughtup {
if !b {
for _, b := range res {
if !b.caughtUp {
wg.Done()
all = false
}
}
<-done
return all
return res, nil
}
@@ -786,34 +786,55 @@ func (c *Controller) gracefulTransitionToStandby(saveConnID, minCaughtUpStandbys
c.setProviderIsStandby(true)
c.killRunningQueries(saveConnID)
// waitForHooksToReplicate will release the lock while it
// blocks, but will return with the lock held.
states, err := c.waitForHooksToReplicate(waitForHooksToReplicateTimeout)
if err != nil {
return nil, err
var hookStates, mysqlStates, bcStates []graceTransitionResult
var hookErr, mysqlErr, bcErr error
// We concurrently wait for hooks, mysql and dolt_branch_control replication to true up.
// If we encounter any errors while doing this, we fail the graceful transition.
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
// waitForHooksToReplicate will release the lock while it
// blocks, but will return with the lock held.
hookStates, hookErr = c.waitForHooksToReplicate(waitForHooksToReplicateTimeout)
}()
go func() {
defer wg.Done()
mysqlStates, mysqlErr = c.mysqlDbPersister.waitForReplication(waitForHooksToReplicateTimeout)
}()
go func() {
defer wg.Done()
bcStates, bcErr = c.bcReplication.waitForReplication(waitForHooksToReplicateTimeout)
}()
wg.Wait()
if hookErr != nil {
return nil, hookErr
}
if mysqlErr != nil {
return nil, mysqlErr
}
if bcErr != nil {
return nil, bcErr
}
if len(states) != len(c.commithooks) {
if len(hookStates) != len(c.commithooks) {
c.lgr.Warnf("cluster/controller: failed to transition to standby; the set of replicated databases changed during the transition.")
return nil, errors.New("cluster/controller: failed to transition to standby; the set of replicated databases changed during the transition.")
}
res := make([]graceTransitionResult, len(states))
for i := range states {
hook := c.commithooks[i]
res[i] = graceTransitionResult{
caughtUp: states[i],
database: hook.dbname,
remote: hook.remotename,
remoteUrl: hook.remoteurl,
}
}
res := make([]graceTransitionResult, 0, len(hookStates)+len(mysqlStates)+len(bcStates))
res = append(res, hookStates...)
res = append(res, mysqlStates...)
res = append(res, bcStates...)
if minCaughtUpStandbys == 0 {
for _, caughtUp := range states {
if !caughtUp {
for _, state := range res {
if !state.caughtUp {
c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.")
return nil, errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.")
return nil, fmt.Errorf("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.")
}
}
c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.")
@@ -845,16 +866,18 @@ func (c *Controller) gracefulTransitionToStandby(saveConnID, minCaughtUpStandbys
c.lgr.Tracef("cluster/controller: successfully replicated all databases to %d out of %d standbys; transitioning to standby.", numCaughtUp, len(replicas))
}
if !c.mysqlDbPersister.waitForReplication(waitForHooksToReplicateTimeout) {
c.lgr.Warnf("cluster/controller: when transitioning to standby, did not successfully replicate users and grants to all standbys.")
}
if !c.bcReplication.waitForReplication(waitForHooksToReplicateTimeout) {
c.lgr.Warnf("cluster/controller: when transitioning to standby, did not successfully replicate branch control data to all standbys.")
}
return res, nil
}
func allCaughtUp(res []graceTransitionResult) bool {
for _, r := range res {
if !r.caughtUp {
return false
}
}
return true
}
// The order of operations is:
// * Set all databases in database_provider to read-only.
// * Kill all running queries in GMS.
@@ -911,10 +934,15 @@ func (c *Controller) setProviderIsStandby(standby bool) {
// up as part of this wait, and `false` otherwise.
//
// called with c.mu held
func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, error) {
func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]graceTransitionResult, error) {
commithooks := make([]*commithook, len(c.commithooks))
copy(commithooks, c.commithooks)
caughtup := make([]bool, len(commithooks))
res := make([]graceTransitionResult, len(commithooks))
for i := range res {
res[i].database = commithooks[i].dbname
res[i].remote = commithooks[i].remotename
res[i].remoteUrl = commithooks[i].remoteurl
}
var wg sync.WaitGroup
wg.Add(len(commithooks))
for li, lch := range commithooks {
@@ -922,8 +950,8 @@ func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, err
ch := lch
ok := ch.setWaitNotify(func() {
// called with ch.mu locked.
if !caughtup[i] && ch.isCaughtUp() {
caughtup[i] = true
if !res[i].caughtUp && ch.isCaughtUp() {
res[i].caughtUp = true
wg.Done()
}
})
@@ -953,14 +981,14 @@ func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, err
// Make certain we don't leak the wg.Wait goroutine in the failure case.
// At this point, none of the callbacks will ever be called again and
// ch.setWaitNotify grabs a lock and so establishes the happens before.
for _, b := range caughtup {
if !b {
for _, b := range res {
if !b.caughtUp {
wg.Done()
}
}
<-done
return caughtup, nil
return res, nil
}
// Within a cluster, if remotesapi is configured with a tls_ca, we take the
@@ -1092,6 +1120,7 @@ func (c *Controller) standbyRemotesJWKS() *jwtauth.MultiJWKS {
type replicationServiceClient struct {
remote string
url string
tls bool
client replicationapi.ReplicationServiceClient
}
@@ -1128,8 +1157,20 @@ func (c *Controller) replicationServiceClients(ctx context.Context) ([]*replicat
ret = append(ret, &replicationServiceClient{
remote: r.Name(),
url: grpcTarget,
tls: c.tlsCfg != nil,
client: client,
})
}
return ret, nil
}
// Generally r.url is a gRPC dial endpoint and will be something like "dns:53.78.2.1:3832", or something like that.
//
// We want to match these endpoints up with Dolt remotes URLs, which will typically be something like http://53.78.2.1:3832.
func (r *replicationServiceClient) httpUrl() string {
prefix := "https://"
if !r.tls {
prefix = "http://"
}
return prefix + strings.TrimPrefix(r.url, "dns:")
}
@@ -282,11 +282,16 @@ func (p *replicatingMySQLDbPersister) LoadData(ctx context.Context) ([]byte, err
return ret, err
}
func (p *replicatingMySQLDbPersister) waitForReplication(timeout time.Duration) bool {
func (p *replicatingMySQLDbPersister) waitForReplication(timeout time.Duration) ([]graceTransitionResult, error) {
p.mu.Lock()
replicas := make([]*mysqlDbReplica, len(p.replicas))
copy(replicas, p.replicas)
caughtup := make([]bool, len(replicas))
res := make([]graceTransitionResult, len(replicas))
for i := range replicas {
res[i].database = "mysql"
res[i].remote = replicas[i].client.remote
res[i].remoteUrl = replicas[i].client.httpUrl()
}
var wg sync.WaitGroup
wg.Add(len(replicas))
for li, lr := range replicas {
@@ -294,9 +299,9 @@ func (p *replicatingMySQLDbPersister) waitForReplication(timeout time.Duration)
r := lr
ok := r.setWaitNotify(func() {
// called with r.mu locked.
if !caughtup[i] {
if !res[i].caughtUp {
if r.isCaughtUp() {
caughtup[i] = true
res[i].caughtUp = true
wg.Done()
} else {
}
@@ -306,7 +311,7 @@ func (p *replicatingMySQLDbPersister) waitForReplication(timeout time.Duration)
for j := li - 1; j >= 0; j-- {
replicas[j].setWaitNotify(nil)
}
return false
return nil, errors.New("cluster: mysqldb replication: could not wait for replication. Concurrent waiters conflicted with each other.")
}
}
p.mu.Unlock()
@@ -330,14 +335,12 @@ func (p *replicatingMySQLDbPersister) waitForReplication(timeout time.Duration)
// Make certain we don't leak the wg.Wait goroutine in the failure case.
// At this point, none of the callbacks will ever be called again and
// ch.setWaitNotify grabs a lock and so establishes the happens before.
all := true
for _, b := range caughtup {
if !b {
for _, b := range res {
if !b.caughtUp {
wg.Done()
all = false
}
}
<-done
return all
return res, nil
}
@@ -886,7 +886,10 @@ tests:
- query: "call dolt_cluster_transition_to_standby('2', '1')"
result:
columns: ["caught_up", "database", "remote", "remote_url"]
rows: [["1", "repo1", "standby", "http://localhost:3852/repo1"]]
rows:
- ["1", "repo1", "standby", "http://localhost:3852/repo1"]
- ["1", "mysql", "standby", "http://localhost:3852"]
- ["1", "dolt_branch_control", "standby", "http://localhost:3852"]
- name: dolt_cluster_transition_to_standby too many standbys provided
multi_repos:
- name: server1
@@ -935,7 +938,7 @@ tests:
- exec: 'create table vals (i int primary key)'
- exec: 'insert into vals values (0),(1),(2),(3),(4)'
- query: "call dolt_cluster_transition_to_standby('2', '2')"
error_match: failed to transition from primary to standby gracefully; could not ensure 2 replicas were caught up on all 1 databases. Only caught up 1 standbys fully.
error_match: failed to transition from primary to standby gracefully; could not ensure 2 replicas were caught up on all 3 databases. Only caught up 1 standbys fully.
- name: create new database, primary replicates to standby, fails over, new primary replicates to standby, fails over, new primary has all writes
multi_repos:
- name: server1