go: doltdb,sqle/cluster: Some PR feedback. Small cleanups.

This commit is contained in:
Aaron Son
2023-05-16 15:42:14 -07:00
parent 2b159425f5
commit 8bbb49b79e
2 changed files with 14 additions and 8 deletions

View File

@@ -42,6 +42,13 @@ type CommitHook interface {
ExecuteForWorkingSets() bool
}
// If a commit hook supports this interface, it can be notified if waiting for
// replication in the callback returned by |Execute| failed to complete in time
// or returned an error.
type NotifyWaitFailedCommitHook interface {
NotifyWaitFailed()
}
func (db hooksDatabase) SetCommitHooks(ctx context.Context, postHooks []CommitHook) hooksDatabase {
db.postCommitHooks = make([]CommitHook, len(postHooks))
copy(db.postCommitHooks, postHooks)
@@ -88,9 +95,7 @@ func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset
}
if rsc != nil {
rsc.Wait[i+ioff] = f
if nf, ok := hook.(interface {
NotifyWaitFailed()
}); ok {
if nf, ok := hook.(NotifyWaitFailedCommitHook); ok {
rsc.NotifyWaitFailed[i+ioff] = nf.NotifyWaitFailed
} else {
rsc.NotifyWaitFailed[i+ioff] = func() {}

View File

@@ -32,6 +32,7 @@ import (
)
var _ doltdb.CommitHook = (*commithook)(nil)
var _ doltdb.NotifyWaitFailedCommitHook = (*commithook)(nil)
type commithook struct {
rootLgr *logrus.Entry
@@ -56,7 +57,7 @@ type commithook struct {
// This is a slice of notification channels maintained by the
// commithook. The semantics are:
// 1. All accesses to |successChs| must happen with |mu| held.
// 2. There maybe be |0| or more channels in the slice.
// 2. There may be |0| or more channels in the slice.
// 3. As a reader, if |successChs| is non-empty, you should just read a value, for example, |successChs[0]| and use it. All entries will be closed at the same time. If |successChs| is empty when you need a channel, you should add one to it.
// 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then.
successChs []chan struct{}
@@ -65,7 +66,7 @@ type commithook struct {
// we are not already caught up, instead of blocking on a successCh
// actually indicated we are caught up. This is set to by a call to
// NotifyWaitFailed(), an optional interface on CommitHook.
circuitBreakerOpen bool
fastFailReplicationWait bool
role Role
@@ -162,7 +163,7 @@ func (h *commithook) replicate(ctx context.Context) {
close(ch)
}
h.successChs = nil
h.circuitBreakerOpen = false
h.fastFailReplicationWait = false
}
h.cond.Wait()
lgr.Tracef("cluster/commithook: background thread: woken up.")
@@ -413,7 +414,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
}
var waitF func(context.Context) error
if !h.isCaughtUp() {
if h.circuitBreakerOpen {
if h.fastFailReplicationWait {
waitF = func(ctx context.Context) error {
return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname)
}
@@ -438,7 +439,7 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
func (h *commithook) NotifyWaitFailed() {
h.mu.Lock()
defer h.mu.Unlock()
h.circuitBreakerOpen = true
h.fastFailReplicationWait = true
}
func (h *commithook) HandleError(ctx context.Context, err error) error {