mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-09 11:19:01 -05:00
go: sqle: cluster: Add a circuit breaker for dolt_cluster_ack_writes_timeout_secs. Once a replica fails the timeout, do not block on it going forward.
This commit is contained in:
@@ -1222,6 +1222,13 @@ type ReplicationStatusController struct {
|
||||
// associated with a commithook to complete. Must return if the
|
||||
// associated Context is canceled.
|
||||
Wait []func(ctx context.Context) error
|
||||
|
||||
// There is an entry here for each function in Wait. If a Wait fails,
|
||||
// you can notify the corresponding function in this slice. This might
|
||||
// control resiliency behaviors like adaptive retry and timeouts,
|
||||
// circuit breakers, etc. and might feed into exposed replication
|
||||
// metrics.
|
||||
NotifyWaitFailed []func()
|
||||
}
|
||||
|
||||
// UpdateWorkingSet updates the working set with the ref given to the root value given
|
||||
|
||||
@@ -81,6 +81,7 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset
|
||||
rsc := getReplicaState(ctx)
|
||||
if rsc != nil {
|
||||
rsc.Wait = make([]func(context.Context) error, len(db.postCommitHooks))
|
||||
rsc.NotifyWaitFailed = make([]func(), len(db.postCommitHooks))
|
||||
}
|
||||
for il, hook := range db.postCommitHooks {
|
||||
if !onlyWS || hook.ExecuteForWorkingSets() {
|
||||
@@ -95,6 +96,13 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset
|
||||
}
|
||||
if rsc != nil {
|
||||
rsc.Wait[i] = f
|
||||
if nf, ok := hook.(interface{
|
||||
NotifyWaitFailed()
|
||||
}); ok {
|
||||
rsc.NotifyWaitFailed[i] = nf.NotifyWaitFailed
|
||||
} else {
|
||||
rsc.NotifyWaitFailed[i] = func() {}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -105,10 +113,12 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset
|
||||
for i := range rsc.Wait {
|
||||
if rsc.Wait[i] != nil {
|
||||
rsc.Wait[j] = rsc.Wait[i]
|
||||
rsc.NotifyWaitFailed[j] = rsc.NotifyWaitFailed[i]
|
||||
j++
|
||||
}
|
||||
}
|
||||
rsc.Wait = rsc.Wait[:j]
|
||||
rsc.NotifyWaitFailed = rsc.NotifyWaitFailed[:j]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -61,6 +61,12 @@ type commithook struct {
|
||||
// 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then.
|
||||
successChs []chan struct{}
|
||||
|
||||
// If this is true, the waitF returned by Execute() will fast fail if
|
||||
// we are not already caught up, instead of blocking on a successCh
|
||||
// actually indicated we are caught up. This is set to by a call to
|
||||
// NotifyWaitFailed(), an optional interface on CommitHook.
|
||||
circuitBreakerOpen bool
|
||||
|
||||
role Role
|
||||
|
||||
// The standby replica to which the new root gets replicated.
|
||||
@@ -157,6 +163,7 @@ func (h *commithook) replicate(ctx context.Context) {
|
||||
}
|
||||
h.successChs = nil
|
||||
}
|
||||
h.circuitBreakerOpen = false
|
||||
h.cond.Wait()
|
||||
lgr.Tracef("cluster/commithook: background thread: woken up.")
|
||||
}
|
||||
@@ -406,22 +413,34 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
|
||||
}
|
||||
var waitF func(context.Context) error
|
||||
if !h.isCaughtUp() {
|
||||
if len(h.successChs) == 0 {
|
||||
h.successChs = append(h.successChs, make(chan struct{}))
|
||||
}
|
||||
successCh := h.successChs[0]
|
||||
waitF = func(ctx context.Context) error {
|
||||
select {
|
||||
case <-successCh:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
if h.circuitBreakerOpen {
|
||||
waitF = func(ctx context.Context) error {
|
||||
return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname)
|
||||
}
|
||||
} else {
|
||||
if len(h.successChs) == 0 {
|
||||
h.successChs = append(h.successChs, make(chan struct{}))
|
||||
}
|
||||
successCh := h.successChs[0]
|
||||
waitF = func(ctx context.Context) error {
|
||||
select {
|
||||
case <-successCh:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return waitF, nil
|
||||
}
|
||||
|
||||
func (h *commithook) NotifyWaitFailed() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.circuitBreakerOpen = true
|
||||
}
|
||||
|
||||
func (h *commithook) HandleError(ctx context.Context, err error) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -276,12 +276,14 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus
|
||||
close(done)
|
||||
}()
|
||||
|
||||
waitFailed := false
|
||||
select {
|
||||
case <-time.After(time.Duration(timeoutI) * time.Second):
|
||||
// We timed out before all the waiters were done.
|
||||
// First we make certain to finalize everything.
|
||||
cancel()
|
||||
<-done
|
||||
waitFailed = true
|
||||
case <-done:
|
||||
cancel()
|
||||
}
|
||||
@@ -290,9 +292,12 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus
|
||||
// returned nil errors. Any non-nil entries in rsc.Wait returned an
|
||||
// error. We turn those into warnings here.
|
||||
numFailed := 0
|
||||
for _, f := range rsc.Wait {
|
||||
for i, f := range rsc.Wait {
|
||||
if f != nil {
|
||||
numFailed += 1
|
||||
if waitFailed {
|
||||
rsc.NotifyWaitFailed[i]()
|
||||
}
|
||||
}
|
||||
}
|
||||
ctx.Session.Warn(&sql.Warning{
|
||||
|
||||
Reference in New Issue
Block a user