diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index a7d81b508d..a95be4e82d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1222,6 +1222,13 @@ type ReplicationStatusController struct { // associated with a commithook to complete. Must return if the // associated Context is canceled. Wait []func(ctx context.Context) error + + // There is an entry here for each function in Wait. If a Wait fails, + // you can notify the corresponding function in this slice. This might + // control resiliency behaviors like adaptive retry and timeouts, + // circuit breakers, etc. and might feed into exposed replication + // metrics. + NotifyWaitFailed []func() } // UpdateWorkingSet updates the working set with the ref given to the root value given diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 96bf911e3e..464bf48942 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -81,6 +81,7 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset rsc := getReplicaState(ctx) if rsc != nil { rsc.Wait = make([]func(context.Context) error, len(db.postCommitHooks)) + rsc.NotifyWaitFailed = make([]func(), len(db.postCommitHooks)) } for il, hook := range db.postCommitHooks { if !onlyWS || hook.ExecuteForWorkingSets() { @@ -95,6 +96,13 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset } if rsc != nil { rsc.Wait[i] = f + if nf, ok := hook.(interface{ + NotifyWaitFailed() + }); ok { + rsc.NotifyWaitFailed[i] = nf.NotifyWaitFailed + } else { + rsc.NotifyWaitFailed[i] = func() {} + } } }() } @@ -105,10 +113,12 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset for i := range rsc.Wait { if rsc.Wait[i] != nil { rsc.Wait[j] = rsc.Wait[i] + rsc.NotifyWaitFailed[j] = rsc.NotifyWaitFailed[i] j++ } } rsc.Wait = rsc.Wait[:j] + rsc.NotifyWaitFailed = rsc.NotifyWaitFailed[:j] } } diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index f49ce7123d..475a9fc180 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -61,6 +61,12 @@ type commithook struct { // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. successChs []chan struct{} + // If this is true, the waitF returned by Execute() will fast fail if + // we are not already caught up, instead of blocking on a successCh + // actually indicated we are caught up. This is set to by a call to + // NotifyWaitFailed(), an optional interface on CommitHook. + circuitBreakerOpen bool + role Role // The standby replica to which the new root gets replicated. @@ -157,6 +163,7 @@ func (h *commithook) replicate(ctx context.Context) { } h.successChs = nil } + h.circuitBreakerOpen = false h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") } @@ -406,22 +413,34 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat } var waitF func(context.Context) error if !h.isCaughtUp() { - if len(h.successChs) == 0 { - h.successChs = append(h.successChs, make(chan struct{})) - } - successCh := h.successChs[0] - waitF = func(ctx context.Context) error { - select { - case <-successCh: - return nil - case <-ctx.Done(): - return ctx.Err() + if h.circuitBreakerOpen { + waitF = func(ctx context.Context) error { + return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname) + } + } else { + if len(h.successChs) == 0 { + h.successChs = append(h.successChs, make(chan struct{})) + } + successCh := h.successChs[0] + waitF = func(ctx context.Context) error { + select { + case <-successCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } } } } return waitF, nil } +func (h *commithook) NotifyWaitFailed() { + h.mu.Lock() + defer h.mu.Unlock() + h.circuitBreakerOpen = true +} + func (h *commithook) HandleError(ctx context.Context, err error) error { return nil } diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index 077c6c0fe2..e259ed29eb 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -276,12 +276,14 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus close(done) }() + waitFailed := false select { case <-time.After(time.Duration(timeoutI) * time.Second): // We timed out before all the waiters were done. // First we make certain to finalize everything. cancel() <-done + waitFailed = true case <-done: cancel() } @@ -290,9 +292,12 @@ func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatus // returned nil errors. Any non-nil entries in rsc.Wait returned an // error. We turn those into warnings here. numFailed := 0 - for _, f := range rsc.Wait { + for i, f := range rsc.Wait { if f != nil { numFailed += 1 + if waitFailed { + rsc.NotifyWaitFailed[i]() + } } } ctx.Session.Warn(&sql.Warning{