go: sqle: transactions,cluster: Move block-for-cluster-replication behavior to Dolt transaction logic and out of the commit hook.

This commit is contained in:
Aaron Son
2023-05-12 16:40:42 -07:00
parent af966fa9fd
commit 261b10c92b
4 changed files with 61 additions and 76 deletions

View File

@@ -61,8 +61,6 @@ type commithook struct {
// 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then.
successChs []chan struct{}
execTimeout time.Duration
role Role
// The standby replica to which the new root gets replicated.
@@ -379,47 +377,6 @@ func (h *commithook) setWaitNotify(f func()) bool {
return true
}
type replicationResult int
const replicationResultTimeout = 0
const replicationResultContextCanceled = 1
const replicationResultSuccess = 2
// Blocks the current goroutine until:
// 1. There is no replication necessary, i.e., isCaughtUp() == true. This returns replicationResultSuccess.
// 2. The replication of |nextHead|, or a later head, at the time this method was called succeeds. This returns replicationResultSuccess.
// 3. ctx.Done() closes. This returns replicationResultContextCanceled.
// 4. timeout passes. This returns replicationResultSuccess.
func (h *commithook) waitForReplicationSuccess(ctx context.Context, timeout time.Duration) replicationResult {
h.mu.Lock()
if h.isCaughtUp() {
h.mu.Unlock()
return replicationResultSuccess
}
if len(h.successChs) == 0 {
h.successChs = append(h.successChs, make(chan struct{}))
}
ch := h.successChs[0]
h.mu.Unlock()
select {
case <-ch:
return replicationResultSuccess
case <-ctx.Done():
return replicationResultContextCanceled
case <-time.After(timeout):
return replicationResultTimeout
}
}
// Set by the controller. If it is non-zero, the Execute() DatabaseHook
// callback will block the calling goroutine for that many seconds waiting for
// replication quiescence.
func (h *commithook) setExecTimeout(timeout time.Duration) {
h.mu.Lock()
h.execTimeout = timeout
h.mu.Unlock()
}
var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch"
// Execute on this commithook updates the target root hash we're attempting to
@@ -434,10 +391,10 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
return nil, err
}
h.mu.Lock()
defer h.mu.Unlock()
lgr = h.logger()
if h.role != RolePrimary {
lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID())
h.mu.Unlock()
return nil, nil
}
if root != h.nextHead {
@@ -447,16 +404,22 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
h.nextPushAttempt = time.Time{}
h.cond.Signal()
}
execTimeout := h.execTimeout
h.mu.Unlock()
if execTimeout != time.Duration(0) {
res := h.waitForReplicationSuccess(ctx, execTimeout)
if res != replicationResultSuccess {
// TODO: Get this failure into the *sql.Context warnings.
lgr.Warnf("cluster/commithook failed to replicate write before the timeout. timeout: %d, wait result: %v", execTimeout, res)
var waitF func(context.Context) error
if !h.isCaughtUp() {
if len(h.successChs) == 0 {
h.successChs = append(h.successChs, make(chan struct{}))
}
successCh := h.successChs[0]
waitF = func(ctx context.Context) error {
select {
case <-successCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
return nil, nil
return waitF, nil
}
func (h *commithook) HandleError(ctx context.Context, err error) error {

View File

@@ -176,20 +176,6 @@ func (c *Controller) ManageSystemVariables(variables sqlvars) {
c.mu.Lock()
defer c.mu.Unlock()
c.systemVars = variables
// We reset this system variable here to put our NotifyChanged on it.
v, _, ok := variables.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs)
if !ok {
panic(fmt.Sprintf("internal error: did not find required global system variable %s", dsess.DoltClusterAckWritesTimeoutSecs))
}
v.NotifyChanged = func(scope sql.SystemVariableScope, v sql.SystemVarValue) {
c.mu.Lock()
defer c.mu.Unlock()
for _, hook := range c.commithooks {
hook.setExecTimeout(time.Duration(v.Val.(int64)) * time.Second)
}
}
variables.AddSystemVariables([]sql.SystemVariable{v})
c.refreshSystemVars()
}
@@ -209,10 +195,6 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.
if err != nil {
return err
}
_, execTimeoutVal, _ := c.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs)
for _, h := range hooks {
h.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second)
}
c.commithooks = append(c.commithooks, hooks...)
}
return nil

View File

@@ -17,7 +17,6 @@ package cluster
import (
"context"
"strings"
"time"
"github.com/dolthub/go-mysql-server/sql"
@@ -58,8 +57,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
})
}
_, execTimeoutVal, _ := controller.systemVars.GetGlobal(dsess.DoltClusterAckWritesTimeoutSecs)
role, _ := controller.roleAndEpoch()
for i, r := range controller.cfg.StandbyRemotes() {
ttfdir, err := denv.TempTableFilesDir()
@@ -67,7 +64,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
return err
}
commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir)
commitHook.setExecTimeout(time.Duration(execTimeoutVal.(int64)) * time.Second)
denv.DoltDB.PrependCommitHook(ctx, commitHook)
controller.registerCommitHook(commitHook)
if err := commitHook.Run(bt); err != nil {

View File

@@ -15,6 +15,7 @@
package dsess
import (
"context"
"errors"
"fmt"
"strings"
@@ -216,7 +217,9 @@ func doltCommit(ctx *sql.Context,
workingSet = workingSet.ClearMerge()
newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), nil)
var rsc doltdb.ReplicationStatusController
newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), &rsc)
waitForReplicationController(ctx, rsc)
return workingSet, newCommit, err
}
@@ -227,7 +230,10 @@ func txCommit(ctx *sql.Context,
workingSet *doltdb.WorkingSet,
hash hash.Hash,
) (*doltdb.WorkingSet, *doltdb.Commit, error) {
return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), nil)
var rsc doltdb.ReplicationStatusController
err := tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), &rsc)
waitForReplicationController(ctx, rsc)
return workingSet, nil, err
}
// DoltCommit commits the working set and creates a new DoltCommit as specified, in one atomic write
@@ -235,6 +241,44 @@ func (tx *DoltTransaction) DoltCommit(ctx *sql.Context, workingSet *doltdb.Worki
return tx.doCommit(ctx, workingSet, commit, doltCommit)
}
func waitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatusController) {
if len(rsc.Wait) == 0 {
return
}
_, timeout, ok := sql.SystemVariables.GetGlobal(DoltClusterAckWritesTimeoutSecs)
if !ok {
return
}
timeoutI := timeout.(int64)
if timeoutI == 0 {
return
}
cCtx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(len(rsc.Wait))
for _, f := range rsc.Wait {
f := f
go func() error {
defer wg.Done()
return f(cCtx)
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-time.After(time.Duration(timeoutI) * time.Second):
// TODO: Error, warning, something...
case <-done:
}
}
// doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit
func (tx *DoltTransaction) doCommit(
ctx *sql.Context,