From 261b10c92bbd7262f4c8daf38d995d4dbb7bfde6 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 12 May 2023 16:40:42 -0700 Subject: [PATCH] go: sqle: transactions,cluster: Move block-for-cluster-replication behavior to Dolt transaction logic and out of the commit hook. --- .../doltcore/sqle/cluster/commithook.go | 67 +++++-------------- .../doltcore/sqle/cluster/controller.go | 18 ----- .../doltcore/sqle/cluster/initdbhook.go | 4 -- .../doltcore/sqle/dsess/transactions.go | 48 ++++++++++++- 4 files changed, 61 insertions(+), 76 deletions(-) diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 0ed653d5dc..f49ce7123d 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -61,8 +61,6 @@ type commithook struct { // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. successChs []chan struct{} - execTimeout time.Duration - role Role // The standby replica to which the new root gets replicated. @@ -379,47 +377,6 @@ func (h *commithook) setWaitNotify(f func()) bool { return true } -type replicationResult int - -const replicationResultTimeout = 0 -const replicationResultContextCanceled = 1 -const replicationResultSuccess = 2 - -// Blocks the current goroutine until: -// 1. There is no replication necessary, i.e., isCaughtUp() == true. This returns replicationResultSuccess. -// 2. The replication of |nextHead|, or a later head, at the time this method was called succeeds. This returns replicationResultSuccess. -// 3. ctx.Done() closes. This returns replicationResultContextCanceled. -// 4. timeout passes. This returns replicationResultSuccess. -func (h *commithook) waitForReplicationSuccess(ctx context.Context, timeout time.Duration) replicationResult { - h.mu.Lock() - if h.isCaughtUp() { - h.mu.Unlock() - return replicationResultSuccess - } - if len(h.successChs) == 0 { - h.successChs = append(h.successChs, make(chan struct{})) - } - ch := h.successChs[0] - h.mu.Unlock() - select { - case <-ch: - return replicationResultSuccess - case <-ctx.Done(): - return replicationResultContextCanceled - case <-time.After(timeout): - return replicationResultTimeout - } -} - -// Set by the controller. If it is non-zero, the Execute() DatabaseHook -// callback will block the calling goroutine for that many seconds waiting for -// replication quiescence. -func (h *commithook) setExecTimeout(timeout time.Duration) { - h.mu.Lock() - h.execTimeout = timeout - h.mu.Unlock() -} - var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch" // Execute on this commithook updates the target root hash we're attempting to @@ -434,10 +391,10 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat return nil, err } h.mu.Lock() + defer h.mu.Unlock() lgr = h.logger() if h.role != RolePrimary { lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID()) - h.mu.Unlock() return nil, nil } if root != h.nextHead { @@ -447,16 +404,22 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat h.nextPushAttempt = time.Time{} h.cond.Signal() } - execTimeout := h.execTimeout - h.mu.Unlock() - if execTimeout != time.Duration(0) { - res := h.waitForReplicationSuccess(ctx, execTimeout) - if res != replicationResultSuccess { - // TODO: Get this failure into the *sql.Context warnings. - lgr.Warnf("cluster/commithook failed to replicate write before the timeout. timeout: %d, wait result: %v", execTimeout, res) + var waitF func(context.Context) error + if !h.isCaughtUp() { + if len(h.successChs) == 0 { + h.successChs = append(h.successChs, make(chan struct{})) + } + successCh := h.successChs[0] + waitF = func(ctx context.Context) error { + select { + case <-successCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } } } - return nil, nil + return waitF, nil } func (h *commithook) HandleError(ctx context.Context, err error) error { diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index e4facd8eab..7c821aa6e0 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -176,20 +176,6 @@ func (c *Controller) ManageSystemVariables(variables sqlvars) { c.mu.Lock() defer c.mu.Unlock() c.systemVars = variables - - // We reset this system variable here to put our NotifyChanged on it. - v, _, ok := variables.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) - if !ok { - panic(fmt.Sprintf("internal error: did not find required global system variable %s", dsess.DoltClusterAckWritesTimeoutSecs)) - } - v.NotifyChanged = func(scope sql.SystemVariableScope, v sql.SystemVarValue) { - c.mu.Lock() - defer c.mu.Unlock() - for _, hook := range c.commithooks { - hook.setExecTimeout(time.Duration(v.Val.(int64)) * time.Second) - } - } - variables.AddSystemVariables([]sql.SystemVariable{v}) c.refreshSystemVars() } @@ -209,10 +195,6 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql. if err != nil { return err } - _, execTimeoutVal, _ := c.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) - for _, h := range hooks { - h.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second) - } c.commithooks = append(c.commithooks, hooks...) } return nil diff --git a/go/libraries/doltcore/sqle/cluster/initdbhook.go b/go/libraries/doltcore/sqle/cluster/initdbhook.go index 588f35a235..7713c91540 100644 --- a/go/libraries/doltcore/sqle/cluster/initdbhook.go +++ b/go/libraries/doltcore/sqle/cluster/initdbhook.go @@ -17,7 +17,6 @@ package cluster import ( "context" "strings" - "time" "github.com/dolthub/go-mysql-server/sql" @@ -58,8 +57,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig }) } - _, execTimeoutVal, _ := controller.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs) - role, _ := controller.roleAndEpoch() for i, r := range controller.cfg.StandbyRemotes() { ttfdir, err := denv.TempTableFilesDir() @@ -67,7 +64,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig return err } commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir) - commitHook.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second) denv.DoltDB.PrependCommitHook(ctx, commitHook) controller.registerCommitHook(commitHook) if err := commitHook.Run(bt); err != nil { diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index 0c1a66201b..3234ec9e5a 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -15,6 +15,7 @@ package dsess import ( + "context" "errors" "fmt" "strings" @@ -216,7 +217,9 @@ func doltCommit(ctx *sql.Context, workingSet = workingSet.ClearMerge() - newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), nil) + var rsc doltdb.ReplicationStatusController + newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), &rsc) + waitForReplicationController(ctx, rsc) return workingSet, newCommit, err } @@ -227,7 +230,10 @@ func txCommit(ctx *sql.Context, workingSet *doltdb.WorkingSet, hash hash.Hash, ) (*doltdb.WorkingSet, *doltdb.Commit, error) { - return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), nil) + var rsc doltdb.ReplicationStatusController + err := tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), &rsc) + waitForReplicationController(ctx, rsc) + return workingSet, nil, err } // DoltCommit commits the working set and creates a new DoltCommit as specified, in one atomic write @@ -235,6 +241,44 @@ func (tx *DoltTransaction) DoltCommit(ctx *sql.Context, workingSet *doltdb.Worki return tx.doCommit(ctx, workingSet, commit, doltCommit) } +func waitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatusController) { + if len(rsc.Wait) == 0 { + return + } + _, timeout, ok := sql.SystemVariables.GetGlobal(DoltClusterAckWritesTimeoutSecs) + if !ok { + return + } + timeoutI := timeout.(int64) + if timeoutI == 0 { + return + } + + cCtx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + wg.Add(len(rsc.Wait)) + for _, f := range rsc.Wait { + f := f + go func() error { + defer wg.Done() + return f(cCtx) + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-time.After(time.Duration(timeoutI) * time.Second): + // TODO: Error, warning, something... + case <-done: + } +} + // doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit func (tx *DoltTransaction) doCommit( ctx *sql.Context,