Merge pull request #3812 from dolthub/andy/merge-panic

[no-release-notes] doltcore/merge: Fix race condition, add concurrency test
This commit is contained in:
AndyA
2022-07-13 17:48:08 -07:00
committed by GitHub
4 changed files with 174 additions and 14 deletions

View File

@@ -16,14 +16,21 @@ package merge_test
import (
"context"
"fmt"
"io"
"math/rand"
"strconv"
"strings"
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
cmd "github.com/dolthub/dolt/go/cmd/dolt/commands"
"github.com/dolthub/dolt/go/cmd/dolt/commands/cnfcmds"
"github.com/dolthub/dolt/go/cmd/dolt/commands/engine"
dtu "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
@@ -246,3 +253,147 @@ func TestMergeConflicts(t *testing.T) {
})
}
}
const (
concurrentScale = 10_000
concurrentIters = 100
concurrentThreads = 8
concurrentTable = "CREATE TABLE concurrent (" +
" id int NOT NULL," +
" c0 int NOT NULL," +
" c1 int NOT NULL," +
" PRIMARY KEY (id)," +
" KEY `idx0` (c0)," +
" KEY `idx1` (c1, c0)" +
");"
)
// TestMergeConcurrency runs current merges via
// concurrent SQL transactions.
func TestMergeConcurrency(t *testing.T) {
ctx := context.Background()
dEnv := setupConcurrencyTest(t, ctx)
eng := engineFromEnvironment(ctx, dEnv)
eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < concurrentThreads; i++ {
seed := i
eg.Go(func() error {
return runConcurrentTxs(ctx, eng, seed)
})
}
assert.NoError(t, eg.Wait())
}
func runConcurrentTxs(ctx context.Context, eng *engine.SqlEngine, seed int) error {
sess, err := eng.NewDoltSession(ctx, sql.NewBaseSession())
if err != nil {
return err
}
sctx := sql.NewContext(ctx, sql.WithSession(sess))
sctx.SetCurrentDatabase("dolt")
sctx.Session.SetClient(sql.Client{User: "root", Address: "%"})
rnd := rand.New(rand.NewSource(int64(seed)))
zipf := rand.NewZipf(rnd, 1.1, 1.0, concurrentScale)
for i := 0; i < concurrentIters; i++ {
if err := executeQuery(sctx, eng, "BEGIN"); err != nil {
return err
}
id := zipf.Uint64()
sum := fmt.Sprintf("SELECT sum(c0), sum(c1) "+
"FROM concurrent WHERE id BETWEEN %d AND %d", id, id+10)
update := fmt.Sprintf("UPDATE concurrent "+
"SET c0 = c0 + %d, c1 = c1 + %d WHERE id = %d",
seed, seed, id)
if err := executeQuery(sctx, eng, sum); err != nil {
return err
}
if err := executeQuery(sctx, eng, update); err != nil {
return err
}
if err := executeQuery(sctx, eng, sum); err != nil {
return err
}
if err := executeQuery(sctx, eng, "COMMIT"); err != nil {
// allow serialization errors
if !sql.ErrLockDeadlock.Is(err) {
return err
}
}
}
return nil
}
func setupConcurrencyTest(t *testing.T, ctx context.Context) (dEnv *env.DoltEnv) {
dEnv = dtu.CreateTestEnv()
eng := engineFromEnvironment(ctx, dEnv)
sqlCtx, err := eng.NewContext(ctx)
require.NoError(t, err)
sqlCtx.Session.SetClient(sql.Client{
User: "root", Address: "%",
})
require.NoError(t, executeQuery(sqlCtx, eng, concurrentTable))
require.NoError(t, executeQuery(sqlCtx, eng, generateTestData()))
return
}
func engineFromEnvironment(ctx context.Context, dEnv *env.DoltEnv) (eng *engine.SqlEngine) {
mrEnv, err := env.DoltEnvAsMultiEnv(ctx, dEnv)
if err != nil {
panic(err)
}
eng, err = engine.NewSqlEngine(ctx, mrEnv, engine.FormatNull, &engine.SqlEngineConfig{
InitialDb: "dolt",
IsReadOnly: false,
PrivFilePath: "",
ServerUser: "root",
ServerPass: "",
Autocommit: true,
})
if err != nil {
panic(err)
}
return
}
func executeQuery(ctx *sql.Context, eng *engine.SqlEngine, query string) error {
_, iter, err := eng.Query(ctx, query)
if err != nil {
return err
}
for {
_, err = iter.Next(ctx)
if err == io.EOF {
break
}
if err != nil {
return err
}
}
return iter.Close(ctx) // tx commit
}
func generateTestData() string {
var sb strings.Builder
sb.WriteString("INSERT INTO concurrent VALUES ")
sb.WriteString("(0, 0, 0")
for i := 1; i < concurrentScale; i++ {
c0 := rand.Intn(concurrentScale)
c1 := rand.Intn(concurrentScale)
sb.WriteString("), (")
sb.WriteString(strconv.Itoa(i))
sb.WriteString(", ")
sb.WriteString(strconv.Itoa(c0))
sb.WriteString(", ")
sb.WriteString(strconv.Itoa(c1))
}
sb.WriteString(");")
return sb.String()
}

