diff --git a/go/libraries/doltcore/merge/integration_test.go b/go/libraries/doltcore/merge/integration_test.go index 0b37aa9610..8cbc56911e 100644 --- a/go/libraries/doltcore/merge/integration_test.go +++ b/go/libraries/doltcore/merge/integration_test.go @@ -16,14 +16,21 @@ package merge_test import ( "context" + "fmt" + "io" + "math/rand" + "strconv" + "strings" "testing" "github.com/dolthub/go-mysql-server/sql" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" cmd "github.com/dolthub/dolt/go/cmd/dolt/commands" "github.com/dolthub/dolt/go/cmd/dolt/commands/cnfcmds" + "github.com/dolthub/dolt/go/cmd/dolt/commands/engine" dtu "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" @@ -246,3 +253,147 @@ func TestMergeConflicts(t *testing.T) { }) } } + +const ( + concurrentScale = 10_000 + concurrentIters = 100 + concurrentThreads = 8 + concurrentTable = "CREATE TABLE concurrent (" + + " id int NOT NULL," + + " c0 int NOT NULL," + + " c1 int NOT NULL," + + " PRIMARY KEY (id)," + + " KEY `idx0` (c0)," + + " KEY `idx1` (c1, c0)" + + ");" +) + +// TestMergeConcurrency runs current merges via +// concurrent SQL transactions. +func TestMergeConcurrency(t *testing.T) { + ctx := context.Background() + dEnv := setupConcurrencyTest(t, ctx) + eng := engineFromEnvironment(ctx, dEnv) + + eg, ctx := errgroup.WithContext(ctx) + for i := 0; i < concurrentThreads; i++ { + seed := i + eg.Go(func() error { + return runConcurrentTxs(ctx, eng, seed) + }) + } + assert.NoError(t, eg.Wait()) +} + +func runConcurrentTxs(ctx context.Context, eng *engine.SqlEngine, seed int) error { + sess, err := eng.NewDoltSession(ctx, sql.NewBaseSession()) + if err != nil { + return err + } + sctx := sql.NewContext(ctx, sql.WithSession(sess)) + sctx.SetCurrentDatabase("dolt") + sctx.Session.SetClient(sql.Client{User: "root", Address: "%"}) + + rnd := rand.New(rand.NewSource(int64(seed))) + zipf := rand.NewZipf(rnd, 1.1, 1.0, concurrentScale) + + for i := 0; i < concurrentIters; i++ { + if err := executeQuery(sctx, eng, "BEGIN"); err != nil { + return err + } + + id := zipf.Uint64() + sum := fmt.Sprintf("SELECT sum(c0), sum(c1) "+ + "FROM concurrent WHERE id BETWEEN %d AND %d", id, id+10) + update := fmt.Sprintf("UPDATE concurrent "+ + "SET c0 = c0 + %d, c1 = c1 + %d WHERE id = %d", + seed, seed, id) + + if err := executeQuery(sctx, eng, sum); err != nil { + return err + } + if err := executeQuery(sctx, eng, update); err != nil { + return err + } + if err := executeQuery(sctx, eng, sum); err != nil { + return err + } + if err := executeQuery(sctx, eng, "COMMIT"); err != nil { + // allow serialization errors + if !sql.ErrLockDeadlock.Is(err) { + return err + } + } + } + return nil +} + +func setupConcurrencyTest(t *testing.T, ctx context.Context) (dEnv *env.DoltEnv) { + dEnv = dtu.CreateTestEnv() + + eng := engineFromEnvironment(ctx, dEnv) + sqlCtx, err := eng.NewContext(ctx) + require.NoError(t, err) + sqlCtx.Session.SetClient(sql.Client{ + User: "root", Address: "%", + }) + + require.NoError(t, executeQuery(sqlCtx, eng, concurrentTable)) + require.NoError(t, executeQuery(sqlCtx, eng, generateTestData())) + return +} + +func engineFromEnvironment(ctx context.Context, dEnv *env.DoltEnv) (eng *engine.SqlEngine) { + mrEnv, err := env.DoltEnvAsMultiEnv(ctx, dEnv) + if err != nil { + panic(err) + } + + eng, err = engine.NewSqlEngine(ctx, mrEnv, engine.FormatNull, &engine.SqlEngineConfig{ + InitialDb: "dolt", + IsReadOnly: false, + PrivFilePath: "", + ServerUser: "root", + ServerPass: "", + Autocommit: true, + }) + if err != nil { + panic(err) + } + return +} + +func executeQuery(ctx *sql.Context, eng *engine.SqlEngine, query string) error { + _, iter, err := eng.Query(ctx, query) + if err != nil { + return err + } + for { + _, err = iter.Next(ctx) + if err == io.EOF { + break + } + if err != nil { + return err + } + } + return iter.Close(ctx) // tx commit +} + +func generateTestData() string { + var sb strings.Builder + sb.WriteString("INSERT INTO concurrent VALUES ") + sb.WriteString("(0, 0, 0") + for i := 1; i < concurrentScale; i++ { + c0 := rand.Intn(concurrentScale) + c1 := rand.Intn(concurrentScale) + sb.WriteString("), (") + sb.WriteString(strconv.Itoa(i)) + sb.WriteString(", ") + sb.WriteString(strconv.Itoa(c0)) + sb.WriteString(", ") + sb.WriteString(strconv.Itoa(c1)) + } + sb.WriteString(");") + return sb.String() +} diff --git a/go/libraries/doltcore/merge/merge_prolly.go b/go/libraries/doltcore/merge/merge_prolly.go index 7076f86150..d22f16a991 100644 --- a/go/libraries/doltcore/merge/merge_prolly.go +++ b/go/libraries/doltcore/merge/merge_prolly.go @@ -60,13 +60,22 @@ func mergeTableData( ) (*doltdb.Table, *MergeStats, error) { group, gCtx := errgroup.WithContext(ctx) - indexEdits := make(chan indexEdit, 128) - conflicts := make(chan confVals, 128) - var mergedData durable.Index + var ( + finalTbl *doltdb.Table + finalRows durable.Index + + updatedRootIndexSet durable.IndexSet + updatedMergeIndexSet durable.IndexSet + + p conflictProcessor + + indexEdits = make(chan indexEdit, 128) + conflicts = make(chan confVals, 128) + ) group.Go(func() error { var err error - updatedTbl, mergedData, err = mergeProllyRowData( + finalTbl, finalRows, err = mergeProllyRowData( gCtx, postMergeSchema, rootSchema, mergeSchema, ancSchema, tbl, mergeTbl, updatedTbl, @@ -90,8 +99,6 @@ func mergeTableData( return nil, nil, err } - var updatedRootIndexSet durable.IndexSet - var updatedMergeIndexSet durable.IndexSet group.Go(func() error { var err error updatedRootIndexSet, updatedMergeIndexSet, err = updateProllySecondaryIndexes(gCtx, indexEdits, rootSchema, mergeSchema, tbl, mergeTbl, rootIndexSet, mergeIndexSet) @@ -105,7 +112,6 @@ func mergeTableData( artM := durable.ProllyMapFromArtifactIndex(artIdx) artifactEditor := artM.Editor() - var p conflictProcessor if can, err := isNewConflictsCompatible(ctx, tbl, tblName, ancSchema, rootSchema, mergeSchema); err != nil { return nil, nil, err } else if can { @@ -135,13 +141,13 @@ func mergeTableData( return nil, nil, err } - updatedTbl, err = mergeProllySecondaryIndexes( + finalTbl, err = mergeProllySecondaryIndexes( ctx, vrw, ns, postMergeSchema, rootSchema, mergeSchema, ancSchema, - mergedData, - tbl, mergeTbl, updatedTbl, + finalRows, + tbl, mergeTbl, finalTbl, ancIndexSet, artifactEditor, mergeRootIsh, @@ -156,14 +162,15 @@ func mergeTableData( } artIdx = durable.ArtifactIndexFromProllyMap(artifactMap) - updatedTbl, err = updatedTbl.SetArtifacts(ctx, artIdx) + finalTbl, err = finalTbl.SetArtifacts(ctx, artIdx) if err != nil { return nil, nil, err } // TODO (dhruv): populate Adds, Deletes, Modifications stats := &MergeStats{Operation: TableModified} - return updatedTbl, stats, nil + + return finalTbl, stats, nil } func mergeTableArtifacts(ctx context.Context, tblName string, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, error) { diff --git a/go/libraries/doltcore/sqle/dsess/transactions.go b/go/libraries/doltcore/sqle/dsess/transactions.go index fc5331e85a..58279452ee 100644 --- a/go/libraries/doltcore/sqle/dsess/transactions.go +++ b/go/libraries/doltcore/sqle/dsess/transactions.go @@ -37,7 +37,7 @@ const ( maxTxCommitRetries = 5 ) -var ErrRetryTransaction = errors.New("this transaction conflicts with a committed transaction from another client, please retry") +var ErrRetryTransaction = errors.New("this transaction conflicts with a committed transaction from another client") var ErrUnresolvedConflictsCommit = errors.New("Merge conflict detected, transaction rolled back. Merge conflicts must be resolved using the dolt_conflicts tables before committing a transaction. To commit transactions with merge conflicts, set @@dolt_allow_commit_conflicts = 1") var ErrUnresolvedConstraintViolationsCommit = errors.New("Committing this transaction resulted in a working set with constraint violations, transaction rolled back. " + "This constraint violation may be the result of a previous merge or the result of transaction sequencing. " + diff --git a/go/store/prolly/ordered_tree.go b/go/store/prolly/ordered_tree.go index 7a25c7f78a..9682feef8e 100644 --- a/go/store/prolly/ordered_tree.go +++ b/go/store/prolly/ordered_tree.go @@ -73,7 +73,9 @@ func mergeOrderedTrees[K, V ~[]byte, O ordering[K], S message.Serializer]( serializer S, valDesc val.TupleDesc, ) (orderedTree[K, V, O], error) { - cfn := base.compareItems + cfn := func(left, right tree.Item) int { + return base.order.Compare(K(left), K(right)) + } root, err := tree.ThreeWayMerge(ctx, base.ns, l.root, r.root, base.root, cfn, cb, serializer, valDesc) if err != nil { return orderedTree[K, V, O]{}, err