Create and propagate MergeInfo struct.

This commit is contained in:
Nick Tobey
2023-11-21 14:35:11 -08:00
parent 887ed8c990
commit b6232a5b5e
6 changed files with 54 additions and 40 deletions

View File

@@ -51,7 +51,7 @@ var ErrUnableToMergeColumnDefaultValue = errorkinds.NewKind("unable to automatic
// table's primary index will also be rewritten. This function merges the table's artifacts (e.g. recorded
// conflicts), migrates any existing table data to the specified |mergedSch|, and merges table data from both
// sides of the merge together.
func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Schema, rewriteRows bool) (*doltdb.Table, *MergeStats, error) {
func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Schema, mergeInfo MergeInfo) (*doltdb.Table, *MergeStats, error) {
mergeTbl, err := mergeTableArtifacts(ctx, tm, tm.leftTbl)
if err != nil {
return nil, nil, err
@@ -77,11 +77,10 @@ func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Sch
}
schemasDifferentSize := len(tm.leftSch.GetAllCols().GetColumns()) != len(mergedSch.GetAllCols().GetColumns())
rebuildPrimaryIndex := rewriteRows || schemasDifferentSize || !valueMerger.leftMapping.IsIdentityMapping()
rebuidSecondaryIndexes := rewriteRows
rebuildPrimaryIndex := mergeInfo.TableRewrite || schemasDifferentSize || !valueMerger.leftMapping.IsIdentityMapping()
var stats *MergeStats
mergeTbl, stats, err = mergeProllyTableData(sqlCtx, tm, mergedSch, mergeTbl, valueMerger, rebuildPrimaryIndex, rebuidSecondaryIndexes)
mergeTbl, stats, err = mergeProllyTableData(sqlCtx, tm, mergedSch, mergeTbl, valueMerger, mergeInfo, rebuildPrimaryIndex)
if err != nil {
return nil, nil, err
}
@@ -106,13 +105,11 @@ func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Sch
// to the right-side, we apply it to the left-side by merging it into the left-side's primary index
// as well as any secondary indexes, and also checking for unique constraints incrementally. When
// conflicts are detected, this function attempts to resolve them automatically if possible, and
// if not, they are recorded as conflicts in the table's artifacts. If |rebuildPrimaryIndex| is set to
// true, then every row in the primary index will be recomputed. This is usually because the right side
// introduced a schema change. If |rebuildSecondaryIndexes| is true, then the seconary indexes will be
// rebuilt instead of being incrementally merged together. This is less efficient, but safer, especially
// when type changes have been applied to a table's schema.
func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Schema, mergeTbl *doltdb.Table, valueMerger *valueMerger, rebuildPrimaryIndex, rebuildSecondaryIndexes bool) (*doltdb.Table, *MergeStats, error) {
iter, err := threeWayDiffer(ctx, tm, valueMerger)
// if not, they are recorded as conflicts in the table's artifacts. If |rebuildIndexes| is set to
// true, then secondary indexes will be rebuilt, instead of being incrementally merged together. This
// is less efficient, but safer, especially when type changes have been applied to a table's schema.
func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Schema, mergeTbl *doltdb.Table, valueMerger *valueMerger, mergeInfo MergeInfo, rebuildPrimaryIndex bool) (*doltdb.Table, *MergeStats, error) {
iter, err := threeWayDiffer(ctx, tm, valueMerger, mergeInfo)
if err != nil {
return nil, nil, err
}
@@ -288,7 +285,7 @@ func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Sch
return nil, nil, err
}
finalIdxs, err := mergeProllySecondaryIndexes(ctx, tm, leftIdxs, rightIdxs, finalSch, finalRows, conflicts.ae, rebuildSecondaryIndexes)
finalIdxs, err := mergeProllySecondaryIndexes(ctx, tm, leftIdxs, rightIdxs, finalSch, finalRows, conflicts.ae, mergeInfo.TableRewrite)
if err != nil {
return nil, nil, err
}
@@ -314,7 +311,7 @@ func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Sch
return finalTbl, s, nil
}
func threeWayDiffer(ctx context.Context, tm *TableMerger, valueMerger *valueMerger) (*tree.ThreeWayDiffer[val.Tuple, val.TupleDesc], error) {
func threeWayDiffer(ctx context.Context, tm *TableMerger, valueMerger *valueMerger, mergeInfo MergeInfo) (*tree.ThreeWayDiffer[val.Tuple, val.TupleDesc], error) {
lr, err := tm.leftTbl.GetRowData(ctx)
if err != nil {
return nil, err
@@ -337,7 +334,9 @@ func threeWayDiffer(ctx context.Context, tm *TableMerger, valueMerger *valueMerg
ctx,
leftRows.NodeStore(),
leftRows.Tuples(),
mergeInfo.LeftSchemaChange,
rightRows.Tuples(),
mergeInfo.RightSchemaChange,
ancRows.Tuples(),
valueMerger.tryMerge,
valueMerger.keyless,

View File

@@ -126,7 +126,7 @@ func (rm *RootMerger) MergeTable(ctx *sql.Context, tblName string, opts editor.O
}
// Calculate a merge of the schemas, but don't apply it yet
mergeSch, schConflicts, tableRewrite, err := SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
mergeSch, schConflicts, mergeInfo, err := SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
if err != nil {
return nil, nil, err
}
@@ -148,7 +148,7 @@ func (rm *RootMerger) MergeTable(ctx *sql.Context, tblName string, opts editor.O
var tbl *doltdb.Table
if types.IsFormat_DOLT(tm.vrw.Format()) {
tbl, stats, err = mergeProllyTable(ctx, tm, mergeSch, tableRewrite)
tbl, stats, err = mergeProllyTable(ctx, tm, mergeSch, mergeInfo)
} else {
tbl, stats, err = mergeNomsTable(ctx, tm, mergeSch, rm.vrw, opts)
}

View File

@@ -152,10 +152,16 @@ func (c ChkConflict) String() string {
var ErrMergeWithDifferentPks = errors.New("error: cannot merge two tables with different primary keys")
type MergeInfo struct {
TableRewrite bool
LeftSchemaChange bool
RightSchemaChange bool
}
// SchemaMerge performs a three-way merge of |ourSch|, |theirSch|, and |ancSch|, and returns: the merged schema,
// any schema conflicts identified, whether moving to the new schema requires a full table rewrite, and any
// unexpected error encountered while merging the schemas.
func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch, theirSch, ancSch schema.Schema, tblName string) (sch schema.Schema, sc SchemaConflict, tableRewrite bool, err error) {
func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch, theirSch, ancSch schema.Schema, tblName string) (sch schema.Schema, sc SchemaConflict, mergeInfo MergeInfo, err error) {
// (sch - ancSch) (mergeSch - ancSch) (sch ∩ mergeSch)
sc = SchemaConflict{
TableName: tblName,
@@ -164,38 +170,38 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
// TODO: We'll remove this once it's possible to get diff and merge on different primary key sets
// TODO: decide how to merge different orders of PKS
if !schema.ArePrimaryKeySetsDiffable(format, ourSch, theirSch) || !schema.ArePrimaryKeySetsDiffable(format, ourSch, ancSch) {
return nil, SchemaConflict{}, false, ErrMergeWithDifferentPks
return nil, SchemaConflict{}, mergeInfo, ErrMergeWithDifferentPks
}
var mergedCC *schema.ColCollection
mergedCC, sc.ColConflicts, tableRewrite, err = mergeColumns(tblName, format, ourSch.GetAllCols(), theirSch.GetAllCols(), ancSch.GetAllCols())
mergedCC, sc.ColConflicts, mergeInfo, err = mergeColumns(tblName, format, ourSch.GetAllCols(), theirSch.GetAllCols(), ancSch.GetAllCols())
if err != nil {
return nil, SchemaConflict{}, false, err
return nil, SchemaConflict{}, mergeInfo, err
}
if len(sc.ColConflicts) > 0 {
return nil, sc, tableRewrite, nil
return nil, sc, mergeInfo, nil
}
var mergedIdxs schema.IndexCollection
mergedIdxs, sc.IdxConflicts = mergeIndexes(mergedCC, ourSch, theirSch, ancSch)
if len(sc.IdxConflicts) > 0 {
return nil, sc, tableRewrite, nil
return nil, sc, mergeInfo, nil
}
sch, err = schema.SchemaFromCols(mergedCC)
if err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
}
sch, err = mergeTableCollation(ctx, tblName, ancSch, ourSch, theirSch, sch)
if err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
}
// TODO: Merge conflict should have blocked any primary key ordinal changes
err = sch.SetPkOrdinals(ourSch.GetPkOrdinals())
if err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
}
_ = mergedIdxs.Iter(func(index schema.Index) (stop bool, err error) {
@@ -207,17 +213,17 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
var mergedChks []schema.Check
mergedChks, sc.ChkConflicts, err = mergeChecks(ctx, ourSch.Checks(), theirSch.Checks(), ancSch.Checks())
if err != nil {
return nil, SchemaConflict{}, false, err
return nil, SchemaConflict{}, mergeInfo, err
}
if len(sc.ChkConflicts) > 0 {
return nil, sc, false, nil
return nil, sc, mergeInfo, nil
}
// Look for invalid CHECKs
for _, chk := range mergedChks {
// CONFLICT: a CHECK now references a column that no longer exists in schema
if ok, err := isCheckReferenced(sch, chk); err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
} else if !ok {
// Append to conflicts
sc.ChkConflicts = append(sc.ChkConflicts, ChkConflict{
@@ -232,7 +238,7 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
sch.Checks().AddCheck(chk.Name(), chk.Expression(), chk.Enforced())
}
return sch, sc, tableRewrite, nil
return sch, sc, mergeInfo, nil
}
// ForeignKeysMerge performs a three-way merge of (ourRoot, theirRoot, ancRoot) and using mergeRoot to validate FKs.
@@ -369,20 +375,20 @@ func checkUnmergeableNewColumns(tblName string, columnMappings columnMappings) e
// compatible with the current stored format. The merged columns, any column conflicts, and a boolean value stating if
// a full table rewrite is needed to align the existing table rows with the new, merged schema. If any unexpected error
// occurs, then that error is returned and the other response fields should be ignored.
func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, theirCC, ancCC *schema.ColCollection) (*schema.ColCollection, []ColConflict, bool, error) {
func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, theirCC, ancCC *schema.ColCollection) (*schema.ColCollection, []ColConflict, MergeInfo, error) {
columnMappings, err := mapColumns(ourCC, theirCC, ancCC)
if err != nil {
return nil, nil, false, err
return nil, nil, MergeInfo{}, err
}
conflicts, err := checkSchemaConflicts(columnMappings)
if err != nil {
return nil, nil, false, err
return nil, nil, MergeInfo{}, err
}
err = checkUnmergeableNewColumns(tblName, columnMappings)
if err != nil {
return nil, nil, false, err
return nil, nil, MergeInfo{}, err
}
compatChecker := newTypeCompatabilityCheckerForStorageFormat(format)
@@ -486,10 +492,15 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
// Check that there are no duplicate column names or tags in the merged column set
conflicts = append(conflicts, checkForColumnConflicts(mergedColumns)...)
if conflicts != nil {
return nil, conflicts, false, nil
return nil, conflicts, MergeInfo{}, nil
}
return schema.NewColCollection(mergedColumns...), nil, tableRewrite, nil
mergeInfo := MergeInfo{
TableRewrite: tableRewrite,
LeftSchemaChange: ourSchemaChanged,
RightSchemaChange: theirSchemaChanged,
}
return schema.NewColCollection(mergedColumns...), nil, mergeInfo, nil
}
// checkForColumnConflicts iterates over |mergedColumns|, checks for duplicate column names or column tags, and returns

View File

@@ -635,8 +635,8 @@ func testMergeSchemasWithConflicts(t *testing.T, test mergeSchemaConflictTest) {
otherSch := getSchema(t, dEnv)
_, actConflicts, requiresTableRewrite, err := merge.SchemaMerge(context.Background(), types.Format_Default, mainSch, otherSch, ancSch, "test")
assert.False(t, requiresTableRewrite)
_, actConflicts, mergeInfo, err := merge.SchemaMerge(context.Background(), types.Format_Default, mainSch, otherSch, ancSch, "test")
assert.False(t, mergeInfo.TableRewrite)
if test.expectedErr != nil {
assert.True(t, errors.Is(err, test.expectedErr))
return

View File

@@ -43,18 +43,22 @@ type resolveCb func(context.Context, val.Tuple, val.Tuple, val.Tuple) (val.Tuple
func NewThreeWayDiffer[K, V ~[]byte, O Ordering[K]](
ctx context.Context,
ns NodeStore,
left, right, base StaticMap[K, V, O],
left StaticMap[K, V, O],
leftSchemaChanged bool,
right StaticMap[K, V, O],
rightSchemaChanged bool,
base StaticMap[K, V, O],
resolveCb resolveCb,
keyless bool,
order O,
) (*ThreeWayDiffer[K, O], error) {
// probably compute each of these separately
ld, err := DifferFromRoots[K](ctx, ns, ns, base.Root, left.Root, order, false)
ld, err := DifferFromRoots[K](ctx, ns, ns, base.Root, left.Root, order, leftSchemaChanged)
if err != nil {
return nil, err
}
rd, err := DifferFromRoots[K](ctx, ns, ns, base.Root, right.Root, order, false)
rd, err := DifferFromRoots[K](ctx, ns, ns, base.Root, right.Root, order, rightSchemaChanged)
if err != nil {
return nil, err
}

View File

@@ -192,7 +192,7 @@ func TestThreeWayDiffer(t *testing.T) {
left := newTestMap(t, ctx, tt.left, ns, valDesc)
right := newTestMap(t, ctx, tt.right, ns, valDesc)
iter, err := NewThreeWayDiffer(ctx, ns, left, right, base, testResolver(t, ns, valDesc, val.NewTupleBuilder(valDesc)), false, keyDesc)
iter, err := NewThreeWayDiffer(ctx, ns, left, false, right, false, base, testResolver(t, ns, valDesc, val.NewTupleBuilder(valDesc)), false, keyDesc)
require.NoError(t, err)
var cmp []testDiff