mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-23 05:13:00 -05:00
Merge pull request #2137 from dolthub/zachmu/sql-tx-commit
Made SQL commits involving a dolt commit atomic
This commit is contained in:
@@ -149,7 +149,7 @@ func (cmd CommitCmd) Exec(ctx context.Context, commandStr string, args []string,
|
||||
return handleCommitErr(ctx, dEnv, err, usage)
|
||||
}
|
||||
|
||||
err = dEnv.DoltDB.CommitWithWorkingSet(
|
||||
_, err = dEnv.DoltDB.CommitWithWorkingSet(
|
||||
ctx,
|
||||
dEnv.RepoStateReader().CWBHeadRef(),
|
||||
ws.Ref(),
|
||||
|
||||
@@ -645,7 +645,7 @@ func (ddb *DoltDB) CommitWithParentCommits(ctx context.Context, valHash hash.Has
|
||||
|
||||
commitSt, ok := ds.MaybeHead()
|
||||
if !ok {
|
||||
return nil, errors.New("commit has no head but commit succeeded (How?!?!?)")
|
||||
return nil, errors.New("Commit has no head but commit succeeded. This is a bug.")
|
||||
}
|
||||
|
||||
return NewCommit(ddb.db, commitSt), nil
|
||||
@@ -1047,38 +1047,45 @@ func (ddb *DoltDB) CommitWithWorkingSet(
|
||||
commit *PendingCommit, workingSet *WorkingSet,
|
||||
prevHash hash.Hash,
|
||||
meta *WorkingSetMeta,
|
||||
) error {
|
||||
) (*Commit, error) {
|
||||
wsDs, err := ddb.db.GetDataset(ctx, workingSetRef.String())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
headDs, err := ddb.db.GetDataset(ctx, headRef.String())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// logrus.Tracef("Updating working set with root %s", workingSet.RootValue().DebugString(ctx, true))
|
||||
|
||||
workingRootRef, stagedRef, mergeStateRef, err := workingSet.writeValues(ctx, ddb)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var metaSt types.Struct
|
||||
metaSt, err = meta.toNomsStruct(ddb.db.Format())
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, _, err = ddb.db.CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.valueSt, datas.WorkingSetSpec{
|
||||
commitDataset, _, err := ddb.db.CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.valueSt, datas.WorkingSetSpec{
|
||||
Meta: datas.WorkingSetMeta{Meta: metaSt},
|
||||
WorkingRoot: workingRootRef,
|
||||
StagedRoot: stagedRef,
|
||||
MergeState: mergeStateRef,
|
||||
}, prevHash, commit.CommitOptions)
|
||||
|
||||
return err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
commitSt, ok := commitDataset.MaybeHead()
|
||||
if !ok {
|
||||
return nil, errors.New("Commit has no head but commit succeeded. This is a bug.")
|
||||
}
|
||||
|
||||
return NewCommit(ddb.db, commitSt), nil
|
||||
}
|
||||
|
||||
// DeleteWorkingSet deletes the working set given
|
||||
|
||||
+8
-1
@@ -142,7 +142,14 @@ func CommitStaged(ctx context.Context, roots doltdb.Roots, mergeActive bool, mer
|
||||
|
||||
// GetCommitStaged adds a new commit to HEAD with the given props, returning it as a PendingCommit that can be
|
||||
// committed with doltdb.CommitWithWorkingSet
|
||||
func GetCommitStaged(ctx context.Context, roots doltdb.Roots, mergeActive bool, mergeParents []*doltdb.Commit, dbData env.DbData, props CommitStagedProps) (*doltdb.PendingCommit, error) {
|
||||
func GetCommitStaged(
|
||||
ctx context.Context,
|
||||
roots doltdb.Roots,
|
||||
mergeActive bool,
|
||||
mergeParents []*doltdb.Commit,
|
||||
dbData env.DbData,
|
||||
props CommitStagedProps,
|
||||
) (*doltdb.PendingCommit, error) {
|
||||
ddb := dbData.Ddb
|
||||
rsr := dbData.Rsr
|
||||
drw := dbData.Drw
|
||||
|
||||
@@ -15,13 +15,13 @@
|
||||
package dfunctions
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/dolthub/vitess/go/vt/proto/query"
|
||||
|
||||
"github.com/dolthub/dolt/go/cmd/dolt/cli"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
)
|
||||
@@ -30,6 +30,7 @@ const DoltCommitFuncName = "dolt_commit"
|
||||
|
||||
var hashType = sql.MustCreateString(query.Type_TEXT, 32, sql.Collation_ascii_bin)
|
||||
|
||||
// DoltCommitFunc runs a `dolt commit` in the SQL context, committing staged changes to head.
|
||||
type DoltCommitFunc struct {
|
||||
children []sql.Expression
|
||||
}
|
||||
@@ -39,7 +40,6 @@ func NewDoltCommitFunc(ctx *sql.Context, args ...sql.Expression) (sql.Expression
|
||||
return &DoltCommitFunc{children: args}, nil
|
||||
}
|
||||
|
||||
// Runs DOLT_COMMIT in the sql engine which models the behavior of `dolt commit`. Commits staged staged changes to head.
|
||||
func (d DoltCommitFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) {
|
||||
// Get the information for the sql context.
|
||||
dbName := ctx.GetCurrentDatabase()
|
||||
@@ -80,7 +80,6 @@ func (d DoltCommitFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error)
|
||||
email = dSess.Email
|
||||
}
|
||||
|
||||
// Get the commit message.
|
||||
msg, msgOk := apr.GetValue(cli.CommitMessageArg)
|
||||
if !msgOk {
|
||||
return nil, fmt.Errorf("Must provide commit message.")
|
||||
@@ -96,38 +95,7 @@ func (d DoltCommitFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error)
|
||||
}
|
||||
}
|
||||
|
||||
// Commit any pending transaction before a dolt_commit
|
||||
tx := ctx.Session.GetTransaction()
|
||||
_, ok = tx.(*dsess.DoltTransaction)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("expected a DoltTransaction, got %T", tx)
|
||||
}
|
||||
|
||||
err = dSess.SetRoots(ctx, dbName, roots)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = dSess.CommitTransaction(ctx, dbName, tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Unsetting the transaction here ensures that it won't be re-committed when this statement concludes
|
||||
ctx.SetTransaction(nil)
|
||||
|
||||
var mergeParentCommits []*doltdb.Commit
|
||||
ws, err := dSess.WorkingSet(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if ws.MergeActive() {
|
||||
mergeParentCommits = []*doltdb.Commit{ws.MergeState().Commit()}
|
||||
}
|
||||
|
||||
// Now do a Dolt commit
|
||||
commit, err := dSess.CommitToDolt(ctx, roots, mergeParentCommits, dbName, actions.CommitStagedProps{
|
||||
pendingCommit, err := dSess.NewPendingCommit(ctx, dbName, roots, actions.CommitStagedProps{
|
||||
Message: msg,
|
||||
Date: t,
|
||||
AllowEmpty: apr.Contains(cli.AllowEmptyFlag),
|
||||
@@ -136,15 +104,25 @@ func (d DoltCommitFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error)
|
||||
Email: email,
|
||||
})
|
||||
if err != nil {
|
||||
return 1, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cmHash, err := commit.HashOf()
|
||||
// Nothing to commit, and we didn't pass --allowEmpty
|
||||
if pendingCommit == nil {
|
||||
return nil, errors.New("nothing to commit")
|
||||
}
|
||||
|
||||
newCommit, err := dSess.DoltCommit(ctx, dbName, dSess.GetTransaction(), pendingCommit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cmHash.String(), nil
|
||||
h, err := newCommit.HashOf()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return h.String(), nil
|
||||
}
|
||||
|
||||
func getDoltArgs(ctx *sql.Context, row sql.Row, children []sql.Expression) ([]string, error) {
|
||||
|
||||
@@ -224,6 +224,8 @@ func DSessFromSess(sess sql.Session) *Session {
|
||||
return sess.(*Session)
|
||||
}
|
||||
|
||||
// LookupDbState returns the session state for the database named
|
||||
// TODO(zachmu) get rid of bool return param, use a not found error or similar
|
||||
func (sess *Session) LookupDbState(ctx *sql.Context, dbName string) (*DatabaseSessionState, bool, error) {
|
||||
dbState, ok := sess.dbStates[dbName]
|
||||
if ok {
|
||||
@@ -325,7 +327,8 @@ func (sess *Session) newWorkingSetForHead(ctx *sql.Context, wsRef ref.WorkingSet
|
||||
return doltdb.EmptyWorkingSet(wsRef).WithWorkingRoot(headRoot).WithStagedRoot(headRoot), nil
|
||||
}
|
||||
|
||||
// CommitTransaction commits the in-progress transaction for the database named
|
||||
// CommitTransaction commits the in-progress transaction for the database named. Depending on session settings, this
|
||||
// may write only a new working set, or may additionally create a new dolt commit for the current HEAD.
|
||||
func (sess *Session) CommitTransaction(ctx *sql.Context, dbName string, tx sql.Transaction) error {
|
||||
if sess.BatchMode == Batched {
|
||||
err := sess.Flush(ctx, dbName)
|
||||
@@ -334,145 +337,149 @@ func (sess *Session) CommitTransaction(ctx *sql.Context, dbName string, tx sql.T
|
||||
}
|
||||
}
|
||||
|
||||
dbState, ok, err := sess.LookupDbState(ctx, dbName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if TransactionsDisabled(ctx) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !dbState.dirty {
|
||||
return nil
|
||||
}
|
||||
|
||||
// This is triggered when certain commands are sent to the server (ex. commit) when a database is not selected.
|
||||
// These commands should not error.
|
||||
if dbName == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// It's possible that this returns false if the user has created an in-Memory database. Moreover,
|
||||
// the analyzer will check for us whether a db exists or not.
|
||||
// TODO: fix this
|
||||
performDoltCommitVar, err := sess.Session.GetSessionVariable(ctx, DoltCommitOnTransactionCommit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
peformDoltCommitInt, ok := performDoltCommitVar.(int8)
|
||||
if !ok {
|
||||
return fmt.Errorf(fmt.Sprintf("Unexpected type for var %s: %T", DoltCommitOnTransactionCommit, performDoltCommitVar))
|
||||
}
|
||||
|
||||
if peformDoltCommitInt == 1 {
|
||||
pendingCommit, err := sess.PendingCommitAllStaged(ctx, dbName, actions.CommitStagedProps{
|
||||
Message: "Transaction commit",
|
||||
Date: ctx.QueryTime(),
|
||||
AllowEmpty: false,
|
||||
Force: false,
|
||||
Name: sess.Username,
|
||||
Email: sess.Email,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Nothing to stage, so fall back to CommitWorkingSet logic instead
|
||||
if pendingCommit == nil {
|
||||
return sess.CommitWorkingSet(ctx, dbName, tx)
|
||||
}
|
||||
|
||||
_, err = sess.DoltCommit(ctx, dbName, tx, pendingCommit)
|
||||
return err
|
||||
} else {
|
||||
return sess.CommitWorkingSet(ctx, dbName, tx)
|
||||
}
|
||||
}
|
||||
|
||||
// CommitWorkingSet commits the working set for the transaction given, without creating a new dolt commit.
|
||||
// Clients should typically use CommitTransaction, which performs additional checks, instead of this method.
|
||||
func (sess *Session) CommitWorkingSet(ctx *sql.Context, dbName string, tx sql.Transaction) error {
|
||||
dbState, _, err := sess.LookupDbState(ctx, dbName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !dbState.dirty {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Newer commit path does a concurrent merge of the current root with the one other clients are editing, then
|
||||
// updates the session with this new root.
|
||||
commitFunc := func(ctx *sql.Context, dtx *DoltTransaction, workingSet *doltdb.WorkingSet) (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
ws, err := dtx.Commit(ctx, workingSet)
|
||||
return ws, nil, err
|
||||
}
|
||||
|
||||
_, err = sess.doCommit(ctx, dbName, tx, commitFunc)
|
||||
return err
|
||||
}
|
||||
|
||||
// DoltCommit commits the working set and a new dolt commit with the properties given.
|
||||
// Clients should typically use CommitTransaction, which performs additional checks, instead of this method.
|
||||
func (sess *Session) DoltCommit(
|
||||
ctx *sql.Context,
|
||||
dbName string,
|
||||
tx sql.Transaction,
|
||||
commit *doltdb.PendingCommit,
|
||||
) (*doltdb.Commit, error) {
|
||||
commitFunc := func(ctx *sql.Context, dtx *DoltTransaction, workingSet *doltdb.WorkingSet) (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
return dtx.DoltCommit(
|
||||
ctx,
|
||||
workingSet.WithWorkingRoot(commit.Roots.Working).WithStagedRoot(commit.Roots.Staged),
|
||||
commit)
|
||||
}
|
||||
|
||||
return sess.doCommit(ctx, dbName, tx, commitFunc)
|
||||
}
|
||||
|
||||
// doCommitFunc is a function to write to the database, which involves updating the working set and potentially
|
||||
// updating HEAD with a new commit
|
||||
type doCommitFunc func(ctx *sql.Context, dtx *DoltTransaction, workingSet *doltdb.WorkingSet) (*doltdb.WorkingSet, *doltdb.Commit, error)
|
||||
|
||||
// doCommit exercise the business logic for a particular doCommitFunc
|
||||
func (sess *Session) doCommit(ctx *sql.Context, dbName string, tx sql.Transaction, commitFunc doCommitFunc) (*doltdb.Commit, error) {
|
||||
dbState, ok, err := sess.LookupDbState(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !ok {
|
||||
// It's possible that we don't have dbstate if the user has created an in-Memory database. Moreover,
|
||||
// the analyzer will check for us whether a db exists or not.
|
||||
// TODO: fix this
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// TODO: validate that the transaction belongs to the DB named
|
||||
dtx, ok := tx.(*DoltTransaction)
|
||||
if !ok {
|
||||
return fmt.Errorf("expected a DoltTransaction")
|
||||
return nil, fmt.Errorf("expected a DoltTransaction")
|
||||
}
|
||||
|
||||
// TODO: actual logging
|
||||
// logrus.Errorf("working root to commit is %s", dbstate.workingSet.WorkingRoot().DebugString(ctx, true))
|
||||
|
||||
mergedWorkingSet, err := dtx.Commit(ctx, dbState.WorkingSet)
|
||||
mergedWorkingSet, newCommit, err := commitFunc(ctx, dtx, dbState.WorkingSet)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// logrus.Errorf("committed merged working root %s", dbstate.workingSet.WorkingRoot().DebugString(ctx, true))
|
||||
|
||||
err = sess.SetWorkingSet(ctx, dbName, mergedWorkingSet, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = sess.CreateDoltCommit(ctx, dbName)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dbState.dirty = false
|
||||
return nil
|
||||
return newCommit, nil
|
||||
}
|
||||
|
||||
func (sess *Session) CommitToDolt(
|
||||
ctx *sql.Context,
|
||||
roots doltdb.Roots,
|
||||
mergeParentCommits []*doltdb.Commit,
|
||||
dbName string,
|
||||
props actions.CommitStagedProps,
|
||||
) (*doltdb.Commit, error) {
|
||||
sessionState, _, err := sess.LookupDbState(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dbData := sessionState.dbData
|
||||
|
||||
ws, err := sess.WorkingSet(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// PendingCommitAllStaged returns a pending commit with all tables staged. Returns nil if there are no changes to stage.
|
||||
func (sess *Session) PendingCommitAllStaged(ctx *sql.Context, dbName string, props actions.CommitStagedProps) (*doltdb.PendingCommit, error) {
|
||||
roots, ok := sess.GetRoots(ctx, dbName)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Couldn't get info for database %s", dbName)
|
||||
}
|
||||
|
||||
// TODO: this does several session state updates, and it really needs to just do one
|
||||
// It's also not atomic with the above commit. We need a way to set both new HEAD and update the working
|
||||
// set together, atomically. We can't easily do this in noms right now, because the the data set is the unit of
|
||||
// atomic update at the API layer. There's a root value which is the unit of atomic updates at the storage layer,
|
||||
// just no API which allows one to update more than one dataset in the same atomic transaction. We need to write
|
||||
// one.
|
||||
// Meanwhile, this is all kinds of thread-unsafe
|
||||
commit, err := actions.CommitStaged(ctx, roots, ws.MergeActive(), mergeParentCommits, dbData, props)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Now we have to do *another* SQL transaction, because CommitStaged currently modifies the super schema of the root
|
||||
// value before committing what it's given. We need that exact same root in our working set to stay consistent. It
|
||||
// doesn't happen automatically like outside the SQL context because CommitStaged is writing to a session-based
|
||||
// repo state writer, so we're never persisting the new working set to disk like in a command line context.
|
||||
// TODO: fix this mess
|
||||
|
||||
ws, err = sess.WorkingSet(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// StartTransaction sets the working set for the session, and we want the one we previous had, not the one on disk
|
||||
// Updating the working set like this also updates the head commit and root info for the session
|
||||
tx, err := sess.StartTransaction(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = sess.SetWorkingSet(ctx, dbName, ws.ClearMerge(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = sess.CommitTransaction(ctx, dbName, tx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Unsetting the transaction here ensures that it won't be re-committed when this statement concludes
|
||||
ctx.SetTransaction(nil)
|
||||
return commit, err
|
||||
}
|
||||
|
||||
// CreateDoltCommit stages the working set and then immediately commits the staged changes. This is a Dolt commit
|
||||
// rather than a transaction commit. If there are no changes to be staged, then no commit is created.
|
||||
func (sess *Session) CreateDoltCommit(ctx *sql.Context, dbName string) error {
|
||||
commitBool, err := sess.Session.GetSessionVariable(ctx, DoltCommitOnTransactionCommit)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if commitBool.(int8) != 1 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sessionState, _, err := sess.LookupDbState(ctx, dbName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roots := sessionState.GetRoots()
|
||||
|
||||
var err error
|
||||
roots, err = actions.StageAllTablesNoDocs(ctx, roots)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sess.NewPendingCommit(ctx, dbName, roots, props)
|
||||
}
|
||||
|
||||
// NewPendingCommit returns a new |doltdb.PendingCommit| for the database named, using the roots given, adding any
|
||||
// merge parent from an in progress merge as appropriate. The session working set is not updated with these new roots,
|
||||
// but they are set in the returned |doltdb.PendingCommit|. If there are no changes staged, this method returns nil.
|
||||
func (sess *Session) NewPendingCommit(ctx *sql.Context, dbName string, roots doltdb.Roots, props actions.CommitStagedProps) (*doltdb.PendingCommit, error) {
|
||||
sessionState, _, err := sess.LookupDbState(ctx, dbName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var mergeParentCommits []*doltdb.Commit
|
||||
@@ -480,19 +487,12 @@ func (sess *Session) CreateDoltCommit(ctx *sql.Context, dbName string) error {
|
||||
mergeParentCommits = []*doltdb.Commit{sessionState.WorkingSet.MergeState().Commit()}
|
||||
}
|
||||
|
||||
_, err = sess.CommitToDolt(ctx, roots, mergeParentCommits, dbName, actions.CommitStagedProps{
|
||||
Message: fmt.Sprintf("Transaction commit at %s", ctx.QueryTime().UTC().Format("2006-01-02T15:04:05Z")),
|
||||
Date: ctx.QueryTime(),
|
||||
AllowEmpty: false,
|
||||
Force: false,
|
||||
Name: sess.Username,
|
||||
Email: sess.Email,
|
||||
})
|
||||
pendingCommit, err := actions.GetCommitStaged(ctx, roots, sessionState.WorkingSet.MergeActive(), mergeParentCommits, sessionState.dbData, props)
|
||||
if _, ok := err.(actions.NothingStaged); err != nil && !ok {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
return pendingCommit, nil
|
||||
}
|
||||
|
||||
// RollbackTransaction rolls the given transaction back
|
||||
|
||||
@@ -30,6 +30,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -72,7 +73,12 @@ type savepoint struct {
|
||||
root *doltdb.RootValue
|
||||
}
|
||||
|
||||
func NewDoltTransaction(startState *doltdb.WorkingSet, workingSet ref.WorkingSetRef, dbData env.DbData, mergeEditOpts editor.Options) *DoltTransaction {
|
||||
func NewDoltTransaction(
|
||||
startState *doltdb.WorkingSet,
|
||||
workingSet ref.WorkingSetRef,
|
||||
dbData env.DbData,
|
||||
mergeEditOpts editor.Options,
|
||||
) *DoltTransaction {
|
||||
return &DoltTransaction{
|
||||
startState: startState,
|
||||
workingSetRef: workingSet,
|
||||
@@ -88,39 +94,90 @@ func (tx DoltTransaction) String() string {
|
||||
|
||||
var txLock sync.Mutex
|
||||
|
||||
// Commit attempts to merge newRoot into the working set
|
||||
// Commit attempts to merge the working set given into the current working set.
|
||||
// Uses the same algorithm as merge.Merger:
|
||||
// |ws.root| is the root
|
||||
// |newRoot| is the mergeRoot
|
||||
// |current working set working root| is the root
|
||||
// |workingSet.workingRoot| is the mergeRoot
|
||||
// |tx.startRoot| is ancRoot
|
||||
// if working set == ancRoot, attempt a fast-forward merge
|
||||
// if workingSet.workingRoot == ancRoot, attempt a fast-forward merge
|
||||
// TODO: Non-working roots aren't merged into the working set and just stomp any changes made there. We need merge
|
||||
// strategies for staged as well as merge state.
|
||||
func (tx *DoltTransaction) Commit(ctx *sql.Context, workingSet *doltdb.WorkingSet) (*doltdb.WorkingSet, error) {
|
||||
ws, _, err := tx.doCommit(ctx, workingSet, nil, txCommit)
|
||||
return ws, err
|
||||
}
|
||||
|
||||
// transactionWrite is the logic to write an updated working set (and optionally a commit) to the database
|
||||
type transactionWrite func(ctx *sql.Context,
|
||||
tx *DoltTransaction, // the transaction being written
|
||||
commit *doltdb.PendingCommit, // optional
|
||||
workingSet *doltdb.WorkingSet, // must be provided
|
||||
hash hash.Hash, // hash of the current working set to be written
|
||||
) (*doltdb.WorkingSet, *doltdb.Commit, error)
|
||||
|
||||
// doltCommit is a transactionWrite function that updates the working set and commits a pending commit atomically
|
||||
func doltCommit(ctx *sql.Context,
|
||||
tx *DoltTransaction,
|
||||
commit *doltdb.PendingCommit,
|
||||
workingSet *doltdb.WorkingSet,
|
||||
hash hash.Hash,
|
||||
) (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
headRef, err := workingSet.Ref().ToHeadRef()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
workingSet = workingSet.ClearMerge()
|
||||
newCommit, err := tx.dbData.Ddb.CommitWithWorkingSet(ctx, headRef, tx.workingSetRef, commit, workingSet, hash, tx.getWorkingSetMeta(ctx))
|
||||
return workingSet, newCommit, err
|
||||
}
|
||||
|
||||
// txCommit is a transactionWrite function that updates the working set
|
||||
func txCommit(ctx *sql.Context,
|
||||
tx *DoltTransaction,
|
||||
_ *doltdb.PendingCommit,
|
||||
workingSet *doltdb.WorkingSet,
|
||||
hash hash.Hash,
|
||||
) (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
return workingSet, nil, tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx))
|
||||
}
|
||||
|
||||
// DoltCommit commits the working set and creates a new DoltCommit as specified, in one atomic write
|
||||
func (tx *DoltTransaction) DoltCommit(ctx *sql.Context, workingSet *doltdb.WorkingSet, commit *doltdb.PendingCommit) (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
return tx.doCommit(ctx, workingSet, commit, doltCommit)
|
||||
}
|
||||
|
||||
// doCommit commits this transaction with the write function provided. It takes the same params as DoltCommit
|
||||
func (tx *DoltTransaction) doCommit(
|
||||
ctx *sql.Context,
|
||||
workingSet *doltdb.WorkingSet,
|
||||
commit *doltdb.PendingCommit,
|
||||
writeFn transactionWrite,
|
||||
) (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
forceTransactionCommit, err := ctx.GetSessionVariable(ctx, ForceTransactionCommit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if forceTransactionCommit.(int8) != 1 {
|
||||
workingRoot := workingSet.WorkingRoot()
|
||||
hasConflicts, err := workingRoot.HasConflicts(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if hasConflicts {
|
||||
return nil, doltdb.ErrUnresolvedConflicts
|
||||
return nil, nil, doltdb.ErrUnresolvedConflicts
|
||||
}
|
||||
hasConstraintViolations, err := workingRoot.HasConstraintViolations(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
if hasConstraintViolations {
|
||||
return nil, doltdb.ErrUnresolvedConstraintViolations
|
||||
return nil, nil, doltdb.ErrUnresolvedConstraintViolations
|
||||
}
|
||||
}
|
||||
|
||||
for i := 0; i < maxTxCommitRetries; i++ {
|
||||
updatedWs, err := func() (*doltdb.WorkingSet, error) {
|
||||
updatedWs, newCommit, err := func() (*doltdb.WorkingSet, *doltdb.Commit, error) {
|
||||
// Serialize commits, since only one can possibly succeed at a time anyway
|
||||
txLock.Lock()
|
||||
defer txLock.Unlock()
|
||||
@@ -134,33 +191,33 @@ func (tx *DoltTransaction) Commit(ctx *sql.Context, workingSet *doltdb.WorkingSe
|
||||
ws = doltdb.EmptyWorkingSet(tx.workingSetRef)
|
||||
newWorkingSet = true
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
wsHash, err := ws.HashOf()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
existingWorkingRoot := ws.WorkingRoot()
|
||||
|
||||
hash, err := ws.HashOf()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if newWorkingSet || rootsEqual(existingWorkingRoot, tx.startState.WorkingRoot()) {
|
||||
// ff merge
|
||||
err = tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, workingSet, hash, tx.getWorkingSetMeta(ctx))
|
||||
var newCommit *doltdb.Commit
|
||||
workingSet, newCommit, err = writeFn(ctx, tx, commit, workingSet, wsHash)
|
||||
if err == datas.ErrOptimisticLockFailed {
|
||||
// this is effectively a `continue` in the loop
|
||||
return nil, nil
|
||||
return nil, nil, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return workingSet, nil
|
||||
return workingSet, newCommit, nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
mergedRoot, stats, err := merge.MergeRoots(ctx, existingWorkingRoot, workingSet.WorkingRoot(), tx.startState.WorkingRoot(), tx.mergeEditOpts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
logrus.Tracef("merge took %s", time.Since(start))
|
||||
|
||||
@@ -171,7 +228,7 @@ func (tx *DoltTransaction) Commit(ctx *sql.Context, workingSet *doltdb.WorkingSe
|
||||
tablesWithConflicts = append(tablesWithConflicts, table)
|
||||
} else {
|
||||
// TODO: surface duplicate key errors as appropriate
|
||||
return nil, fmt.Errorf("conflict in table %s", table)
|
||||
return nil, nil, fmt.Errorf("conflict in table %s", table)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -180,31 +237,32 @@ func (tx *DoltTransaction) Commit(ctx *sql.Context, workingSet *doltdb.WorkingSe
|
||||
if len(tablesWithConflicts) > 0 {
|
||||
mergedRoot, err = tx.stompConflicts(ctx, mergedRoot, tablesWithConflicts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
mergedWorkingSet := workingSet.WithWorkingRoot(mergedRoot)
|
||||
err = tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSetRef, mergedWorkingSet, hash, tx.getWorkingSetMeta(ctx))
|
||||
var newCommit *doltdb.Commit
|
||||
mergedWorkingSet, newCommit, err = writeFn(ctx, tx, commit, mergedWorkingSet, wsHash)
|
||||
if err == datas.ErrOptimisticLockFailed {
|
||||
// this is effectively a `continue` in the loop
|
||||
return nil, nil
|
||||
return nil, nil, nil
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return mergedWorkingSet, nil
|
||||
return mergedWorkingSet, newCommit, nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
} else if updatedWs != nil {
|
||||
return updatedWs, nil
|
||||
return updatedWs, newCommit, nil
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: different error type for retries exhausted
|
||||
return nil, datas.ErrOptimisticLockFailed
|
||||
return nil, nil, datas.ErrOptimisticLockFailed
|
||||
}
|
||||
|
||||
// stompConflicts resolves the conflicted tables in the root given by blindly accepting theirs, and returns the
|
||||
|
||||
@@ -25,12 +25,14 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
)
|
||||
|
||||
// TODO: we need tests for manual DOLT_COMMIT as well, but that's difficult with the way that functions are resolved
|
||||
// in the engine.
|
||||
func TestDoltTransactionCommitOneClient(t *testing.T) {
|
||||
// In this test, we're setting only one client to match transaction commits to dolt commits.
|
||||
// Autocommit is disabled for the enabled client, as it's the recommended way to use this feature.
|
||||
harness := newDoltHarness(t)
|
||||
enginetest.TestTransactionScript(t, harness, enginetest.TransactionTest{
|
||||
Name: "dolt commit after transaction commit one client",
|
||||
Name: "dolt commit on transaction commit one client",
|
||||
SetUpScript: []string{
|
||||
"CREATE TABLE x (y BIGINT PRIMARY KEY, z BIGINT);",
|
||||
"INSERT INTO x VALUES (1,1);",
|
||||
@@ -40,6 +42,15 @@ func TestDoltTransactionCommitOneClient(t *testing.T) {
|
||||
Query: "/* client a */ SET @@autocommit=0;",
|
||||
Expected: []sql.Row{{}},
|
||||
},
|
||||
// start transaction implicitly commits the current transaction, so we have to do so before we turn on dolt commits
|
||||
{
|
||||
Query: "/* client a */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client b */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client a */ SET @@dolt_transaction_commit=1;",
|
||||
Expected: []sql.Row{{}},
|
||||
@@ -52,14 +63,6 @@ func TestDoltTransactionCommitOneClient(t *testing.T) {
|
||||
Query: "/* client b */ SET @initial_head=@@mydb_head;",
|
||||
Expected: []sql.Row{{}},
|
||||
},
|
||||
{
|
||||
Query: "/* client a */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client b */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
|
||||
Expected: []sql.Row{{true}},
|
||||
@@ -132,6 +135,10 @@ func TestDoltTransactionCommitOneClient(t *testing.T) {
|
||||
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
|
||||
Expected: []sql.Row{{false}},
|
||||
},
|
||||
{
|
||||
Query: "/* client c */ SELECT * FROM x ORDER BY y;",
|
||||
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
@@ -144,7 +151,7 @@ func TestDoltTransactionCommitOneClient(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
cm, err := commit.GetCommitMeta()
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, cm.Description, "Transaction commit at")
|
||||
require.Contains(t, cm.Description, "Transaction commit")
|
||||
|
||||
as, err := doltdb.NewAncestorSpec("~1")
|
||||
require.NoError(t, err)
|
||||
@@ -160,7 +167,7 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
|
||||
// Autocommit is disabled, as it's the recommended way to use this feature.
|
||||
harness := newDoltHarness(t)
|
||||
enginetest.TestTransactionScript(t, harness, enginetest.TransactionTest{
|
||||
Name: "dolt commit after transaction commit two clients",
|
||||
Name: "dolt commit on transaction commit two clients",
|
||||
SetUpScript: []string{
|
||||
"CREATE TABLE x (y BIGINT PRIMARY KEY, z BIGINT);",
|
||||
"INSERT INTO x VALUES (1,1);",
|
||||
@@ -174,6 +181,15 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
|
||||
Query: "/* client b */ SET @@autocommit=0;",
|
||||
Expected: []sql.Row{{}},
|
||||
},
|
||||
// start transaction implicitly commits the current transaction, so we have to do so before we turn on dolt commits
|
||||
{
|
||||
Query: "/* client a */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client b */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client a */ SET @@dolt_transaction_commit=1;",
|
||||
Expected: []sql.Row{{}},
|
||||
@@ -190,14 +206,6 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
|
||||
Query: "/* client b */ SET @initial_head=@@mydb_head;",
|
||||
Expected: []sql.Row{{}},
|
||||
},
|
||||
{
|
||||
Query: "/* client a */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client b */ START TRANSACTION;",
|
||||
Expected: []sql.Row{},
|
||||
},
|
||||
{
|
||||
Query: "/* client a */ INSERT INTO x VALUES (2,2);",
|
||||
Expected: []sql.Row{{sql.NewOkResult(1)}},
|
||||
@@ -250,6 +258,10 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
|
||||
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
|
||||
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
|
||||
},
|
||||
{
|
||||
Query: "/* client c */ SELECT * FROM x ORDER BY y;",
|
||||
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
|
||||
},
|
||||
},
|
||||
})
|
||||
db := harness.databases[0].GetDoltDB()
|
||||
@@ -261,7 +273,7 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
cm2, err := commit2.GetCommitMeta()
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, cm2.Description, "Transaction commit at")
|
||||
require.Contains(t, cm2.Description, "Transaction commit")
|
||||
|
||||
as, err := doltdb.NewAncestorSpec("~1")
|
||||
require.NoError(t, err)
|
||||
@@ -269,7 +281,7 @@ func TestDoltTransactionCommitTwoClients(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
cm1, err := commit1.GetCommitMeta()
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, cm1.Description, "Transaction commit at")
|
||||
require.Contains(t, cm1.Description, "Transaction commit")
|
||||
|
||||
commit0, err := commit1.GetAncestor(context.Background(), as)
|
||||
require.NoError(t, err)
|
||||
@@ -283,12 +295,13 @@ func TestDoltTransactionCommitAutocommit(t *testing.T) {
|
||||
// Not the recommended way to use the feature, but it's permitted.
|
||||
harness := newDoltHarness(t)
|
||||
enginetest.TestTransactionScript(t, harness, enginetest.TransactionTest{
|
||||
Name: "dolt commit after transaction commit autocommit",
|
||||
Name: "dolt commit with autocommit",
|
||||
SetUpScript: []string{
|
||||
"CREATE TABLE x (y BIGINT PRIMARY KEY, z BIGINT);",
|
||||
"INSERT INTO x VALUES (1,1);",
|
||||
},
|
||||
Assertions: []enginetest.ScriptTestAssertion{
|
||||
// these SET statements currently commit a transaction (since autocommit is on)
|
||||
{
|
||||
Query: "/* client a */ SET @@dolt_transaction_commit=1;",
|
||||
Expected: []sql.Row{{}},
|
||||
@@ -313,6 +326,10 @@ func TestDoltTransactionCommitAutocommit(t *testing.T) {
|
||||
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
|
||||
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
|
||||
},
|
||||
{
|
||||
Query: "/* client c */ SELECT * FROM x ORDER BY y;",
|
||||
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
|
||||
},
|
||||
},
|
||||
})
|
||||
db := harness.databases[0].GetDoltDB()
|
||||
@@ -320,19 +337,25 @@ func TestDoltTransactionCommitAutocommit(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
headRefs, err := db.GetHeadRefs(context.Background())
|
||||
require.NoError(t, err)
|
||||
commit2, err := db.Resolve(context.Background(), cs, headRefs[0])
|
||||
commit3, err := db.Resolve(context.Background(), cs, headRefs[0])
|
||||
require.NoError(t, err)
|
||||
cm2, err := commit2.GetCommitMeta()
|
||||
cm3, err := commit3.GetCommitMeta()
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, cm2.Description, "Transaction commit at")
|
||||
require.Contains(t, cm3.Description, "Transaction commit")
|
||||
|
||||
as, err := doltdb.NewAncestorSpec("~1")
|
||||
require.NoError(t, err)
|
||||
commit2, err := commit3.GetAncestor(context.Background(), as)
|
||||
require.NoError(t, err)
|
||||
cm2, err := commit2.GetCommitMeta()
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, cm2.Description, "Transaction commit")
|
||||
|
||||
commit1, err := commit2.GetAncestor(context.Background(), as)
|
||||
require.NoError(t, err)
|
||||
cm1, err := commit1.GetCommitMeta()
|
||||
require.NoError(t, err)
|
||||
require.Contains(t, cm1.Description, "Transaction commit at")
|
||||
require.Equal(t, "Transaction commit", cm1.Description)
|
||||
|
||||
commit0, err := commit1.GetAncestor(context.Background(), as)
|
||||
require.NoError(t, err)
|
||||
|
||||
Reference in New Issue
Block a user