Merge pull request #5205 from dolthub/aaron/dsess-txn-does-not-commit-unstaged-stuff

go/libraries/doltcore/sqle/dsess: When commiting a transaction with a  DOLT_COMMIT, merge against the current HEAD if necessary.
This commit is contained in:
Aaron Son
2023-01-25 13:32:04 -08:00
committed by GitHub
7 changed files with 196 additions and 26 deletions
+3 -3
View File
@@ -170,7 +170,7 @@ func performCommit(ctx context.Context, commandStr string, args []string, dEnv *
}
}
err = actions.ResetSoftToRef(ctx, dEnv.DbData(), "HEAD~1")
_, err = actions.ResetSoftToRef(ctx, dEnv.DbData(), "HEAD~1")
if err != nil {
return handleResetError(err, usage)
}
@@ -203,7 +203,7 @@ func performCommit(ctx context.Context, commandStr string, args []string, dEnv *
})
if err != nil {
if apr.Contains(cli.AmendFlag) {
errRes := actions.ResetSoftToRef(ctx, dEnv.DbData(), headHash.String())
_, errRes := actions.ResetSoftToRef(ctx, dEnv.DbData(), headHash.String())
if errRes != nil {
return handleResetError(errRes, usage)
}
@@ -222,7 +222,7 @@ func performCommit(ctx context.Context, commandStr string, args []string, dEnv *
)
if err != nil {
if apr.Contains(cli.AmendFlag) {
errRes := actions.ResetSoftToRef(ctx, dEnv.DbData(), headHash.String())
_, errRes := actions.ResetSoftToRef(ctx, dEnv.DbData(), headHash.String())
if errRes != nil {
return handleResetError(errRes, usage)
}
+1 -1
View File
@@ -122,7 +122,7 @@ func (cmd ResetCmd) Exec(ctx context.Context, commandStr string, args []string,
// This is a valid ref
if ok {
err = actions.ResetSoftToRef(ctx, dEnv.DbData(), apr.Arg(0))
_, err = actions.ResetSoftToRef(ctx, dEnv.DbData(), apr.Arg(0))
return handleResetError(err, usage)
}
}
+7 -7
View File
@@ -204,34 +204,34 @@ func ResetSoft(ctx context.Context, dbData env.DbData, tables []string, roots do
// ResetSoftToRef matches the `git reset --soft <REF>` pattern. It resets both staged and head to the previous ref
// and leaves the working unset. The user can then choose to create a commit that contains all changes since the ref.
func ResetSoftToRef(ctx context.Context, dbData env.DbData, cSpecStr string) error {
func ResetSoftToRef(ctx context.Context, dbData env.DbData, cSpecStr string) (*doltdb.RootValue, error) {
cs, err := doltdb.NewCommitSpec(cSpecStr)
if err != nil {
return err
return nil, err
}
newHead, err := dbData.Ddb.Resolve(ctx, cs, dbData.Rsr.CWBHeadRef())
if err != nil {
return err
return nil, err
}
foundRoot, err := newHead.GetRootValue(ctx)
if err != nil {
return err
return nil, err
}
// Changed the staged to the old root. Leave the working as is.
err = dbData.Rsw.UpdateStagedRoot(ctx, foundRoot)
if err != nil {
return err
return nil, err
}
// Update the head to this commit
if err = dbData.Ddb.SetHeadToCommit(ctx, dbData.Rsr.CWBHeadRef(), newHead); err != nil {
return err
return nil, err
}
return err
return foundRoot, err
}
func getUnionedTables(ctx context.Context, tables []string, stagedRoot, headRoot *doltdb.RootValue) ([]string, error) {
+5 -2
View File
@@ -570,16 +570,19 @@ func (d *DoltSession) NewPendingCommit(ctx *sql.Context, dbName string, roots do
mergeParentCommits = append(mergeParentCommits, parentCommit)
}
err = actions.ResetSoftToRef(ctx, sessionState.dbData, "HEAD~1")
// TODO: This is not the correct way to write this commit as an amend. While this commit is running
// the branch head moves backwards and concurrency control here is not principled.
root, err := actions.ResetSoftToRef(ctx, sessionState.dbData, "HEAD~1")
if err != nil {
return nil, err
}
roots.Head = root
}
pendingCommit, err := actions.GetCommitStaged(ctx, roots, sessionState.WorkingSet.MergeActive(), mergeParentCommits, sessionState.dbData.Ddb, props)
if err != nil {
if props.Amend {
err = actions.ResetSoftToRef(ctx, sessionState.dbData, headHash.String())
_, err = actions.ResetSoftToRef(ctx, sessionState.dbData, headHash.String())
if err != nil {
return nil, err
}
@@ -143,13 +143,61 @@ func doltCommit(ctx *sql.Context,
workingSet *doltdb.WorkingSet,
currHash hash.Hash,
) (*doltdb.WorkingSet, *doltdb.Commit, error) {
pending := *commit
headRef, err := workingSet.Ref().ToHeadRef()
if err != nil {
return nil, nil, err
}
headSpec, _ := doltdb.NewCommitSpec("HEAD")
curHead, err := tx.dbData.Ddb.Resolve(ctx, headSpec, headRef)
if err != nil {
return nil, nil, err
}
// We check if the branch HEAD has changed since our transaction started.
if curHead != nil {
curRootVal, err := curHead.ResolveRootValue(ctx)
if err != nil {
return nil, nil, err
}
curRootValHash, err := curRootVal.HashOf()
if err != nil {
return nil, nil, err
}
headRootValHash, err := pending.Roots.Head.HashOf()
if err != nil {
return nil, nil, err
}
if curRootValHash != headRootValHash {
// If the branch head changed since our transaction started, then we merge
// the existing branch head (curRootVal) into our staged root value. We
// treat the HEAD of the branch when our transaction started as the common
// ancestor (TODO: This will not be true in the case of destructive branch
// updates). The merged root value becomes our new Staged root value which
// is the value which we are trying to commit.
start := time.Now()
pending.Roots.Staged, _, err = merge.MergeRoots(
ctx,
pending.Roots.Staged,
curRootVal,
pending.Roots.Head,
curHead,
tx.startState,
tx.mergeEditOpts,
merge.MergeOpts{})
if err != nil {
return nil, nil, err
}
logrus.Tracef("staged and HEAD merge took %s", time.Since(start))
}
}
workingSet = workingSet.ClearMerge()
newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, commit, workingSet, currHash, tx.getWorkingSetMeta(ctx))
newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, &pending, workingSet, currHash, tx.getWorkingSetMeta(ctx))
return workingSet, newCommit, err
}
@@ -224,7 +272,7 @@ func (tx *DoltTransaction) doCommit(
if err != nil {
return nil, nil, err
}
logrus.Tracef("merge took %s", time.Since(start))
logrus.Tracef("working set merge took %s", time.Since(start))
err = tx.validateWorkingSetForCommit(ctx, mergedWorkingSet, notFfMerge)
if err != nil {
@@ -232,9 +280,6 @@ func (tx *DoltTransaction) doCommit(
}
var newCommit *doltdb.Commit
if commit != nil {
commit.Roots.Staged = mergedWorkingSet.WorkingRoot()
}
mergedWorkingSet, newCommit, err = writeFn(ctx, tx, commit, mergedWorkingSet, existingWSHash)
if err == datas.ErrOptimisticLockFailed {
// this is effectively a `continue` in the loop
@@ -264,7 +309,6 @@ func (tx *DoltTransaction) mergeRoots(
existingWorkingRoot *doltdb.WorkingSet,
workingSet *doltdb.WorkingSet,
) (*doltdb.WorkingSet, error) {
mo := merge.MergeOpts{IsCherryPick: false}
mergedRoot, _, err := merge.MergeRoots(
ctx,
existingWorkingRoot.WorkingRoot(),
@@ -272,7 +316,8 @@ func (tx *DoltTransaction) mergeRoots(
tx.startState.WorkingRoot(),
workingSet,
tx.startState,
tx.mergeEditOpts, mo)
tx.mergeEditOpts,
merge.MergeOpts{})
if err != nil {
return nil, err
}
@@ -200,6 +200,22 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
Query: "/* client b */ START TRANSACTION;",
Expected: []sql.Row{},
},
// Concurrent with the two transactions which are going to (dolt_)commit changes, we
// have a transaction which only modifies the working set. At the end of this
// sequence, the changes to the working set should not be committed.
{
Query: "/* client c */ START TRANSACTION;",
Expected: []sql.Row{},
},
{
Query: "/* client c */ INSERT INTO x values (4, 4)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client c */ COMMIT",
Expected: []sql.Row{},
},
// Now we have the two concurrent transactions commit their changes.
{
Query: "/* client a */ SET @@dolt_transaction_commit=1;",
Expected: []sql.Row{{}},
@@ -262,22 +278,25 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
},
{
Query: "/* client a */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
{
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
{
Query: "/* client c */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
{
Query: "/* client c */ SELECT * FROM x AS OF 'HEAD' ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
// After we commit both of these transactions, our working set should not have any pending changes.
// In the past, we have merged the working set but failed to land the merged root value in the
// commit itself.
// After we commit both transactions, our working set should still have the change which
// was never dolt_committed.
{
Query: "/* client c */ SELECT COUNT(*) FROM DOLT_DIFF('HEAD', 'WORKING', 'x');",
Expected: []sql.Row{{0}},
Expected: []sql.Row{{1}},
},
},
})
@@ -755,6 +755,109 @@ var DoltTransactionTests = []queries.TransactionTest{
},
},
},
{
Name: "call dolt_commit commits staged stuff, merges with working set and branch head",
SetUpScript: []string{
"create table t1 (id int primary key, val int)",
"create table t2 (id int primary key, val int)",
"insert into t1 values (1, 1), (2, 2)",
"insert into t2 values (1, 1), (2, 2)",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "/* client a */ set autocommit = off",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ call dolt_add('t1')",
Expected: []sql.Row{{0}},
},
{
Query: "/* client a */ call dolt_commit('-m', 'initial commit of t1')",
SkipResultsCheck: true,
},
{
Query: "/* client b */ set autocommit = off",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ start transaction",
Expected: []sql.Row{},
},
{
Query: "/* client b */ start transaction",
Expected: []sql.Row{},
},
{
Query: "/* client a */ insert into t1 values (3, 3)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client b */ insert into t1 values (4, 4)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client a */ insert into t2 values (3, 3)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client b */ insert into t2 values (4, 4)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client a */ call dolt_add('t1')",
Expected: []sql.Row{{0}},
},
{
Query: "/* client b */ call dolt_add('t1')",
Expected: []sql.Row{{0}},
},
{
Query: "/* client a */ insert into t1 values (5, 5)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client b */ insert into t1 values (6, 6)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client c */ insert into t2 values (6, 6)",
Expected: []sql.Row{{types.NewOkResult(1)}},
},
{
Query: "/* client a */ call dolt_commit('-m', 'add 3 to t1')",
SkipResultsCheck: true,
},
{
Query: "/* client a */ select * from t2 order by id asc",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {6, 6}},
},
{
Query: "/* client a */ select * from t1 order by id asc",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {5, 5}},
},
{
Query: "/* client a */ select * from t1 as of 'HEAD' order by id asc",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
{
Query: "/* client b */ call dolt_commit('-m', 'add 4 to t1')",
SkipResultsCheck: true,
},
{
Query: "/* client b */ select * from t2 order by id asc",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {6, 6}},
},
{
Query: "/* client b */ select * from t1 order by id asc",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {4, 4}, {5, 5}, {6, 6}},
},
{
Query: "/* client b */ select * from t1 as of 'HEAD' order by id asc",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}, {4, 4}},
},
},
},
}
var DoltConflictHandlingTests = []queries.TransactionTest{