diff --git a/go/cmd/dolt/commands/branch.go b/go/cmd/dolt/commands/branch.go index d1ba4b1856..a63f6d99a6 100644 --- a/go/cmd/dolt/commands/branch.go +++ b/go/cmd/dolt/commands/branch.go @@ -245,7 +245,7 @@ func moveBranch(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgParseR force := apr.Contains(cli.ForceFlag) src := apr.Arg(0) dest := apr.Arg(1) - err := actions.RenameBranch(ctx, dEnv.DbData(), src, apr.Arg(1), dEnv, force) + err := actions.RenameBranch(ctx, dEnv.DbData(), src, apr.Arg(1), dEnv, force, nil) var verr errhand.VerboseError if err != nil { @@ -306,7 +306,7 @@ func deleteBranches(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPa err := actions.DeleteBranch(ctx, dEnv.DbData(), brName, actions.DeleteOptions{ Force: force, Remote: apr.Contains(cli.RemoteParam), - }, dEnv) + }, dEnv, nil) if err != nil { var verr errhand.VerboseError @@ -379,7 +379,7 @@ func createBranch(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPars } } - err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, apr.Contains(cli.ForceFlag)) + err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, apr.Contains(cli.ForceFlag), nil) if err != nil { return HandleVErrAndExitCode(errhand.BuildDError(err.Error()).Build(), usage) } diff --git a/go/cmd/dolt/commands/checkout.go b/go/cmd/dolt/commands/checkout.go index 59a2d3f206..5badb3d5e7 100644 --- a/go/cmd/dolt/commands/checkout.go +++ b/go/cmd/dolt/commands/checkout.go @@ -238,7 +238,7 @@ func checkoutRemoteBranchOrSuggestNew(ctx context.Context, dEnv *env.DoltEnv, na } func checkoutNewBranchFromStartPt(ctx context.Context, dEnv *env.DoltEnv, newBranch, startPt string) errhand.VerboseError { - err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, false) + err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, false, nil) if err != nil { return errhand.BuildDError(err.Error()).Build() } diff --git a/go/cmd/dolt/commands/commit.go b/go/cmd/dolt/commands/commit.go index b0b7a53818..40c197c120 100644 --- a/go/cmd/dolt/commands/commit.go +++ b/go/cmd/dolt/commands/commit.go @@ -229,6 +229,7 @@ func performCommit(ctx context.Context, commandStr string, args []string, dEnv * ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(), prevHash, dEnv.NewWorkingSetMeta(fmt.Sprintf("Updated by %s %s", commandStr, strings.Join(args, " "))), + nil, ) if err != nil { if apr.Contains(cli.AmendFlag) { diff --git a/go/cmd/dolt/commands/merge.go b/go/cmd/dolt/commands/merge.go index 0b3731ca48..b9751f2fd5 100644 --- a/go/cmd/dolt/commands/merge.go +++ b/go/cmd/dolt/commands/merge.go @@ -538,6 +538,7 @@ func executeNoFFMergeAndCommit(ctx context.Context, dEnv *env.DoltEnv, spec *mer ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(), wsHash, dEnv.NewWorkingSetMeta(msg), + nil, ) if err != nil { diff --git a/go/libraries/doltcore/doltdb/commit_hooks.go b/go/libraries/doltcore/doltdb/commit_hooks.go index faf8cac34c..9fc349f78e 100644 --- a/go/libraries/doltcore/doltdb/commit_hooks.go +++ b/go/libraries/doltcore/doltdb/commit_hooks.go @@ -49,8 +49,8 @@ func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook { } // Execute implements CommitHook, replicates head updates to the destDb field -func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { - return pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir) +func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { + return nil, pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir) } func pushDataset(ctx context.Context, destDB, srcDB datas.Database, ds datas.Dataset, tmpDir string) error { @@ -135,16 +135,16 @@ func (*AsyncPushOnWriteHook) ExecuteForWorkingSets() bool { } // Execute implements CommitHook, replicates head updates to the destDb field -func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { +func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { addr, _ := ds.MaybeHeadAddr() select { case ah.ch <- PushArg{ds: ds, db: db, hash: addr}: case <-ctx.Done(): ah.ch <- PushArg{ds: ds, db: db, hash: addr} - return ctx.Err() + return nil, ctx.Err() } - return nil + return nil, nil } // HandleError implements CommitHook @@ -174,12 +174,12 @@ func NewLogHook(msg []byte) *LogHook { } // Execute implements CommitHook, writes message to log channel -func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { +func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { if lh.out != nil { _, err := lh.out.Write(lh.msg) - return err + return nil, err } - return nil + return nil, nil } // HandleError implements CommitHook diff --git a/go/libraries/doltcore/doltdb/commit_hooks_test.go b/go/libraries/doltcore/doltdb/commit_hooks_test.go index 02c5806673..80e10facff 100644 --- a/go/libraries/doltcore/doltdb/commit_hooks_test.go +++ b/go/libraries/doltcore/doltdb/commit_hooks_test.go @@ -136,7 +136,7 @@ func TestPushOnWriteHook(t *testing.T) { ds, err := ddb.db.GetDataset(ctx, "refs/heads/main") require.NoError(t, err) - err = hook.Execute(ctx, ds, ddb.db) + _, err = hook.Execute(ctx, ds, ddb.db) require.NoError(t, err) cs, _ = NewCommitSpec(defaultBranch) @@ -269,7 +269,7 @@ func TestAsyncPushOnWrite(t *testing.T) { require.NoError(t, err) ds, err := ddb.db.GetDataset(ctx, "refs/heads/main") require.NoError(t, err) - err = hook.Execute(ctx, ds, ddb.db) + _, err = hook.Execute(ctx, ds, ddb.db) require.NoError(t, err) } }) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 07654e85bb..4722582ab7 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1079,7 +1079,7 @@ func (ddb *DoltDB) GetRefsOfTypeByNomsRoot(ctx context.Context, refTypeFilter ma // NewBranchAtCommit creates a new branch with HEAD at the commit given. Branch names must pass IsValidUserBranchName. // Silently overwrites any existing branch with the same name given, if one exists. -func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, commit *Commit) error { +func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, commit *Commit, replicationStatus *ReplicationStatusController) error { if !IsValidBranchRef(branchRef) { panic(fmt.Sprintf("invalid branch name %s, use IsValidUserBranchName check", branchRef.String())) } @@ -1124,7 +1124,7 @@ func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, } ws = ws.WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot) - return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta()) + return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta(), replicationStatus) } // CopyWorkingSet copies a WorkingSetRef from one ref to another. If `force` is @@ -1155,15 +1155,15 @@ func (ddb *DoltDB) CopyWorkingSet(ctx context.Context, fromWSRef ref.WorkingSetR } } - return ddb.UpdateWorkingSet(ctx, toWSRef, ws, currWsHash, TodoWorkingSetMeta()) + return ddb.UpdateWorkingSet(ctx, toWSRef, ws, currWsHash, TodoWorkingSetMeta(), nil) } // DeleteBranch deletes the branch given, returning an error if it doesn't exist. -func (ddb *DoltDB) DeleteBranch(ctx context.Context, branch ref.DoltRef) error { - return ddb.deleteRef(ctx, branch) +func (ddb *DoltDB) DeleteBranch(ctx context.Context, branch ref.DoltRef, replicationStatus *ReplicationStatusController) error { + return ddb.deleteRef(ctx, branch, replicationStatus) } -func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef) error { +func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef, replicationStatus *ReplicationStatusController) error { ds, err := ddb.db.GetDataset(ctx, dref.String()) if err != nil { @@ -1184,7 +1184,7 @@ func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef) error { } } - _, err = ddb.db.Delete(ctx, ds) + _, err = ddb.db.withReplicationStatusController(replicationStatus).Delete(ctx, ds) return err } @@ -1216,6 +1216,20 @@ func (ddb *DoltDB) NewTagAtCommit(ctx context.Context, tagRef ref.DoltRef, c *Co return err } +type ReplicationStatusController struct { + // A slice of funcs which can be called to wait for the replication + // associated with a commithook to complete. Must return if the + // associated Context is canceled. + Wait []func(ctx context.Context) error + + // There is an entry here for each function in Wait. If a Wait fails, + // you can notify the corresponding function in this slice. This might + // control resiliency behaviors like adaptive retry and timeouts, + // circuit breakers, etc. and might feed into exposed replication + // metrics. + NotifyWaitFailed []func() +} + // UpdateWorkingSet updates the working set with the ref given to the root value given // |prevHash| is the hash of the expected WorkingSet struct stored in the ref, not the hash of the RootValue there. func (ddb *DoltDB) UpdateWorkingSet( @@ -1224,6 +1238,7 @@ func (ddb *DoltDB) UpdateWorkingSet( workingSet *WorkingSet, prevHash hash.Hash, meta *datas.WorkingSetMeta, + replicationStatus *ReplicationStatusController, ) error { ds, err := ddb.db.GetDataset(ctx, workingSetRef.String()) if err != nil { @@ -1235,7 +1250,7 @@ func (ddb *DoltDB) UpdateWorkingSet( return err } - _, err = ddb.db.UpdateWorkingSet(ctx, ds, datas.WorkingSetSpec{ + _, err = ddb.db.withReplicationStatusController(replicationStatus).UpdateWorkingSet(ctx, ds, datas.WorkingSetSpec{ Meta: meta, WorkingRoot: workingRootRef, StagedRoot: stagedRef, @@ -1255,6 +1270,7 @@ func (ddb *DoltDB) CommitWithWorkingSet( commit *PendingCommit, workingSet *WorkingSet, prevHash hash.Hash, meta *datas.WorkingSetMeta, + replicationStatus *ReplicationStatusController, ) (*Commit, error) { wsDs, err := ddb.db.GetDataset(ctx, workingSetRef.String()) if err != nil { @@ -1271,12 +1287,13 @@ func (ddb *DoltDB) CommitWithWorkingSet( return nil, err } - commitDataset, _, err := ddb.db.CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{ - Meta: meta, - WorkingRoot: workingRootRef, - StagedRoot: stagedRef, - MergeState: mergeState, - }, prevHash, commit.CommitOptions) + commitDataset, _, err := ddb.db.withReplicationStatusController(replicationStatus). + CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{ + Meta: meta, + WorkingRoot: workingRootRef, + StagedRoot: stagedRef, + MergeState: mergeState, + }, prevHash, commit.CommitOptions) if err != nil { return nil, err @@ -1310,7 +1327,7 @@ func (ddb *DoltDB) DeleteWorkingSet(ctx context.Context, workingSetRef ref.Worki } func (ddb *DoltDB) DeleteTag(ctx context.Context, tag ref.DoltRef) error { - err := ddb.deleteRef(ctx, tag) + err := ddb.deleteRef(ctx, tag, nil) if err == ErrBranchNotFound { return ErrTagNotFound @@ -1337,7 +1354,7 @@ func (ddb *DoltDB) NewWorkspaceAtCommit(ctx context.Context, workRef ref.DoltRef } func (ddb *DoltDB) DeleteWorkspace(ctx context.Context, workRef ref.DoltRef) error { - err := ddb.deleteRef(ctx, workRef) + err := ddb.deleteRef(ctx, workRef, nil) if err == ErrBranchNotFound { return ErrWorkspaceNotFound @@ -1652,7 +1669,7 @@ func (ddb *DoltDB) RemoveStashAtIdx(ctx context.Context, idx int) error { // RemoveAllStashes removes the stash list Dataset from the database, // which equivalent to removing Stash entries from the stash list. func (ddb *DoltDB) RemoveAllStashes(ctx context.Context) error { - err := ddb.deleteRef(ctx, ref.NewStashRef()) + err := ddb.deleteRef(ctx, ref.NewStashRef(), nil) if err == ErrBranchNotFound { return nil } diff --git a/go/libraries/doltcore/doltdb/hooksdatabase.go b/go/libraries/doltcore/doltdb/hooksdatabase.go index 54d128d043..8b14765ca1 100644 --- a/go/libraries/doltcore/doltdb/hooksdatabase.go +++ b/go/libraries/doltcore/doltdb/hooksdatabase.go @@ -17,23 +17,23 @@ package doltdb import ( "context" "io" + "sync" "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" - - "sync" ) type hooksDatabase struct { datas.Database postCommitHooks []CommitHook + rsc *ReplicationStatusController } // CommitHook is an abstraction for executing arbitrary commands after atomic database commits type CommitHook interface { // Execute is arbitrary read-only function whose arguments are new Dataset commit into a specific Database - Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error + Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) // HandleError is an bridge function to handle Execute errors HandleError(ctx context.Context, err error) error // SetLogger lets clients specify an output stream for HandleError @@ -42,8 +42,16 @@ type CommitHook interface { ExecuteForWorkingSets() bool } +// If a commit hook supports this interface, it can be notified if waiting for +// replication in the callback returned by |Execute| failed to complete in time +// or returned an error. +type NotifyWaitFailedCommitHook interface { + NotifyWaitFailed() +} + func (db hooksDatabase) SetCommitHooks(ctx context.Context, postHooks []CommitHook) hooksDatabase { - db.postCommitHooks = postHooks + db.postCommitHooks = make([]CommitHook, len(postHooks)) + copy(db.postCommitHooks, postHooks) return db } @@ -54,27 +62,61 @@ func (db hooksDatabase) SetCommitHookLogger(ctx context.Context, wr io.Writer) h return db } +func (db hooksDatabase) withReplicationStatusController(rsc *ReplicationStatusController) hooksDatabase { + db.rsc = rsc + return db +} + func (db hooksDatabase) PostCommitHooks() []CommitHook { - return db.postCommitHooks + toret := make([]CommitHook, len(db.postCommitHooks)) + copy(toret, db.postCommitHooks) + return toret } func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset, onlyWS bool) { - var err error var wg sync.WaitGroup - for _, hook := range db.postCommitHooks { + rsc := db.rsc + var ioff int + if rsc != nil { + ioff = len(rsc.Wait) + rsc.Wait = append(rsc.Wait, make([]func(context.Context) error, len(db.postCommitHooks))...) + rsc.NotifyWaitFailed = append(rsc.NotifyWaitFailed, make([]func(), len(db.postCommitHooks))...) + } + for il, hook := range db.postCommitHooks { if !onlyWS || hook.ExecuteForWorkingSets() { + i := il hook := hook wg.Add(1) go func() { defer wg.Done() - err = hook.Execute(ctx, ds, db) + f, err := hook.Execute(ctx, ds, db) if err != nil { hook.HandleError(ctx, err) } + if rsc != nil { + rsc.Wait[i+ioff] = f + if nf, ok := hook.(NotifyWaitFailedCommitHook); ok { + rsc.NotifyWaitFailed[i+ioff] = nf.NotifyWaitFailed + } else { + rsc.NotifyWaitFailed[i+ioff] = func() {} + } + } }() } } wg.Wait() + if rsc != nil { + j := ioff + for i := ioff; i < len(rsc.Wait); i++ { + if rsc.Wait[i] != nil { + rsc.Wait[j] = rsc.Wait[i] + rsc.NotifyWaitFailed[j] = rsc.NotifyWaitFailed[i] + j++ + } + } + rsc.Wait = rsc.Wait[:j] + rsc.NotifyWaitFailed = rsc.NotifyWaitFailed[:j] + } } func (db hooksDatabase) CommitWithWorkingSet( diff --git a/go/libraries/doltcore/dtestutils/testcommands/multienv.go b/go/libraries/doltcore/dtestutils/testcommands/multienv.go index 202c249e96..f669255be3 100644 --- a/go/libraries/doltcore/dtestutils/testcommands/multienv.go +++ b/go/libraries/doltcore/dtestutils/testcommands/multienv.go @@ -156,7 +156,7 @@ func (mr *MultiRepoTestSetup) NewRemote(remoteName string) { func (mr *MultiRepoTestSetup) NewBranch(dbName, branchName string) { dEnv := mr.envs[dbName] - err := actions.CreateBranchWithStartPt(context.Background(), dEnv.DbData(), branchName, "head", false) + err := actions.CreateBranchWithStartPt(context.Background(), dEnv.DbData(), branchName, "head", false, nil) if err != nil { mr.Errhand(err) } @@ -268,6 +268,7 @@ func (mr *MultiRepoTestSetup) CommitWithWorkingSet(dbName string) *doltdb.Commit ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(), prevHash, doltdb.TodoWorkingSetMeta(), + nil, ) if err != nil { panic("couldn't commit: " + err.Error()) diff --git a/go/libraries/doltcore/env/actions/branch.go b/go/libraries/doltcore/env/actions/branch.go index 313bdf7004..0c182cce86 100644 --- a/go/libraries/doltcore/env/actions/branch.go +++ b/go/libraries/doltcore/env/actions/branch.go @@ -31,11 +31,13 @@ var ErrCOBranchDelete = errors.New("attempted to delete checked out branch") var ErrUnmergedBranch = errors.New("branch is not fully merged") var ErrWorkingSetsOnBothBranches = errors.New("checkout would overwrite uncommitted changes on target branch") -func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch string, remoteDbPro env.RemoteDbProvider, force bool) error { +func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch string, remoteDbPro env.RemoteDbProvider, force bool, rsc *doltdb.ReplicationStatusController) error { oldRef := ref.NewBranchRef(oldBranch) newRef := ref.NewBranchRef(newBranch) - err := CopyBranchOnDB(ctx, dbData.Ddb, oldBranch, newBranch, force) + // TODO: This function smears the branch updates across multiple commits of the datas.Database. + + err := CopyBranchOnDB(ctx, dbData.Ddb, oldBranch, newBranch, force, rsc) if err != nil { return err } @@ -66,14 +68,14 @@ func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch s } } - return DeleteBranch(ctx, dbData, oldBranch, DeleteOptions{Force: true}, remoteDbPro) + return DeleteBranch(ctx, dbData, oldBranch, DeleteOptions{Force: true}, remoteDbPro, rsc) } func CopyBranch(ctx context.Context, dEnv *env.DoltEnv, oldBranch, newBranch string, force bool) error { - return CopyBranchOnDB(ctx, dEnv.DoltDB, oldBranch, newBranch, force) + return CopyBranchOnDB(ctx, dEnv.DoltDB, oldBranch, newBranch, force, nil) } -func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranch string, force bool) error { +func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranch string, force bool, rsc *doltdb.ReplicationStatusController) error { oldRef := ref.NewBranchRef(oldBranch) newRef := ref.NewBranchRef(newBranch) @@ -104,7 +106,7 @@ func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranc return err } - return ddb.NewBranchAtCommit(ctx, newRef, cm) + return ddb.NewBranchAtCommit(ctx, newRef, cm, rsc) } type DeleteOptions struct { @@ -112,7 +114,7 @@ type DeleteOptions struct { Remote bool } -func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts DeleteOptions, remoteDbPro env.RemoteDbProvider) error { +func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts DeleteOptions, remoteDbPro env.RemoteDbProvider, rsc *doltdb.ReplicationStatusController) error { var branchRef ref.DoltRef if opts.Remote { var err error @@ -127,10 +129,10 @@ func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts De } } - return DeleteBranchOnDB(ctx, dbData, branchRef, opts, remoteDbPro) + return DeleteBranchOnDB(ctx, dbData, branchRef, opts, remoteDbPro, rsc) } -func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.DoltRef, opts DeleteOptions, pro env.RemoteDbProvider) error { +func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.DoltRef, opts DeleteOptions, pro env.RemoteDbProvider, rsc *doltdb.ReplicationStatusController) error { ddb := dbdata.Ddb hasRef, err := ddb.HasRef(ctx, branchRef) @@ -173,7 +175,7 @@ func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.Dolt } } - return ddb.DeleteBranch(ctx, branchRef) + return ddb.DeleteBranch(ctx, branchRef, rsc) } // validateBranchMergedIntoCurrentWorkingBranch returns an error if the given branch is not fully merged into the HEAD of the current branch. @@ -267,8 +269,8 @@ func validateBranchMergedIntoUpstream(ctx context.Context, dbdata env.DbData, br return nil } -func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, startPt string, force bool) error { - err := createBranch(ctx, dbData, newBranch, startPt, force) +func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, startPt string, force bool, rsc *doltdb.ReplicationStatusController) error { + err := createBranch(ctx, dbData, newBranch, startPt, force, rsc) if err != nil { if err == ErrAlreadyExists { @@ -289,7 +291,7 @@ func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, return nil } -func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, startingPoint string, force bool, headRef ref.DoltRef) error { +func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, startingPoint string, force bool, headRef ref.DoltRef, rsc *doltdb.ReplicationStatusController) error { branchRef := ref.NewBranchRef(newBranch) hasRef, err := ddb.HasRef(ctx, branchRef) if err != nil { @@ -314,7 +316,7 @@ func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, starti return err } - err = ddb.NewBranchAtCommit(ctx, branchRef, cm) + err = ddb.NewBranchAtCommit(ctx, branchRef, cm, rsc) if err != nil { return err } @@ -322,8 +324,8 @@ func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, starti return nil } -func createBranch(ctx context.Context, dbData env.DbData, newBranch, startingPoint string, force bool) error { - return CreateBranchOnDB(ctx, dbData.Ddb, newBranch, startingPoint, force, dbData.Rsr.CWBHeadRef()) +func createBranch(ctx context.Context, dbData env.DbData, newBranch, startingPoint string, force bool, rsc *doltdb.ReplicationStatusController) error { + return CreateBranchOnDB(ctx, dbData.Ddb, newBranch, startingPoint, force, dbData.Rsr.CWBHeadRef(), rsc) } var emptyHash = hash.Hash{} diff --git a/go/libraries/doltcore/env/actions/checkout.go b/go/libraries/doltcore/env/actions/checkout.go index 54b9e07980..9f8a8bc26e 100644 --- a/go/libraries/doltcore/env/actions/checkout.go +++ b/go/libraries/doltcore/env/actions/checkout.go @@ -331,6 +331,7 @@ func cleanOldWorkingSet( initialWs.WithWorkingRoot(newRoots.Working).WithStagedRoot(newRoots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard"), + nil, ) if err != nil { return err diff --git a/go/libraries/doltcore/env/actions/clone.go b/go/libraries/doltcore/env/actions/clone.go index a6907d1c60..779a4027b5 100644 --- a/go/libraries/doltcore/env/actions/clone.go +++ b/go/libraries/doltcore/env/actions/clone.go @@ -230,7 +230,7 @@ func CloneRemote(ctx context.Context, srcDB *doltdb.DoltDB, remoteName, branch s } if brnch.GetPath() != branch { - err := dEnv.DoltDB.DeleteBranch(ctx, brnch) + err := dEnv.DoltDB.DeleteBranch(ctx, brnch, nil) if err != nil { return fmt.Errorf("%w: %s; %s", ErrFailedToDeleteBranch, brnch.String(), err.Error()) } diff --git a/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go b/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go index 8f79fefc3c..6188662261 100644 --- a/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go +++ b/go/libraries/doltcore/env/actions/commitwalk/commitwalk_test.go @@ -86,7 +86,7 @@ func TestGetDotDotRevisions(t *testing.T) { // Create a feature branch. bref := ref.NewBranchRef("feature") - err = dEnv.DoltDB.NewBranchAtCommit(context.Background(), bref, mainCommits[5]) + err = dEnv.DoltDB.NewBranchAtCommit(context.Background(), bref, mainCommits[5], nil) require.NoError(t, err) // Create 3 commits on feature branch. diff --git a/go/libraries/doltcore/env/actions/remotes.go b/go/libraries/doltcore/env/actions/remotes.go index 21dab4f914..179e8efeef 100644 --- a/go/libraries/doltcore/env/actions/remotes.go +++ b/go/libraries/doltcore/env/actions/remotes.go @@ -220,14 +220,14 @@ func DeleteRemoteBranch(ctx context.Context, targetRef ref.BranchRef, remoteRef } if hasRef { - err = remoteDB.DeleteBranch(ctx, targetRef) + err = remoteDB.DeleteBranch(ctx, targetRef, nil) } if err != nil { return err } - err = localDB.DeleteBranch(ctx, remoteRef) + err = localDB.DeleteBranch(ctx, remoteRef, nil) if err != nil { return err diff --git a/go/libraries/doltcore/env/actions/reset.go b/go/libraries/doltcore/env/actions/reset.go index 0a601ff365..62b8903b0e 100644 --- a/go/libraries/doltcore/env/actions/reset.go +++ b/go/libraries/doltcore/env/actions/reset.go @@ -164,7 +164,7 @@ func ResetHard( return err } - err = dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.WithWorkingRoot(roots.Working).WithStagedRoot(roots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard")) + err = dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.WithWorkingRoot(roots.Working).WithStagedRoot(roots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard"), nil) if err != nil { return err } diff --git a/go/libraries/doltcore/env/environment.go b/go/libraries/doltcore/env/environment.go index d7cb31a44b..3fb3a4e562 100644 --- a/go/libraries/doltcore/env/environment.go +++ b/go/libraries/doltcore/env/environment.go @@ -629,7 +629,7 @@ func (dEnv *DoltEnv) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb.Root wsRef = ws.Ref() } - return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, dEnv.workingSetMeta(), nil) } // UpdateWorkingSet updates the working set for the current working branch to the value given. @@ -648,7 +648,7 @@ func (dEnv *DoltEnv) UpdateWorkingSet(ctx context.Context, ws *doltdb.WorkingSet } } - return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws, h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws, h, dEnv.workingSetMeta(), nil) } type repoStateReader struct { @@ -758,7 +758,7 @@ func (dEnv *DoltEnv) UpdateStagedRoot(ctx context.Context, newRoot *doltdb.RootV wsRef = ws.Ref() } - return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, dEnv.workingSetMeta(), nil) } func (dEnv *DoltEnv) AbortMerge(ctx context.Context) error { @@ -772,7 +772,7 @@ func (dEnv *DoltEnv) AbortMerge(ctx context.Context) error { return err } - return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.AbortMerge(), h, dEnv.workingSetMeta()) + return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.AbortMerge(), h, dEnv.workingSetMeta(), nil) } func (dEnv *DoltEnv) workingSetMeta() *datas.WorkingSetMeta { @@ -911,7 +911,7 @@ func (dEnv *DoltEnv) RemoveRemote(ctx context.Context, name string) error { rr := r.(ref.RemoteRef) if rr.GetRemote() == remote.Name { - err = ddb.DeleteBranch(ctx, rr) + err = ddb.DeleteBranch(ctx, rr, nil) if err != nil { return fmt.Errorf("%w; failed to delete remote tracking ref '%s'; %s", ErrFailedToDeleteRemote, rr.String(), err.Error()) diff --git a/go/libraries/doltcore/env/memory.go b/go/libraries/doltcore/env/memory.go index 7be9bdfadf..c973fc428f 100644 --- a/go/libraries/doltcore/env/memory.go +++ b/go/libraries/doltcore/env/memory.go @@ -136,7 +136,7 @@ func (m MemoryRepoState) UpdateStagedRoot(ctx context.Context, newRoot *doltdb.R wsRef = ws.Ref() } - return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, m.workingSetMeta()) + return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, m.workingSetMeta(), nil) } func (m MemoryRepoState) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb.RootValue) error { @@ -162,7 +162,7 @@ func (m MemoryRepoState) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb. wsRef = ws.Ref() } - return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, m.workingSetMeta()) + return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, m.workingSetMeta(), nil) } func (m MemoryRepoState) WorkingSet(ctx context.Context) (*doltdb.WorkingSet, error) { diff --git a/go/libraries/doltcore/merge/merge_test.go b/go/libraries/doltcore/merge/merge_test.go index 2b5fa3acdf..c1e54c6749 100644 --- a/go/libraries/doltcore/merge/merge_test.go +++ b/go/libraries/doltcore/merge/merge_test.go @@ -743,7 +743,7 @@ func buildLeftRightAncCommitsAndBranches(t *testing.T, ddb *doltdb.DoltDB, rootT commit, err := ddb.Commit(context.Background(), hash, ref.NewBranchRef(env.DefaultInitBranch), meta) require.NoError(t, err) - err = ddb.NewBranchAtCommit(context.Background(), ref.NewBranchRef("to-merge"), initialCommit) + err = ddb.NewBranchAtCommit(context.Background(), ref.NewBranchRef("to-merge"), initialCommit, nil) require.NoError(t, err) mergeCommit, err := ddb.Commit(context.Background(), mergeHash, ref.NewBranchRef("to-merge"), meta) require.NoError(t, err) diff --git a/go/libraries/doltcore/migrate/progress.go b/go/libraries/doltcore/migrate/progress.go index 12139e6b54..7d9656bc57 100644 --- a/go/libraries/doltcore/migrate/progress.go +++ b/go/libraries/doltcore/migrate/progress.go @@ -182,7 +182,7 @@ func persistMigratedCommitMapping(ctx context.Context, ddb *doltdb.DoltDB, mappi } br := ref.NewBranchRef(MigratedCommitsBranch) - err = ddb.NewBranchAtCommit(ctx, br, init) + err = ddb.NewBranchAtCommit(ctx, br, init, nil) if err != nil { return err } @@ -298,6 +298,6 @@ func commitRoot( Name: meta.Name, Email: meta.Email, Timestamp: uint64(time.Now().Unix()), - }) + }, nil) return err } diff --git a/go/libraries/doltcore/migrate/transform.go b/go/libraries/doltcore/migrate/transform.go index 5c17cbc605..5101475c31 100644 --- a/go/libraries/doltcore/migrate/transform.go +++ b/go/libraries/doltcore/migrate/transform.go @@ -93,7 +93,7 @@ func migrateWorkingSet(ctx context.Context, menv Environment, brRef ref.BranchRe newWs := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(wr).WithStagedRoot(sr) - return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta()) + return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta(), nil) } func migrateCommit(ctx context.Context, menv Environment, oldCm *doltdb.Commit, new *doltdb.DoltDB, prog *progress) error { diff --git a/go/libraries/doltcore/rebase/rebase.go b/go/libraries/doltcore/rebase/rebase.go index 5ade41afba..3cad66c404 100644 --- a/go/libraries/doltcore/rebase/rebase.go +++ b/go/libraries/doltcore/rebase/rebase.go @@ -152,7 +152,7 @@ func rebaseRefs(ctx context.Context, dbData env.DbData, replay ReplayCommitFn, n for i, r := range refs { switch dRef := r.(type) { case ref.BranchRef: - err = ddb.NewBranchAtCommit(ctx, dRef, newHeads[i]) + err = ddb.NewBranchAtCommit(ctx, dRef, newHeads[i], nil) case ref.TagRef: // rewrite tag with new commit diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go index f286fcbb95..c83c9e25d0 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go @@ -706,7 +706,7 @@ func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string return err } - return sqlDatabase.DbData().Ddb.UpdateWorkingSet(ctx, newWorkingSet.Ref(), newWorkingSet, hash, newWorkingSet.Meta()) + return sqlDatabase.DbData().Ddb.UpdateWorkingSet(ctx, newWorkingSet.Ref(), newWorkingSet, hash, newWorkingSet.Meta(), nil) } // getTableSchema returns a sql.Schema for the specified table in the specified database. diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 37ae58896e..91fe10bfbb 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -32,6 +32,7 @@ import ( ) var _ doltdb.CommitHook = (*commithook)(nil) +var _ doltdb.NotifyWaitFailedCommitHook = (*commithook)(nil) type commithook struct { rootLgr *logrus.Entry @@ -53,6 +54,20 @@ type commithook struct { // commithooks are caught up with replicating to the standby. waitNotify func() + // This is a slice of notification channels maintained by the + // commithook. The semantics are: + // 1. All accesses to |successChs| must happen with |mu| held. + // 2. There may be |0| or more channels in the slice. + // 3. As a reader, if |successChs| is non-empty, you should just read a value, for example, |successChs[0]| and use it. All entries will be closed at the same time. If |successChs| is empty when you need a channel, you should add one to it. + // 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then. + successChs []chan struct{} + + // If this is true, the waitF returned by Execute() will fast fail if + // we are not already caught up, instead of blocking on a successCh + // actually indicated we are caught up. This is set to by a call to + // NotifyWaitFailed(), an optional interface on CommitHook. + fastFailReplicationWait bool + role Role // The standby replica to which the new root gets replicated. @@ -145,9 +160,17 @@ func (h *commithook) replicate(ctx context.Context) { if h.waitNotify != nil { h.waitNotify() } + caughtUp := h.isCaughtUp() + if len(h.successChs) != 0 && caughtUp { + for _, ch := range h.successChs { + close(ch) + } + h.successChs = nil + h.fastFailReplicationWait = false + } if shouldHeartbeat { h.attemptHeartbeat(ctx) - } else { + } else if caughtUp { shouldHeartbeat = true } h.cond.Wait() @@ -230,6 +253,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) { } h.cancelReplicate = nil }() + successChs := h.successChs + h.successChs = nil + defer func() { + if len(successChs) != 0 { + h.successChs = append(h.successChs, successChs...) + } + }() h.mu.Unlock() if destDB == nil { @@ -280,6 +310,12 @@ func (h *commithook) attemptReplicate(ctx context.Context) { h.lastPushedHead = toPush h.lastSuccess = incomingTime h.nextPushAttempt = time.Time{} + if len(successChs) != 0 { + for _, ch := range successChs { + close(ch) + } + successChs = nil + } } else { h.currentError = new(string) *h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err) @@ -392,21 +428,21 @@ var errDetectedBrokenConfigStr = "error: more than one server was configured as // Execute on this commithook updates the target root hash we're attempting to // replicate and wakes the replication thread. -func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error { +func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) { lgr := h.logger() lgr.Tracef("cluster/commithook: Execute called post commit") cs := datas.ChunkStoreFromDatabase(db) root, err := cs.Root(ctx) if err != nil { lgr.Errorf("cluster/commithook: Execute: error retrieving local database root: %v", err) - return err + return nil, err } h.mu.Lock() defer h.mu.Unlock() lgr = h.logger() if h.role != RolePrimary { lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID()) - return nil + return nil, nil } if root != h.nextHead { lgr.Tracef("signaling replication thread to push new head: %v", root.String()) @@ -415,7 +451,34 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat h.nextPushAttempt = time.Time{} h.cond.Signal() } - return nil + var waitF func(context.Context) error + if !h.isCaughtUp() { + if h.fastFailReplicationWait { + waitF = func(ctx context.Context) error { + return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname) + } + } else { + if len(h.successChs) == 0 { + h.successChs = append(h.successChs, make(chan struct{})) + } + successCh := h.successChs[0] + waitF = func(ctx context.Context) error { + select { + case <-successCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } + } + } + return waitF, nil +} + +func (h *commithook) NotifyWaitFailed() { + h.mu.Lock() + defer h.mu.Unlock() + h.fastFailReplicationWait = true } func (h *commithook) HandleError(ctx context.Context, err error) error { diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 322af3ed63..7c821aa6e0 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -82,6 +82,7 @@ type Controller struct { type sqlvars interface { AddSystemVariables(sysVars []sql.SystemVariable) + GetGlobal(name string) (sql.SystemVariable, interface{}, bool) } // We can manage certain aspects of the exposed databases on the server through diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go b/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go index 3d0c7f2776..5002f8d872 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_branch.go @@ -64,30 +64,36 @@ func doDoltBranch(ctx *sql.Context, args []string) (int, error) { return 1, fmt.Errorf("Could not load database %s", dbName) } + var rsc doltdb.ReplicationStatusController + switch { case apr.Contains(cli.CopyFlag): - err = copyBranch(ctx, dbData, apr) + err = copyBranch(ctx, dbData, apr, &rsc) case apr.Contains(cli.MoveFlag): - err = renameBranch(ctx, dbData, apr, dSess, dbName) + err = renameBranch(ctx, dbData, apr, dSess, dbName, &rsc) case apr.Contains(cli.DeleteFlag), apr.Contains(cli.DeleteForceFlag): - err = deleteBranches(ctx, dbData, apr, dSess, dbName) + err = deleteBranches(ctx, dbData, apr, dSess, dbName, &rsc) default: - err = createNewBranch(ctx, dbData, apr) + err = createNewBranch(ctx, dbData, apr, &rsc) } if err != nil { return 1, err } else { - return 0, commitTransaction(ctx, dSess) + return 0, commitTransaction(ctx, dSess, &rsc) } } -func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession) error { +func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession, rsc *doltdb.ReplicationStatusController) error { err := dSess.CommitTransaction(ctx, ctx.GetTransaction()) if err != nil { return err } + if rsc != nil { + dsess.WaitForReplicationController(ctx, *rsc) + } + // Because this transaction manipulation is happening outside the engine's awareness, we need to set it to nil here // to get a fresh transaction started on the next statement. // TODO: put this under engine control @@ -97,7 +103,7 @@ func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession) error { // renameBranch takes DoltSession and database name to try accessing file system for dolt database. // If the oldBranch being renamed is the current branch on CLI, then RepoState head will be updated with the newBranch ref. -func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string) error { +func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() != 2 { return InvalidArgErr } @@ -124,7 +130,7 @@ func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseRe return err } - err := actions.RenameBranch(ctx, dbData, oldBranchName, newBranchName, sess.Provider(), force) + err := actions.RenameBranch(ctx, dbData, oldBranchName, newBranchName, sess.Provider(), force, rsc) if err != nil { return err } @@ -150,7 +156,7 @@ func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseRe // deleteBranches takes DoltSession and database name to try accessing file system for dolt database. // If the database is not session state db and the branch being deleted is the current branch on CLI, it will update // the RepoState to set head as empty branchRef. -func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string) error { +func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() == 0 { return InvalidArgErr } @@ -194,7 +200,7 @@ func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParse err = actions.DeleteBranch(ctx, dbData, branchName, actions.DeleteOptions{ Force: force, - }, dSess.Provider()) + }, dSess.Provider(), rsc) if err != nil { return err } @@ -274,7 +280,7 @@ func loadConfig(ctx *sql.Context) *env.DoltCliConfig { return dEnv.Config } -func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults) error { +func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() == 0 || apr.NArg() > 2 { return InvalidArgErr } @@ -332,7 +338,7 @@ func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgPars return err } - err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, startPt, apr.Contains(cli.ForceFlag)) + err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, startPt, apr.Contains(cli.ForceFlag), rsc) if err != nil { return err } @@ -348,7 +354,7 @@ func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgPars return nil } -func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults) error { +func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() != 2 { return InvalidArgErr } @@ -364,10 +370,10 @@ func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResu } force := apr.Contains(cli.ForceFlag) - return copyABranch(ctx, dbData, srcBr, destBr, force) + return copyABranch(ctx, dbData, srcBr, destBr, force, rsc) } -func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr string, force bool) error { +func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr string, force bool, rsc *doltdb.ReplicationStatusController) error { if err := branch_control.CanCreateBranch(ctx, destBr); err != nil { return err } @@ -378,7 +384,7 @@ func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr strin return err } } - err := actions.CopyBranchOnDB(ctx, dbData.Ddb, srcBr, destBr, force) + err := actions.CopyBranchOnDB(ctx, dbData.Ddb, srcBr, destBr, force, rsc) if err != nil { if err == doltdb.ErrBranchNotFound { return errors.New(fmt.Sprintf("fatal: A branch named '%s' not found", srcBr)) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go index 21ef428198..4e851e18fc 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_checkout.go @@ -71,9 +71,11 @@ func doDoltCheckout(ctx *sql.Context, args []string) (int, error) { return 1, fmt.Errorf("Could not load database %s", currentDbName) } + var rsc doltdb.ReplicationStatusController + // Checking out new branch. if branchOrTrack { - err = checkoutNewBranch(ctx, dbName, dbData, apr) + err = checkoutNewBranch(ctx, dbName, dbData, apr, &rsc) if err != nil { return 1, err } else { @@ -121,13 +123,15 @@ func doDoltCheckout(ctx *sql.Context, args []string) (int, error) { err = checkoutTables(ctx, roots, dbName, args) if err != nil && apr.NArg() == 1 { - err = checkoutRemoteBranch(ctx, dbName, dbData, branchName, apr) + err = checkoutRemoteBranch(ctx, dbName, dbData, branchName, apr, &rsc) } if err != nil { return 1, err } + dsess.WaitForReplicationController(ctx, rsc) + return 0, nil } @@ -168,7 +172,7 @@ func createWorkingSetForLocalBranch(ctx *sql.Context, ddb *doltdb.DoltDB, branch } ws := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot) - return ddb.UpdateWorkingSet(ctx, wsRef, ws, hash.Hash{} /* current hash... */, doltdb.TodoWorkingSetMeta()) + return ddb.UpdateWorkingSet(ctx, wsRef, ws, hash.Hash{} /* current hash... */, doltdb.TodoWorkingSetMeta(), nil) } // getRevisionForRevisionDatabase returns the root database name and revision for a database, or just the root database name if the specified db name is not a revision database. @@ -196,7 +200,7 @@ func getRevisionForRevisionDatabase(ctx *sql.Context, dbName string) (string, st // checkoutRemoteBranch checks out a remote branch creating a new local branch with the same name as the remote branch // and set its upstream. The upstream persists out of sql session. -func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, branchName string, apr *argparser.ArgParseResults) error { +func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, branchName string, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { remoteRefs, err := actions.GetRemoteBranchRef(ctx, dbData.Ddb, branchName) if err != nil { return errors.New("fatal: unable to read from data repository") @@ -206,7 +210,7 @@ func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, br return fmt.Errorf("error: could not find %s", branchName) } else if len(remoteRefs) == 1 { remoteRef := remoteRefs[0] - err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, remoteRef.String(), false) + err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, remoteRef.String(), false, rsc) if err != nil { return err } @@ -226,7 +230,7 @@ func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, br } } -func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr *argparser.ArgParseResults) error { +func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { var newBranchName string var remoteName, remoteBranchName string var startPt = "head" @@ -259,7 +263,7 @@ func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr * newBranchName = newBranch } - err = actions.CreateBranchWithStartPt(ctx, dbData, newBranchName, startPt, false) + err = actions.CreateBranchWithStartPt(ctx, dbData, newBranchName, startPt, false, rsc) if err != nil { return err } diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go b/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go index aa1b59840d..2e6d32451e 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_remote.go @@ -24,6 +24,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/errhand" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" @@ -65,11 +66,13 @@ func doDoltRemote(ctx *sql.Context, args []string) (int, error) { return 1, fmt.Errorf("error: invalid argument, use 'dolt_remotes' system table to list remotes") } + var rsc doltdb.ReplicationStatusController + switch apr.Arg(0) { case "add": err = addRemote(ctx, dbName, dbData, apr, dSess) case "remove", "rm": - err = removeRemote(ctx, dbData, apr) + err = removeRemote(ctx, dbData, apr, &rsc) default: err = fmt.Errorf("error: invalid argument") } @@ -77,6 +80,9 @@ func doDoltRemote(ctx *sql.Context, args []string) (int, error) { if err != nil { return 1, err } + + dsess.WaitForReplicationController(ctx, rsc) + return 0, nil } @@ -106,7 +112,7 @@ func addRemote(_ *sql.Context, dbName string, dbd env.DbData, apr *argparser.Arg return dbd.Rsw.AddRemote(r) } -func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResults) error { +func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error { if apr.NArg() != 2 { return fmt.Errorf("error: invalid argument") } @@ -133,7 +139,7 @@ func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResul rr := r.(ref.RemoteRef) if rr.GetRemote() == remote.Name { - err = ddb.DeleteBranch(ctx, rr) + err = ddb.DeleteBranch(ctx, rr, rsc) if err != nil { return fmt.Errorf("%w; failed to delete remote tracking ref '%s'; %s", env.ErrFailedToDeleteRemote, rr.String(), err.Error()) diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index f3786e2379..9be838ed59 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -15,6 +15,7 @@ package dsess import ( + "context" "errors" "fmt" "strings" @@ -22,6 +23,7 @@ import ( "time" "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/vitess/go/mysql" "github.com/sirupsen/logrus" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" @@ -216,7 +218,9 @@ func doltCommit(ctx *sql.Context, workingSet = workingSet.ClearMerge() - newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx)) + var rsc doltdb.ReplicationStatusController + newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), &rsc) + WaitForReplicationController(ctx, rsc) return workingSet, newCommit, err } @@ -227,7 +231,10 @@ func txCommit(ctx *sql.Context, workingSet *doltdb.WorkingSet, hash hash.Hash, ) (*doltdb.WorkingSet, *doltdb.Commit, error) { - return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx)) + var rsc doltdb.ReplicationStatusController + err := tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), &rsc) + WaitForReplicationController(ctx, rsc) + return workingSet, nil, err } // DoltCommit commits the working set and creates a new DoltCommit as specified, in one atomic write @@ -235,6 +242,71 @@ func (tx *DoltTransaction) DoltCommit(ctx *sql.Context, workingSet *doltdb.Worki return tx.doCommit(ctx, workingSet, commit, doltCommit) } +func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatusController) { + if len(rsc.Wait) == 0 { + return + } + _, timeout, ok := sql.SystemVariables.GetGlobal(DoltClusterAckWritesTimeoutSecs) + if !ok { + return + } + timeoutI := timeout.(int64) + if timeoutI == 0 { + return + } + + cCtx, cancel := context.WithCancel(ctx) + var wg sync.WaitGroup + wg.Add(len(rsc.Wait)) + for i, f := range rsc.Wait { + f := f + i := i + go func() { + defer wg.Done() + err := f(cCtx) + if err == nil { + rsc.Wait[i] = nil + } + }() + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + waitFailed := false + select { + case <-time.After(time.Duration(timeoutI) * time.Second): + // We timed out before all the waiters were done. + // First we make certain to finalize everything. + cancel() + <-done + waitFailed = true + case <-done: + cancel() + } + + // Just because our waiters all completed does not mean they all + // returned nil errors. Any non-nil entries in rsc.Wait returned an + // error. We turn those into warnings here. + numFailed := 0 + for i, f := range rsc.Wait { + if f != nil { + numFailed += 1 + if waitFailed { + rsc.NotifyWaitFailed[i]() + } + } + } + ctx.Session.Warn(&sql.Warning{ + Level: "Warning", + Code: mysql.ERQueryTimeout, + Message: fmt.Sprintf("Timed out replication of commit to %d out of %d replicas.", numFailed, len(rsc.Wait)), + }) +} + // doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit func (tx *DoltTransaction) doCommit( ctx *sql.Context, diff --git a/go/libraries/doltcore/sqle/dsess/variables.go b/go/libraries/doltcore/sqle/dsess/variables.go index c979dded55..07b18431e7 100644 --- a/go/libraries/doltcore/sqle/dsess/variables.go +++ b/go/libraries/doltcore/sqle/dsess/variables.go @@ -52,8 +52,9 @@ const ( ShowBranchDatabases = "dolt_show_branch_databases" DoltLogLevel = "dolt_log_level" - DoltClusterRoleVariable = "dolt_cluster_role" - DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch" + DoltClusterRoleVariable = "dolt_cluster_role" + DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch" + DoltClusterAckWritesTimeoutSecs = "dolt_cluster_ack_writes_timeout_secs" ) const URLTemplateDatabasePlaceholder = "{database}" diff --git a/go/libraries/doltcore/sqle/read_replica_database.go b/go/libraries/doltcore/sqle/read_replica_database.go index 511878bc95..1cb591e3ef 100644 --- a/go/libraries/doltcore/sqle/read_replica_database.go +++ b/go/libraries/doltcore/sqle/read_replica_database.go @@ -243,7 +243,7 @@ func (rrd ReadReplicaDatabase) CreateLocalBranchFromRemote(ctx *sql.Context, bra } // create refs/heads/branch dataset - err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm) + err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm, nil) if err != nil { return nil, err } @@ -334,7 +334,7 @@ func pullBranchesAndUpdateWorkingSet( if commitRootHash != wsWorkingRootHash || commitRootHash != wsStagedRootHash { ws = ws.WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot) - err = rrd.ddb.UpdateWorkingSet(ctx, ws.Ref(), ws, prevHash, doltdb.TodoWorkingSetMeta()) + err = rrd.ddb.UpdateWorkingSet(ctx, ws.Ref(), ws, prevHash, doltdb.TodoWorkingSetMeta(), nil) if err == nil { return nil } @@ -458,7 +458,7 @@ func (rrd ReadReplicaDatabase) createNewBranchFromRemote(ctx *sql.Context, remot return err } - err = rrd.ddb.NewBranchAtCommit(ctx, remoteRef.Ref, cm) + err = rrd.ddb.NewBranchAtCommit(ctx, remoteRef.Ref, cm, nil) err = rrd.ddb.SetHead(ctx, trackingRef, remoteRef.Hash) if err != nil { return err @@ -534,7 +534,7 @@ func refsToDelete(remRefs, localRefs []doltdb.RefWithHash) []doltdb.RefWithHash func deleteBranches(ctx *sql.Context, rrd ReadReplicaDatabase, branches []doltdb.RefWithHash) error { for _, b := range branches { - err := rrd.ddb.DeleteBranch(ctx, b.Ref) + err := rrd.ddb.DeleteBranch(ctx, b.Ref, nil) if errors.Is(err, doltdb.ErrBranchNotFound) { continue } else if err != nil { diff --git a/go/libraries/doltcore/sqle/sqlselect_test.go b/go/libraries/doltcore/sqle/sqlselect_test.go index ff3caee88a..88caf6d0b6 100644 --- a/go/libraries/doltcore/sqle/sqlselect_test.go +++ b/go/libraries/doltcore/sqle/sqlselect_test.go @@ -1675,7 +1675,7 @@ func processNode(t *testing.T, ctx context.Context, dEnv *env.DoltEnv, node Hist require.NoError(t, err) if !ok { - err = dEnv.DoltDB.NewBranchAtCommit(ctx, branchRef, parent) + err = dEnv.DoltDB.NewBranchAtCommit(ctx, branchRef, parent, nil) require.NoError(t, err) } diff --git a/go/libraries/doltcore/sqle/system_variables.go b/go/libraries/doltcore/sqle/system_variables.go index 9875b7642b..a18ec75336 100644 --- a/go/libraries/doltcore/sqle/system_variables.go +++ b/go/libraries/doltcore/sqle/system_variables.go @@ -166,6 +166,13 @@ func AddDoltSystemVariables() { Type: types.NewSystemBoolType(dsess.ShowBranchDatabases), Default: int8(0), }, + { + Name: dsess.DoltClusterAckWritesTimeoutSecs, + Dynamic: true, + Scope: sql.SystemVariableScope_Persist, + Type: types.NewSystemIntType(dsess.DoltClusterAckWritesTimeoutSecs, 0, 60, false), + Default: int64(0), + }, }) } diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index 04c1959a4d..cf231d1ae3 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -1060,6 +1060,157 @@ tests: result: columns: ["count(*)"] rows: [["5"]] +- name: dolt_cluster_ack_writes_timeout_secs behavior + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + # This test writes new commits on the primary and quickly checks for them on + # the secondary. If dolt_cluster_ack_writes_timeout_secs is working as + # intended, the new writes will always be present. + connections: + - on: server1 + queries: + - exec: 'SET @@PERSIST.dolt_cluster_ack_writes_timeout_secs = 10' + - exec: 'CREATE DATABASE repo1' + - exec: 'USE repo1' + - exec: 'CREATE TABLE vals (i INT PRIMARY KEY)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: "SHOW TABLES" + result: + columns: ["Tables_in_repo1"] + rows: [["vals"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (0),(1),(2),(3),(4)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["5"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (5),(6),(7),(8),(9)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["10"]] + # Restart both servers to test the behavior of the persisted variable. + - on: server1 + restart_server: {} + - on: server2 + restart_server: {} + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (10),(11),(12),(13),(14)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["15"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'INSERT INTO vals VALUES (15),(16),(17),(18),(19)' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM vals' + result: + columns: ["COUNT(*)"] + rows: [["20"]] + # Assert that branch creation and deletion also blocks on replication. + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["2"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("-d", "new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["1"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["2"]] + - on: server1 + queries: + - exec: 'USE repo1' + - exec: 'CALL DOLT_BRANCH("-d", "new_branch")' + - on: server2 + queries: + - exec: 'USE repo1' + - query: 'SELECT COUNT(*) FROM dolt_branches' + result: + columns: ["COUNT(*)"] + rows: [["1"]] - name: call dolt checkout multi_repos: - name: server1