Merge pull request #7044 from dolthub/nicktobey/idea

Track whether or not rows actually need to be remapped to the result schema.
This commit is contained in:
Nick Tobey
2023-11-22 19:18:29 -08:00
committed by GitHub
4 changed files with 138 additions and 109 deletions

View File

@@ -51,7 +51,7 @@ var ErrUnableToMergeColumnDefaultValue = errorkinds.NewKind("unable to automatic
// table's primary index will also be rewritten. This function merges the table's artifacts (e.g. recorded
// conflicts), migrates any existing table data to the specified |mergedSch|, and merges table data from both
// sides of the merge together.
func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Schema, rewriteRows bool) (*doltdb.Table, *MergeStats, error) {
func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Schema, mergeInfo MergeInfo) (*doltdb.Table, *MergeStats, error) {
mergeTbl, err := mergeTableArtifacts(ctx, tm, tm.leftTbl)
if err != nil {
return nil, nil, err
@@ -69,6 +69,14 @@ func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Sch
leftRows := durable.ProllyMapFromIndex(lr)
valueMerger := newValueMerger(mergedSch, tm.leftSch, tm.rightSch, tm.ancSch, leftRows.Pool(), tm.ns)
if !valueMerger.leftMapping.IsIdentityMapping() {
mergeInfo.LeftNeedsRewrite = true
}
if !valueMerger.rightMapping.IsIdentityMapping() {
mergeInfo.RightNeedsRewrite = true
}
// We need a sql.Context to apply column default values in merges; if we don't have one already,
// create one, since this code also gets called from the CLI merge code path.
sqlCtx, ok := ctx.(*sql.Context)
@@ -76,12 +84,8 @@ func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Sch
sqlCtx = sql.NewContext(ctx)
}
schemasDifferentSize := len(tm.leftSch.GetAllCols().GetColumns()) != len(mergedSch.GetAllCols().GetColumns())
rebuildPrimaryIndex := rewriteRows || schemasDifferentSize || !valueMerger.leftMapping.IsIdentityMapping()
rebuidSecondaryIndexes := rewriteRows
var stats *MergeStats
mergeTbl, stats, err = mergeProllyTableData(sqlCtx, tm, mergedSch, mergeTbl, valueMerger, rebuildPrimaryIndex, rebuidSecondaryIndexes)
mergeTbl, stats, err = mergeProllyTableData(sqlCtx, tm, mergedSch, mergeTbl, valueMerger, mergeInfo)
if err != nil {
return nil, nil, err
}
@@ -106,12 +110,8 @@ func mergeProllyTable(ctx context.Context, tm *TableMerger, mergedSch schema.Sch
// to the right-side, we apply it to the left-side by merging it into the left-side's primary index
// as well as any secondary indexes, and also checking for unique constraints incrementally. When
// conflicts are detected, this function attempts to resolve them automatically if possible, and
// if not, they are recorded as conflicts in the table's artifacts. If |rebuildPrimaryIndex| is set to
// true, then every row in the primary index will be recomputed. This is usually because the right side
// introduced a schema change. If |rebuildSecondaryIndexes| is true, then the seconary indexes will be
// rebuilt instead of being incrementally merged together. This is less efficient, but safer, especially
// when type changes have been applied to a table's schema.
func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Schema, mergeTbl *doltdb.Table, valueMerger *valueMerger, rebuildPrimaryIndex, rebuildSecondaryIndexes bool) (*doltdb.Table, *MergeStats, error) {
// if not, they are recorded as conflicts in the table's artifacts.
func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Schema, mergeTbl *doltdb.Table, valueMerger *valueMerger, mergeInfo MergeInfo) (*doltdb.Table, *MergeStats, error) {
iter, err := threeWayDiffer(ctx, tm, valueMerger)
if err != nil {
return nil, nil, err
@@ -131,11 +131,11 @@ func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Sch
keyless := schema.IsKeyless(tm.leftSch)
pri, err := newPrimaryMerger(leftEditor, tm, valueMerger, finalSch)
pri, err := newPrimaryMerger(leftEditor, tm, valueMerger, finalSch, mergeInfo)
if err != nil {
return nil, nil, err
}
sec, err := newSecondaryMerger(ctx, tm, valueMerger, finalSch)
sec, err := newSecondaryMerger(ctx, tm, valueMerger, finalSch, mergeInfo)
if err != nil {
return nil, nil, err
}
@@ -196,11 +196,9 @@ func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Sch
// In the event that the right side introduced a schema change, account for it here.
// We still have to migrate when the diff is `tree.DiffOpLeftModify` because of the corner case where
// the right side contains a schema change but the changed column is null, so row bytes don't change.
if rebuildPrimaryIndex {
err = pri.merge(ctx, diff, tm.leftSch)
if err != nil {
return nil, nil, err
}
err = pri.merge(ctx, diff, tm.leftSch)
if err != nil {
return nil, nil, err
}
case tree.DiffOpDivergentModifyConflict, tree.DiffOpDivergentDeleteConflict:
@@ -288,7 +286,7 @@ func mergeProllyTableData(ctx *sql.Context, tm *TableMerger, finalSch schema.Sch
return nil, nil, err
}
finalIdxs, err := mergeProllySecondaryIndexes(ctx, tm, leftIdxs, rightIdxs, finalSch, finalRows, conflicts.ae, rebuildSecondaryIndexes)
finalIdxs, err := mergeProllySecondaryIndexes(ctx, tm, leftIdxs, rightIdxs, finalSch, finalRows, conflicts.ae, mergeInfo.InvalidateSecondaryIndexes)
if err != nil {
return nil, nil, err
}
@@ -1014,14 +1012,16 @@ type primaryMerger struct {
valueMerger *valueMerger
tableMerger *TableMerger
finalSch schema.Schema
mergeInfo MergeInfo
}
func newPrimaryMerger(leftEditor *prolly.MutableMap, tableMerger *TableMerger, valueMerger *valueMerger, finalSch schema.Schema) (*primaryMerger, error) {
func newPrimaryMerger(leftEditor *prolly.MutableMap, tableMerger *TableMerger, valueMerger *valueMerger, finalSch schema.Schema, mergeInfo MergeInfo) (*primaryMerger, error) {
return &primaryMerger{
mut: leftEditor,
valueMerger: valueMerger,
tableMerger: tableMerger,
finalSch: finalSch,
mergeInfo: mergeInfo,
}, nil
}
@@ -1042,28 +1042,31 @@ func (m *primaryMerger) merge(ctx *sql.Context, diff tree.ThreeWayDiff, sourceSc
return fmt.Errorf("cannot merge keyless tables with reordered columns")
}
} else {
defaults, err := resolveDefaults(ctx, m.tableMerger.name, m.finalSch, m.tableMerger.rightSch)
if err != nil {
return err
}
// Remapping when there's no schema change is harmless, but slow.
if m.mergeInfo.RightNeedsRewrite {
defaults, err := resolveDefaults(ctx, m.tableMerger.name, m.finalSch, m.tableMerger.rightSch)
if err != nil {
return err
}
tempTupleValue, err := remapTupleWithColumnDefaults(
ctx,
diff.Key,
diff.Right,
sourceSch.GetValueDescriptor(),
m.valueMerger.rightMapping,
m.tableMerger,
m.tableMerger.rightSch,
m.finalSch,
defaults,
m.valueMerger.syncPool,
true,
)
if err != nil {
return err
tempTupleValue, err := remapTupleWithColumnDefaults(
ctx,
diff.Key,
diff.Right,
sourceSch.GetValueDescriptor(),
m.valueMerger.rightMapping,
m.tableMerger,
m.tableMerger.rightSch,
m.finalSch,
defaults,
m.valueMerger.syncPool,
true,
)
if err != nil {
return err
}
newTupleValue = tempTupleValue
}
newTupleValue = tempTupleValue
}
return m.mut.Put(ctx, diff.Key, newTupleValue)
case tree.DiffOpRightDelete:
@@ -1100,6 +1103,10 @@ func (m *primaryMerger) merge(ctx *sql.Context, diff tree.ThreeWayDiff, sourceSc
return m.mut.Put(ctx, diff.Key, merged)
case tree.DiffOpLeftAdd, tree.DiffOpLeftModify, tree.DiffOpDivergentModifyConflict, tree.DiffOpDivergentDeleteConflict:
// Remapping when there's no schema change is harmless, but slow.
if !m.mergeInfo.LeftNeedsRewrite {
return nil
}
// If the right side has a schema change, then newly added rows from the left must be migrated to the new schema.
// Rows with unresolvable conflicts must also be migrated to the new schema so that they can resolved manually.
if diff.Left == nil {
@@ -1218,11 +1225,12 @@ type secondaryMerger struct {
valueMerger *valueMerger
mergedSchema schema.Schema
tableMerger *TableMerger
mergeInfo MergeInfo
}
const secondaryMergerPendingSize = 650_000
func newSecondaryMerger(ctx *sql.Context, tm *TableMerger, valueMerger *valueMerger, mergedSchema schema.Schema) (*secondaryMerger, error) {
func newSecondaryMerger(ctx *sql.Context, tm *TableMerger, valueMerger *valueMerger, mergedSchema schema.Schema, mergeInfo MergeInfo) (*secondaryMerger, error) {
ls, err := tm.leftTbl.GetIndexSet(ctx)
if err != nil {
return nil, err
@@ -1246,11 +1254,15 @@ func newSecondaryMerger(ctx *sql.Context, tm *TableMerger, valueMerger *valueMer
valueMerger: valueMerger,
mergedSchema: mergedSchema,
tableMerger: tm,
mergeInfo: mergeInfo,
}, nil
}
func (m *secondaryMerger) merge(ctx *sql.Context, diff tree.ThreeWayDiff, leftSchema, rightSchema schema.Schema, tm *TableMerger, finalSchema schema.Schema) error {
var err error
if m.mergeInfo.InvalidateSecondaryIndexes {
return nil
}
for _, idx := range m.leftIdxes {
switch diff.Op {
case tree.DiffOpDivergentModifyResolved:
@@ -1264,57 +1276,59 @@ func (m *secondaryMerger) merge(ctx *sql.Context, diff tree.ThreeWayDiff, leftSc
newTupleValue := diff.Right
baseTupleValue := diff.Base
if schema.IsKeyless(rightSchema) {
if m.valueMerger.rightMapping.IsIdentityMapping() == false {
return fmt.Errorf("cannot merge keyless tables with reordered columns")
}
} else {
defaults, err := resolveDefaults(ctx, m.tableMerger.name, m.mergedSchema, m.tableMerger.rightSch)
if err != nil {
return err
}
// Convert right value to result schema
tempTupleValue, err := remapTupleWithColumnDefaults(
ctx,
diff.Key,
diff.Right,
m.valueMerger.rightSchema.GetValueDescriptor(),
m.valueMerger.rightMapping,
m.tableMerger,
m.tableMerger.rightSch,
m.mergedSchema,
defaults,
m.valueMerger.syncPool,
true,
)
if err != nil {
return err
}
newTupleValue = tempTupleValue
if diff.Base != nil {
defaults, err := resolveDefaults(ctx, m.tableMerger.name, m.mergedSchema, m.tableMerger.ancSch)
if m.mergeInfo.RightNeedsRewrite {
if schema.IsKeyless(rightSchema) {
if m.valueMerger.rightMapping.IsIdentityMapping() == false {
return fmt.Errorf("cannot merge keyless tables with reordered columns")
}
} else {
defaults, err := resolveDefaults(ctx, m.tableMerger.name, m.mergedSchema, m.tableMerger.rightSch)
if err != nil {
return err
}
// Convert base value to result schema
baseTupleValue, err = remapTupleWithColumnDefaults(
// Convert right value to result schema
tempTupleValue, err := remapTupleWithColumnDefaults(
ctx,
diff.Key,
diff.Base,
// Only the right side was modified, so the base schema must be the same as the left schema
leftSchema.GetValueDescriptor(),
m.valueMerger.baseMapping,
tm,
m.tableMerger.ancSch,
finalSchema,
diff.Right,
m.valueMerger.rightSchema.GetValueDescriptor(),
m.valueMerger.rightMapping,
m.tableMerger,
m.tableMerger.rightSch,
m.mergedSchema,
defaults,
m.valueMerger.syncPool,
false)
true,
)
if err != nil {
return err
}
newTupleValue = tempTupleValue
if diff.Base != nil {
defaults, err := resolveDefaults(ctx, m.tableMerger.name, m.mergedSchema, m.tableMerger.ancSch)
if err != nil {
return err
}
// Convert base value to result schema
baseTupleValue, err = remapTupleWithColumnDefaults(
ctx,
diff.Key,
diff.Base,
// Only the right side was modified, so the base schema must be the same as the left schema
leftSchema.GetValueDescriptor(),
m.valueMerger.baseMapping,
tm,
m.tableMerger.ancSch,
finalSchema,
defaults,
m.valueMerger.syncPool,
false)
if err != nil {
return err
}
}
}
}

View File

@@ -126,7 +126,7 @@ func (rm *RootMerger) MergeTable(ctx *sql.Context, tblName string, opts editor.O
}
// Calculate a merge of the schemas, but don't apply it yet
mergeSch, schConflicts, tableRewrite, err := SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
mergeSch, schConflicts, mergeInfo, err := SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
if err != nil {
return nil, nil, err
}
@@ -148,7 +148,7 @@ func (rm *RootMerger) MergeTable(ctx *sql.Context, tblName string, opts editor.O
var tbl *doltdb.Table
if types.IsFormat_DOLT(tm.vrw.Format()) {
tbl, stats, err = mergeProllyTable(ctx, tm, mergeSch, tableRewrite)
tbl, stats, err = mergeProllyTable(ctx, tm, mergeSch, mergeInfo)
} else {
tbl, stats, err = mergeNomsTable(ctx, tm, mergeSch, rm.vrw, opts)
}

View File

@@ -155,7 +155,7 @@ var ErrMergeWithDifferentPks = errors.New("error: cannot merge two tables with d
// SchemaMerge performs a three-way merge of |ourSch|, |theirSch|, and |ancSch|, and returns: the merged schema,
// any schema conflicts identified, whether moving to the new schema requires a full table rewrite, and any
// unexpected error encountered while merging the schemas.
func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch, theirSch, ancSch schema.Schema, tblName string) (sch schema.Schema, sc SchemaConflict, tableRewrite bool, err error) {
func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch, theirSch, ancSch schema.Schema, tblName string) (sch schema.Schema, sc SchemaConflict, mergeInfo MergeInfo, err error) {
// (sch - ancSch) (mergeSch - ancSch) (sch ∩ mergeSch)
sc = SchemaConflict{
TableName: tblName,
@@ -164,38 +164,38 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
// TODO: We'll remove this once it's possible to get diff and merge on different primary key sets
// TODO: decide how to merge different orders of PKS
if !schema.ArePrimaryKeySetsDiffable(format, ourSch, theirSch) || !schema.ArePrimaryKeySetsDiffable(format, ourSch, ancSch) {
return nil, SchemaConflict{}, false, ErrMergeWithDifferentPks
return nil, SchemaConflict{}, mergeInfo, ErrMergeWithDifferentPks
}
var mergedCC *schema.ColCollection
mergedCC, sc.ColConflicts, tableRewrite, err = mergeColumns(tblName, format, ourSch.GetAllCols(), theirSch.GetAllCols(), ancSch.GetAllCols())
mergedCC, sc.ColConflicts, mergeInfo, err = mergeColumns(tblName, format, ourSch.GetAllCols(), theirSch.GetAllCols(), ancSch.GetAllCols())
if err != nil {
return nil, SchemaConflict{}, false, err
return nil, SchemaConflict{}, mergeInfo, err
}
if len(sc.ColConflicts) > 0 {
return nil, sc, tableRewrite, nil
return nil, sc, mergeInfo, nil
}
var mergedIdxs schema.IndexCollection
mergedIdxs, sc.IdxConflicts = mergeIndexes(mergedCC, ourSch, theirSch, ancSch)
if len(sc.IdxConflicts) > 0 {
return nil, sc, tableRewrite, nil
return nil, sc, mergeInfo, nil
}
sch, err = schema.SchemaFromCols(mergedCC)
if err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
}
sch, err = mergeTableCollation(ctx, tblName, ancSch, ourSch, theirSch, sch)
if err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
}
// TODO: Merge conflict should have blocked any primary key ordinal changes
err = sch.SetPkOrdinals(ourSch.GetPkOrdinals())
if err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
}
_ = mergedIdxs.Iter(func(index schema.Index) (stop bool, err error) {
@@ -207,17 +207,17 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
var mergedChks []schema.Check
mergedChks, sc.ChkConflicts, err = mergeChecks(ctx, ourSch.Checks(), theirSch.Checks(), ancSch.Checks())
if err != nil {
return nil, SchemaConflict{}, false, err
return nil, SchemaConflict{}, mergeInfo, err
}
if len(sc.ChkConflicts) > 0 {
return nil, sc, false, nil
return nil, sc, mergeInfo, nil
}
// Look for invalid CHECKs
for _, chk := range mergedChks {
// CONFLICT: a CHECK now references a column that no longer exists in schema
if ok, err := isCheckReferenced(sch, chk); err != nil {
return nil, sc, false, err
return nil, sc, mergeInfo, err
} else if !ok {
// Append to conflicts
sc.ChkConflicts = append(sc.ChkConflicts, ChkConflict{
@@ -232,7 +232,7 @@ func SchemaMerge(ctx context.Context, format *storetypes.NomsBinFormat, ourSch,
sch.Checks().AddCheck(chk.Name(), chk.Expression(), chk.Enforced())
}
return sch, sc, tableRewrite, nil
return sch, sc, mergeInfo, nil
}
// ForeignKeysMerge performs a three-way merge of (ourRoot, theirRoot, ancRoot) and using mergeRoot to validate FKs.
@@ -361,6 +361,12 @@ func checkUnmergeableNewColumns(tblName string, columnMappings columnMappings) e
return nil
}
type MergeInfo struct {
LeftNeedsRewrite bool
RightNeedsRewrite bool
InvalidateSecondaryIndexes bool
}
// mergeColumns merges the columns from |ourCC|, |theirCC| into a single column collection, using the ancestor column
// definitions in |ancCC| to determine on which side a column has changed. If merging is not possible because of
// conflicting changes to the columns in |ourCC| and |theirCC|, then a set of ColConflict instances are returned
@@ -369,26 +375,25 @@ func checkUnmergeableNewColumns(tblName string, columnMappings columnMappings) e
// compatible with the current stored format. The merged columns, any column conflicts, and a boolean value stating if
// a full table rewrite is needed to align the existing table rows with the new, merged schema. If any unexpected error
// occurs, then that error is returned and the other response fields should be ignored.
func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, theirCC, ancCC *schema.ColCollection) (*schema.ColCollection, []ColConflict, bool, error) {
func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, theirCC, ancCC *schema.ColCollection) (*schema.ColCollection, []ColConflict, MergeInfo, error) {
mergeInfo := MergeInfo{}
columnMappings, err := mapColumns(ourCC, theirCC, ancCC)
if err != nil {
return nil, nil, false, err
return nil, nil, mergeInfo, err
}
conflicts, err := checkSchemaConflicts(columnMappings)
if err != nil {
return nil, nil, false, err
return nil, nil, mergeInfo, err
}
err = checkUnmergeableNewColumns(tblName, columnMappings)
if err != nil {
return nil, nil, false, err
return nil, nil, mergeInfo, err
}
compatChecker := newTypeCompatabilityCheckerForStorageFormat(format)
tableRewrite := false
// After we've checked for schema conflicts, merge the columns together
// TODO: We don't currently preserve all column position changes; the returned merged columns are always based on
// their position in |ourCC|, with any new columns from |theirCC| added at the end of the column collection.
@@ -402,11 +407,19 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
case anc == nil && ours == nil && theirs != nil:
// if an ancestor does not exist, and the column exists only on one side, use that side
// (if an ancestor DOES exist, this means the column was deleted, so it's a no-op)
mergeInfo.LeftNeedsRewrite = true
mergedColumns = append(mergedColumns, *theirs)
case anc == nil && ours != nil && theirs == nil:
// if an ancestor does not exist, and the column exists only on one side, use that side
// (if an ancestor DOES exist, this means the column was deleted, so it's a no-op)
mergeInfo.RightNeedsRewrite = true
mergedColumns = append(mergedColumns, *ours)
case anc != nil && ours == nil && theirs != nil:
// column was deleted on our side
mergeInfo.RightNeedsRewrite = true
case anc != nil && ours != nil && theirs == nil:
// column was deleted on their side
mergeInfo.LeftNeedsRewrite = true
case ours == nil && theirs == nil:
// if the column is deleted on both sides... just let it fall out
case ours != nil && theirs != nil:
@@ -423,9 +436,10 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
} else if theirsChanged {
// In this case, only theirsChanged, so we need to check if moving from ours->theirs
// is valid, otherwise it's a conflict
mergeInfo.LeftNeedsRewrite = true
compatible, rewrite := compatChecker.IsTypeChangeCompatible(ours.TypeInfo, theirs.TypeInfo)
if rewrite {
tableRewrite = true
mergeInfo.InvalidateSecondaryIndexes = true
}
if compatible {
mergedColumns = append(mergedColumns, *theirs)
@@ -439,9 +453,10 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
} else if oursChanged {
// In this case, only oursChanged, so we need to check if moving from theirs->ours
// is valid, otherwise it's a conflict
mergeInfo.RightNeedsRewrite = true
compatible, rewrite := compatChecker.IsTypeChangeCompatible(theirs.TypeInfo, ours.TypeInfo)
if rewrite {
tableRewrite = true
mergeInfo.InvalidateSecondaryIndexes = true
}
if compatible {
mergedColumns = append(mergedColumns, *ours)
@@ -469,10 +484,10 @@ func mergeColumns(tblName string, format *storetypes.NomsBinFormat, ourCC, their
// Check that there are no duplicate column names or tags in the merged column set
conflicts = append(conflicts, checkForColumnConflicts(mergedColumns)...)
if conflicts != nil {
return nil, conflicts, false, nil
return nil, conflicts, mergeInfo, nil
}
return schema.NewColCollection(mergedColumns...), nil, tableRewrite, nil
return schema.NewColCollection(mergedColumns...), nil, mergeInfo, nil
}
// checkForColumnConflicts iterates over |mergedColumns|, checks for duplicate column names or column tags, and returns

View File

@@ -635,8 +635,8 @@ func testMergeSchemasWithConflicts(t *testing.T, test mergeSchemaConflictTest) {
otherSch := getSchema(t, dEnv)
_, actConflicts, requiresTableRewrite, err := merge.SchemaMerge(context.Background(), types.Format_Default, mainSch, otherSch, ancSch, "test")
assert.False(t, requiresTableRewrite)
_, actConflicts, mergeInfo, err := merge.SchemaMerge(context.Background(), types.Format_Default, mainSch, otherSch, ancSch, "test")
assert.False(t, mergeInfo.InvalidateSecondaryIndexes)
if test.expectedErr != nil {
assert.True(t, errors.Is(err, test.expectedErr))
return