Merge pull request #5940 from dolthub/aaron/block-on-cluster-replication

go: sqle: cluster: Implement dolt_cluster_ack_writes_timeout_secs, a way to block COMMIT ack until the commit is replicated to cluster standbys.
This commit is contained in:
Aaron Son
2023-05-17 06:11:06 -07:00
committed by GitHub
33 changed files with 490 additions and 114 deletions

View File

@@ -245,7 +245,7 @@ func moveBranch(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgParseR
force := apr.Contains(cli.ForceFlag)
src := apr.Arg(0)
dest := apr.Arg(1)
err := actions.RenameBranch(ctx, dEnv.DbData(), src, apr.Arg(1), dEnv, force)
err := actions.RenameBranch(ctx, dEnv.DbData(), src, apr.Arg(1), dEnv, force, nil)
var verr errhand.VerboseError
if err != nil {
@@ -306,7 +306,7 @@ func deleteBranches(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPa
err := actions.DeleteBranch(ctx, dEnv.DbData(), brName, actions.DeleteOptions{
Force: force,
Remote: apr.Contains(cli.RemoteParam),
}, dEnv)
}, dEnv, nil)
if err != nil {
var verr errhand.VerboseError
@@ -379,7 +379,7 @@ func createBranch(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPars
}
}
err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, apr.Contains(cli.ForceFlag))
err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, apr.Contains(cli.ForceFlag), nil)
if err != nil {
return HandleVErrAndExitCode(errhand.BuildDError(err.Error()).Build(), usage)
}

View File

@@ -238,7 +238,7 @@ func checkoutRemoteBranchOrSuggestNew(ctx context.Context, dEnv *env.DoltEnv, na
}
func checkoutNewBranchFromStartPt(ctx context.Context, dEnv *env.DoltEnv, newBranch, startPt string) errhand.VerboseError {
err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, false)
err := actions.CreateBranchWithStartPt(ctx, dEnv.DbData(), newBranch, startPt, false, nil)
if err != nil {
return errhand.BuildDError(err.Error()).Build()
}

View File