View File

@@ -60,13 +60,22 @@ func mergeTableData(
) (*doltdb.Table, *MergeStats, error) {
group, gCtx := errgroup.WithContext(ctx)
indexEdits := make(chan indexEdit, 128)
conflicts := make(chan confVals, 128)
var mergedData durable.Index
var (
finalTbl *doltdb.Table
finalRows durable.Index
updatedRootIndexSet durable.IndexSet
updatedMergeIndexSet durable.IndexSet
p conflictProcessor
indexEdits = make(chan indexEdit, 128)
conflicts = make(chan confVals, 128)
)
group.Go(func() error {
var err error
updatedTbl, mergedData, err = mergeProllyRowData(
finalTbl, finalRows, err = mergeProllyRowData(
gCtx,
postMergeSchema, rootSchema, mergeSchema, ancSchema,
tbl, mergeTbl, updatedTbl,
@@ -90,8 +99,6 @@ func mergeTableData(
return nil, nil, err
}
var updatedRootIndexSet durable.IndexSet
var updatedMergeIndexSet durable.IndexSet
group.Go(func() error {
var err error
updatedRootIndexSet, updatedMergeIndexSet, err = updateProllySecondaryIndexes(gCtx, indexEdits, rootSchema, mergeSchema, tbl, mergeTbl, rootIndexSet, mergeIndexSet)
@@ -105,7 +112,6 @@ func mergeTableData(
artM := durable.ProllyMapFromArtifactIndex(artIdx)
artifactEditor := artM.Editor()
var p conflictProcessor
if can, err := isNewConflictsCompatible(ctx, tbl, tblName, ancSchema, rootSchema, mergeSchema); err != nil {
return nil, nil, err
} else if can {
@@ -135,13 +141,13 @@ func mergeTableData(
return nil, nil, err
}
updatedTbl, err = mergeProllySecondaryIndexes(
finalTbl, err = mergeProllySecondaryIndexes(
ctx,
vrw,
ns,
postMergeSchema, rootSchema, mergeSchema, ancSchema,
mergedData,
tbl, mergeTbl, updatedTbl,
finalRows,
tbl, mergeTbl, finalTbl,
ancIndexSet,
artifactEditor,
mergeRootIsh,
@@ -156,14 +162,15 @@ func mergeTableData(
}
artIdx = durable.ArtifactIndexFromProllyMap(artifactMap)
updatedTbl, err = updatedTbl.SetArtifacts(ctx, artIdx)
finalTbl, err = finalTbl.SetArtifacts(ctx, artIdx)
if err != nil {
return nil, nil, err
}
// TODO (dhruv): populate Adds, Deletes, Modifications
stats := &MergeStats{Operation: TableModified}
return updatedTbl, stats, nil
return finalTbl, stats, nil
}
func mergeTableArtifacts(ctx context.Context, tblName string, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, error) {

View File

@@ -37,7 +37,7 @@ const (
maxTxCommitRetries = 5
)
var ErrRetryTransaction = errors.New("this transaction conflicts with a committed transaction from another client, please retry")
var ErrRetryTransaction = errors.New("this transaction conflicts with a committed transaction from another client")
var ErrUnresolvedConflictsCommit = errors.New("Merge conflict detected, transaction rolled back. Merge conflicts must be resolved using the dolt_conflicts tables before committing a transaction. To commit transactions with merge conflicts, set @@dolt_allow_commit_conflicts = 1")
var ErrUnresolvedConstraintViolationsCommit = errors.New("Committing this transaction resulted in a working set with constraint violations, transaction rolled back. " +
"This constraint violation may be the result of a previous merge or the result of transaction sequencing. " +

View File

@@ -73,7 +73,9 @@ func mergeOrderedTrees[K, V ~[]byte, O ordering[K], S message.Serializer](
serializer S,
valDesc val.TupleDesc,
) (orderedTree[K, V, O], error) {
cfn := base.compareItems
cfn := func(left, right tree.Item) int {
return base.order.Compare(K(left), K(right))
}
root, err := tree.ThreeWayMerge(ctx, base.ns, l.root, r.root, base.root, cfn, cb, serializer, valDesc)
if err != nil {
return orderedTree[K, V, O]{}, err