Merge pull request #7704 from dolthub/nicktobey/autoinclock

Implement `traditional` auto-increment lock mode hold the lock for the duration of the insert iter.
This commit is contained in:
Nick Tobey
2024-04-11 14:46:58 -07:00
committed by GitHub
12 changed files with 270 additions and 33 deletions

View File

@@ -57,7 +57,7 @@ require (
github.com/cespare/xxhash v1.1.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3
github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb
github.com/dolthub/swiss v0.1.0
github.com/goccy/go-json v0.10.2
github.com/google/go-github/v57 v57.0.0

View File

@@ -183,8 +183,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e h1:kPsT4a47cw1+y/N5SSCkma7FhAPw7KeGmD6c9PBZW9Y=
github.com/dolthub/go-icu-regex v0.0.0-20230524105445-af7e7991c97e/go.mod h1:KPUcpx070QOfJK1gNe0zx4pA5sicIK1GMikIGLKC168=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3 h1:i48WY1tLEbvbN4PWiozfC7CzuuOw9or4tZedIWwxWE0=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410074202-09d57d9841d3/go.mod h1:uTqIti1oPqKEUS9N1zcT43a/QQ8n0PuBdZUMZziBaGU=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb h1:AN5zMcStorSl9r87Wkvz1aFN57VBobiofbRzxnAqh5o=
github.com/dolthub/go-mysql-server v0.18.2-0.20240410155534-b53360bab9bb/go.mod h1:uTqIti1oPqKEUS9N1zcT43a/QQ8n0PuBdZUMZziBaGU=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto=
github.com/dolthub/jsonpath v0.0.2-0.20240227200619-19675ab05c71 h1:bMGS25NWAGTEtT5tOBsCuCrlYnLRKpbJVJkDbrTRhwQ=

View File

@@ -34,19 +34,28 @@ import (
"github.com/dolthub/dolt/go/store/types"
)
type LockMode int64
var (
LockMode_Traditional LockMode = 0
LockMode_Concurret LockMode = 1
LockMode_Interleaved LockMode = 2
)
type AutoIncrementTracker struct {
dbName string
sequences *sync.Map // map[string]uint64
mm *mutexmap.MutexMap
lockMode LockMode
}
var _ globalstate.AutoIncrementTracker = AutoIncrementTracker{}
var _ globalstate.AutoIncrementTracker = &AutoIncrementTracker{}
// NewAutoIncrementTracker returns a new autoincrement tracker for the roots given. All roots sets must be
// considered because the auto increment value for a table is tracked globally, across all branches.
// Roots provided should be the working sets when available, or the branches when they are not (e.g. for remote
// branches that don't have a local working set)
func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb.Rootish) (AutoIncrementTracker, error) {
func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb.Rootish) (*AutoIncrementTracker, error) {
ait := AutoIncrementTracker{
dbName: dbName,
sequences: &sync.Map{},
@@ -56,7 +65,7 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb
for _, root := range roots {
root, err := root.ResolveRootValue(ctx)
if err != nil {
return AutoIncrementTracker{}, err
return &AutoIncrementTracker{}, err
}
err = root.IterTables(ctx, func(tableName string, table *doltdb.Table, sch schema.Schema) (bool, error) {
@@ -81,10 +90,11 @@ func NewAutoIncrementTracker(ctx context.Context, dbName string, roots ...doltdb
})
if err != nil {
return AutoIncrementTracker{}, err
return &AutoIncrementTracker{}, err
}
}
return ait, nil
return &ait, nil
}
func loadAutoIncValue(sequences *sync.Map, tableName string) uint64 {
@@ -111,8 +121,10 @@ func (a AutoIncrementTracker) Next(tbl string, insertVal interface{}) (uint64, e
return 0, err
}
release := a.mm.Lock(tbl)
defer release()
if a.lockMode == LockMode_Interleaved {
release := a.mm.Lock(tbl)
defer release()
}
curr := loadAutoIncValue(a.sequences, tbl)
@@ -406,3 +418,13 @@ func (a AutoIncrementTracker) DropTable(ctx *sql.Context, tableName string, wses
return nil
}
func (a *AutoIncrementTracker) AcquireTableLock(ctx *sql.Context, tableName string) (func(), error) {
_, i, _ := sql.SystemVariables.GetGlobal("innodb_autoinc_lock_mode")
lockMode := LockMode(i.(int64))
if lockMode == LockMode_Interleaved {
panic("Attempted to acquire AutoInc lock for entire insert operation, but lock mode was set to Interleaved")
}
a.lockMode = lockMode
return a.mm.Lock(tableName), nil
}

View File

@@ -83,7 +83,7 @@ func NewGlobalStateStoreForDb(ctx context.Context, dbName string, db *doltdb.Dol
}
type GlobalStateImpl struct {
aiTracker AutoIncrementTracker
aiTracker globalstate.AutoIncrementTracker
mu *sync.Mutex
}

View File

@@ -38,18 +38,26 @@ func NewMutexMap() *MutexMap {
func (mm *MutexMap) Lock(key interface{}) func() {
mm.mu.Lock()
defer mm.mu.Unlock()
keyedMutex, hasKey := mm.keyedMutexes[key]
if !hasKey {
keyedMutex = &mapMutex{parent: mm, key: key}
mm.keyedMutexes[key] = keyedMutex
}
keyedMutex.refcount++
var keyedMutex *mapMutex
func() {
// We must release the parent lock before attempting to acquire the child lock, otherwise if the child lock
// is currently held it will never be released.
defer mm.mu.Unlock()
var hasKey bool
keyedMutex, hasKey = mm.keyedMutexes[key]
if !hasKey {
keyedMutex = &mapMutex{parent: mm, key: key}
mm.keyedMutexes[key] = keyedMutex
}
keyedMutex.refcount++
}()
keyedMutex.mu.Lock()
return func() { keyedMutex.Unlock() }
return func() {
keyedMutex.Unlock()
}
}
func (mm *mapMutex) Unlock() {

View File

@@ -17,6 +17,7 @@ package enginetest
import (
"context"
"fmt"
"io"
"os"
"runtime"
"sync"
@@ -31,6 +32,7 @@ import (
"github.com/dolthub/go-mysql-server/sql/memo"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
"github.com/dolthub/go-mysql-server/sql/plan"
"github.com/dolthub/go-mysql-server/sql/transform"
gmstypes "github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/vitess/go/mysql"
"github.com/stretchr/testify/assert"
@@ -195,6 +197,125 @@ func newUpdateResult(matched, updated int) gmstypes.OkResult {
}
}
func TestAutoIncrementTrackerLockMode(t *testing.T) {
for _, lockMode := range []int64{0, 1, 2} {
t.Run(fmt.Sprintf("lock mode %d", lockMode), func(t *testing.T) {
testAutoIncrementTrackerWithLockMode(t, lockMode)
})
}
}
// testAutoIncrementTrackerWithLockMode tests that interleaved inserts don't cause deadlocks, regardless of the value of innodb_autoinc_lock_mode.
// In a real use case, these interleaved operations would be happening in different sessions on different threads.
// In order to make the test behave predictably, we manually interleave the two iterators.
func testAutoIncrementTrackerWithLockMode(t *testing.T, lockMode int64) {
err := sql.SystemVariables.AssignValues(map[string]interface{}{"innodb_autoinc_lock_mode": lockMode})
require.NoError(t, err)
setupScripts := []setup.SetupScript{[]string{
"CREATE TABLE test1 (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,c0 int,index t1_c_index (c0));",
"CREATE TABLE test2 (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,c0 int,index t2_c_index (c0));",
"CREATE TABLE timestamps (pk int NOT NULL PRIMARY KEY AUTO_INCREMENT, t int);",
"CREATE TRIGGER t1 AFTER INSERT ON test1 FOR EACH ROW INSERT INTO timestamps VALUES (0, 1);",
"CREATE TRIGGER t2 AFTER INSERT ON test2 FOR EACH ROW INSERT INTO timestamps VALUES (0, 2);",
"CREATE VIEW bin AS SELECT 0 AS v UNION ALL SELECT 1;",
"CREATE VIEW sequence5bit AS SELECT b1.v + 2*b2.v + 4*b3.v + 8*b4.v + 16*b5.v AS v from bin b1, bin b2, bin b3, bin b4, bin b5;",
}}
harness := newDoltHarness(t)
defer harness.Close()
harness.Setup(setup.MydbData, setupScripts)
e := mustNewEngine(t, harness)
defer e.Close()
ctx := enginetest.NewContext(harness)
// Confirm that the system variable was correctly set.
_, iter, err := e.Query(ctx, "select @@innodb_autoinc_lock_mode")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{lockMode}})
// Ordinarily QueryEngine.query manages transactions.
// Since we can't use that for this test, we manually start a new transaction.
ts := ctx.Session.(sql.TransactionSession)
tx, err := ts.StartTransaction(ctx, sql.ReadWrite)
require.NoError(t, err)
ctx.SetTransaction(tx)
getTriggerIter := func(query string) sql.RowIter {
root, err := e.AnalyzeQuery(ctx, query)
require.NoError(t, err)
var triggerNode *plan.TriggerExecutor
transform.Node(root, func(n sql.Node) (sql.Node, transform.TreeIdentity, error) {
if triggerNode != nil {
return n, transform.SameTree, nil
}
if t, ok := n.(*plan.TriggerExecutor); ok {
triggerNode = t
}
return n, transform.NewTree, nil
})
iter, err := e.EngineAnalyzer().ExecBuilder.Build(ctx, triggerNode, nil)
require.NoError(t, err)
return iter
}
iter1 := getTriggerIter("INSERT INTO test1 (c0) select v from sequence5bit;")
iter2 := getTriggerIter("INSERT INTO test2 (c0) select v from sequence5bit;")
// Alternate the iterators until they're exhausted.
var err1 error
var err2 error
for err1 != io.EOF || err2 != io.EOF {
if err1 != io.EOF {
var row1 sql.Row
require.NoError(t, err1)
row1, err1 = iter1.Next(ctx)
_ = row1
}
if err2 != io.EOF {
require.NoError(t, err2)
_, err2 = iter2.Next(ctx)
}
}
err = iter1.Close(ctx)
require.NoError(t, err)
err = iter2.Close(ctx)
require.NoError(t, err)
dsess.DSessFromSess(ctx.Session).CommitTransaction(ctx, ctx.GetTransaction())
// Verify that the inserts are seen by the engine.
{
_, iter, err := e.Query(ctx, "select count(*) from timestamps")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{int64(64)}})
}
// Verify that the insert operations are actually interleaved by inspecting the order that values were added to `timestamps`
{
_, iter, err := e.Query(ctx, "select (select min(pk) from timestamps where t = 1) < (select max(pk) from timestamps where t = 2)")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{true}})
}
{
_, iter, err := e.Query(ctx, "select (select min(pk) from timestamps where t = 2) < (select max(pk) from timestamps where t = 1)")
require.NoError(t, err)
rows, err := sql.RowIterToRows(ctx, iter)
require.NoError(t, err)
assert.Equal(t, rows, []sql.Row{{true}})
}
}
// Convenience test for debugging a single query. Unskip and set to the desired query.
func TestSingleMergeScript(t *testing.T) {
t.Skip()

View File

@@ -38,4 +38,9 @@ type AutoIncrementTracker interface {
// below the current value for this table. The table in the provided working set is assumed to already have the value
// given, so the new global maximum is computed without regard for its value in that working set.
Set(ctx *sql.Context, tableName string, table *doltdb.Table, ws ref.WorkingSetRef, newAutoIncVal uint64) (*doltdb.Table, error)
// AcquireTableLock acquires the auto increment lock on a table, and reutrns a callback function to release the lock.
// Depending on the value of the `innodb_autoinc_lock_mode` system variable, the engine may need to acquire and hold
// the lock for the duration of an insert statement.
AcquireTableLock(ctx *sql.Context, tableName string) (func(), error)
}

View File

@@ -82,6 +82,10 @@ func (e *StaticErrorEditor) SetAutoIncrementValue(*sql.Context, uint64) error {
return e.err
}
func (e *StaticErrorEditor) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
return func() {}, nil
}
func (e *StaticErrorEditor) StatementBegin(ctx *sql.Context) {}
func (e *StaticErrorEditor) DiscardChanges(ctx *sql.Context, errorEncountered error) error {

View File

@@ -153,6 +153,10 @@ func (te *nomsTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) e
return te.flush(ctx)
}
func (te *nomsTableWriter) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
return te.autoInc.AcquireTableLock(ctx, te.tableName)
}
func (te *nomsTableWriter) IndexedAccess(i sql.IndexLookup) sql.IndexedTable {
idx := index.DoltIndexFromSqlIndex(i.Index)
return &nomsFkIndexer{

View File

@@ -206,7 +206,7 @@ func (w *prollyTableWriter) GetNextAutoIncrementValue(ctx *sql.Context, insertVa
return w.aiTracker.Next(w.tableName, insertVal)
}
// SetAutoIncrementValue implements TableWriter.
// SetAutoIncrementValue implements AutoIncrementSetter.
func (w *prollyTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) error {
seq, err := w.aiTracker.CoerceAutoIncrementValue(val)
if err != nil {
@@ -220,6 +220,10 @@ func (w *prollyTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64)
return w.flush(ctx)
}
func (w *prollyTableWriter) AcquireAutoIncrementLock(ctx *sql.Context) (func(), error) {
return w.aiTracker.AcquireTableLock(ctx, w.tableName)
}
// Close implements Closer
func (w *prollyTableWriter) Close(ctx *sql.Context) error {
// We discard data changes in DiscardChanges, but this doesn't include schema changes, which we don't want to flush

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env bats
load $BATS_TEST_DIRNAME/helper/common.bash
setup() {
setup_common
dolt sql <<SQL
CREATE TABLE test1 (
pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,
c0 int,
index t1_c_index (c0)
);
CREATE TABLE test2 (
pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,
c0 int,
index t2_c_index (c0)
);
-- We use this table to store the first id and row count of both insert operations, which helps us prove
-- that the generated id ranges are contiguous for each transaction.
CREATE TABLE ranges (
pk int NOT NULL PRIMARY KEY,
firstId int,
rowCount int
);
-- We use this table to demonstrate that concurrent writes to different tables can and are interleaved
CREATE TABLE timestamps (
pk int NOT NULL PRIMARY KEY AUTO_INCREMENT,
t int
);
delimiter |
CREATE TRIGGER t1 AFTER INSERT ON test1
FOR EACH ROW
BEGIN
INSERT INTO timestamps VALUES (0, 1);
END|
CREATE TRIGGER t2 AFTER INSERT ON test2
FOR EACH ROW
BEGIN
INSERT INTO timestamps VALUES (0, 2);
END|
delimiter ;
CREATE VIEW bin AS SELECT 0 AS v UNION ALL SELECT 1;
CREATE VIEW sequence5bit AS SELECT b1.v + 2*b2.v + 4*b3.v + 8*b4.v + 16*b5.v AS v from bin b1, bin b2, bin b3, bin b4, bin b5;
CREATE VIEW sequence10bit AS SELECT b1.v + 32*b2.v AS v from sequence5bit b1, sequence5bit b2;
CREATE VIEW sequence15bit AS SELECT b1.v + 32*b2.v + 32*32*b3.v AS v from sequence5bit b1, sequence5bit b2, sequence5bit b3;
SQL
}
teardown() {
assert_feature_version
teardown_common
}
@test "auto_increment_lock_modes: multiple inserts to the same table aren't interleaved when innodb_autoinc_lock_mode = 0" {
cat > config.yml <<EOF
system_variables:
innodb_autoinc_lock_mode: 0
EOF
if [ "$SQL_ENGINE" = "remote-engine" ]; then
skip "This test tests remote connections directly, SQL_ENGINE is not needed."
fi
start_sql_server_with_config "" config.yml
dolt sql -q "INSERT INTO test1 (c0) select 0 from sequence10bit; INSERT INTO ranges VALUES (0, LAST_INSERT_ID(), ROW_COUNT()); COMMIT;" &
dolt sql -q "INSERT INTO test1 (c0) select 1 from sequence10bit; INSERT INTO ranges VALUES (1, LAST_INSERT_ID(), ROW_COUNT()); COMMIT;"
wait $!
stop_sql_server
run dolt sql -r csv -q "select
c0,
min(pk) = firstId,
rowCount = 1024,
max(pk) = firstId + rowCount -1
from test1 join ranges on test1.c0 = ranges.pk group by c0"
[ "$status" -eq 0 ]
[[ "$output" =~ "0,true,true,true" ]] || false
[[ "$output" =~ "1,true,true,true" ]] || false
}

View File

@@ -43,19 +43,6 @@ EOF
run dolt sql-server --config ./config.yml
[ $status -eq 1 ]
[[ "$output" =~ "Variable 'innodb_autoinc_lock_mode' can't be set to the value of '100'" ]] || false
cat > config.yml <<EOF
user:
name: dolt
listener:
host: "0.0.0.0"
port: $PORT
system_variables:
innodb_autoinc_lock_mode: 1
EOF
run dolt sql-server --config ./config.yml
[ $status -eq 1 ]
[[ "$output" =~ "Variable 'innodb_autoinc_lock_mode' can't be set to the value of '1'" ]] || false
}
@test "sql-server: sanity check" {