diff --git a/go/go.mod b/go/go.mod index 406ad473fb..cf3ceff484 100644 --- a/go/go.mod +++ b/go/go.mod @@ -57,7 +57,7 @@ require ( github.com/cespare/xxhash v1.1.0 github.com/creasty/defaults v1.6.0 github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2 - github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3 + github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb github.com/dolthub/swiss v0.1.0 github.com/goccy/go-json v0.10.2 github.com/google/go-github/v57 v57.0.0 diff --git a/go/go.sum b/go/go.sum index 914d706b25..163a180a91 100644 --- a/go/go.sum +++ b/go/go.sum @@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U= github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y= github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168= -github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3 h1:i48WY1tLEbvbN4PWiozfC7CzuuOw9or4tZedIWwxWE0= -github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3/go.mod h1:uTqIti1oPqKEUS9N1zcT43a/QQ8n0PuBdZUMZziBaGU= +github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb h1:AN5zMcStorSl9r87Wkvz1aFN57VBobiofbRzxnAqh5o= +github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb/go.mod h1:uTqIti1oPqKEUS9N1zcT43a/QQ8n0PuBdZUMZziBaGU= github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514= github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto= github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTEtT5tOBsCuCrlYnLRKpbJVJkDbrTRhwQ= diff --git a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go index aad59db37e..f43060c2a5 100644 --- a/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go +++ b/go/libraries/doltcore/sqle/dsess/autoincrement_tracker.go @@ -34,19 +34,28 @@ import ( "github.com/dolthub/dolt/go/store/types" ) +type LockMode int64 + +var ( + LockMode_Traditional LockMode = 0 + LockMode_Concurret LockMode = 1 + LockMode_Interleaved LockMode = 2 +) + type AutoIncrementTracker struct { dbName string sequences *sync.Map // map[string]uint64 mm *mutexmap.MutexMap + lockMode LockMode } -var _ globalstate.AutoIncrementTracker = AutoIncrementTracker{} +var _ globalstate.AutoIncrementTracker = &AutoIncrementTracker{} // NewAutoIncrementTracker returns a new autoincrement tracker for the roots given. All roots sets must be // considered because the auto increment value for a table is tracked globally, across all branches. // Roots provided should be the working sets when available, or the branches when they are not (e.g. for remote // branches that don't have a local working set) -func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb.Rootish) (AutoIncrementTracker, error) { +func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb.Rootish) (*AutoIncrementTracker, error) { ait := AutoIncrementTracker{ dbName: dbName, sequences: &sync.Map{}, @@ -56,7 +65,7 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb for _, root := range roots { root, err := root.ResolveRootValue(ctx) if err != nil { - return AutoIncrementTracker{}, err + return &AutoIncrementTracker{}, err } err = root.IterTables(ctx, func(tableName string, table *doltdb.Table, sch schema.Schema) (bool, error) { @@ -81,10 +90,11 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb }) if err != nil { - return AutoIncrementTracker{}, err + return &AutoIncrementTracker{}, err } } - return ait, nil + + return &ait, nil } func loadAutoIncValue(sequences *sync.Map, tableName string) uint64 { @@ -111,8 +121,10 @@ func (a AutoIncrementTracker) Next(tbl string, insertVal interface{}) (uint64, e return 0, err } - release := a.mm.Lock(tbl) - defer release() + if a.lockMode == LockMode_Interleaved { + release := a.mm.Lock(tbl) + defer release() + } curr := loadAutoIncValue(a.sequences, tbl) @@ -406,3 +418,13 @@ func (a AutoIncrementTracker) DropTable(ctx *sql.Context, tableName string, wses return nil } + +func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName string) (func(), error) { + _, i, _ := sql.SystemVariables.GetGlobal("innodb_autoinc_lock_mode") + lockMode := LockMode(i.(int64)) + if lockMode == LockMode_Interleaved { + panic("Attempted to acquire AutoInc lock for entire insert operation, but lock mode was set to Interleaved") + } + a.lockMode = lockMode + return a.mm.Lock(tableName), nil +} diff --git a/go/libraries/doltcore/sqle/dsess/globalstate.go b/go/libraries/doltcore/sqle/dsess/globalstate.go index ab6e07eefd..dd675d099b 100755 --- a/go/libraries/doltcore/sqle/dsess/globalstate.go +++ b/go/libraries/doltcore/sqle/dsess/globalstate.go @@ -83,7 +83,7 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol } type GlobalStateImpl struct { - aiTracker AutoIncrementTracker + aiTracker globalstate.AutoIncrementTracker mu *sync.Mutex } diff --git a/go/libraries/doltcore/sqle/dsess/mutexmap/mutexmap.go b/go/libraries/doltcore/sqle/dsess/mutexmap/mutexmap.go index 2bbc65a3ce..1fa0e49ef2 100644 --- a/go/libraries/doltcore/sqle/dsess/mutexmap/mutexmap.go +++ b/go/libraries/doltcore/sqle/dsess/mutexmap/mutexmap.go @@ -38,18 +38,26 @@ func NewMutexMap() *MutexMap { func (mm *MutexMap) Lock(key interface{}) func() { mm.mu.Lock() - defer mm.mu.Unlock() - keyedMutex, hasKey := mm.keyedMutexes[key] - if !hasKey { - keyedMutex = &mapMutex{parent: mm, key: key} - mm.keyedMutexes[key] = keyedMutex - } - keyedMutex.refcount++ + var keyedMutex *mapMutex + func() { + // We must release the parent lock before attempting to acquire the child lock, otherwise if the child lock + // is currently held it will never be released. + defer mm.mu.Unlock() + var hasKey bool + keyedMutex, hasKey = mm.keyedMutexes[key] + if !hasKey { + keyedMutex = &mapMutex{parent: mm, key: key} + mm.keyedMutexes[key] = keyedMutex + } + keyedMutex.refcount++ + }() keyedMutex.mu.Lock() - return func() { keyedMutex.Unlock() } + return func() { + keyedMutex.Unlock() + } } func (mm *mapMutex) Unlock() { diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go index 2d07b4b36f..a7c0fe764e 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_engine_test.go @@ -17,6 +17,7 @@ package enginetest import ( "context" "fmt" + "io" "os" "runtime" "sync" @@ -31,6 +32,7 @@ import ( "github.com/dolthub/go-mysql-server/sql/memo" "github.com/dolthub/go-mysql-server/sql/mysql_db" "github.com/dolthub/go-mysql-server/sql/plan" + "github.com/dolthub/go-mysql-server/sql/transform" gmstypes "github.com/dolthub/go-mysql-server/sql/types" "github.com/dolthub/vitess/go/mysql" "github.com/stretchr/testify/assert" @@ -195,6 +197,125 @@ func newUpdateResult(matched, updated int) gmstypes.OkResult { } } +func TestAutoIncrementTrackerLockMode(t *testing.T) { + for _, lockMode := range []int64{0, 1, 2} { + t.Run(fmt.Sprintf("lock mode %d", lockMode), func(t *testing.T) { + testAutoIncrementTrackerWithLockMode(t, lockMode) + }) + } +} + +// testAutoIncrementTrackerWithLockMode tests that interleaved inserts don't cause deadlocks, regardless of the value of innodb_autoinc_lock_mode. +// In a real use case, these interleaved operations would be happening in different sessions on different threads. +// In order to make the test behave predictably, we manually interleave the two iterators. +func testAutoIncrementTrackerWithLockMode(t *testing.T, lockMode int64) { + + err := sql.SystemVariables.AssignValues(map[string]interface{}{"innodb_autoinc_lock_mode": lockMode}) + require.NoError(t, err) + + setupScripts := []setup.SetupScript{[]string{ + "CREATE TABLE test1 (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,c0 int,index t1_c_index (c0));", + "CREATE TABLE test2 (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,c0 int,index t2_c_index (c0));", + "CREATE TABLE timestamps (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT, t int);", + "CREATE TRIGGER t1 AFTER INSERT ON test1 FOR EACH ROW INSERT INTO timestamps VALUES (0, 1);", + "CREATE TRIGGER t2 AFTER INSERT ON test2 FOR EACH ROW INSERT INTO timestamps VALUES (0, 2);", + "CREATE VIEW bin AS SELECT 0 AS v UNION ALL SELECT 1;", + "CREATE VIEW sequence5bit AS SELECT b1.v + 2*b2.v + 4*b3.v + 8*b4.v + 16*b5.v AS v from bin b1, bin b2, bin b3, bin b4, bin b5;", + }} + + harness := newDoltHarness(t) + defer harness.Close() + harness.Setup(setup.MydbData, setupScripts) + e := mustNewEngine(t, harness) + + defer e.Close() + ctx := enginetest.NewContext(harness) + + // Confirm that the system variable was correctly set. + _, iter, err := e.Query(ctx, "select @@innodb_autoinc_lock_mode") + require.NoError(t, err) + rows, err := sql.RowIterToRows(ctx, iter) + require.NoError(t, err) + assert.Equal(t, rows, []sql.Row{{lockMode}}) + + // Ordinarily QueryEngine.query manages transactions. + // Since we can't use that for this test, we manually start a new transaction. + ts := ctx.Session.(sql.TransactionSession) + tx, err := ts.StartTransaction(ctx, sql.ReadWrite) + require.NoError(t, err) + ctx.SetTransaction(tx) + + getTriggerIter := func(query string) sql.RowIter { + root, err := e.AnalyzeQuery(ctx, query) + require.NoError(t, err) + + var triggerNode *plan.TriggerExecutor + transform.Node(root, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) { + if triggerNode != nil { + return n, transform.SameTree, nil + } + if t, ok := n.(*plan.TriggerExecutor); ok { + triggerNode = t + } + return n, transform.NewTree, nil + }) + iter, err := e.EngineAnalyzer().ExecBuilder.Build(ctx, triggerNode, nil) + require.NoError(t, err) + return iter + } + + iter1 := getTriggerIter("INSERT INTO test1 (c0) select v from sequence5bit;") + iter2 := getTriggerIter("INSERT INTO test2 (c0) select v from sequence5bit;") + + // Alternate the iterators until they're exhausted. + var err1 error + var err2 error + for err1 != io.EOF || err2 != io.EOF { + if err1 != io.EOF { + var row1 sql.Row + require.NoError(t, err1) + row1, err1 = iter1.Next(ctx) + _ = row1 + } + if err2 != io.EOF { + require.NoError(t, err2) + _, err2 = iter2.Next(ctx) + } + } + err = iter1.Close(ctx) + require.NoError(t, err) + err = iter2.Close(ctx) + require.NoError(t, err) + + dsess.DSessFromSess(ctx.Session).CommitTransaction(ctx, ctx.GetTransaction()) + + // Verify that the inserts are seen by the engine. + { + _, iter, err := e.Query(ctx, "select count(*) from timestamps") + require.NoError(t, err) + rows, err := sql.RowIterToRows(ctx, iter) + require.NoError(t, err) + assert.Equal(t, rows, []sql.Row{{int64(64)}}) + } + + // Verify that the insert operations are actually interleaved by inspecting the order that values were added to `timestamps` + { + _, iter, err := e.Query(ctx, "select (select min(pk) from timestamps where t = 1) < (select max(pk) from timestamps where t = 2)") + require.NoError(t, err) + rows, err := sql.RowIterToRows(ctx, iter) + require.NoError(t, err) + assert.Equal(t, rows, []sql.Row{{true}}) + } + + { + _, iter, err := e.Query(ctx, "select (select min(pk) from timestamps where t = 2) < (select max(pk) from timestamps where t = 1)") + require.NoError(t, err) + rows, err := sql.RowIterToRows(ctx, iter) + require.NoError(t, err) + assert.Equal(t, rows, []sql.Row{{true}}) + } +} + // Convenience test for debugging a single query. Unskip and set to the desired query. func TestSingleMergeScript(t *testing.T) { t.Skip() diff --git a/go/libraries/doltcore/sqle/globalstate/auto_increment_tracker.go b/go/libraries/doltcore/sqle/globalstate/auto_increment_tracker.go index ce423e6a82..9687bb8000 100644 --- a/go/libraries/doltcore/sqle/globalstate/auto_increment_tracker.go +++ b/go/libraries/doltcore/sqle/globalstate/auto_increment_tracker.go @@ -38,4 +38,9 @@ type AutoIncrementTracker interface { // below the current value for this table. The table in the provided working set is assumed to already have the value // given, so the new global maximum is computed without regard for its value in that working set. Set(ctx *sql.Context, tableName string, table *doltdb.Table, ws ref.WorkingSetRef, newAutoIncVal uint64) (*doltdb.Table, error) + + // AcquireTableLock acquires the auto increment lock on a table, and reutrns a callback function to release the lock. + // Depending on the value of the `innodb_autoinc_lock_mode` system variable, the engine may need to acquire and hold + // the lock for the duration of an insert statement. + AcquireTableLock(ctx *sql.Context, tableName string) (func(), error) } diff --git a/go/libraries/doltcore/sqle/sqlutil/static_errors.go b/go/libraries/doltcore/sqle/sqlutil/static_errors.go index 926ea29c1d..e391bf2940 100644 --- a/go/libraries/doltcore/sqle/sqlutil/static_errors.go +++ b/go/libraries/doltcore/sqle/sqlutil/static_errors.go @@ -82,6 +82,10 @@ func (e *StaticErrorEditor) SetAutoIncrementValue(*sql.Context, uint64) error { return e.err } +func (e *StaticErrorEditor) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) { + return func() {}, nil +} + func (e *StaticErrorEditor) StatementBegin(ctx *sql.Context) {} func (e *StaticErrorEditor) DiscardChanges(ctx *sql.Context, errorEncountered error) error { diff --git a/go/libraries/doltcore/sqle/writer/noms_table_writer.go b/go/libraries/doltcore/sqle/writer/noms_table_writer.go index ef6ffae817..5dfb07a113 100644 --- a/go/libraries/doltcore/sqle/writer/noms_table_writer.go +++ b/go/libraries/doltcore/sqle/writer/noms_table_writer.go @@ -153,6 +153,10 @@ func (te *nomsTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) e return te.flush(ctx) } +func (te *nomsTableWriter) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) { + return te.autoInc.AcquireTableLock(ctx, te.tableName) +} + func (te *nomsTableWriter) IndexedAccess(i sql.IndexLookup) sql.IndexedTable { idx := index.DoltIndexFromSqlIndex(i.Index) return &nomsFkIndexer{ diff --git a/go/libraries/doltcore/sqle/writer/prolly_table_writer.go b/go/libraries/doltcore/sqle/writer/prolly_table_writer.go index c2c75d66dd..cb7ab56343 100644 --- a/go/libraries/doltcore/sqle/writer/prolly_table_writer.go +++ b/go/libraries/doltcore/sqle/writer/prolly_table_writer.go @@ -206,7 +206,7 @@ func (w *prollyTableWriter) GetNextAutoIncrementValue(ctx *sql.Context, insertVa return w.aiTracker.Next(w.tableName, insertVal) } -// SetAutoIncrementValue implements TableWriter. +// SetAutoIncrementValue implements AutoIncrementSetter. func (w *prollyTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) error { seq, err := w.aiTracker.CoerceAutoIncrementValue(val) if err != nil { @@ -220,6 +220,10 @@ func (w *prollyTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) return w.flush(ctx) } +func (w *prollyTableWriter) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) { + return w.aiTracker.AcquireTableLock(ctx, w.tableName) +} + // Close implements Closer func (w *prollyTableWriter) Close(ctx *sql.Context) error { // We discard data changes in DiscardChanges, but this doesn't include schema changes, which we don't want to flush diff --git a/integration-tests/bats/auto_increment_lock_modes.bats b/integration-tests/bats/auto_increment_lock_modes.bats new file mode 100644 index 0000000000..595b0b07e4 --- /dev/null +++ b/integration-tests/bats/auto_increment_lock_modes.bats @@ -0,0 +1,82 @@ +#!/usr/bin/env bats +load $BATS_TEST_DIRNAME/helper/common.bash + +setup() { + setup_common + + dolt sql < config.yml < config.yml <