diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 9565485ac9..8b14765ca1 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -42,6 +42,13 @@ type CommitHook interface { ExecuteForWorkingSets() bool } +// If a commit hook supports this interface, it can be notified if waiting for +// replication in the callback returned by |Execute| failed to complete in time +// or returned an error. +type NotifyWaitFailedCommitHook interface { + NotifyWaitFailed() +} + func (db hooksDatabase) SetCommitHooks(ctx context.Context, postHooks []CommitHook) hooksDatabase { db.postCommitHooks = make([]CommitHook, len(postHooks)) copy(db.postCommitHooks, postHooks) @@ -88,9 +95,7 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset } if rsc != nil { rsc.Wait[i+ioff] = f - if nf, ok := hook.(interface { - NotifyWaitFailed() - }); ok { + if nf, ok := hook.(NotifyWaitFailedCommitHook); ok { rsc.NotifyWaitFailed[i+ioff] = nf.NotifyWaitFailed } else { rsc.NotifyWaitFailed[i+ioff] = func() {} diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 1889257f4e..2e3d822ed2 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -32,6 +32,7 @@ import ( ) var _ doltdb.CommitHook = (*commithook)(nil) +var _ doltdb.NotifyWaitFailedCommitHook = (*commithook)(nil) type commithook struct { rootLgr *logrus.Entry @@ -56,7 +57,7 @@ type commithook struct { // This is a slice of notification channels maintained by the // commithook. The semantics are: // 1. All accesses to |successChs| must happen with |mu| held. - // 2. There maybe be |0| or more channels in the slice. + // 2. There may be |0| or more channels in the slice. // 3. As a reader, if |successChs| is non-empty, you should just read a value, for example, |successChs[0]| and use it. All entries will be closed at the same time. If |successChs| is empty when you need a channel, you should add one to it. // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. successChs []chan struct{} @@ -65,7 +66,7 @@ type commithook struct { // we are not already caught up, instead of blocking on a successCh // actually indicated we are caught up. This is set to by a call to // NotifyWaitFailed(), an optional interface on CommitHook. - circuitBreakerOpen bool + fastFailReplicationWait bool role Role @@ -162,7 +163,7 @@ func (h *commithook) replicate(ctx context.Context) { close(ch) } h.successChs = nil - h.circuitBreakerOpen = false + h.fastFailReplicationWait = false } h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") @@ -413,7 +414,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat } var waitF func(context.Context) error if !h.isCaughtUp() { - if h.circuitBreakerOpen { + if h.fastFailReplicationWait { waitF = func(ctx context.Context) error { return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname) } @@ -438,7 +439,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat func (h *commithook) NotifyWaitFailed() { h.mu.Lock() defer h.mu.Unlock() - h.circuitBreakerOpen = true + h.fastFailReplicationWait = true } func (h *commithook) HandleError(ctx context.Context, err error) error {