Added session var that matches dolt and transaction commits

This commit is contained in:
Daylon Wilkins
2021-05-30 20:32:09 -07:00
committed by Daylon Wilkins
parent 86571fd43d
commit ae0de59aa0
12 changed files with 475 additions and 25 deletions
+6 -4
View File
@@ -213,10 +213,12 @@ func (cmd SqlClientCmd) Exec(ctx context.Context, commandStr string, args []stri
if err != nil {
cli.PrintErrln(err.Error())
}
serverController.StopServer()
err = serverController.WaitForClose()
if err != nil {
cli.PrintErrln(err.Error())
if apr.Contains(sqlClientDualFlag) {
serverController.StopServer()
err = serverController.WaitForClose()
if err != nil {
cli.PrintErrln(err.Error())
}
}
return 0
@@ -23,13 +23,14 @@
package eventsapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
duration "github.com/golang/protobuf/ptypes/duration"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -4,6 +4,7 @@ package eventsapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@@ -23,11 +23,12 @@
package eventsapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -21,12 +21,13 @@
package remotesapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -4,6 +4,7 @@ package remotesapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
@@ -21,11 +21,12 @@
package remotesapi
import (
reflect "reflect"
sync "sync"
proto "github.com/golang/protobuf/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
@@ -4,6 +4,7 @@ package remotesapi
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
+22 -1
View File
@@ -99,7 +99,28 @@ func (db Database) StartTransaction(ctx *sql.Context) (sql.Transaction, error) {
return nil, err
}
return NewDoltTransaction(root, wsRef, db.ddb, db.rsw), nil
err = db.setHeadHash(ctx)
if err != nil {
return nil, err
}
return NewDoltTransaction(root, wsRef, db.DbData()), nil
}
func (db Database) setHeadHash(ctx *sql.Context) error {
headCommit, err := db.ddb.Resolve(ctx, db.rsr.CWBHeadSpec(), db.rsr.CWBHeadRef())
if err != nil {
return err
}
headHash, err := headCommit.HashOf()
if err != nil {
return err
}
if doltSession, ok := ctx.Session.(*DoltSession); ok {
return doltSession.SetSessionVarDirectly(ctx, HeadKey(db.name), headHash.String())
} else {
return ctx.SetSessionVariable(ctx, HeadKey(db.name), headHash.String())
}
}
func (db Database) CommitTransaction(ctx *sql.Context, tx sql.Transaction) error {
+63 -1
View File
@@ -24,6 +24,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/store/hash"
@@ -40,7 +41,10 @@ const (
WorkingKeySuffix = "_working"
)
const EnableTransactionsEnvKey = "DOLT_ENABLE_TRANSACTIONS"
const (
EnableTransactionsEnvKey = "DOLT_ENABLE_TRANSACTIONS"
DoltCommitOnTransactionCommit = "dolt_transaction_commit"
)
// TransactionsEnabled controls whether to use SQL transactions
// Exported only for testing
@@ -53,6 +57,16 @@ func init() {
TransactionsEnabled = true
}
}
sql.SystemVariables.AddSystemVariables([]sql.SystemVariable{
{
Name: DoltCommitOnTransactionCommit,
Scope: sql.SystemVariableScope_Session,
Dynamic: true,
SetVarHintApplies: false,
Type: sql.NewSystemBoolType(DoltCommitOnTransactionCommit),
Default: int8(0),
},
})
}
func IsHeadKey(key string) (bool, string) {
@@ -187,10 +201,58 @@ func (sess *DoltSession) CommitTransaction(ctx *sql.Context, dbName string, tx s
return err
}
err = sess.CommitWorkingSetToDolt(ctx, dtx.dbData, dbName)
if err != nil {
return err
}
sess.dirty[dbName] = false
return nil
}
// CommitWorkingSetToDolt stages the working set and then immediately commits the staged changes. This is a Dolt commit
// rather than a transaction commit. If there are no changes to be staged, then no commit is created.
func (sess *DoltSession) CommitWorkingSetToDolt(ctx *sql.Context, dbData env.DbData, dbName string) error {
if commitBool, err := sess.Session.GetSessionVariable(ctx, DoltCommitOnTransactionCommit); err != nil {
return err
} else if commitBool.(int8) == 1 {
fkChecks, err := sess.Session.GetSessionVariable(ctx, "foreign_key_checks")
if err != nil {
return err
}
err = actions.StageAllTables(ctx, dbData)
if err != nil {
return err
}
queryTime := ctx.QueryTime()
_, err = actions.CommitStaged(ctx, dbData, actions.CommitStagedProps{
Message: fmt.Sprintf("Transaction commit at %s", queryTime.UTC().Format("2006-01-02T15:04:05Z")),
Date: queryTime,
AllowEmpty: false,
CheckForeignKeys: fkChecks.(int8) == 1,
Name: sess.Username,
Email: sess.Email,
})
if _, ok := err.(actions.NothingStaged); err != nil && !ok {
return err
}
headCommit, err := dbData.Ddb.Resolve(ctx, dbData.Rsr.CWBHeadSpec(), dbData.Rsr.CWBHeadRef())
if err != nil {
return err
}
headHash, err := headCommit.HashOf()
if err != nil {
return err
}
err = sess.Session.SetSessionVariable(ctx, HeadKey(dbName), headHash.String())
if err != nil {
return err
}
}
return nil
}
// RollbackTransaction rolls the given transaction back
func (sess *DoltSession) RollbackTransaction(ctx *sql.Context, dbName string, tx sql.Transaction) error {
if !TransactionsEnabled || dbName == "" {
@@ -0,0 +1,360 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package enginetest
import (
"context"
"testing"
"github.com/dolthub/go-mysql-server/enginetest"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
)
func TestDoltTransactionCommitOneClient(t *testing.T) {
// In this test, we're setting only one client to match transaction commits to dolt commits.
// Autocommit is disabled for the enabled client, as it's the recommended way to use this feature.
ote := sqle.TransactionsEnabled
sqle.TransactionsEnabled = true
defer func() {
sqle.TransactionsEnabled = ote
}()
harness := newDoltHarness(t)
enginetest.TestTransactionScript(t, harness, enginetest.TransactionTest{
Name: "dolt commit after transaction commit one client",
SetUpScript: []string{
"CREATE TABLE x (y BIGINT PRIMARY KEY, z BIGINT);",
"INSERT INTO x VALUES (1,1);",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "/* client a */ SET @@autocommit=0;",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ SET @@dolt_transaction_commit=1;",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ SET @initial_head=@@mydb_head;",
Expected: []sql.Row{{}},
},
{
Query: "/* client b */ SET @initial_head=@@mydb_head;",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ START TRANSACTION;",
Expected: []sql.Row{},
},
{
Query: "/* client b */ START TRANSACTION;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client a */ INSERT INTO x VALUES (2,2);",
Expected: []sql.Row{{sql.NewOkResult(1)}},
},
{
Query: "/* client b */ INSERT INTO x VALUES (3,3);",
Expected: []sql.Row{{sql.NewOkResult(1)}},
},
{
Query: "/* client a */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}},
},
{
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {3, 3}},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client b */ COMMIT;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client a */ COMMIT;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{false}},
},
{
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{false}},
},
{
Query: "/* client a */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
{
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{false}},
},
{
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{false}},
},
},
})
db := harness.databases[0].GetDoltDB()
cs, err := doltdb.NewCommitSpec("HEAD")
require.NoError(t, err)
headRefs, err := db.GetHeadRefs(context.Background())
require.NoError(t, err)
commit, err := db.Resolve(context.Background(), cs, headRefs[0])
require.NoError(t, err)
cm, err := commit.GetCommitMeta()
require.NoError(t, err)
require.Contains(t, cm.Description, "Transaction commit at")
as, err := doltdb.NewAncestorSpec("~1")
require.NoError(t, err)
initialCommit, err := commit.GetAncestor(context.Background(), as)
require.NoError(t, err)
icm, err := initialCommit.GetCommitMeta()
require.NoError(t, err)
require.Equal(t, "Initialize data repository", icm.Description)
}
func TestDoltTransactionCommitTwoClients(t *testing.T) {
// In this test, we're setting both clients to match transaction commits to dolt commits.
// Autocommit is disabled, as it's the recommended way to use this feature.
ote := sqle.TransactionsEnabled
sqle.TransactionsEnabled = true
defer func() {
sqle.TransactionsEnabled = ote
}()
harness := newDoltHarness(t)
enginetest.TestTransactionScript(t, harness, enginetest.TransactionTest{
Name: "dolt commit after transaction commit two clients",
SetUpScript: []string{
"CREATE TABLE x (y BIGINT PRIMARY KEY, z BIGINT);",
"INSERT INTO x VALUES (1,1);",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "/* client a */ SET @@autocommit=0;",
Expected: []sql.Row{{}},
},
{
Query: "/* client b */ SET @@autocommit=0;",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ SET @@dolt_transaction_commit=1;",
Expected: []sql.Row{{}},
},
{
Query: "/* client b */ SET @@dolt_transaction_commit=1;",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ SET @initial_head=@@mydb_head;",
Expected: []sql.Row{{}},
},
{
Query: "/* client b */ SET @initial_head=@@mydb_head;",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ START TRANSACTION;",
Expected: []sql.Row{},
},
{
Query: "/* client b */ START TRANSACTION;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ INSERT INTO x VALUES (2,2);",
Expected: []sql.Row{{sql.NewOkResult(1)}},
},
{
Query: "/* client b */ INSERT INTO x VALUES (3,3);",
Expected: []sql.Row{{sql.NewOkResult(1)}},
},
{
Query: "/* client a */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}},
},
{
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {3, 3}},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client b */ COMMIT;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{true}},
},
{
Query: "/* client a */ COMMIT;",
Expected: []sql.Row{},
},
{
Query: "/* client a */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{false}},
},
{
Query: "/* client b */ SELECT @@mydb_head like @initial_head;",
Expected: []sql.Row{{false}},
},
{
Query: "/* client a */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
{
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
},
})
db := harness.databases[0].GetDoltDB()
cs, err := doltdb.NewCommitSpec("HEAD")
require.NoError(t, err)
headRefs, err := db.GetHeadRefs(context.Background())
require.NoError(t, err)
commit2, err := db.Resolve(context.Background(), cs, headRefs[0])
require.NoError(t, err)
cm2, err := commit2.GetCommitMeta()
require.NoError(t, err)
require.Contains(t, cm2.Description, "Transaction commit at")
as, err := doltdb.NewAncestorSpec("~1")
require.NoError(t, err)
commit1, err := commit2.GetAncestor(context.Background(), as)
require.NoError(t, err)
cm1, err := commit1.GetCommitMeta()
require.NoError(t, err)
require.Contains(t, cm1.Description, "Transaction commit at")
commit0, err := commit1.GetAncestor(context.Background(), as)
require.NoError(t, err)
cm0, err := commit0.GetCommitMeta()
require.NoError(t, err)
require.Equal(t, "Initialize data repository", cm0.Description)
}
func TestDoltTransactionCommitAutocommit(t *testing.T) {
// In this test, each insertion from both clients cause a commit as autocommit is enabled.
// Not the recommended way to use the feature, but it's permitted.
ote := sqle.TransactionsEnabled
sqle.TransactionsEnabled = true
defer func() {
sqle.TransactionsEnabled = ote
}()
harness := newDoltHarness(t)
enginetest.TestTransactionScript(t, harness, enginetest.TransactionTest{
Name: "dolt commit after transaction commit autocommit",
SetUpScript: []string{
"CREATE TABLE x (y BIGINT PRIMARY KEY, z BIGINT);",
"INSERT INTO x VALUES (1,1);",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "/* client a */ SET @@dolt_transaction_commit=1;",
Expected: []sql.Row{{}},
},
{
Query: "/* client b */ SET @@dolt_transaction_commit=1;",
Expected: []sql.Row{{}},
},
{
Query: "/* client a */ INSERT INTO x VALUES (2,2);",
Expected: []sql.Row{{sql.NewOkResult(1)}},
},
{
Query: "/* client b */ INSERT INTO x VALUES (3,3);",
Expected: []sql.Row{{sql.NewOkResult(1)}},
},
{
Query: "/* client a */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
{
Query: "/* client b */ SELECT * FROM x ORDER BY y;",
Expected: []sql.Row{{1, 1}, {2, 2}, {3, 3}},
},
},
})
db := harness.databases[0].GetDoltDB()
cs, err := doltdb.NewCommitSpec("HEAD")
require.NoError(t, err)
headRefs, err := db.GetHeadRefs(context.Background())
require.NoError(t, err)
commit2, err := db.Resolve(context.Background(), cs, headRefs[0])
require.NoError(t, err)
cm2, err := commit2.GetCommitMeta()
require.NoError(t, err)
require.Contains(t, cm2.Description, "Transaction commit at")
as, err := doltdb.NewAncestorSpec("~1")
require.NoError(t, err)
commit1, err := commit2.GetAncestor(context.Background(), as)
require.NoError(t, err)
cm1, err := commit1.GetCommitMeta()
require.NoError(t, err)
require.Contains(t, cm1.Description, "Transaction commit at")
commit0, err := commit1.GetAncestor(context.Background(), as)
require.NoError(t, err)
cm0, err := commit0.GetCommitMeta()
require.NoError(t, err)
require.Equal(t, "Initialize data repository", cm0.Description)
}
+9 -11
View File
@@ -35,8 +35,7 @@ const (
type DoltTransaction struct {
startRoot *doltdb.RootValue
workingSet ref.WorkingSetRef
db *doltdb.DoltDB
rsw env.RepoStateWriter
dbData env.DbData
savepoints []savepoint
}
@@ -45,12 +44,11 @@ type savepoint struct {
root *doltdb.RootValue
}
func NewDoltTransaction(startRoot *doltdb.RootValue, workingSet ref.WorkingSetRef, db *doltdb.DoltDB, rsw env.RepoStateWriter) *DoltTransaction {
func NewDoltTransaction(startRoot *doltdb.RootValue, workingSet ref.WorkingSetRef, dbData env.DbData) *DoltTransaction {
return &DoltTransaction{
startRoot: startRoot,
workingSet: workingSet,
db: db,
rsw: rsw,
dbData: dbData,
}
}
@@ -67,10 +65,10 @@ func (tx DoltTransaction) String() string {
// if working set == ancRoot, attempt a fast-forward merge
func (tx *DoltTransaction) Commit(ctx *sql.Context, newRoot *doltdb.RootValue) (*doltdb.RootValue, error) {
for i := 0; i < maxTxCommitRetries; i++ {
ws, err := tx.db.ResolveWorkingSet(ctx, tx.workingSet)
ws, err := tx.dbData.Ddb.ResolveWorkingSet(ctx, tx.workingSet)
if err == doltdb.ErrWorkingSetNotFound {
// initial commit
err = tx.db.UpdateWorkingSet(ctx, tx.workingSet, newRoot, hash.Hash{})
err = tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSet, newRoot, hash.Hash{})
if err == datas.ErrOptimisticLockFailed {
continue
}
@@ -82,14 +80,14 @@ func (tx *DoltTransaction) Commit(ctx *sql.Context, newRoot *doltdb.RootValue) (
root := ws.RootValue()
hash, err := ws.Struct().Hash(tx.db.Format())
hash, err := ws.Struct().Hash(tx.dbData.Ddb.Format())
if err != nil {
return nil, err
}
if rootsEqual(root, tx.startRoot) {
// ff merge
err = tx.db.UpdateWorkingSet(ctx, tx.workingSet, newRoot, hash)
err = tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSet, newRoot, hash)
if err == datas.ErrOptimisticLockFailed {
continue
} else if err != nil {
@@ -111,7 +109,7 @@ func (tx *DoltTransaction) Commit(ctx *sql.Context, newRoot *doltdb.RootValue) (
}
}
err = tx.db.UpdateWorkingSet(ctx, tx.workingSet, mergedRoot, hash)
err = tx.dbData.Ddb.UpdateWorkingSet(ctx, tx.workingSet, mergedRoot, hash)
if err == datas.ErrOptimisticLockFailed {
continue
}
@@ -131,7 +129,7 @@ func (tx *DoltTransaction) updateRepoStateFile(ctx *sql.Context, mergedRoot *dol
return nil, err
}
err = tx.rsw.SetWorkingHash(ctx, hash)
err = tx.dbData.Rsw.SetWorkingHash(ctx, hash)
if err != nil {
return nil, err
}