@@ -229,6 +229,7 @@ func performCommit(ctx context.Context, commandStr string, args []string, dEnv *
ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(),
prevHash,
dEnv.NewWorkingSetMeta(fmt.Sprintf("Updated by %s %s", commandStr, strings.Join(args, " "))),
nil,
)
if err != nil {
if apr.Contains(cli.AmendFlag) {

View File

@@ -538,6 +538,7 @@ func executeNoFFMergeAndCommit(ctx context.Context, dEnv *env.DoltEnv, spec *mer
ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(),
wsHash,
dEnv.NewWorkingSetMeta(msg),
nil,
)
if err != nil {

View File

@@ -49,8 +49,8 @@ func NewPushOnWriteHook(destDB *DoltDB, tmpDir string) *PushOnWriteHook {
}
// Execute implements CommitHook, replicates head updates to the destDb field
func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
return pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir)
func (ph *PushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) {
return nil, pushDataset(ctx, ph.destDB, db, ds, ph.tmpDir)
}
func pushDataset(ctx context.Context, destDB, srcDB datas.Database, ds datas.Dataset, tmpDir string) error {
@@ -135,16 +135,16 @@ func (*AsyncPushOnWriteHook) ExecuteForWorkingSets() bool {
}
// Execute implements CommitHook, replicates head updates to the destDb field
func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) {
addr, _ := ds.MaybeHeadAddr()
select {
case ah.ch <- PushArg{ds: ds, db: db, hash: addr}:
case <-ctx.Done():
ah.ch <- PushArg{ds: ds, db: db, hash: addr}
return ctx.Err()
return nil, ctx.Err()
}
return nil
return nil, nil
}
// HandleError implements CommitHook
@@ -174,12 +174,12 @@ func NewLogHook(msg []byte) *LogHook {
}
// Execute implements CommitHook, writes message to log channel
func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
func (lh *LogHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) {
if lh.out != nil {
_, err := lh.out.Write(lh.msg)
return err
return nil, err
}
return nil
return nil, nil
}
// HandleError implements CommitHook

View File

@@ -136,7 +136,7 @@ func TestPushOnWriteHook(t *testing.T) {
ds, err := ddb.db.GetDataset(ctx, "refs/heads/main")
require.NoError(t, err)
err = hook.Execute(ctx, ds, ddb.db)
_, err = hook.Execute(ctx, ds, ddb.db)
require.NoError(t, err)
cs, _ = NewCommitSpec(defaultBranch)
@@ -269,7 +269,7 @@ func TestAsyncPushOnWrite(t *testing.T) {
require.NoError(t, err)
ds, err := ddb.db.GetDataset(ctx, "refs/heads/main")
require.NoError(t, err)
err = hook.Execute(ctx, ds, ddb.db)
_, err = hook.Execute(ctx, ds, ddb.db)
require.NoError(t, err)
}
})

View File

@@ -1079,7 +1079,7 @@ func (ddb *DoltDB) GetRefsOfTypeByNomsRoot(ctx context.Context, refTypeFilter ma
// NewBranchAtCommit creates a new branch with HEAD at the commit given. Branch names must pass IsValidUserBranchName.
// Silently overwrites any existing branch with the same name given, if one exists.
func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, commit *Commit) error {
func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef, commit *Commit, replicationStatus *ReplicationStatusController) error {
if !IsValidBranchRef(branchRef) {
panic(fmt.Sprintf("invalid branch name %s, use IsValidUserBranchName check", branchRef.String()))
}
@@ -1124,7 +1124,7 @@ func (ddb *DoltDB) NewBranchAtCommit(ctx context.Context, branchRef ref.DoltRef,
}
ws = ws.WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot)
return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta())
return ddb.UpdateWorkingSet(ctx, wsRef, ws, currWsHash, TodoWorkingSetMeta(), replicationStatus)
}
// CopyWorkingSet copies a WorkingSetRef from one ref to another. If `force` is
@@ -1155,15 +1155,15 @@ func (ddb *DoltDB) CopyWorkingSet(ctx context.Context, fromWSRef ref.WorkingSetR
}
}
return ddb.UpdateWorkingSet(ctx, toWSRef, ws, currWsHash, TodoWorkingSetMeta())
return ddb.UpdateWorkingSet(ctx, toWSRef, ws, currWsHash, TodoWorkingSetMeta(), nil)
}
// DeleteBranch deletes the branch given, returning an error if it doesn't exist.
func (ddb *DoltDB) DeleteBranch(ctx context.Context, branch ref.DoltRef) error {
return ddb.deleteRef(ctx, branch)
func (ddb *DoltDB) DeleteBranch(ctx context.Context, branch ref.DoltRef, replicationStatus *ReplicationStatusController) error {
return ddb.deleteRef(ctx, branch, replicationStatus)
}
func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef) error {
func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef, replicationStatus *ReplicationStatusController) error {
ds, err := ddb.db.GetDataset(ctx, dref.String())
if err != nil {
@@ -1184,7 +1184,7 @@ func (ddb *DoltDB) deleteRef(ctx context.Context, dref ref.DoltRef) error {
}
}
_, err = ddb.db.Delete(ctx, ds)
_, err = ddb.db.withReplicationStatusController(replicationStatus).Delete(ctx, ds)
return err
}
@@ -1216,6 +1216,20 @@ func (ddb *DoltDB) NewTagAtCommit(ctx context.Context, tagRef ref.DoltRef, c *Co
return err
}
type ReplicationStatusController struct {
// A slice of funcs which can be called to wait for the replication
// associated with a commithook to complete. Must return if the
// associated Context is canceled.
Wait []func(ctx context.Context) error
// There is an entry here for each function in Wait. If a Wait fails,
// you can notify the corresponding function in this slice. This might
// control resiliency behaviors like adaptive retry and timeouts,
// circuit breakers, etc. and might feed into exposed replication
// metrics.
NotifyWaitFailed []func()
}
// UpdateWorkingSet updates the working set with the ref given to the root value given
// |prevHash| is the hash of the expected WorkingSet struct stored in the ref, not the hash of the RootValue there.
func (ddb *DoltDB) UpdateWorkingSet(
@@ -1224,6 +1238,7 @@ func (ddb *DoltDB) UpdateWorkingSet(
workingSet *WorkingSet,
prevHash hash.Hash,
meta *datas.WorkingSetMeta,
replicationStatus *ReplicationStatusController,
) error {
ds, err := ddb.db.GetDataset(ctx, workingSetRef.String())
if err != nil {
@@ -1235,7 +1250,7 @@ func (ddb *DoltDB) UpdateWorkingSet(
return err
}
_, err = ddb.db.UpdateWorkingSet(ctx, ds, datas.WorkingSetSpec{
_, err = ddb.db.withReplicationStatusController(replicationStatus).UpdateWorkingSet(ctx, ds, datas.WorkingSetSpec{
Meta: meta,
WorkingRoot: workingRootRef,
StagedRoot: stagedRef,
@@ -1255,6 +1270,7 @@ func (ddb *DoltDB) CommitWithWorkingSet(
commit *PendingCommit, workingSet *WorkingSet,
prevHash hash.Hash,
meta *datas.WorkingSetMeta,
replicationStatus *ReplicationStatusController,
) (*Commit, error) {
wsDs, err := ddb.db.GetDataset(ctx, workingSetRef.String())
if err != nil {
@@ -1271,12 +1287,13 @@ func (ddb *DoltDB) CommitWithWorkingSet(
return nil, err
}
commitDataset, _, err := ddb.db.CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{
Meta: meta,
WorkingRoot: workingRootRef,
StagedRoot: stagedRef,
MergeState: mergeState,
}, prevHash, commit.CommitOptions)
commitDataset, _, err := ddb.db.withReplicationStatusController(replicationStatus).
CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{
Meta: meta,
WorkingRoot: workingRootRef,
StagedRoot: stagedRef,
MergeState: mergeState,
}, prevHash, commit.CommitOptions)
if err != nil {
return nil, err
@@ -1310,7 +1327,7 @@ func (ddb *DoltDB) DeleteWorkingSet(ctx context.Context, workingSetRef ref.Worki
}
func (ddb *DoltDB) DeleteTag(ctx context.Context, tag ref.DoltRef) error {
err := ddb.deleteRef(ctx, tag)
err := ddb.deleteRef(ctx, tag, nil)
if err == ErrBranchNotFound {
return ErrTagNotFound
@@ -1337,7 +1354,7 @@ func (ddb *DoltDB) NewWorkspaceAtCommit(ctx context.Context, workRef ref.DoltRef
}
func (ddb *DoltDB) DeleteWorkspace(ctx context.Context, workRef ref.DoltRef) error {
err := ddb.deleteRef(ctx, workRef)
err := ddb.deleteRef(ctx, workRef, nil)
if err == ErrBranchNotFound {
return ErrWorkspaceNotFound
@@ -1652,7 +1669,7 @@ func (ddb *DoltDB) RemoveStashAtIdx(ctx context.Context, idx int) error {
// RemoveAllStashes removes the stash list Dataset from the database,
// which equivalent to removing Stash entries from the stash list.
func (ddb *DoltDB) RemoveAllStashes(ctx context.Context) error {
err := ddb.deleteRef(ctx, ref.NewStashRef())
err := ddb.deleteRef(ctx, ref.NewStashRef(), nil)
if err == ErrBranchNotFound {
return nil
}

View File

@@ -17,23 +17,23 @@ package doltdb
import (
"context"
"io"
"sync"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
"sync"
)
type hooksDatabase struct {
datas.Database
postCommitHooks []CommitHook
rsc *ReplicationStatusController
}
// CommitHook is an abstraction for executing arbitrary commands after atomic database commits
type CommitHook interface {
// Execute is arbitrary read-only function whose arguments are new Dataset commit into a specific Database
Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error
Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error)
// HandleError is an bridge function to handle Execute errors
HandleError(ctx context.Context, err error) error
// SetLogger lets clients specify an output stream for HandleError
@@ -42,8 +42,16 @@ type CommitHook interface {
ExecuteForWorkingSets() bool
}
// If a commit hook supports this interface, it can be notified if waiting for
// replication in the callback returned by |Execute| failed to complete in time
// or returned an error.
type NotifyWaitFailedCommitHook interface {
NotifyWaitFailed()
}
func (db hooksDatabase) SetCommitHooks(ctx context.Context, postHooks []CommitHook) hooksDatabase {
db.postCommitHooks = postHooks
db.postCommitHooks = make([]CommitHook, len(postHooks))
copy(db.postCommitHooks, postHooks)
return db
}
@@ -54,27 +62,61 @@ func (db hooksDatabase) SetCommitHookLogger(ctx context.Context, wr io.Writer) h
return db
}
func (db hooksDatabase) withReplicationStatusController(rsc *ReplicationStatusController) hooksDatabase {
db.rsc = rsc
return db
}
func (db hooksDatabase) PostCommitHooks() []CommitHook {
return db.postCommitHooks
toret := make([]CommitHook, len(db.postCommitHooks))
copy(toret, db.postCommitHooks)
return toret
}
func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset, onlyWS bool) {
var err error
var wg sync.WaitGroup
for _, hook := range db.postCommitHooks {
rsc := db.rsc
var ioff int
if rsc != nil {
ioff = len(rsc.Wait)
rsc.Wait = append(rsc.Wait, make([]func(context.Context) error, len(db.postCommitHooks))...)
rsc.NotifyWaitFailed = append(rsc.NotifyWaitFailed, make([]func(), len(db.postCommitHooks))...)
}
for il, hook := range db.postCommitHooks {
if !onlyWS || hook.ExecuteForWorkingSets() {
i := il
hook := hook
wg.Add(1)
go func() {
defer wg.Done()
err = hook.Execute(ctx, ds, db)
f, err := hook.Execute(ctx, ds, db)
if err != nil {
hook.HandleError(ctx, err)
}
if rsc != nil {
rsc.Wait[i+ioff] = f
if nf, ok := hook.(NotifyWaitFailedCommitHook); ok {
rsc.NotifyWaitFailed[i+ioff] = nf.NotifyWaitFailed
} else {
rsc.NotifyWaitFailed[i+ioff] = func() {}
}
}
}()
}
}
wg.Wait()
if rsc != nil {
j := ioff
for i := ioff; i < len(rsc.Wait); i++ {
if rsc.Wait[i] != nil {
rsc.Wait[j] = rsc.Wait[i]
rsc.NotifyWaitFailed[j] = rsc.NotifyWaitFailed[i]
j++
}
}
rsc.Wait = rsc.Wait[:j]
rsc.NotifyWaitFailed = rsc.NotifyWaitFailed[:j]
}
}
func (db hooksDatabase) CommitWithWorkingSet(

View File

@@ -156,7 +156,7 @@ func (mr *MultiRepoTestSetup) NewRemote(remoteName string) {
func (mr *MultiRepoTestSetup) NewBranch(dbName, branchName string) {
dEnv := mr.envs[dbName]
err := actions.CreateBranchWithStartPt(context.Background(), dEnv.DbData(), branchName, "head", false)
err := actions.CreateBranchWithStartPt(context.Background(), dEnv.DbData(), branchName, "head", false, nil)
if err != nil {
mr.Errhand(err)
}
@@ -268,6 +268,7 @@ func (mr *MultiRepoTestSetup) CommitWithWorkingSet(dbName string) *doltdb.Commit
ws.WithStagedRoot(pendingCommit.Roots.Staged).WithWorkingRoot(pendingCommit.Roots.Working).ClearMerge(),
prevHash,
doltdb.TodoWorkingSetMeta(),
nil,
)
if err != nil {
panic("couldn't commit: " + err.Error())

View File

@@ -31,11 +31,13 @@ var ErrCOBranchDelete = errors.New("attempted to delete checked out branch")
var ErrUnmergedBranch = errors.New("branch is not fully merged")
var ErrWorkingSetsOnBothBranches = errors.New("checkout would overwrite uncommitted changes on target branch")
func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch string, remoteDbPro env.RemoteDbProvider, force bool) error {
func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch string, remoteDbPro env.RemoteDbProvider, force bool, rsc *doltdb.ReplicationStatusController) error {
oldRef := ref.NewBranchRef(oldBranch)
newRef := ref.NewBranchRef(newBranch)
err := CopyBranchOnDB(ctx, dbData.Ddb, oldBranch, newBranch, force)
// TODO: This function smears the branch updates across multiple commits of the datas.Database.
err := CopyBranchOnDB(ctx, dbData.Ddb, oldBranch, newBranch, force, rsc)
if err != nil {
return err
}
@@ -66,14 +68,14 @@ func RenameBranch(ctx context.Context, dbData env.DbData, oldBranch, newBranch s
}
}
return DeleteBranch(ctx, dbData, oldBranch, DeleteOptions{Force: true}, remoteDbPro)
return DeleteBranch(ctx, dbData, oldBranch, DeleteOptions{Force: true}, remoteDbPro, rsc)
}
func CopyBranch(ctx context.Context, dEnv *env.DoltEnv, oldBranch, newBranch string, force bool) error {
return CopyBranchOnDB(ctx, dEnv.DoltDB, oldBranch, newBranch, force)
return CopyBranchOnDB(ctx, dEnv.DoltDB, oldBranch, newBranch, force, nil)
}
func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranch string, force bool) error {
func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranch string, force bool, rsc *doltdb.ReplicationStatusController) error {
oldRef := ref.NewBranchRef(oldBranch)
newRef := ref.NewBranchRef(newBranch)
@@ -104,7 +106,7 @@ func CopyBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, oldBranch, newBranc
return err
}
return ddb.NewBranchAtCommit(ctx, newRef, cm)
return ddb.NewBranchAtCommit(ctx, newRef, cm, rsc)
}
type DeleteOptions struct {
@@ -112,7 +114,7 @@ type DeleteOptions struct {
Remote bool
}
func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts DeleteOptions, remoteDbPro env.RemoteDbProvider) error {
func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts DeleteOptions, remoteDbPro env.RemoteDbProvider, rsc *doltdb.ReplicationStatusController) error {
var branchRef ref.DoltRef
if opts.Remote {
var err error
@@ -127,10 +129,10 @@ func DeleteBranch(ctx context.Context, dbData env.DbData, brName string, opts De
}
}
return DeleteBranchOnDB(ctx, dbData, branchRef, opts, remoteDbPro)
return DeleteBranchOnDB(ctx, dbData, branchRef, opts, remoteDbPro, rsc)
}
func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.DoltRef, opts DeleteOptions, pro env.RemoteDbProvider) error {
func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.DoltRef, opts DeleteOptions, pro env.RemoteDbProvider, rsc *doltdb.ReplicationStatusController) error {
ddb := dbdata.Ddb
hasRef, err := ddb.HasRef(ctx, branchRef)
@@ -173,7 +175,7 @@ func DeleteBranchOnDB(ctx context.Context, dbdata env.DbData, branchRef ref.Dolt
}
}
return ddb.DeleteBranch(ctx, branchRef)
return ddb.DeleteBranch(ctx, branchRef, rsc)
}
// validateBranchMergedIntoCurrentWorkingBranch returns an error if the given branch is not fully merged into the HEAD of the current branch.
@@ -267,8 +269,8 @@ func validateBranchMergedIntoUpstream(ctx context.Context, dbdata env.DbData, br
return nil
}
func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, startPt string, force bool) error {
err := createBranch(ctx, dbData, newBranch, startPt, force)
func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch, startPt string, force bool, rsc *doltdb.ReplicationStatusController) error {
err := createBranch(ctx, dbData, newBranch, startPt, force, rsc)
if err != nil {
if err == ErrAlreadyExists {
@@ -289,7 +291,7 @@ func CreateBranchWithStartPt(ctx context.Context, dbData env.DbData, newBranch,
return nil
}
func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, startingPoint string, force bool, headRef ref.DoltRef) error {
func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, startingPoint string, force bool, headRef ref.DoltRef, rsc *doltdb.ReplicationStatusController) error {
branchRef := ref.NewBranchRef(newBranch)
hasRef, err := ddb.HasRef(ctx, branchRef)
if err != nil {
@@ -314,7 +316,7 @@ func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, starti
return err
}
err = ddb.NewBranchAtCommit(ctx, branchRef, cm)
err = ddb.NewBranchAtCommit(ctx, branchRef, cm, rsc)
if err != nil {
return err
}
@@ -322,8 +324,8 @@ func CreateBranchOnDB(ctx context.Context, ddb *doltdb.DoltDB, newBranch, starti
return nil
}
func createBranch(ctx context.Context, dbData env.DbData, newBranch, startingPoint string, force bool) error {
return CreateBranchOnDB(ctx, dbData.Ddb, newBranch, startingPoint, force, dbData.Rsr.CWBHeadRef())
func createBranch(ctx context.Context, dbData env.DbData, newBranch, startingPoint string, force bool, rsc *doltdb.ReplicationStatusController) error {
return CreateBranchOnDB(ctx, dbData.Ddb, newBranch, startingPoint, force, dbData.Rsr.CWBHeadRef(), rsc)
}
var emptyHash = hash.Hash{}

View File

@@ -331,6 +331,7 @@ func cleanOldWorkingSet(
initialWs.WithWorkingRoot(newRoots.Working).WithStagedRoot(newRoots.Staged).ClearMerge(),
h,
dEnv.NewWorkingSetMeta("reset hard"),
nil,
)
if err != nil {
return err

View File

@@ -230,7 +230,7 @@ func CloneRemote(ctx context.Context, srcDB *doltdb.DoltDB, remoteName, branch s
}
if brnch.GetPath() != branch {
err := dEnv.DoltDB.DeleteBranch(ctx, brnch)
err := dEnv.DoltDB.DeleteBranch(ctx, brnch, nil)
if err != nil {
return fmt.Errorf("%w: %s; %s", ErrFailedToDeleteBranch, brnch.String(), err.Error())
}

View File

@@ -86,7 +86,7 @@ func TestGetDotDotRevisions(t *testing.T) {
// Create a feature branch.
bref := ref.NewBranchRef("feature")
err = dEnv.DoltDB.NewBranchAtCommit(context.Background(), bref, mainCommits[5])
err = dEnv.DoltDB.NewBranchAtCommit(context.Background(), bref, mainCommits[5], nil)
require.NoError(t, err)
// Create 3 commits on feature branch.

View File

@@ -220,14 +220,14 @@ func DeleteRemoteBranch(ctx context.Context, targetRef ref.BranchRef, remoteRef
}
if hasRef {
err = remoteDB.DeleteBranch(ctx, targetRef)
err = remoteDB.DeleteBranch(ctx, targetRef, nil)
}
if err != nil {
return err
}
err = localDB.DeleteBranch(ctx, remoteRef)
err = localDB.DeleteBranch(ctx, remoteRef, nil)
if err != nil {
return err

View File

@@ -164,7 +164,7 @@ func ResetHard(
return err
}
err = dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.WithWorkingRoot(roots.Working).WithStagedRoot(roots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard"))
err = dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.WithWorkingRoot(roots.Working).WithStagedRoot(roots.Staged).ClearMerge(), h, dEnv.NewWorkingSetMeta("reset hard"), nil)
if err != nil {
return err
}

View File

@@ -629,7 +629,7 @@ func (dEnv *DoltEnv) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb.Root
wsRef = ws.Ref()
}
return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, dEnv.workingSetMeta())
return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, dEnv.workingSetMeta(), nil)
}
// UpdateWorkingSet updates the working set for the current working branch to the value given.
@@ -648,7 +648,7 @@ func (dEnv *DoltEnv) UpdateWorkingSet(ctx context.Context, ws *doltdb.WorkingSet
}
}
return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws, h, dEnv.workingSetMeta())
return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws, h, dEnv.workingSetMeta(), nil)
}
type repoStateReader struct {
@@ -758,7 +758,7 @@ func (dEnv *DoltEnv) UpdateStagedRoot(ctx context.Context, newRoot *doltdb.RootV
wsRef = ws.Ref()
}
return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, dEnv.workingSetMeta())
return dEnv.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, dEnv.workingSetMeta(), nil)
}
func (dEnv *DoltEnv) AbortMerge(ctx context.Context) error {
@@ -772,7 +772,7 @@ func (dEnv *DoltEnv) AbortMerge(ctx context.Context) error {
return err
}
return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.AbortMerge(), h, dEnv.workingSetMeta())
return dEnv.DoltDB.UpdateWorkingSet(ctx, ws.Ref(), ws.AbortMerge(), h, dEnv.workingSetMeta(), nil)
}
func (dEnv *DoltEnv) workingSetMeta() *datas.WorkingSetMeta {
@@ -911,7 +911,7 @@ func (dEnv *DoltEnv) RemoveRemote(ctx context.Context, name string) error {
rr := r.(ref.RemoteRef)
if rr.GetRemote() == remote.Name {
err = ddb.DeleteBranch(ctx, rr)
err = ddb.DeleteBranch(ctx, rr, nil)
if err != nil {
return fmt.Errorf("%w; failed to delete remote tracking ref '%s'; %s", ErrFailedToDeleteRemote, rr.String(), err.Error())

View File

@@ -136,7 +136,7 @@ func (m MemoryRepoState) UpdateStagedRoot(ctx context.Context, newRoot *doltdb.R
wsRef = ws.Ref()
}
return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, m.workingSetMeta())
return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithStagedRoot(newRoot), h, m.workingSetMeta(), nil)
}
func (m MemoryRepoState) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb.RootValue) error {
@@ -162,7 +162,7 @@ func (m MemoryRepoState) UpdateWorkingRoot(ctx context.Context, newRoot *doltdb.
wsRef = ws.Ref()
}
return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, m.workingSetMeta())
return m.DoltDB.UpdateWorkingSet(ctx, wsRef, ws.WithWorkingRoot(newRoot), h, m.workingSetMeta(), nil)
}
func (m MemoryRepoState) WorkingSet(ctx context.Context) (*doltdb.WorkingSet, error) {

View File

@@ -743,7 +743,7 @@ func buildLeftRightAncCommitsAndBranches(t *testing.T, ddb *doltdb.DoltDB, rootT
commit, err := ddb.Commit(context.Background(), hash, ref.NewBranchRef(env.DefaultInitBranch), meta)
require.NoError(t, err)
err = ddb.NewBranchAtCommit(context.Background(), ref.NewBranchRef("to-merge"), initialCommit)
err = ddb.NewBranchAtCommit(context.Background(), ref.NewBranchRef("to-merge"), initialCommit, nil)
require.NoError(t, err)
mergeCommit, err := ddb.Commit(context.Background(), mergeHash, ref.NewBranchRef("to-merge"), meta)
require.NoError(t, err)

View File

@@ -182,7 +182,7 @@ func persistMigratedCommitMapping(ctx context.Context, ddb *doltdb.DoltDB, mappi
}
br := ref.NewBranchRef(MigratedCommitsBranch)
err = ddb.NewBranchAtCommit(ctx, br, init)
err = ddb.NewBranchAtCommit(ctx, br, init, nil)
if err != nil {
return err
}
@@ -298,6 +298,6 @@ func commitRoot(
Name: meta.Name,
Email: meta.Email,
Timestamp: uint64(time.Now().Unix()),
})
}, nil)
return err
}

View File

@@ -93,7 +93,7 @@ func migrateWorkingSet(ctx context.Context, menv Environment, brRef ref.BranchRe
newWs := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(wr).WithStagedRoot(sr)
return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta())
return new.UpdateWorkingSet(ctx, wsRef, newWs, hash.Hash{}, oldWs.Meta(), nil)
}
func migrateCommit(ctx context.Context, menv Environment, oldCm *doltdb.Commit, new *doltdb.DoltDB, prog *progress) error {

View File

@@ -152,7 +152,7 @@ func rebaseRefs(ctx context.Context, dbData env.DbData, replay ReplayCommitFn, n
for i, r := range refs {
switch dRef := r.(type) {
case ref.BranchRef:
err = ddb.NewBranchAtCommit(ctx, dRef, newHeads[i])
err = ddb.NewBranchAtCommit(ctx, dRef, newHeads[i], nil)
case ref.TagRef:
// rewrite tag with new commit

View File

@@ -706,7 +706,7 @@ func closeWriteSession(ctx *sql.Context, engine *gms.Engine, databaseName string
return err
}
return sqlDatabase.DbData().Ddb.UpdateWorkingSet(ctx, newWorkingSet.Ref(), newWorkingSet, hash, newWorkingSet.Meta())
return sqlDatabase.DbData().Ddb.UpdateWorkingSet(ctx, newWorkingSet.Ref(), newWorkingSet, hash, newWorkingSet.Meta(), nil)
}
// getTableSchema returns a sql.Schema for the specified table in the specified database.

View File

@@ -32,6 +32,7 @@ import (
)
var _ doltdb.CommitHook = (*commithook)(nil)
var _ doltdb.NotifyWaitFailedCommitHook = (*commithook)(nil)
type commithook struct {
rootLgr *logrus.Entry
@@ -53,6 +54,20 @@ type commithook struct {
// commithooks are caught up with replicating to the standby.
waitNotify func()
// This is a slice of notification channels maintained by the
// commithook. The semantics are:
// 1. All accesses to |successChs| must happen with |mu| held.
// 2. There may be |0| or more channels in the slice.
// 3. As a reader, if |successChs| is non-empty, you should just read a value, for example, |successChs[0]| and use it. All entries will be closed at the same time. If |successChs| is empty when you need a channel, you should add one to it.
// 4. If you read a channel out of |successChs|, that channel will be closed on the next successful replication attempt. It will not be closed before then.
successChs []chan struct{}
// If this is true, the waitF returned by Execute() will fast fail if
// we are not already caught up, instead of blocking on a successCh
// actually indicated we are caught up. This is set to by a call to
// NotifyWaitFailed(), an optional interface on CommitHook.
fastFailReplicationWait bool
role Role
// The standby replica to which the new root gets replicated.
@@ -145,9 +160,17 @@ func (h *commithook) replicate(ctx context.Context) {
if h.waitNotify != nil {
h.waitNotify()
}
caughtUp := h.isCaughtUp()
if len(h.successChs) != 0 && caughtUp {
for _, ch := range h.successChs {
close(ch)
}
h.successChs = nil
h.fastFailReplicationWait = false
}
if shouldHeartbeat {
h.attemptHeartbeat(ctx)
} else {
} else if caughtUp {
shouldHeartbeat = true
}
h.cond.Wait()
@@ -230,6 +253,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
}
h.cancelReplicate = nil
}()
successChs := h.successChs
h.successChs = nil
defer func() {
if len(successChs) != 0 {
h.successChs = append(h.successChs, successChs...)
}
}()
h.mu.Unlock()
if destDB == nil {
@@ -280,6 +310,12 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
h.lastPushedHead = toPush
h.lastSuccess = incomingTime
h.nextPushAttempt = time.Time{}
if len(successChs) != 0 {
for _, ch := range successChs {
close(ch)
}
successChs = nil
}
} else {
h.currentError = new(string)
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
@@ -392,21 +428,21 @@ var errDetectedBrokenConfigStr = "error: more than one server was configured as
// Execute on this commithook updates the target root hash we're attempting to
// replicate and wakes the replication thread.
func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) (func(context.Context) error, error) {
lgr := h.logger()
lgr.Tracef("cluster/commithook: Execute called post commit")
cs := datas.ChunkStoreFromDatabase(db)
root, err := cs.Root(ctx)
if err != nil {
lgr.Errorf("cluster/commithook: Execute: error retrieving local database root: %v", err)
return err
return nil, err
}
h.mu.Lock()
defer h.mu.Unlock()
lgr = h.logger()
if h.role != RolePrimary {
lgr.Warnf("cluster/commithook received commit callback for a commit on %s, but we are not role primary; not replicating the commit, which is likely to be lost.", ds.ID())
return nil
return nil, nil
}
if root != h.nextHead {
lgr.Tracef("signaling replication thread to push new head: %v", root.String())
@@ -415,7 +451,34 @@ func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Dat
h.nextPushAttempt = time.Time{}
h.cond.Signal()
}
return nil
var waitF func(context.Context) error
if !h.isCaughtUp() {
if h.fastFailReplicationWait {
waitF = func(ctx context.Context) error {
return fmt.Errorf("circuit breaker for replication to %s/%s is open. this commit did not necessarily replicate successfully.", h.remotename, h.dbname)
}
} else {
if len(h.successChs) == 0 {
h.successChs = append(h.successChs, make(chan struct{}))
}
successCh := h.successChs[0]
waitF = func(ctx context.Context) error {
select {
case <-successCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
}
}
return waitF, nil
}
func (h *commithook) NotifyWaitFailed() {
h.mu.Lock()
defer h.mu.Unlock()
h.fastFailReplicationWait = true
}
func (h *commithook) HandleError(ctx context.Context, err error) error {

View File

@@ -82,6 +82,7 @@ type Controller struct {
type sqlvars interface {
AddSystemVariables(sysVars []sql.SystemVariable)
GetGlobal(name string) (sql.SystemVariable, interface{}, bool)
}
// We can manage certain aspects of the exposed databases on the server through

View File

@@ -64,30 +64,36 @@ func doDoltBranch(ctx *sql.Context, args []string) (int, error) {
return 1, fmt.Errorf("Could not load database %s", dbName)
}
var rsc doltdb.ReplicationStatusController
switch {
case apr.Contains(cli.CopyFlag):
err = copyBranch(ctx, dbData, apr)
err = copyBranch(ctx, dbData, apr, &rsc)
case apr.Contains(cli.MoveFlag):
err = renameBranch(ctx, dbData, apr, dSess, dbName)
err = renameBranch(ctx, dbData, apr, dSess, dbName, &rsc)
case apr.Contains(cli.DeleteFlag), apr.Contains(cli.DeleteForceFlag):
err = deleteBranches(ctx, dbData, apr, dSess, dbName)
err = deleteBranches(ctx, dbData, apr, dSess, dbName, &rsc)
default:
err = createNewBranch(ctx, dbData, apr)
err = createNewBranch(ctx, dbData, apr, &rsc)
}
if err != nil {
return 1, err
} else {
return 0, commitTransaction(ctx, dSess)
return 0, commitTransaction(ctx, dSess, &rsc)
}
}
func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession) error {
func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession, rsc *doltdb.ReplicationStatusController) error {
err := dSess.CommitTransaction(ctx, ctx.GetTransaction())
if err != nil {
return err
}
if rsc != nil {
dsess.WaitForReplicationController(ctx, *rsc)
}
// Because this transaction manipulation is happening outside the engine's awareness, we need to set it to nil here
// to get a fresh transaction started on the next statement.
// TODO: put this under engine control
@@ -97,7 +103,7 @@ func commitTransaction(ctx *sql.Context, dSess *dsess.DoltSession) error {
// renameBranch takes DoltSession and database name to try accessing file system for dolt database.
// If the oldBranch being renamed is the current branch on CLI, then RepoState head will be updated with the newBranch ref.
func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string) error {
func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string, rsc *doltdb.ReplicationStatusController) error {
if apr.NArg() != 2 {
return InvalidArgErr
}
@@ -124,7 +130,7 @@ func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseRe
return err
}
err := actions.RenameBranch(ctx, dbData, oldBranchName, newBranchName, sess.Provider(), force)
err := actions.RenameBranch(ctx, dbData, oldBranchName, newBranchName, sess.Provider(), force, rsc)
if err != nil {
return err
}
@@ -150,7 +156,7 @@ func renameBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseRe
// deleteBranches takes DoltSession and database name to try accessing file system for dolt database.
// If the database is not session state db and the branch being deleted is the current branch on CLI, it will update
// the RepoState to set head as empty branchRef.
func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string) error {
func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, sess *dsess.DoltSession, dbName string, rsc *doltdb.ReplicationStatusController) error {
if apr.NArg() == 0 {
return InvalidArgErr
}
@@ -194,7 +200,7 @@ func deleteBranches(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParse
err = actions.DeleteBranch(ctx, dbData, branchName, actions.DeleteOptions{
Force: force,
}, dSess.Provider())
}, dSess.Provider(), rsc)
if err != nil {
return err
}
@@ -274,7 +280,7 @@ func loadConfig(ctx *sql.Context) *env.DoltCliConfig {
return dEnv.Config
}
func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults) error {
func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error {
if apr.NArg() == 0 || apr.NArg() > 2 {
return InvalidArgErr
}
@@ -332,7 +338,7 @@ func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgPars
return err
}
err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, startPt, apr.Contains(cli.ForceFlag))
err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, startPt, apr.Contains(cli.ForceFlag), rsc)
if err != nil {
return err
}
@@ -348,7 +354,7 @@ func createNewBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgPars
return nil
}
func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults) error {
func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error {
if apr.NArg() != 2 {
return InvalidArgErr
}
@@ -364,10 +370,10 @@ func copyBranch(ctx *sql.Context, dbData env.DbData, apr *argparser.ArgParseResu
}
force := apr.Contains(cli.ForceFlag)
return copyABranch(ctx, dbData, srcBr, destBr, force)
return copyABranch(ctx, dbData, srcBr, destBr, force, rsc)
}
func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr string, force bool) error {
func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr string, force bool, rsc *doltdb.ReplicationStatusController) error {
if err := branch_control.CanCreateBranch(ctx, destBr); err != nil {
return err
}
@@ -378,7 +384,7 @@ func copyABranch(ctx *sql.Context, dbData env.DbData, srcBr string, destBr strin
return err
}
}
err := actions.CopyBranchOnDB(ctx, dbData.Ddb, srcBr, destBr, force)
err := actions.CopyBranchOnDB(ctx, dbData.Ddb, srcBr, destBr, force, rsc)
if err != nil {
if err == doltdb.ErrBranchNotFound {
return errors.New(fmt.Sprintf("fatal: A branch named '%s' not found", srcBr))

View File

@@ -71,9 +71,11 @@ func doDoltCheckout(ctx *sql.Context, args []string) (int, error) {
return 1, fmt.Errorf("Could not load database %s", currentDbName)
}
var rsc doltdb.ReplicationStatusController
// Checking out new branch.
if branchOrTrack {
err = checkoutNewBranch(ctx, dbName, dbData, apr)
err = checkoutNewBranch(ctx, dbName, dbData, apr, &rsc)
if err != nil {
return 1, err
} else {
@@ -121,13 +123,15 @@ func doDoltCheckout(ctx *sql.Context, args []string) (int, error) {
err = checkoutTables(ctx, roots, dbName, args)
if err != nil && apr.NArg() == 1 {
err = checkoutRemoteBranch(ctx, dbName, dbData, branchName, apr)
err = checkoutRemoteBranch(ctx, dbName, dbData, branchName, apr, &rsc)
}
if err != nil {
return 1, err
}
dsess.WaitForReplicationController(ctx, rsc)
return 0, nil
}
@@ -168,7 +172,7 @@ func createWorkingSetForLocalBranch(ctx *sql.Context, ddb *doltdb.DoltDB, branch
}
ws := doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot)
return ddb.UpdateWorkingSet(ctx, wsRef, ws, hash.Hash{} /* current hash... */, doltdb.TodoWorkingSetMeta())
return ddb.UpdateWorkingSet(ctx, wsRef, ws, hash.Hash{} /* current hash... */, doltdb.TodoWorkingSetMeta(), nil)
}
// getRevisionForRevisionDatabase returns the root database name and revision for a database, or just the root database name if the specified db name is not a revision database.
@@ -196,7 +200,7 @@ func getRevisionForRevisionDatabase(ctx *sql.Context, dbName string) (string, st
// checkoutRemoteBranch checks out a remote branch creating a new local branch with the same name as the remote branch
// and set its upstream. The upstream persists out of sql session.
func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, branchName string, apr *argparser.ArgParseResults) error {
func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, branchName string, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error {
remoteRefs, err := actions.GetRemoteBranchRef(ctx, dbData.Ddb, branchName)
if err != nil {
return errors.New("fatal: unable to read from data repository")
@@ -206,7 +210,7 @@ func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, br
return fmt.Errorf("error: could not find %s", branchName)
} else if len(remoteRefs) == 1 {
remoteRef := remoteRefs[0]
err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, remoteRef.String(), false)
err = actions.CreateBranchWithStartPt(ctx, dbData, branchName, remoteRef.String(), false, rsc)
if err != nil {
return err
}
@@ -226,7 +230,7 @@ func checkoutRemoteBranch(ctx *sql.Context, dbName string, dbData env.DbData, br
}
}
func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr *argparser.ArgParseResults) error {
func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error {
var newBranchName string
var remoteName, remoteBranchName string
var startPt = "head"
@@ -259,7 +263,7 @@ func checkoutNewBranch(ctx *sql.Context, dbName string, dbData env.DbData, apr *
newBranchName = newBranch
}
err = actions.CreateBranchWithStartPt(ctx, dbData, newBranchName, startPt, false)
err = actions.CreateBranchWithStartPt(ctx, dbData, newBranchName, startPt, false, rsc)
if err != nil {
return err
}

View File

@@ -24,6 +24,7 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
@@ -65,11 +66,13 @@ func doDoltRemote(ctx *sql.Context, args []string) (int, error) {
return 1, fmt.Errorf("error: invalid argument, use 'dolt_remotes' system table to list remotes")
}
var rsc doltdb.ReplicationStatusController
switch apr.Arg(0) {
case "add":
err = addRemote(ctx, dbName, dbData, apr, dSess)
case "remove", "rm":
err = removeRemote(ctx, dbData, apr)
err = removeRemote(ctx, dbData, apr, &rsc)
default:
err = fmt.Errorf("error: invalid argument")
}
@@ -77,6 +80,9 @@ func doDoltRemote(ctx *sql.Context, args []string) (int, error) {
if err != nil {
return 1, err
}
dsess.WaitForReplicationController(ctx, rsc)
return 0, nil
}
@@ -106,7 +112,7 @@ func addRemote(_ *sql.Context, dbName string, dbd env.DbData, apr *argparser.Arg
return dbd.Rsw.AddRemote(r)
}
func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResults) error {
func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResults, rsc *doltdb.ReplicationStatusController) error {
if apr.NArg() != 2 {
return fmt.Errorf("error: invalid argument")
}
@@ -133,7 +139,7 @@ func removeRemote(ctx *sql.Context, dbd env.DbData, apr *argparser.ArgParseResul
rr := r.(ref.RemoteRef)
if rr.GetRemote() == remote.Name {
err = ddb.DeleteBranch(ctx, rr)
err = ddb.DeleteBranch(ctx, rr, rsc)
if err != nil {
return fmt.Errorf("%w; failed to delete remote tracking ref '%s'; %s", env.ErrFailedToDeleteRemote, rr.String(), err.Error())

View File

@@ -15,6 +15,7 @@
package dsess
import (
"context"
"errors"
"fmt"
"strings"
@@ -22,6 +23,7 @@ import (
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/mysql"
"github.com/sirupsen/logrus"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
@@ -216,7 +218,9 @@ func doltCommit(ctx *sql.Context,
workingSet = workingSet.ClearMerge()
newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx))
var rsc doltdb.ReplicationStatusController
newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx), &rsc)
WaitForReplicationController(ctx, rsc)
return workingSet, newCommit, err
}
@@ -227,7 +231,10 @@ func txCommit(ctx *sql.Context,
workingSet *doltdb.WorkingSet,
hash hash.Hash,
) (*doltdb.WorkingSet, *doltdb.Commit, error) {
return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx))
var rsc doltdb.ReplicationStatusController
err := tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx), &rsc)
WaitForReplicationController(ctx, rsc)
return workingSet, nil, err
}
// DoltCommit commits the working set and creates a new DoltCommit as specified, in one atomic write
@@ -235,6 +242,71 @@ func (tx *DoltTransaction) DoltCommit(ctx *sql.Context, workingSet *doltdb.Worki
return tx.doCommit(ctx, workingSet, commit, doltCommit)
}
func WaitForReplicationController(ctx *sql.Context, rsc doltdb.ReplicationStatusController) {
if len(rsc.Wait) == 0 {
return
}
_, timeout, ok := sql.SystemVariables.GetGlobal(DoltClusterAckWritesTimeoutSecs)
if !ok {
return
}
timeoutI := timeout.(int64)
if timeoutI == 0 {
return
}
cCtx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
wg.Add(len(rsc.Wait))
for i, f := range rsc.Wait {
f := f
i := i
go func() {
defer wg.Done()
err := f(cCtx)
if err == nil {
rsc.Wait[i] = nil
}
}()
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
waitFailed := false
select {
case <-time.After(time.Duration(timeoutI) * time.Second):
// We timed out before all the waiters were done.
// First we make certain to finalize everything.
cancel()
<-done
waitFailed = true
case <-done:
cancel()
}
// Just because our waiters all completed does not mean they all
// returned nil errors. Any non-nil entries in rsc.Wait returned an
// error. We turn those into warnings here.
numFailed := 0
for i, f := range rsc.Wait {
if f != nil {
numFailed += 1
if waitFailed {
rsc.NotifyWaitFailed[i]()
}
}
}
ctx.Session.Warn(&sql.Warning{
Level: "Warning",
Code: mysql.ERQueryTimeout,
Message: fmt.Sprintf("Timed out replication of commit to %d out of %d replicas.", numFailed, len(rsc.Wait)),
})
}
// doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit
func (tx *DoltTransaction) doCommit(
ctx *sql.Context,

View File

@@ -52,8 +52,9 @@ const (
ShowBranchDatabases = "dolt_show_branch_databases"
DoltLogLevel = "dolt_log_level"
DoltClusterRoleVariable = "dolt_cluster_role"
DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch"
DoltClusterRoleVariable = "dolt_cluster_role"
DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch"
DoltClusterAckWritesTimeoutSecs = "dolt_cluster_ack_writes_timeout_secs"
)
const URLTemplateDatabasePlaceholder = "{database}"

View File

@@ -243,7 +243,7 @@ func (rrd ReadReplicaDatabase) CreateLocalBranchFromRemote(ctx *sql.Context, bra
}
// create refs/heads/branch dataset
err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm)
err = rrd.ddb.NewBranchAtCommit(ctx, branchRef, cm, nil)
if err != nil {
return nil, err
}
@@ -334,7 +334,7 @@ func pullBranchesAndUpdateWorkingSet(
if commitRootHash != wsWorkingRootHash || commitRootHash != wsStagedRootHash {
ws = ws.WithWorkingRoot(commitRoot).WithStagedRoot(commitRoot)
err = rrd.ddb.UpdateWorkingSet(ctx, ws.Ref(), ws, prevHash, doltdb.TodoWorkingSetMeta())
err = rrd.ddb.UpdateWorkingSet(ctx, ws.Ref(), ws, prevHash, doltdb.TodoWorkingSetMeta(), nil)
if err == nil {
return nil
}
@@ -458,7 +458,7 @@ func (rrd ReadReplicaDatabase) createNewBranchFromRemote(ctx *sql.Context, remot
return err
}
err = rrd.ddb.NewBranchAtCommit(ctx, remoteRef.Ref, cm)
err = rrd.ddb.NewBranchAtCommit(ctx, remoteRef.Ref, cm, nil)
err = rrd.ddb.SetHead(ctx, trackingRef, remoteRef.Hash)
if err != nil {
return err
@@ -534,7 +534,7 @@ func refsToDelete(remRefs, localRefs []doltdb.RefWithHash) []doltdb.RefWithHash
func deleteBranches(ctx *sql.Context, rrd ReadReplicaDatabase, branches []doltdb.RefWithHash) error {
for _, b := range branches {
err := rrd.ddb.DeleteBranch(ctx, b.Ref)
err := rrd.ddb.DeleteBranch(ctx, b.Ref, nil)
if errors.Is(err, doltdb.ErrBranchNotFound) {
continue
} else if err != nil {

View File

@@ -1675,7 +1675,7 @@ func processNode(t *testing.T, ctx context.Context, dEnv *env.DoltEnv, node Hist
require.NoError(t, err)
if !ok {
err = dEnv.DoltDB.NewBranchAtCommit(ctx, branchRef, parent)
err = dEnv.DoltDB.NewBranchAtCommit(ctx, branchRef, parent, nil)
require.NoError(t, err)
}

View File

@@ -166,6 +166,13 @@ func AddDoltSystemVariables() {
Type: types.NewSystemBoolType(dsess.ShowBranchDatabases),
Default: int8(0),
},
{
Name: dsess.DoltClusterAckWritesTimeoutSecs,
Dynamic: true,
Scope: sql.SystemVariableScope_Persist,
Type: types.NewSystemIntType(dsess.DoltClusterAckWritesTimeoutSecs, 0, 60, false),
Default: int64(0),
},
})
}

View File

@@ -1060,6 +1060,157 @@ tests:
result:
columns: ["count(*)"]
rows: [["5"]]
- name: dolt_cluster_ack_writes_timeout_secs behavior
multi_repos:
- name: server1
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3309
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3852/{database}
bootstrap_role: primary
bootstrap_epoch: 1
remotesapi:
port: 3851
server:
args: ["--config", "server.yaml"]
port: 3309
- name: server2
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3310
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3851/{database}
bootstrap_role: standby
bootstrap_epoch: 1
remotesapi:
port: 3852
server:
args: ["--config", "server.yaml"]
port: 3310
# This test writes new commits on the primary and quickly checks for them on
# the secondary. If dolt_cluster_ack_writes_timeout_secs is working as
# intended, the new writes will always be present.
connections:
- on: server1
queries:
- exec: 'SET @@PERSIST.dolt_cluster_ack_writes_timeout_secs = 10'
- exec: 'CREATE DATABASE repo1'
- exec: 'USE repo1'
- exec: 'CREATE TABLE vals (i INT PRIMARY KEY)'
- on: server2
queries:
- exec: 'USE repo1'
- query: "SHOW TABLES"
result:
columns: ["Tables_in_repo1"]
rows: [["vals"]]
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'INSERT INTO vals VALUES (0),(1),(2),(3),(4)'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM vals'
result:
columns: ["COUNT(*)"]
rows: [["5"]]
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'INSERT INTO vals VALUES (5),(6),(7),(8),(9)'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM vals'
result:
columns: ["COUNT(*)"]
rows: [["10"]]
# Restart both servers to test the behavior of the persisted variable.
- on: server1
restart_server: {}
- on: server2
restart_server: {}
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'INSERT INTO vals VALUES (10),(11),(12),(13),(14)'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM vals'
result:
columns: ["COUNT(*)"]
rows: [["15"]]
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'INSERT INTO vals VALUES (15),(16),(17),(18),(19)'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM vals'
result:
columns: ["COUNT(*)"]
rows: [["20"]]
# Assert that branch creation and deletion also blocks on replication.
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'CALL DOLT_BRANCH("new_branch")'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM dolt_branches'
result:
columns: ["COUNT(*)"]
rows: [["2"]]
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'CALL DOLT_BRANCH("-d", "new_branch")'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM dolt_branches'
result:
columns: ["COUNT(*)"]
rows: [["1"]]
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'CALL DOLT_BRANCH("new_branch")'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM dolt_branches'
result:
columns: ["COUNT(*)"]
rows: [["2"]]
- on: server1
queries:
- exec: 'USE repo1'
- exec: 'CALL DOLT_BRANCH("-d", "new_branch")'
- on: server2
queries:
- exec: 'USE repo1'
- query: 'SELECT COUNT(*) FROM dolt_branches'
result:
columns: ["COUNT(*)"]
rows: [["1"]]
- name: call dolt checkout
multi_repos:
- name: server1