[no-release-notes] implement cell-wise merges for new storage format (#3346)

* implement cell-wise merges and on the fly secondary index corrections

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* move old storage format fix to new PR

* pr feedback

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* pr feedback 2, no pointers to interfaces, and use val.OrdinalMapping

* copyright header

* need for loop around select, buffer cellWiseChan, pr edits

Co-authored-by: druvv <druvv@users.noreply.github.com>
This commit is contained in:
Dhruv Sringari
2022-05-05 11:03:06 -07:00
committed by GitHub
parent 3ce78dc3bc
commit 69bde477cf
7 changed files with 880 additions and 378 deletions

View File

@@ -33,13 +33,9 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
json2 "github.com/dolthub/dolt/go/libraries/doltcore/sqle/json"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor/creation"
"github.com/dolthub/dolt/go/libraries/utils/valutil"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
)
@@ -249,256 +245,6 @@ func (merger *Merger) MergeTable(ctx context.Context, tblName string, opts edito
return resultTbl, stats, nil
}
func mergeTableData(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSchema, mergeSchema, ancSchema schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, *MergeStats, error) {
// TODO (dhruv): update this function definition to return any conflicts
updatedTable, mergedData, err := mergeProllyRowData(ctx, postMergeSchema, rootSchema, mergeSchema, ancSchema, tbl, mergeTbl, ancTbl, tableToUpdate)
if err != nil {
return nil, nil, err
}
updatedTable, err = mergeProllySecondaryIndexes(ctx, vrw, postMergeSchema, rootSchema, mergeSchema, ancSchema, mergedData, tbl, mergeTbl, ancTbl, updatedTable)
if err != nil {
return nil, nil, err
}
// TODO (dhruv): populate merge stats
return updatedTable, &MergeStats{Operation: TableModified}, nil
}
// mergeProllyRowData merges the primary row table indexes of |tbl|, |mergeTbl|,
// and |ancTbl|. It stores the merged row data into |tableToUpdate| and returns the new value along with the row data.
func mergeProllyRowData(ctx context.Context, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, durable.Index, error) {
rootR, err := tbl.GetRowData(ctx)
if err != nil {
return nil, nil, err
}
mergeR, err := mergeTbl.GetRowData(ctx)
if err != nil {
return nil, nil, err
}
ancR, err := ancTbl.GetRowData(ctx)
if err != nil {
return nil, nil, err
}
rootRP := durable.ProllyMapFromIndex(rootR)
mergeRP := durable.ProllyMapFromIndex(mergeR)
ancRP := durable.ProllyMapFromIndex(ancR)
//vMerger := newValueMerger(postMergeSchema, rootSch, mergeSch, ancSch)
conflicted := false
mergedRP, err := prolly.MergeMaps(ctx, rootRP, mergeRP, ancRP, func(left, right tree.Diff) (tree.Diff, bool) {
conflicted = true
return tree.Diff{}, false
//merged, ok := vMerger.tryMerge(val.Tuple(left.To), val.Tuple(right.To), val.Tuple(left.From))
//if !ok {
// conflicted = true
// return tree.Diff{}, false
//}
//
//return tree.Diff{
// Type: tree.ModifiedDiff,
// Key: left.Key,
// From: left.From,
// To: tree.NodeItem(merged),
//}, true
})
if err != nil {
return nil, nil, err
}
if conflicted {
return nil, nil, errors.New("row conflicts not supported yet")
}
updatedTbl, err := tableToUpdate.UpdateRows(ctx, durable.IndexFromProllyMap(mergedRP))
if err != nil {
return nil, nil, err
}
return updatedTbl, durable.IndexFromProllyMap(mergedRP), nil
}
var syncPool = pool.NewBuffPool()
//type valueMerger struct {
// numCols int
// vD, lVD, rVD, bVD val.TupleDesc
// leftMapping, rightMapping, baseMapping map[int]int
//}
//
//func newValueMerger(merged, leftSch, rightSch, baseSch schema.Schema) *valueMerger {
// n := merged.GetNonPKCols().Size()
// leftMapping := make(map[int]int, n)
// rightMapping := make(map[int]int, n)
// baseMapping := make(map[int]int, n)
//
// for i, tag := range merged.GetNonPKCols().Tags {
// if j, ok := leftSch.GetNonPKCols().TagToIdx[tag]; ok {
// leftMapping[i] = j
// }
// if j, ok := rightSch.GetNonPKCols().TagToIdx[tag]; ok {
// rightMapping[i] = j
// }
// if j, ok := baseSch.GetNonPKCols().TagToIdx[tag]; ok {
// baseMapping[i] = j
// }
// }
//
// return &valueMerger{
// n,
// prolly.ValueDescriptorFromSchema(merged),
// prolly.ValueDescriptorFromSchema(leftSch),
// prolly.ValueDescriptorFromSchema(rightSch),
// prolly.ValueDescriptorFromSchema(baseSch),
// leftMapping,
// rightMapping,
// baseMapping,
// }
//}
//
//// tryMerge performs a cell-wise merge given left, right, and base cell value
//// tuples. It returns the merged cell value taple and an ok bool.
//func (m *valueMerger) tryMerge(left, right, base val.Tuple) (merged val.Tuple, ok bool) {
// processColumnFunc := func(i int) (resultVal []byte, isConflict bool) {
// var leftVal []byte
// if l, ok := m.leftMapping[i]; ok {
// leftVal = m.lVD.GetField(l, left)
// }
// var rightVal []byte
// if r, ok := m.rightMapping[i]; ok {
// rightVal = m.rVD.GetField(r, right)
// }
// var baseVal []byte
// if b, ok := m.baseMapping[i]; ok {
// baseVal = m.bVD.GetField(b, base)
// }
//
// // bytes.Equal?? What about nil vs empty slices??
// leftModified := m.vD.Comparator().CompareValues(leftVal, baseVal, m.vD.Types[i]) != 0
// rightModified := m.vD.Comparator().CompareValues(rightVal, baseVal, m.vD.Types[i]) != 0
//
// switch {
// case leftModified && rightModified:
// return nil, true
// case leftModified:
// return leftVal, false
// default:
// return rightVal, false
// }
// }
//
// mergedValues := make([][]byte, m.numCols)
// for i := 0; i < m.numCols; i++ {
// v, isConflict := processColumnFunc(i)
// if isConflict {
// return nil, false
// }
// mergedValues[i] = v
// }
//
// return val.NewTuple(syncPool, mergedValues...), true
//}
// mergeProllySecondaryIndexes merges the secondary indexes of the given |tbl|,
// |mergeTbl|, and |ancTbl|. It stores the merged indexes into |tableToUpdate|
// and returns its updated value.
func mergeProllySecondaryIndexes(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, error) {
rootSet, err := tbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
mergeSet, err := mergeTbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
ancSet, err := ancTbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
mergedSet, err := mergeProllyIndexSets(ctx, vrw, postMergeSchema, rootSch, mergeSch, ancSch, mergedData, rootSet, mergeSet, ancSet)
if err != nil {
return nil, err
}
updatedTbl, err := tableToUpdate.SetIndexSet(ctx, mergedSet)
if err != nil {
return nil, err
}
return updatedTbl, nil
}
// mergeProllyIndexSets merges the |root|, |merge|, and |anc| index sets based
// on the provided |postMergeSchema|. It returns the merged index set.
func mergeProllyIndexSets(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, root, merge, anc durable.IndexSet) (durable.IndexSet, error) {
mergedIndexSet := durable.NewIndexSet(ctx, vrw)
tryGetIdx := func(sch schema.Schema, iS durable.IndexSet, indexName string) (idx durable.Index, ok bool, err error) {
ok = sch.Indexes().Contains(indexName)
if ok {
idx, err = iS.GetIndex(ctx, sch, indexName)
if err != nil {
return nil, false, err
}
return idx, true, nil
}
return nil, false, nil
}
// Based on the indexes in the post merge schema, merge the root, merge,
// and ancestor indexes.
for _, index := range postMergeSchema.Indexes().AllIndexes() {
rootI, rootOK, err := tryGetIdx(rootSch, root, index.Name())
if err != nil {
return nil, err
}
mergeI, mergeOK, err := tryGetIdx(mergeSch, merge, index.Name())
if err != nil {
return nil, err
}
ancI, ancOK, err := tryGetIdx(ancSch, anc, index.Name())
if err != nil {
return nil, err
}
mergedIndex, err := func() (durable.Index, error) {
if !rootOK || !mergeOK || !ancOK {
mergedIndex, err := creation.BuildSecondaryProllyIndex(ctx, vrw, postMergeSchema, index, durable.ProllyMapFromIndex(mergedData))
if err != nil {
return nil, err
}
return mergedIndex, nil
}
left := durable.ProllyMapFromIndex(rootI)
right := durable.ProllyMapFromIndex(mergeI)
base := durable.ProllyMapFromIndex(ancI)
var collision = false
merged, err := prolly.MergeMaps(ctx, left, right, base, func(left, right tree.Diff) (tree.Diff, bool) {
collision = true
return tree.Diff{}, true
})
if err != nil {
return nil, err
}
if collision {
return nil, errors.New("collisions not implemented")
}
return durable.IndexFromProllyMap(merged), nil
}()
if err != nil {
return nil, err
}
mergedIndexSet, err = mergedIndexSet.PutIndex(ctx, index.Name(), mergedIndex)
if err != nil {
return nil, err
}
}
return mergedIndexSet, nil
}
func getTableInfoFromRoot(ctx context.Context, tblName string, root *doltdb.RootValue) (
ok bool,
table *doltdb.Table,
@@ -1009,6 +755,7 @@ func nomsPkRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Sc
return rowMergeResult{}, err
}
var didMerge bool
processTagFunc := func(tag uint64) (resultVal types.Value, isConflict bool) {
baseVal, _ := baseVals.Get(tag)
val, _ := rowVals.Get(tag)
@@ -1023,8 +770,10 @@ func nomsPkRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Sc
case modified && mergeModified:
return nil, true
case modified:
didMerge = true
return val, false
default:
didMerge = true
return mergeVal, false
}
}
@@ -1057,7 +806,7 @@ func nomsPkRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Sc
return rowMergeResult{}, err
}
return rowMergeResult{v, true, false}, nil
return rowMergeResult{v, didMerge, false}, nil
}
func keylessRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Schema, val, mergeVal, ancVal types.Value) (rowMergeResult, error) {

View File

@@ -0,0 +1,475 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package merge
import (
"context"
"errors"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor/creation"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
type cellWiseMerge struct {
leftDiff tree.Diff
rightDiff tree.Diff
merged tree.Diff
}
// mergeTableData three-way merges rows and indexes for a given table. First,
// the primary row data is merged, then secondary indexes are merged. In the
// process of merging the primary row data, we may need to perform cell-wise
// merges. Since a cell-wise merge result neither contains the values from the
// root branch or the merge branch we also need to update the secondary indexes
// prior to merging them.
//
// Each cell-wise merge reverts the corresponding index entries in the root
// branch, and modifies index entries in the merge branch. The merge branch's
// entries are set to values consistent the cell-wise merge result. When the
// root and merge secondary indexes are merged, they will produce entries
// consistent with the primary row data.
func mergeTableData(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSchema, mergeSchema, ancSchema schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, *MergeStats, error) {
group, gCtx := errgroup.WithContext(ctx)
cellWiseMerges := make(chan cellWiseMerge, 128)
var updatedTable *doltdb.Table
var mergedData durable.Index
group.Go(func() error {
var err error
// TODO (dhruv): update this function definition to return any conflicts
updatedTable, mergedData, err = mergeProllyRowData(gCtx, postMergeSchema, rootSchema, mergeSchema, ancSchema, tbl, mergeTbl, ancTbl, tableToUpdate, cellWiseMerges)
if err != nil {
return err
}
defer close(cellWiseMerges)
return nil
})
rootIndexSet, err := tbl.GetIndexSet(ctx)
if err != nil {
return nil, nil, err
}
mergeIndexSet, err := mergeTbl.GetIndexSet(ctx)
if err != nil {
return nil, nil, err
}
var updatedRootIndexSet durable.IndexSet
var updatedMergeIndexSet durable.IndexSet
group.Go(func() error {
var err error
updatedRootIndexSet, updatedMergeIndexSet, err = updateProllySecondaryIndexes(gCtx, cellWiseMerges, rootSchema, mergeSchema, tbl, mergeTbl, rootIndexSet, mergeIndexSet)
return err
})
err = group.Wait()
if err != nil {
return nil, nil, err
}
tbl, err = tbl.SetIndexSet(ctx, updatedRootIndexSet)
if err != nil {
return nil, nil, err
}
mergeTbl, err = mergeTbl.SetIndexSet(ctx, updatedMergeIndexSet)
if err != nil {
return nil, nil, err
}
updatedTable, err = mergeProllySecondaryIndexes(ctx, vrw, postMergeSchema, rootSchema, mergeSchema, ancSchema, mergedData, tbl, mergeTbl, ancTbl, updatedTable)
if err != nil {
return nil, nil, err
}
// TODO (dhruv): populate merge stats
return updatedTable, &MergeStats{Operation: TableModified}, nil
}
// mergeProllyRowData merges the primary row table indexes of |tbl|, |mergeTbl|,
// and |ancTbl|. It stores the merged row data into |tableToUpdate| and returns the new value along with the row data.
func mergeProllyRowData(ctx context.Context, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table, cellWiseMerges chan cellWiseMerge) (*doltdb.Table, durable.Index, error) {
rootR, err := tbl.GetRowData(ctx)
if err != nil {
return nil, nil, err
}
mergeR, err := mergeTbl.GetRowData(ctx)
if err != nil {
return nil, nil, err
}
ancR, err := ancTbl.GetRowData(ctx)
if err != nil {
return nil, nil, err
}
rootRP := durable.ProllyMapFromIndex(rootR)
mergeRP := durable.ProllyMapFromIndex(mergeR)
ancRP := durable.ProllyMapFromIndex(ancR)
m := durable.ProllyMapFromIndex(rootR)
vMerger := newValueMerger(postMergeSchema, rootSch, mergeSch, ancSch, m.Pool())
conflicted := false
mergedRP, err := prolly.MergeMaps(ctx, rootRP, mergeRP, ancRP, func(left, right tree.Diff) (tree.Diff, bool) {
merged, isConflict := vMerger.tryMerge(val.Tuple(left.To), val.Tuple(right.To), val.Tuple(left.From))
if isConflict {
conflicted = true
return tree.Diff{}, false
}
d := tree.Diff{
Type: tree.ModifiedDiff,
Key: left.Key,
From: left.From,
To: tree.Item(merged),
}
select {
case cellWiseMerges <- cellWiseMerge{left, right, d}:
break
case <-ctx.Done():
return tree.Diff{}, false
}
return d, true
})
if err != nil {
return nil, nil, err
}
if conflicted {
return nil, nil, errors.New("row conflicts not supported yet")
}
updatedTbl, err := tableToUpdate.UpdateRows(ctx, durable.IndexFromProllyMap(mergedRP))
if err != nil {
return nil, nil, err
}
return updatedTbl, durable.IndexFromProllyMap(mergedRP), nil
}
type valueMerger struct {
numCols int
vD val.TupleDesc
leftMapping, rightMapping, baseMapping val.OrdinalMapping
syncPool pool.BuffPool
}
func newValueMerger(merged, leftSch, rightSch, baseSch schema.Schema, syncPool pool.BuffPool) *valueMerger {
n := merged.GetNonPKCols().Size()
leftMapping := make(val.OrdinalMapping, n)
rightMapping := make(val.OrdinalMapping, n)
baseMapping := make(val.OrdinalMapping, n)
for i, tag := range merged.GetNonPKCols().Tags {
if j, ok := leftSch.GetNonPKCols().TagToIdx[tag]; ok {
leftMapping[i] = j
} else {
leftMapping[i] = -1
}
if j, ok := rightSch.GetNonPKCols().TagToIdx[tag]; ok {
rightMapping[i] = j
} else {
rightMapping[i] = -1
}
if j, ok := baseSch.GetNonPKCols().TagToIdx[tag]; ok {
baseMapping[i] = j
} else {
baseMapping[i] = -1
}
}
return &valueMerger{
numCols: n,
vD: prolly.ValueDescriptorFromSchema(merged),
leftMapping: leftMapping,
rightMapping: rightMapping,
baseMapping: baseMapping,
syncPool: syncPool,
}
}
// tryMerge performs a cell-wise merge given left, right, and base cell value
// tuples. It returns the merged cell value tuple and a bool indicating if a
// conflict occurred. tryMerge should only be called if left and right produce
// non-identical diffs against base.
func (m *valueMerger) tryMerge(left, right, base val.Tuple) (val.Tuple, bool) {
if base != nil && (left == nil) != (right == nil) {
// One row deleted, the other modified
return nil, true
}
// Because we have non-identical diffs, left and right are guaranteed to be
// non-nil at this point.
if left == nil || right == nil {
panic("found nil left / right which should never occur")
}
mergedValues := make([][]byte, m.numCols)
for i := 0; i < m.numCols; i++ {
v, isConflict := m.processColumn(i, left, right, base)
if isConflict {
return nil, true
}
mergedValues[i] = v
}
return val.NewTuple(m.syncPool, mergedValues...), false
}
// processColumn returns the merged value of column |i| of the merged schema,
// based on the |left|, |right|, and |base| schema.
func (m *valueMerger) processColumn(i int, left, right, base val.Tuple) ([]byte, bool) {
// missing columns are coerced into NULL column values
var leftCol []byte
if l := m.leftMapping[i]; l != -1 {
leftCol = left.GetField(l)
}
var rightCol []byte
if r := m.rightMapping[i]; r != -1 {
rightCol = right.GetField(r)
}
if m.vD.Comparator().CompareValues(leftCol, rightCol, m.vD.Types[i]) == 0 {
return leftCol, false
}
if base == nil {
// Conflicting insert
return nil, true
}
var baseVal []byte
if b := m.baseMapping[i]; b != -1 {
baseVal = base.GetField(b)
}
leftModified := m.vD.Comparator().CompareValues(leftCol, baseVal, m.vD.Types[i]) != 0
rightModified := m.vD.Comparator().CompareValues(rightCol, baseVal, m.vD.Types[i]) != 0
switch {
case leftModified && rightModified:
return nil, true
case leftModified:
return leftCol, false
default:
return rightCol, false
}
}
// Given cellWiseMerge's sent on |cellWiseChan|, update the secondary indexes in
// |rootIndexSet| and |mergeIndexSet| such that when the index sets are merged,
// they produce entries consistent with the cell-wise merges. The updated
// |rootIndexSet| and |mergeIndexSet| are returned.
func updateProllySecondaryIndexes(
ctx context.Context,
cellWiseChan chan cellWiseMerge,
rootSchema, mergeSchema schema.Schema,
tbl, mergeTbl *doltdb.Table,
rootIndexSet, mergeIndexSet durable.IndexSet) (durable.IndexSet, durable.IndexSet, error) {
rootIdxs, err := getMutableSecondaryIdxs(ctx, rootSchema, tbl)
if err != nil {
return nil, nil, err
}
mergeIdxs, err := getMutableSecondaryIdxs(ctx, mergeSchema, mergeTbl)
if err != nil {
return nil, nil, err
}
OUTER:
for {
select {
case m, ok := <-cellWiseChan:
if !ok {
break OUTER
}
for _, idx := range rootIdxs {
// Revert corresponding idx entry in left
err = idx.UpdateEntry(ctx, val.Tuple(m.leftDiff.Key), val.Tuple(m.leftDiff.To), val.Tuple(m.leftDiff.From))
if err != nil {
return nil, nil, err
}
}
for _, idx := range mergeIdxs {
// Update corresponding idx entry to merged value in right
err = idx.UpdateEntry(ctx, val.Tuple(m.rightDiff.Key), val.Tuple(m.rightDiff.To), val.Tuple(m.merged.To))
if err != nil {
return nil, nil, err
}
}
case <-ctx.Done():
return nil, nil, ctx.Err()
}
}
persistIndexMuts := func(indexSet durable.IndexSet, idxs []MutableSecondaryIdx) (durable.IndexSet, error) {
for _, idx := range idxs {
m, err := idx.Map(ctx)
if err != nil {
return nil, err
}
indexSet, err = indexSet.PutIndex(ctx, idx.Name, durable.IndexFromProllyMap(m))
if err != nil {
return nil, err
}
}
return indexSet, nil
}
updatedRootIndexSet, err := persistIndexMuts(rootIndexSet, rootIdxs)
if err != nil {
return nil, nil, err
}
updatedMergeIndexSet, err := persistIndexMuts(mergeIndexSet, mergeIdxs)
if err != nil {
return nil, nil, err
}
return updatedRootIndexSet, updatedMergeIndexSet, nil
}
// getMutableSecondaryIdxs returns a MutableSecondaryIdx for each secondary index
// defined in |schema| and |tbl|.
func getMutableSecondaryIdxs(ctx context.Context, schema schema.Schema, tbl *doltdb.Table) ([]MutableSecondaryIdx, error) {
indexSet, err := tbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
mods := make([]MutableSecondaryIdx, schema.Indexes().Count())
for i, index := range schema.Indexes().AllIndexes() {
idx, err := indexSet.GetIndex(ctx, schema, index.Name())
if err != nil {
return nil, err
}
m := durable.ProllyMapFromIndex(idx)
mods[i] = NewMutableSecondaryIdx(m, schema, index, m.Pool())
}
return mods, nil
}
// mergeProllySecondaryIndexes merges the secondary indexes of the given |tbl|,
// |mergeTbl|, and |ancTbl|. It stores the merged indexes into |tableToUpdate|
// and returns its updated value.
func mergeProllySecondaryIndexes(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, error) {
rootSet, err := tbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
mergeSet, err := mergeTbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
ancSet, err := ancTbl.GetIndexSet(ctx)
if err != nil {
return nil, err
}
mergedSet, err := mergeProllyIndexSets(ctx, vrw, postMergeSchema, rootSch, mergeSch, ancSch, mergedData, rootSet, mergeSet, ancSet)
if err != nil {
return nil, err
}
updatedTbl, err := tableToUpdate.SetIndexSet(ctx, mergedSet)
if err != nil {
return nil, err
}
return updatedTbl, nil
}
// mergeProllyIndexSets merges the |root|, |merge|, and |anc| index sets based
// on the provided |postMergeSchema|. It returns the merged index set.
func mergeProllyIndexSets(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, root, merge, anc durable.IndexSet) (durable.IndexSet, error) {
mergedIndexSet := durable.NewIndexSet(ctx, vrw)
tryGetIdx := func(sch schema.Schema, iS durable.IndexSet, indexName string) (idx durable.Index, ok bool, err error) {
ok = sch.Indexes().Contains(indexName)
if ok {
idx, err = iS.GetIndex(ctx, sch, indexName)
if err != nil {
return nil, false, err
}
return idx, true, nil
}
return nil, false, nil
}
// Based on the indexes in the post merge schema, merge the root, merge,
// and ancestor indexes.
for _, index := range postMergeSchema.Indexes().AllIndexes() {
rootI, rootOK, err := tryGetIdx(rootSch, root, index.Name())
if err != nil {
return nil, err
}
mergeI, mergeOK, err := tryGetIdx(mergeSch, merge, index.Name())
if err != nil {
return nil, err
}
ancI, ancOK, err := tryGetIdx(ancSch, anc, index.Name())
if err != nil {
return nil, err
}
mergedIndex, err := func() (durable.Index, error) {
if !rootOK || !mergeOK || !ancOK {
mergedIndex, err := creation.BuildSecondaryProllyIndex(ctx, vrw, postMergeSchema, index, durable.ProllyMapFromIndex(mergedData))
if err != nil {
return nil, err
}
return mergedIndex, nil
}
left := durable.ProllyMapFromIndex(rootI)
right := durable.ProllyMapFromIndex(mergeI)
base := durable.ProllyMapFromIndex(ancI)
var collision = false
merged, err := prolly.MergeMaps(ctx, left, right, base, func(left, right tree.Diff) (tree.Diff, bool) {
collision = true
return tree.Diff{}, true
})
if err != nil {
return nil, err
}
if collision {
return nil, errors.New("collisions not implemented")
}
return durable.IndexFromProllyMap(merged), nil
}()
if err != nil {
return nil, err
}
mergedIndexSet, err = mergedIndexSet.PutIndex(ctx, index.Name(), mergedIndex)
if err != nil {
return nil, err
}
}
return mergedIndexSet, nil
}

View File

@@ -16,6 +16,7 @@ package merge
import (
"context"
"sort"
"testing"
"github.com/stretchr/testify/assert"
@@ -32,6 +33,7 @@ import (
filesys2 "github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/valutil"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
@@ -69,6 +71,7 @@ type rowV struct {
var vD = prolly.ValueDescriptorFromSchema(sch)
var vB = val.NewTupleBuilder(vD)
var syncPool = pool.NewBuffPool()
func (v rowV) value() val.Tuple {
vB.PutInt64(0, int64(v.col1))
@@ -110,6 +113,10 @@ type testRow struct {
// non-conflicting updates. => +2
//
// For (insert, insert) there are identical inserts and conflicting inserts => +1
//
// A modification of a primary key is the combination of the two base cases:
// First, a (delete, delete), then an (insert, insert). We omit tests for these
// and instead defer to the base cases.
var testRows = []testRow{
// Ancestor exists
@@ -186,6 +193,17 @@ var testRows = []testRow{
false,
&rowV{-6, -6},
},
// Non-conflicting update 2
{
62,
&rowV{62, 62},
UpdateAction,
UpdateAction,
&rowV{-62, 62},
&rowV{62, -62},
false,
&rowV{-62, -62},
},
{
7,
&rowV{7, 7},
@@ -358,10 +376,10 @@ func TestNomsMergeCommits(t *testing.T) {
assert.NoError(t, err)
if !mergedRows.Equals(expectedRows) {
t.Error(mustString(types.EncodedValue(context.Background(), mergedRows)), "\n!=\n", mustString(types.EncodedValue(context.Background(), expectedRows)))
t.Error(mustString(types.EncodedValue(context.Background(), expectedRows)), "\n!=\n", mustString(types.EncodedValue(context.Background(), mergedRows)))
}
if !conflicts.Equals(expectedConflicts) {
t.Error(mustString(types.EncodedValue(context.Background(), conflicts)), "\n!=\n", mustString(types.EncodedValue(context.Background(), expectedConflicts)))
t.Error(mustString(types.EncodedValue(context.Background(), expectedConflicts)), "\n!=\n", mustString(types.EncodedValue(context.Background(), conflicts)))
}
for _, index := range sch.Indexes().AllIndexes() {
@@ -383,9 +401,16 @@ func TestNomsMergeCommits(t *testing.T) {
assert.Equal(t, eh.String(), h.String(), "table hashes do not equal")
}
func sortTests(t []testRow) {
sort.Slice(t, func(i, j int) bool {
return t[i].key < t[j].key
})
}
func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *doltdb.RootValue, *doltdb.RootValue, durable.Index) {
ddb := mustMakeEmptyRepo(t)
vrw := ddb.ValueReadWriter()
sortTests(testRows)
var initialKVs []val.Tuple
var expectedKVs []val.Tuple
@@ -396,11 +421,6 @@ func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *do
continue
}
// Skip cell-wise merges
if testCase.leftAction == UpdateAction && testCase.rightAction == UpdateAction {
continue
}
if testCase.initialValue != nil {
initialKVs = append(initialKVs, key(testCase.key), testCase.initialValue.value())
}
@@ -422,11 +442,6 @@ func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *do
continue
}
// Skip cell-wise merges
if testCase.leftAction == UpdateAction && testCase.rightAction == UpdateAction {
continue
}
switch testCase.leftAction {
case NoopAction:
break
@@ -478,6 +493,7 @@ func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *do
func setupNomsMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *doltdb.RootValue, *doltdb.RootValue, types.Map, types.Map, *MergeStats) {
ddb := mustMakeEmptyRepo(t)
vrw := ddb.ValueReadWriter()
sortTests(testRows)
var initalKVs []types.Value
var expectedKVs []types.Value

View File

@@ -0,0 +1,88 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package merge
import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor/creation"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/val"
)
// MutableSecondaryIdx wraps a prolly.MutableMap of a secondary table index. It
// provides an UpdateEntry function which can be used to update an entry in the
// secondary index given a modification to a row.
type MutableSecondaryIdx struct {
Name string
mut prolly.MutableMap
keyMap val.OrdinalMapping
pkLen int
keyBld *val.TupleBuilder
syncPool pool.BuffPool
}
// NewMutableSecondaryIdx returns a MutableSecondaryIdx. |m| is the secondary idx data.
func NewMutableSecondaryIdx(m prolly.Map, sch schema.Schema, index schema.Index, syncPool pool.BuffPool) MutableSecondaryIdx {
kD, _ := m.Descriptors()
return MutableSecondaryIdx{
index.Name(),
m.Mutate(),
creation.GetIndexKeyMapping(sch, index),
sch.GetPKCols().Size(),
val.NewTupleBuilder(kD),
syncPool,
}
}
// UpdateEntry modifies the corresponding secondary index entry given the key
// and curr/new values of the primary row.
func (m MutableSecondaryIdx) UpdateEntry(ctx context.Context, key, currValue, newValue val.Tuple) error {
currKey := m.mapKeyValue(key, currValue)
newKey := m.mapKeyValue(key, newValue)
err := m.mut.Delete(ctx, currKey)
if err != nil {
return nil
}
err = m.mut.Put(ctx, newKey, val.EmptyTuple)
if err != nil {
return nil
}
return nil
}
// Map returns the finalized prolly.Map of the underlying prolly.MutableMap.
func (m MutableSecondaryIdx) Map(ctx context.Context) (prolly.Map, error) {
return m.mut.Map(ctx)
}
// mapKeyValue returns the secondary index entry key given the key and value of
// the corresponding primary row.
func (m MutableSecondaryIdx) mapKeyValue(k, v val.Tuple) val.Tuple {
for to := range m.keyMap {
from := m.keyMap.MapOrdinal(to)
if from < m.pkLen {
m.keyBld.PutRaw(to, k.GetField(from))
} else {
from -= m.pkLen
m.keyBld.PutRaw(to, v.GetField(from))
}
}
return m.keyBld.Build(m.syncPool)
}

View File

@@ -19,14 +19,15 @@ import (
"strconv"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
type RowMergeTest struct {
type nomsRowMergeTest struct {
name string
row, mergeRow, ancRow types.Value
sch schema.Schema
@@ -35,98 +36,208 @@ type RowMergeTest struct {
expectConflict bool
}
type rowMergeTest struct {
name string
row, mergeRow, ancRow val.Tuple
mergedSch, leftSch, rightSch, baseSch schema.Schema
expectedResult val.Tuple
expectCellMerge bool
expectConflict bool
}
type testCase struct {
name string
row, mergeRow, ancRow []*int
rowCnt, mRowCnt, aRowCnt int
expectedResult []*int
expectCellMerge bool
expectConflict bool
}
// 0 is nil, negative value is invalid
func build(ints ...int) []*int {
out := make([]*int, len(ints))
for i, v := range ints {
if v < 0 {
panic("invalid")
}
if v == 0 {
continue
}
t := v
out[i] = &t
}
return out
}
var convergentEditCases = []testCase{
{
"add same row",
build(1, 2),
build(1, 2),
nil,
2, 2, 2,
build(1, 2),
false,
false,
},
{
"both delete row",
nil,
nil,
build(1, 2),
2, 2, 2,
nil,
false,
false,
},
{
"modify row to equal value",
build(2, 2),
build(2, 2),
build(1, 1),
2, 2, 2,
build(2, 2),
false,
false,
},
}
var testCases = []testCase{
{
"insert different rows",
build(1, 2),
build(2, 3),
nil,
2, 2, 2,
nil,
false,
true,
},
{
"delete a row in one, and modify it in other",
nil,
build(1, 3),
build(1, 2),
2, 2, 2,
nil,
false,
true,
},
{
"modify rows without overlap",
build(2, 1),
build(1, 2),
build(1, 1),
2, 2, 2,
build(2, 2),
true,
false,
},
{
"modify rows with equal overlapping changes",
build(2, 2, 255),
build(2, 3, 255),
build(1, 2, 0),
3, 3, 3,
build(2, 3, 255),
true,
false,
},
{
"modify rows with differing overlapping changes",
build(2, 2, 128),
build(1, 3, 255),
build(1, 2, 0),
3, 3, 3,
nil,
false,
true,
},
{
"modify rows where one adds a column",
build(2, 2),
build(1, 3, 255),
build(1, 2),
2, 3, 2,
build(2, 3, 255),
true,
false,
},
{
"modify rows where one drops a column",
build(1, 2, 1),
build(2, 1),
build(1, 1, 1),
3, 2, 3,
build(2, 2),
true,
false,
},
{
"dropping a column should be equivalent to setting a column to null",
build(1, 2, 0),
build(2, 1),
build(1, 1, 1),
3, 2, 3,
build(2, 2),
true,
false,
},
// TODO (dhruv): Fix this bug in the old storage format
//{
// "add rows but one holds a new column",
// build(1, 1),
// build(1, 1, 1),
// nil,
// 2, 3, 2,
// nil,
// false,
// true,
//},
{
"Delete a row in one, set all null in the other",
build(0, 0, 0), // build translates zeros into NULL values
nil,
build(1, 1, 1),
3, 3, 3,
nil,
false,
true,
},
}
func TestRowMerge(t *testing.T) {
tests := []RowMergeTest{
createRowMergeStruct(
"add same row",
[]types.Value{types.String("one"), types.Int(2)},
[]types.Value{types.String("one"), types.Int(2)},
nil,
[]types.Value{types.String("one"), types.Int(2)},
false,
false,
),
createRowMergeStruct(
"add diff row",
[]types.Value{types.String("one"), types.String("two")},
[]types.Value{types.String("one"), types.String("three")},
nil,
nil,
false,
true,
),
createRowMergeStruct(
"both delete row",
nil,
nil,
[]types.Value{types.String("one"), types.Uint(2)},
nil,
false,
false,
),
createRowMergeStruct(
"one delete one modify",
nil,
[]types.Value{types.String("two"), types.Uint(2)},
[]types.Value{types.String("one"), types.Uint(2)},
nil,
false,
true,
),
createRowMergeStruct(
"modify rows without overlap",
[]types.Value{types.String("two"), types.Uint(2)},
[]types.Value{types.String("one"), types.Uint(3)},
[]types.Value{types.String("one"), types.Uint(2)},
[]types.Value{types.String("two"), types.Uint(3)},
true,
false,
),
createRowMergeStruct(
"modify rows with equal overlapping changes",
[]types.Value{types.String("two"), types.Uint(2), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))},
[]types.Value{types.String("one"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))},
[]types.Value{types.String("one"), types.Uint(2), types.UUID(uuid.MustParse("00000000-0000-0000-0000-000000000000"))},
[]types.Value{types.String("two"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))},
true,
false,
),
createRowMergeStruct(
"modify rows with differing overlapping changes",
[]types.Value{types.String("two"), types.Uint(2), types.UUID(uuid.MustParse("99999999-9999-9999-9999-999999999999"))},
[]types.Value{types.String("one"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))},
[]types.Value{types.String("one"), types.Uint(2), types.UUID(uuid.MustParse("00000000-0000-0000-0000-000000000000"))},
nil,
false,
true,
),
createRowMergeStruct(
"modify rows where one adds a column",
[]types.Value{types.String("two"), types.Uint(2)},
[]types.Value{types.String("one"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))},
[]types.Value{types.String("one"), types.Uint(2)},
[]types.Value{types.String("two"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))},
true,
false,
),
createRowMergeStruct(
"modify row where values added in different columns",
[]types.Value{types.String("one"), types.Uint(2), types.String(""), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))},
[]types.Value{types.String("one"), types.Uint(2), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff")), types.String("")},
[]types.Value{types.String("one"), types.Uint(2), types.NullValue, types.NullValue},
nil,
false,
true,
),
createRowMergeStruct(
"modify row where initial value wasn't given",
[]types.Value{mustTuple(types.NewTuple(types.Format_Default, types.String("one"), types.Uint(2), types.String("a")))},
[]types.Value{mustTuple(types.NewTuple(types.Format_Default, types.String("one"), types.Uint(2), types.String("b")))},
[]types.Value{mustTuple(types.NewTuple(types.Format_Default, types.String("one"), types.Uint(2), types.NullValue))},
nil,
false,
true,
),
if types.Format_Default != types.Format_DOLT_1 {
t.Skip()
}
tests := make([]rowMergeTest, len(testCases))
for i, t := range testCases {
tests[i] = createRowMergeStruct(t)
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
v := newValueMerger(test.mergedSch, test.leftSch, test.rightSch, test.baseSch, syncPool)
merged, isConflict := v.tryMerge(test.row, test.mergeRow, test.ancRow)
assert.Equal(t, test.expectConflict, isConflict)
vD := prolly.ValueDescriptorFromSchema(test.mergedSch)
assert.Equal(t, vD.Format(test.expectedResult), vD.Format(merged))
})
}
}
func TestNomsRowMerge(t *testing.T) {
if types.Format_Default == types.Format_DOLT_1 {
t.Skip()
}
testCases := append(testCases, convergentEditCases...)
tests := make([]nomsRowMergeTest, len(testCases))
for i, t := range testCases {
tests[i] = createNomsRowMergeStruct(t)
}
for _, test := range tests {
@@ -171,31 +282,89 @@ func valsToTestTuple(vals []types.Value, includePrimaryKeys bool) types.Value {
return mustTuple(types.NewTuple(types.Format_Default, tplVals...))
}
func createRowMergeStruct(name string, vals, mergeVals, ancVals, expected []types.Value, expectCellMrg bool, expectCnf bool) RowMergeTest {
longest := vals
func createRowMergeStruct(t testCase) rowMergeTest {
mergedSch := calcMergedSchema(t)
leftSch := calcSchema(t.rowCnt)
rightSch := calcSchema(t.mRowCnt)
baseSch := calcSchema(t.aRowCnt)
if len(mergeVals) > len(longest) {
longest = mergeVals
tpl := buildTup(leftSch, t.row)
mergeTpl := buildTup(rightSch, t.mergeRow)
ancTpl := buildTup(baseSch, t.ancRow)
expectedTpl := buildTup(mergedSch, t.expectedResult)
return rowMergeTest{
t.name,
tpl, mergeTpl, ancTpl,
mergedSch, leftSch, rightSch, baseSch,
expectedTpl,
t.expectCellMerge,
t.expectConflict}
}
func createNomsRowMergeStruct(t testCase) nomsRowMergeTest {
sch := calcMergedSchema(t)
tpl := valsToTestTupleWithPks(toVals(t.row))
mergeTpl := valsToTestTupleWithPks(toVals(t.mergeRow))
ancTpl := valsToTestTupleWithPks(toVals(t.ancRow))
expectedTpl := valsToTestTupleWithPks(toVals(t.expectedResult))
return nomsRowMergeTest{t.name, tpl, mergeTpl, ancTpl, sch, expectedTpl, t.expectCellMerge, t.expectConflict}
}
func calcMergedSchema(t testCase) schema.Schema {
longest := t.rowCnt
if t.mRowCnt > longest {
longest = t.mRowCnt
}
if t.aRowCnt > longest {
longest = t.aRowCnt
}
if len(ancVals) > len(longest) {
longest = ancVals
}
return calcSchema(longest)
}
cols := make([]schema.Column, len(longest)+1)
func calcSchema(nCols int) schema.Schema {
cols := make([]schema.Column, nCols+1)
// Schema needs a primary key to be valid, but all the logic being tested works only on the non-key columns.
cols[0] = schema.NewColumn("primaryKey", 0, types.IntKind, true)
for i, val := range longest {
for i := 0; i < nCols; i++ {
tag := i + 1
cols[tag] = schema.NewColumn(strconv.FormatInt(int64(tag), 10), uint64(tag), val.Kind(), false)
cols[tag] = schema.NewColumn(strconv.FormatInt(int64(tag), 10), uint64(tag), types.IntKind, false)
}
colColl := schema.NewColCollection(cols...)
sch := schema.MustSchemaFromCols(colColl)
tpl := valsToTestTupleWithPks(vals)
mergeTpl := valsToTestTupleWithPks(mergeVals)
ancTpl := valsToTestTupleWithPks(ancVals)
expectedTpl := valsToTestTupleWithPks(expected)
return RowMergeTest{name, tpl, mergeTpl, ancTpl, sch, expectedTpl, expectCellMrg, expectCnf}
return sch
}
func buildTup(sch schema.Schema, r []*int) val.Tuple {
if r == nil {
return nil
}
vD := prolly.ValueDescriptorFromSchema(sch)
vB := val.NewTupleBuilder(vD)
for i, v := range r {
if v != nil {
vB.PutInt64(i, int64(*v))
}
}
return vB.Build(syncPool)
}
func toVals(ints []*int) []types.Value {
if ints == nil {
return nil
}
v := make([]types.Value, len(ints))
for i, d := range ints {
if d == nil {
v[i] = types.NullValue
continue
}
v[i] = types.Int(*d)
}
return v
}

View File

@@ -182,7 +182,7 @@ func BuildSecondaryProllyIndex(ctx context.Context, vrw types.ValueReadWriter, s
// create a key builder for index key tuples
kd, _ := secondary.Descriptors()
keyBld := val.NewTupleBuilder(kd)
keyMap := getIndexKeyMapping(sch, idx)
keyMap := GetIndexKeyMapping(sch, idx)
mut := secondary.Mutate()
for {
@@ -222,7 +222,7 @@ func BuildSecondaryProllyIndex(ctx context.Context, vrw types.ValueReadWriter, s
return durable.IndexFromProllyMap(secondary), nil
}
func getIndexKeyMapping(sch schema.Schema, idx schema.Index) (m val.OrdinalMapping) {
func GetIndexKeyMapping(sch schema.Schema, idx schema.Index) (m val.OrdinalMapping) {
m = make(val.OrdinalMapping, len(idx.AllTags()))
for i, tag := range idx.AllTags() {

View File

@@ -166,6 +166,11 @@ func (m Map) IterRange(ctx context.Context, rng Range) (MapIter, error) {
}
}
// Pool returns the pool.BuffPool of the underlying tuples' tree.NodeStore
func (m Map) Pool() pool.BuffPool {
return m.tuples.ns.Pool()
}
func (m Map) pointLookupFromRange(ctx context.Context, rng Range) (*pointLookup, error) {
search := pointLookupSearchFn(rng)
cur, err := tree.NewCursorFromSearchFn(ctx, m.tuples.ns, m.tuples.root, search)