mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-21 19:39:04 -05:00
go: sqle: cluster: Have Controller.waitforHooksToReplicate return []graceTransitionResult.
This commit is contained in:
@@ -798,20 +798,11 @@ func (c *Controller) gracefulTransitionToStandby(saveConnID, minCaughtUpStandbys
|
||||
return nil, errors.New("cluster/controller: failed to transition to standby; the set of replicated databases changed during the transition.")
|
||||
}
|
||||
|
||||
res := make([]graceTransitionResult, len(states))
|
||||
for i := range states {
|
||||
hook := c.commithooks[i]
|
||||
res[i] = graceTransitionResult{
|
||||
caughtUp: states[i],
|
||||
database: hook.dbname,
|
||||
remote: hook.remotename,
|
||||
remoteUrl: hook.remoteurl,
|
||||
}
|
||||
}
|
||||
res := states
|
||||
|
||||
if minCaughtUpStandbys == 0 {
|
||||
for _, caughtUp := range states {
|
||||
if !caughtUp {
|
||||
for _, state := range states {
|
||||
if !state.caughtUp {
|
||||
c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.")
|
||||
return nil, errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.")
|
||||
}
|
||||
@@ -922,10 +913,15 @@ func (c *Controller) setProviderIsStandby(standby bool) {
|
||||
// up as part of this wait, and `false` otherwise.
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, error) {
|
||||
func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]graceTransitionResult, error) {
|
||||
commithooks := make([]*commithook, len(c.commithooks))
|
||||
copy(commithooks, c.commithooks)
|
||||
caughtup := make([]bool, len(commithooks))
|
||||
res := make([]graceTransitionResult, len(commithooks))
|
||||
for i := range res {
|
||||
res[i].database = commithooks[i].dbname
|
||||
res[i].remote = commithooks[i].remotename
|
||||
res[i].remoteUrl = commithooks[i].remoteurl
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(commithooks))
|
||||
for li, lch := range commithooks {
|
||||
@@ -933,8 +929,8 @@ func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, err
|
||||
ch := lch
|
||||
ok := ch.setWaitNotify(func() {
|
||||
// called with ch.mu locked.
|
||||
if !caughtup[i] && ch.isCaughtUp() {
|
||||
caughtup[i] = true
|
||||
if !res[i].caughtUp && ch.isCaughtUp() {
|
||||
res[i].caughtUp = true
|
||||
wg.Done()
|
||||
}
|
||||
})
|
||||
@@ -964,14 +960,14 @@ func (c *Controller) waitForHooksToReplicate(timeout time.Duration) ([]bool, err
|
||||
// Make certain we don't leak the wg.Wait goroutine in the failure case.
|
||||
// At this point, none of the callbacks will ever be called again and
|
||||
// ch.setWaitNotify grabs a lock and so establishes the happens before.
|
||||
for _, b := range caughtup {
|
||||
if !b {
|
||||
for _, b := range res {
|
||||
if !b.caughtUp {
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
<-done
|
||||
|
||||
return caughtup, nil
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// Within a cluster, if remotesapi is configured with a tls_ca, we take the
|
||||
|
||||
Reference in New Issue
Block a user