From 69bde477cf032a2dbe740e53a41f1580890afed3 Mon Sep 17 00:00:00 2001 From: Dhruv Sringari Date: Thu, 5 May 2022 11:03:06 -0700 Subject: [PATCH] [no-release-notes] implement cell-wise merges for new storage format (#3346) * implement cell-wise merges and on the fly secondary index corrections * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * move old storage format fix to new PR * pr feedback * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * pr feedback 2, no pointers to interfaces, and use val.OrdinalMapping * copyright header * need for loop around select, buffer cellWiseChan, pr edits Co-authored-by: druvv --- go/libraries/doltcore/merge/merge.go | 259 +--------- go/libraries/doltcore/merge/merge_prolly.go | 475 ++++++++++++++++++ go/libraries/doltcore/merge/merge_test.go | 40 +- .../doltcore/merge/mutable_secondary_index.go | 88 ++++ go/libraries/doltcore/merge/row_merge_test.go | 387 ++++++++++---- .../doltcore/table/editor/creation/index.go | 4 +- go/store/prolly/map.go | 5 + 7 files changed, 880 insertions(+), 378 deletions(-) create mode 100644 go/libraries/doltcore/merge/merge_prolly.go create mode 100644 go/libraries/doltcore/merge/mutable_secondary_index.go diff --git a/go/libraries/doltcore/merge/merge.go b/go/libraries/doltcore/merge/merge.go index 34d7d2dde5..344ab91770 100644 --- a/go/libraries/doltcore/merge/merge.go +++ b/go/libraries/doltcore/merge/merge.go @@ -33,13 +33,9 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/schema" json2 "github.com/dolthub/dolt/go/libraries/doltcore/sqle/json" "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" - "github.com/dolthub/dolt/go/libraries/doltcore/table/editor/creation" "github.com/dolthub/dolt/go/libraries/utils/valutil" "github.com/dolthub/dolt/go/store/atomicerr" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/pool" - "github.com/dolthub/dolt/go/store/prolly" - "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" ) @@ -249,256 +245,6 @@ func (merger *Merger) MergeTable(ctx context.Context, tblName string, opts edito return resultTbl, stats, nil } -func mergeTableData(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSchema, mergeSchema, ancSchema schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, *MergeStats, error) { - // TODO (dhruv): update this function definition to return any conflicts - updatedTable, mergedData, err := mergeProllyRowData(ctx, postMergeSchema, rootSchema, mergeSchema, ancSchema, tbl, mergeTbl, ancTbl, tableToUpdate) - if err != nil { - return nil, nil, err - } - - updatedTable, err = mergeProllySecondaryIndexes(ctx, vrw, postMergeSchema, rootSchema, mergeSchema, ancSchema, mergedData, tbl, mergeTbl, ancTbl, updatedTable) - if err != nil { - return nil, nil, err - } - - // TODO (dhruv): populate merge stats - return updatedTable, &MergeStats{Operation: TableModified}, nil -} - -// mergeProllyRowData merges the primary row table indexes of |tbl|, |mergeTbl|, -// and |ancTbl|. It stores the merged row data into |tableToUpdate| and returns the new value along with the row data. -func mergeProllyRowData(ctx context.Context, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, durable.Index, error) { - rootR, err := tbl.GetRowData(ctx) - if err != nil { - return nil, nil, err - } - mergeR, err := mergeTbl.GetRowData(ctx) - if err != nil { - return nil, nil, err - } - ancR, err := ancTbl.GetRowData(ctx) - if err != nil { - return nil, nil, err - } - rootRP := durable.ProllyMapFromIndex(rootR) - mergeRP := durable.ProllyMapFromIndex(mergeR) - ancRP := durable.ProllyMapFromIndex(ancR) - - //vMerger := newValueMerger(postMergeSchema, rootSch, mergeSch, ancSch) - - conflicted := false - mergedRP, err := prolly.MergeMaps(ctx, rootRP, mergeRP, ancRP, func(left, right tree.Diff) (tree.Diff, bool) { - conflicted = true - return tree.Diff{}, false - //merged, ok := vMerger.tryMerge(val.Tuple(left.To), val.Tuple(right.To), val.Tuple(left.From)) - //if !ok { - // conflicted = true - // return tree.Diff{}, false - //} - // - //return tree.Diff{ - // Type: tree.ModifiedDiff, - // Key: left.Key, - // From: left.From, - // To: tree.NodeItem(merged), - //}, true - }) - if err != nil { - return nil, nil, err - } - if conflicted { - return nil, nil, errors.New("row conflicts not supported yet") - } - - updatedTbl, err := tableToUpdate.UpdateRows(ctx, durable.IndexFromProllyMap(mergedRP)) - if err != nil { - return nil, nil, err - } - - return updatedTbl, durable.IndexFromProllyMap(mergedRP), nil -} - -var syncPool = pool.NewBuffPool() - -//type valueMerger struct { -// numCols int -// vD, lVD, rVD, bVD val.TupleDesc -// leftMapping, rightMapping, baseMapping map[int]int -//} -// -//func newValueMerger(merged, leftSch, rightSch, baseSch schema.Schema) *valueMerger { -// n := merged.GetNonPKCols().Size() -// leftMapping := make(map[int]int, n) -// rightMapping := make(map[int]int, n) -// baseMapping := make(map[int]int, n) -// -// for i, tag := range merged.GetNonPKCols().Tags { -// if j, ok := leftSch.GetNonPKCols().TagToIdx[tag]; ok { -// leftMapping[i] = j -// } -// if j, ok := rightSch.GetNonPKCols().TagToIdx[tag]; ok { -// rightMapping[i] = j -// } -// if j, ok := baseSch.GetNonPKCols().TagToIdx[tag]; ok { -// baseMapping[i] = j -// } -// } -// -// return &valueMerger{ -// n, -// prolly.ValueDescriptorFromSchema(merged), -// prolly.ValueDescriptorFromSchema(leftSch), -// prolly.ValueDescriptorFromSchema(rightSch), -// prolly.ValueDescriptorFromSchema(baseSch), -// leftMapping, -// rightMapping, -// baseMapping, -// } -//} -// -//// tryMerge performs a cell-wise merge given left, right, and base cell value -//// tuples. It returns the merged cell value taple and an ok bool. -//func (m *valueMerger) tryMerge(left, right, base val.Tuple) (merged val.Tuple, ok bool) { -// processColumnFunc := func(i int) (resultVal []byte, isConflict bool) { -// var leftVal []byte -// if l, ok := m.leftMapping[i]; ok { -// leftVal = m.lVD.GetField(l, left) -// } -// var rightVal []byte -// if r, ok := m.rightMapping[i]; ok { -// rightVal = m.rVD.GetField(r, right) -// } -// var baseVal []byte -// if b, ok := m.baseMapping[i]; ok { -// baseVal = m.bVD.GetField(b, base) -// } -// -// // bytes.Equal?? What about nil vs empty slices?? -// leftModified := m.vD.Comparator().CompareValues(leftVal, baseVal, m.vD.Types[i]) != 0 -// rightModified := m.vD.Comparator().CompareValues(rightVal, baseVal, m.vD.Types[i]) != 0 -// -// switch { -// case leftModified && rightModified: -// return nil, true -// case leftModified: -// return leftVal, false -// default: -// return rightVal, false -// } -// } -// -// mergedValues := make([][]byte, m.numCols) -// for i := 0; i < m.numCols; i++ { -// v, isConflict := processColumnFunc(i) -// if isConflict { -// return nil, false -// } -// mergedValues[i] = v -// } -// -// return val.NewTuple(syncPool, mergedValues...), true -//} - -// mergeProllySecondaryIndexes merges the secondary indexes of the given |tbl|, -// |mergeTbl|, and |ancTbl|. It stores the merged indexes into |tableToUpdate| -// and returns its updated value. -func mergeProllySecondaryIndexes(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, error) { - rootSet, err := tbl.GetIndexSet(ctx) - if err != nil { - return nil, err - } - mergeSet, err := mergeTbl.GetIndexSet(ctx) - if err != nil { - return nil, err - } - ancSet, err := ancTbl.GetIndexSet(ctx) - if err != nil { - return nil, err - } - mergedSet, err := mergeProllyIndexSets(ctx, vrw, postMergeSchema, rootSch, mergeSch, ancSch, mergedData, rootSet, mergeSet, ancSet) - if err != nil { - return nil, err - } - updatedTbl, err := tableToUpdate.SetIndexSet(ctx, mergedSet) - if err != nil { - return nil, err - } - return updatedTbl, nil -} - -// mergeProllyIndexSets merges the |root|, |merge|, and |anc| index sets based -// on the provided |postMergeSchema|. It returns the merged index set. -func mergeProllyIndexSets(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, root, merge, anc durable.IndexSet) (durable.IndexSet, error) { - mergedIndexSet := durable.NewIndexSet(ctx, vrw) - - tryGetIdx := func(sch schema.Schema, iS durable.IndexSet, indexName string) (idx durable.Index, ok bool, err error) { - ok = sch.Indexes().Contains(indexName) - if ok { - idx, err = iS.GetIndex(ctx, sch, indexName) - if err != nil { - return nil, false, err - } - return idx, true, nil - } - return nil, false, nil - } - - // Based on the indexes in the post merge schema, merge the root, merge, - // and ancestor indexes. - for _, index := range postMergeSchema.Indexes().AllIndexes() { - - rootI, rootOK, err := tryGetIdx(rootSch, root, index.Name()) - if err != nil { - return nil, err - } - mergeI, mergeOK, err := tryGetIdx(mergeSch, merge, index.Name()) - if err != nil { - return nil, err - } - ancI, ancOK, err := tryGetIdx(ancSch, anc, index.Name()) - if err != nil { - return nil, err - } - - mergedIndex, err := func() (durable.Index, error) { - if !rootOK || !mergeOK || !ancOK { - mergedIndex, err := creation.BuildSecondaryProllyIndex(ctx, vrw, postMergeSchema, index, durable.ProllyMapFromIndex(mergedData)) - if err != nil { - return nil, err - } - return mergedIndex, nil - } - - left := durable.ProllyMapFromIndex(rootI) - right := durable.ProllyMapFromIndex(mergeI) - base := durable.ProllyMapFromIndex(ancI) - - var collision = false - merged, err := prolly.MergeMaps(ctx, left, right, base, func(left, right tree.Diff) (tree.Diff, bool) { - collision = true - return tree.Diff{}, true - }) - if err != nil { - return nil, err - } - if collision { - return nil, errors.New("collisions not implemented") - } - return durable.IndexFromProllyMap(merged), nil - }() - if err != nil { - return nil, err - } - - mergedIndexSet, err = mergedIndexSet.PutIndex(ctx, index.Name(), mergedIndex) - if err != nil { - return nil, err - } - } - - return mergedIndexSet, nil -} - func getTableInfoFromRoot(ctx context.Context, tblName string, root *doltdb.RootValue) ( ok bool, table *doltdb.Table, @@ -1009,6 +755,7 @@ func nomsPkRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Sc return rowMergeResult{}, err } + var didMerge bool processTagFunc := func(tag uint64) (resultVal types.Value, isConflict bool) { baseVal, _ := baseVals.Get(tag) val, _ := rowVals.Get(tag) @@ -1023,8 +770,10 @@ func nomsPkRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Sc case modified && mergeModified: return nil, true case modified: + didMerge = true return val, false default: + didMerge = true return mergeVal, false } } @@ -1057,7 +806,7 @@ func nomsPkRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Sc return rowMergeResult{}, err } - return rowMergeResult{v, true, false}, nil + return rowMergeResult{v, didMerge, false}, nil } func keylessRowMerge(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Schema, val, mergeVal, ancVal types.Value) (rowMergeResult, error) { diff --git a/go/libraries/doltcore/merge/merge_prolly.go b/go/libraries/doltcore/merge/merge_prolly.go new file mode 100644 index 0000000000..64a9ab11ab --- /dev/null +++ b/go/libraries/doltcore/merge/merge_prolly.go @@ -0,0 +1,475 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "errors" + + "golang.org/x/sync/errgroup" + + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" + "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/table/editor/creation" + "github.com/dolthub/dolt/go/store/pool" + "github.com/dolthub/dolt/go/store/prolly" + "github.com/dolthub/dolt/go/store/prolly/tree" + "github.com/dolthub/dolt/go/store/types" + "github.com/dolthub/dolt/go/store/val" +) + +type cellWiseMerge struct { + leftDiff tree.Diff + rightDiff tree.Diff + merged tree.Diff +} + +// mergeTableData three-way merges rows and indexes for a given table. First, +// the primary row data is merged, then secondary indexes are merged. In the +// process of merging the primary row data, we may need to perform cell-wise +// merges. Since a cell-wise merge result neither contains the values from the +// root branch or the merge branch we also need to update the secondary indexes +// prior to merging them. +// +// Each cell-wise merge reverts the corresponding index entries in the root +// branch, and modifies index entries in the merge branch. The merge branch's +// entries are set to values consistent the cell-wise merge result. When the +// root and merge secondary indexes are merged, they will produce entries +// consistent with the primary row data. +func mergeTableData(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSchema, mergeSchema, ancSchema schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, *MergeStats, error) { + group, gCtx := errgroup.WithContext(ctx) + + cellWiseMerges := make(chan cellWiseMerge, 128) + var updatedTable *doltdb.Table + var mergedData durable.Index + + group.Go(func() error { + var err error + // TODO (dhruv): update this function definition to return any conflicts + updatedTable, mergedData, err = mergeProllyRowData(gCtx, postMergeSchema, rootSchema, mergeSchema, ancSchema, tbl, mergeTbl, ancTbl, tableToUpdate, cellWiseMerges) + if err != nil { + return err + } + defer close(cellWiseMerges) + return nil + }) + + rootIndexSet, err := tbl.GetIndexSet(ctx) + if err != nil { + return nil, nil, err + } + mergeIndexSet, err := mergeTbl.GetIndexSet(ctx) + if err != nil { + return nil, nil, err + } + + var updatedRootIndexSet durable.IndexSet + var updatedMergeIndexSet durable.IndexSet + group.Go(func() error { + var err error + updatedRootIndexSet, updatedMergeIndexSet, err = updateProllySecondaryIndexes(gCtx, cellWiseMerges, rootSchema, mergeSchema, tbl, mergeTbl, rootIndexSet, mergeIndexSet) + return err + }) + + err = group.Wait() + if err != nil { + return nil, nil, err + } + + tbl, err = tbl.SetIndexSet(ctx, updatedRootIndexSet) + if err != nil { + return nil, nil, err + } + mergeTbl, err = mergeTbl.SetIndexSet(ctx, updatedMergeIndexSet) + if err != nil { + return nil, nil, err + } + + updatedTable, err = mergeProllySecondaryIndexes(ctx, vrw, postMergeSchema, rootSchema, mergeSchema, ancSchema, mergedData, tbl, mergeTbl, ancTbl, updatedTable) + if err != nil { + return nil, nil, err + } + + // TODO (dhruv): populate merge stats + return updatedTable, &MergeStats{Operation: TableModified}, nil +} + +// mergeProllyRowData merges the primary row table indexes of |tbl|, |mergeTbl|, +// and |ancTbl|. It stores the merged row data into |tableToUpdate| and returns the new value along with the row data. +func mergeProllyRowData(ctx context.Context, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table, cellWiseMerges chan cellWiseMerge) (*doltdb.Table, durable.Index, error) { + rootR, err := tbl.GetRowData(ctx) + if err != nil { + return nil, nil, err + } + mergeR, err := mergeTbl.GetRowData(ctx) + if err != nil { + return nil, nil, err + } + ancR, err := ancTbl.GetRowData(ctx) + if err != nil { + return nil, nil, err + } + rootRP := durable.ProllyMapFromIndex(rootR) + mergeRP := durable.ProllyMapFromIndex(mergeR) + ancRP := durable.ProllyMapFromIndex(ancR) + + m := durable.ProllyMapFromIndex(rootR) + vMerger := newValueMerger(postMergeSchema, rootSch, mergeSch, ancSch, m.Pool()) + + conflicted := false + mergedRP, err := prolly.MergeMaps(ctx, rootRP, mergeRP, ancRP, func(left, right tree.Diff) (tree.Diff, bool) { + merged, isConflict := vMerger.tryMerge(val.Tuple(left.To), val.Tuple(right.To), val.Tuple(left.From)) + if isConflict { + conflicted = true + return tree.Diff{}, false + } + + d := tree.Diff{ + Type: tree.ModifiedDiff, + Key: left.Key, + From: left.From, + To: tree.Item(merged), + } + + select { + case cellWiseMerges <- cellWiseMerge{left, right, d}: + break + case <-ctx.Done(): + return tree.Diff{}, false + } + + return d, true + }) + if err != nil { + return nil, nil, err + } + if conflicted { + return nil, nil, errors.New("row conflicts not supported yet") + } + + updatedTbl, err := tableToUpdate.UpdateRows(ctx, durable.IndexFromProllyMap(mergedRP)) + if err != nil { + return nil, nil, err + } + + return updatedTbl, durable.IndexFromProllyMap(mergedRP), nil +} + +type valueMerger struct { + numCols int + vD val.TupleDesc + leftMapping, rightMapping, baseMapping val.OrdinalMapping + syncPool pool.BuffPool +} + +func newValueMerger(merged, leftSch, rightSch, baseSch schema.Schema, syncPool pool.BuffPool) *valueMerger { + n := merged.GetNonPKCols().Size() + leftMapping := make(val.OrdinalMapping, n) + rightMapping := make(val.OrdinalMapping, n) + baseMapping := make(val.OrdinalMapping, n) + + for i, tag := range merged.GetNonPKCols().Tags { + if j, ok := leftSch.GetNonPKCols().TagToIdx[tag]; ok { + leftMapping[i] = j + } else { + leftMapping[i] = -1 + } + if j, ok := rightSch.GetNonPKCols().TagToIdx[tag]; ok { + rightMapping[i] = j + } else { + rightMapping[i] = -1 + } + if j, ok := baseSch.GetNonPKCols().TagToIdx[tag]; ok { + baseMapping[i] = j + } else { + baseMapping[i] = -1 + } + } + + return &valueMerger{ + numCols: n, + vD: prolly.ValueDescriptorFromSchema(merged), + leftMapping: leftMapping, + rightMapping: rightMapping, + baseMapping: baseMapping, + syncPool: syncPool, + } +} + +// tryMerge performs a cell-wise merge given left, right, and base cell value +// tuples. It returns the merged cell value tuple and a bool indicating if a +// conflict occurred. tryMerge should only be called if left and right produce +// non-identical diffs against base. +func (m *valueMerger) tryMerge(left, right, base val.Tuple) (val.Tuple, bool) { + + if base != nil && (left == nil) != (right == nil) { + // One row deleted, the other modified + return nil, true + } + + // Because we have non-identical diffs, left and right are guaranteed to be + // non-nil at this point. + if left == nil || right == nil { + panic("found nil left / right which should never occur") + } + + mergedValues := make([][]byte, m.numCols) + for i := 0; i < m.numCols; i++ { + v, isConflict := m.processColumn(i, left, right, base) + if isConflict { + return nil, true + } + mergedValues[i] = v + } + + return val.NewTuple(m.syncPool, mergedValues...), false +} + +// processColumn returns the merged value of column |i| of the merged schema, +// based on the |left|, |right|, and |base| schema. +func (m *valueMerger) processColumn(i int, left, right, base val.Tuple) ([]byte, bool) { + // missing columns are coerced into NULL column values + var leftCol []byte + if l := m.leftMapping[i]; l != -1 { + leftCol = left.GetField(l) + } + var rightCol []byte + if r := m.rightMapping[i]; r != -1 { + rightCol = right.GetField(r) + } + + if m.vD.Comparator().CompareValues(leftCol, rightCol, m.vD.Types[i]) == 0 { + return leftCol, false + } + + if base == nil { + // Conflicting insert + return nil, true + } + + var baseVal []byte + if b := m.baseMapping[i]; b != -1 { + baseVal = base.GetField(b) + } + + leftModified := m.vD.Comparator().CompareValues(leftCol, baseVal, m.vD.Types[i]) != 0 + rightModified := m.vD.Comparator().CompareValues(rightCol, baseVal, m.vD.Types[i]) != 0 + + switch { + case leftModified && rightModified: + return nil, true + case leftModified: + return leftCol, false + default: + return rightCol, false + } +} + +// Given cellWiseMerge's sent on |cellWiseChan|, update the secondary indexes in +// |rootIndexSet| and |mergeIndexSet| such that when the index sets are merged, +// they produce entries consistent with the cell-wise merges. The updated +// |rootIndexSet| and |mergeIndexSet| are returned. +func updateProllySecondaryIndexes( + ctx context.Context, + cellWiseChan chan cellWiseMerge, + rootSchema, mergeSchema schema.Schema, + tbl, mergeTbl *doltdb.Table, + rootIndexSet, mergeIndexSet durable.IndexSet) (durable.IndexSet, durable.IndexSet, error) { + + rootIdxs, err := getMutableSecondaryIdxs(ctx, rootSchema, tbl) + if err != nil { + return nil, nil, err + } + mergeIdxs, err := getMutableSecondaryIdxs(ctx, mergeSchema, mergeTbl) + if err != nil { + return nil, nil, err + } + +OUTER: + for { + select { + case m, ok := <-cellWiseChan: + if !ok { + break OUTER + } + for _, idx := range rootIdxs { + // Revert corresponding idx entry in left + err = idx.UpdateEntry(ctx, val.Tuple(m.leftDiff.Key), val.Tuple(m.leftDiff.To), val.Tuple(m.leftDiff.From)) + if err != nil { + return nil, nil, err + } + } + for _, idx := range mergeIdxs { + // Update corresponding idx entry to merged value in right + err = idx.UpdateEntry(ctx, val.Tuple(m.rightDiff.Key), val.Tuple(m.rightDiff.To), val.Tuple(m.merged.To)) + if err != nil { + return nil, nil, err + } + } + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } + + persistIndexMuts := func(indexSet durable.IndexSet, idxs []MutableSecondaryIdx) (durable.IndexSet, error) { + for _, idx := range idxs { + m, err := idx.Map(ctx) + if err != nil { + return nil, err + } + indexSet, err = indexSet.PutIndex(ctx, idx.Name, durable.IndexFromProllyMap(m)) + if err != nil { + return nil, err + } + } + + return indexSet, nil + } + + updatedRootIndexSet, err := persistIndexMuts(rootIndexSet, rootIdxs) + if err != nil { + return nil, nil, err + } + + updatedMergeIndexSet, err := persistIndexMuts(mergeIndexSet, mergeIdxs) + if err != nil { + return nil, nil, err + } + + return updatedRootIndexSet, updatedMergeIndexSet, nil +} + +// getMutableSecondaryIdxs returns a MutableSecondaryIdx for each secondary index +// defined in |schema| and |tbl|. +func getMutableSecondaryIdxs(ctx context.Context, schema schema.Schema, tbl *doltdb.Table) ([]MutableSecondaryIdx, error) { + indexSet, err := tbl.GetIndexSet(ctx) + if err != nil { + return nil, err + } + + mods := make([]MutableSecondaryIdx, schema.Indexes().Count()) + for i, index := range schema.Indexes().AllIndexes() { + idx, err := indexSet.GetIndex(ctx, schema, index.Name()) + if err != nil { + return nil, err + } + m := durable.ProllyMapFromIndex(idx) + + mods[i] = NewMutableSecondaryIdx(m, schema, index, m.Pool()) + } + + return mods, nil +} + +// mergeProllySecondaryIndexes merges the secondary indexes of the given |tbl|, +// |mergeTbl|, and |ancTbl|. It stores the merged indexes into |tableToUpdate| +// and returns its updated value. +func mergeProllySecondaryIndexes(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, tbl, mergeTbl, ancTbl, tableToUpdate *doltdb.Table) (*doltdb.Table, error) { + rootSet, err := tbl.GetIndexSet(ctx) + if err != nil { + return nil, err + } + mergeSet, err := mergeTbl.GetIndexSet(ctx) + if err != nil { + return nil, err + } + ancSet, err := ancTbl.GetIndexSet(ctx) + if err != nil { + return nil, err + } + mergedSet, err := mergeProllyIndexSets(ctx, vrw, postMergeSchema, rootSch, mergeSch, ancSch, mergedData, rootSet, mergeSet, ancSet) + if err != nil { + return nil, err + } + updatedTbl, err := tableToUpdate.SetIndexSet(ctx, mergedSet) + if err != nil { + return nil, err + } + return updatedTbl, nil +} + +// mergeProllyIndexSets merges the |root|, |merge|, and |anc| index sets based +// on the provided |postMergeSchema|. It returns the merged index set. +func mergeProllyIndexSets(ctx context.Context, vrw types.ValueReadWriter, postMergeSchema, rootSch, mergeSch, ancSch schema.Schema, mergedData durable.Index, root, merge, anc durable.IndexSet) (durable.IndexSet, error) { + mergedIndexSet := durable.NewIndexSet(ctx, vrw) + + tryGetIdx := func(sch schema.Schema, iS durable.IndexSet, indexName string) (idx durable.Index, ok bool, err error) { + ok = sch.Indexes().Contains(indexName) + if ok { + idx, err = iS.GetIndex(ctx, sch, indexName) + if err != nil { + return nil, false, err + } + return idx, true, nil + } + return nil, false, nil + } + + // Based on the indexes in the post merge schema, merge the root, merge, + // and ancestor indexes. + for _, index := range postMergeSchema.Indexes().AllIndexes() { + + rootI, rootOK, err := tryGetIdx(rootSch, root, index.Name()) + if err != nil { + return nil, err + } + mergeI, mergeOK, err := tryGetIdx(mergeSch, merge, index.Name()) + if err != nil { + return nil, err + } + ancI, ancOK, err := tryGetIdx(ancSch, anc, index.Name()) + if err != nil { + return nil, err + } + + mergedIndex, err := func() (durable.Index, error) { + if !rootOK || !mergeOK || !ancOK { + mergedIndex, err := creation.BuildSecondaryProllyIndex(ctx, vrw, postMergeSchema, index, durable.ProllyMapFromIndex(mergedData)) + if err != nil { + return nil, err + } + return mergedIndex, nil + } + + left := durable.ProllyMapFromIndex(rootI) + right := durable.ProllyMapFromIndex(mergeI) + base := durable.ProllyMapFromIndex(ancI) + + var collision = false + merged, err := prolly.MergeMaps(ctx, left, right, base, func(left, right tree.Diff) (tree.Diff, bool) { + collision = true + return tree.Diff{}, true + }) + if err != nil { + return nil, err + } + if collision { + return nil, errors.New("collisions not implemented") + } + return durable.IndexFromProllyMap(merged), nil + }() + if err != nil { + return nil, err + } + + mergedIndexSet, err = mergedIndexSet.PutIndex(ctx, index.Name(), mergedIndex) + if err != nil { + return nil, err + } + } + + return mergedIndexSet, nil +} diff --git a/go/libraries/doltcore/merge/merge_test.go b/go/libraries/doltcore/merge/merge_test.go index 8edf5f977f..08cdedb7b9 100644 --- a/go/libraries/doltcore/merge/merge_test.go +++ b/go/libraries/doltcore/merge/merge_test.go @@ -16,6 +16,7 @@ package merge import ( "context" + "sort" "testing" "github.com/stretchr/testify/assert" @@ -32,6 +33,7 @@ import ( filesys2 "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/libraries/utils/valutil" "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/pool" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" @@ -69,6 +71,7 @@ type rowV struct { var vD = prolly.ValueDescriptorFromSchema(sch) var vB = val.NewTupleBuilder(vD) +var syncPool = pool.NewBuffPool() func (v rowV) value() val.Tuple { vB.PutInt64(0, int64(v.col1)) @@ -110,6 +113,10 @@ type testRow struct { // non-conflicting updates. => +2 // // For (insert, insert) there are identical inserts and conflicting inserts => +1 +// +// A modification of a primary key is the combination of the two base cases: +// First, a (delete, delete), then an (insert, insert). We omit tests for these +// and instead defer to the base cases. var testRows = []testRow{ // Ancestor exists @@ -186,6 +193,17 @@ var testRows = []testRow{ false, &rowV{-6, -6}, }, + // Non-conflicting update 2 + { + 62, + &rowV{62, 62}, + UpdateAction, + UpdateAction, + &rowV{-62, 62}, + &rowV{62, -62}, + false, + &rowV{-62, -62}, + }, { 7, &rowV{7, 7}, @@ -358,10 +376,10 @@ func TestNomsMergeCommits(t *testing.T) { assert.NoError(t, err) if !mergedRows.Equals(expectedRows) { - t.Error(mustString(types.EncodedValue(context.Background(), mergedRows)), "\n!=\n", mustString(types.EncodedValue(context.Background(), expectedRows))) + t.Error(mustString(types.EncodedValue(context.Background(), expectedRows)), "\n!=\n", mustString(types.EncodedValue(context.Background(), mergedRows))) } if !conflicts.Equals(expectedConflicts) { - t.Error(mustString(types.EncodedValue(context.Background(), conflicts)), "\n!=\n", mustString(types.EncodedValue(context.Background(), expectedConflicts))) + t.Error(mustString(types.EncodedValue(context.Background(), expectedConflicts)), "\n!=\n", mustString(types.EncodedValue(context.Background(), conflicts))) } for _, index := range sch.Indexes().AllIndexes() { @@ -383,9 +401,16 @@ func TestNomsMergeCommits(t *testing.T) { assert.Equal(t, eh.String(), h.String(), "table hashes do not equal") } +func sortTests(t []testRow) { + sort.Slice(t, func(i, j int) bool { + return t[i].key < t[j].key + }) +} + func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *doltdb.RootValue, *doltdb.RootValue, durable.Index) { ddb := mustMakeEmptyRepo(t) vrw := ddb.ValueReadWriter() + sortTests(testRows) var initialKVs []val.Tuple var expectedKVs []val.Tuple @@ -396,11 +421,6 @@ func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *do continue } - // Skip cell-wise merges - if testCase.leftAction == UpdateAction && testCase.rightAction == UpdateAction { - continue - } - if testCase.initialValue != nil { initialKVs = append(initialKVs, key(testCase.key), testCase.initialValue.value()) } @@ -422,11 +442,6 @@ func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *do continue } - // Skip cell-wise merges - if testCase.leftAction == UpdateAction && testCase.rightAction == UpdateAction { - continue - } - switch testCase.leftAction { case NoopAction: break @@ -478,6 +493,7 @@ func setupMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *do func setupNomsMergeTest(t *testing.T) (types.ValueReadWriter, *doltdb.RootValue, *doltdb.RootValue, *doltdb.RootValue, types.Map, types.Map, *MergeStats) { ddb := mustMakeEmptyRepo(t) vrw := ddb.ValueReadWriter() + sortTests(testRows) var initalKVs []types.Value var expectedKVs []types.Value diff --git a/go/libraries/doltcore/merge/mutable_secondary_index.go b/go/libraries/doltcore/merge/mutable_secondary_index.go new file mode 100644 index 0000000000..818650465c --- /dev/null +++ b/go/libraries/doltcore/merge/mutable_secondary_index.go @@ -0,0 +1,88 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + + "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/table/editor/creation" + "github.com/dolthub/dolt/go/store/pool" + "github.com/dolthub/dolt/go/store/prolly" + "github.com/dolthub/dolt/go/store/val" +) + +// MutableSecondaryIdx wraps a prolly.MutableMap of a secondary table index. It +// provides an UpdateEntry function which can be used to update an entry in the +// secondary index given a modification to a row. +type MutableSecondaryIdx struct { + Name string + mut prolly.MutableMap + keyMap val.OrdinalMapping + pkLen int + keyBld *val.TupleBuilder + syncPool pool.BuffPool +} + +// NewMutableSecondaryIdx returns a MutableSecondaryIdx. |m| is the secondary idx data. +func NewMutableSecondaryIdx(m prolly.Map, sch schema.Schema, index schema.Index, syncPool pool.BuffPool) MutableSecondaryIdx { + kD, _ := m.Descriptors() + return MutableSecondaryIdx{ + index.Name(), + m.Mutate(), + creation.GetIndexKeyMapping(sch, index), + sch.GetPKCols().Size(), + val.NewTupleBuilder(kD), + syncPool, + } +} + +// UpdateEntry modifies the corresponding secondary index entry given the key +// and curr/new values of the primary row. +func (m MutableSecondaryIdx) UpdateEntry(ctx context.Context, key, currValue, newValue val.Tuple) error { + currKey := m.mapKeyValue(key, currValue) + newKey := m.mapKeyValue(key, newValue) + + err := m.mut.Delete(ctx, currKey) + if err != nil { + return nil + } + err = m.mut.Put(ctx, newKey, val.EmptyTuple) + if err != nil { + return nil + } + + return nil +} + +// Map returns the finalized prolly.Map of the underlying prolly.MutableMap. +func (m MutableSecondaryIdx) Map(ctx context.Context) (prolly.Map, error) { + return m.mut.Map(ctx) +} + +// mapKeyValue returns the secondary index entry key given the key and value of +// the corresponding primary row. +func (m MutableSecondaryIdx) mapKeyValue(k, v val.Tuple) val.Tuple { + for to := range m.keyMap { + from := m.keyMap.MapOrdinal(to) + if from < m.pkLen { + m.keyBld.PutRaw(to, k.GetField(from)) + } else { + from -= m.pkLen + m.keyBld.PutRaw(to, v.GetField(from)) + } + } + return m.keyBld.Build(m.syncPool) +} diff --git a/go/libraries/doltcore/merge/row_merge_test.go b/go/libraries/doltcore/merge/row_merge_test.go index da344dad67..115110d0b2 100644 --- a/go/libraries/doltcore/merge/row_merge_test.go +++ b/go/libraries/doltcore/merge/row_merge_test.go @@ -19,14 +19,15 @@ import ( "strconv" "testing" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/types" + "github.com/dolthub/dolt/go/store/val" ) -type RowMergeTest struct { +type nomsRowMergeTest struct { name string row, mergeRow, ancRow types.Value sch schema.Schema @@ -35,98 +36,208 @@ type RowMergeTest struct { expectConflict bool } +type rowMergeTest struct { + name string + row, mergeRow, ancRow val.Tuple + mergedSch, leftSch, rightSch, baseSch schema.Schema + expectedResult val.Tuple + expectCellMerge bool + expectConflict bool +} + +type testCase struct { + name string + row, mergeRow, ancRow []*int + rowCnt, mRowCnt, aRowCnt int + expectedResult []*int + expectCellMerge bool + expectConflict bool +} + +// 0 is nil, negative value is invalid +func build(ints ...int) []*int { + out := make([]*int, len(ints)) + for i, v := range ints { + if v < 0 { + panic("invalid") + } + if v == 0 { + continue + } + t := v + out[i] = &t + } + return out +} + +var convergentEditCases = []testCase{ + { + "add same row", + build(1, 2), + build(1, 2), + nil, + 2, 2, 2, + build(1, 2), + false, + false, + }, + { + "both delete row", + nil, + nil, + build(1, 2), + 2, 2, 2, + nil, + false, + false, + }, + { + "modify row to equal value", + build(2, 2), + build(2, 2), + build(1, 1), + 2, 2, 2, + build(2, 2), + false, + false, + }, +} + +var testCases = []testCase{ + { + "insert different rows", + build(1, 2), + build(2, 3), + nil, + 2, 2, 2, + nil, + false, + true, + }, + { + "delete a row in one, and modify it in other", + nil, + build(1, 3), + build(1, 2), + 2, 2, 2, + nil, + false, + true, + }, + { + "modify rows without overlap", + build(2, 1), + build(1, 2), + build(1, 1), + 2, 2, 2, + build(2, 2), + true, + false, + }, + { + "modify rows with equal overlapping changes", + build(2, 2, 255), + build(2, 3, 255), + build(1, 2, 0), + 3, 3, 3, + build(2, 3, 255), + true, + false, + }, + { + "modify rows with differing overlapping changes", + build(2, 2, 128), + build(1, 3, 255), + build(1, 2, 0), + 3, 3, 3, + nil, + false, + true, + }, + { + "modify rows where one adds a column", + build(2, 2), + build(1, 3, 255), + build(1, 2), + 2, 3, 2, + build(2, 3, 255), + true, + false, + }, + { + "modify rows where one drops a column", + build(1, 2, 1), + build(2, 1), + build(1, 1, 1), + 3, 2, 3, + build(2, 2), + true, + false, + }, + { + "dropping a column should be equivalent to setting a column to null", + build(1, 2, 0), + build(2, 1), + build(1, 1, 1), + 3, 2, 3, + build(2, 2), + true, + false, + }, + // TODO (dhruv): Fix this bug in the old storage format + //{ + // "add rows but one holds a new column", + // build(1, 1), + // build(1, 1, 1), + // nil, + // 2, 3, 2, + // nil, + // false, + // true, + //}, + { + "Delete a row in one, set all null in the other", + build(0, 0, 0), // build translates zeros into NULL values + nil, + build(1, 1, 1), + 3, 3, 3, + nil, + false, + true, + }, +} + func TestRowMerge(t *testing.T) { - tests := []RowMergeTest{ - createRowMergeStruct( - "add same row", - []types.Value{types.String("one"), types.Int(2)}, - []types.Value{types.String("one"), types.Int(2)}, - nil, - []types.Value{types.String("one"), types.Int(2)}, - false, - false, - ), - createRowMergeStruct( - "add diff row", - []types.Value{types.String("one"), types.String("two")}, - []types.Value{types.String("one"), types.String("three")}, - nil, - nil, - false, - true, - ), - createRowMergeStruct( - "both delete row", - nil, - nil, - []types.Value{types.String("one"), types.Uint(2)}, - nil, - false, - false, - ), - createRowMergeStruct( - "one delete one modify", - nil, - []types.Value{types.String("two"), types.Uint(2)}, - []types.Value{types.String("one"), types.Uint(2)}, - nil, - false, - true, - ), - createRowMergeStruct( - "modify rows without overlap", - []types.Value{types.String("two"), types.Uint(2)}, - []types.Value{types.String("one"), types.Uint(3)}, - []types.Value{types.String("one"), types.Uint(2)}, - []types.Value{types.String("two"), types.Uint(3)}, - true, - false, - ), - createRowMergeStruct( - "modify rows with equal overlapping changes", - []types.Value{types.String("two"), types.Uint(2), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))}, - []types.Value{types.String("one"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))}, - []types.Value{types.String("one"), types.Uint(2), types.UUID(uuid.MustParse("00000000-0000-0000-0000-000000000000"))}, - []types.Value{types.String("two"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))}, - true, - false, - ), - createRowMergeStruct( - "modify rows with differing overlapping changes", - []types.Value{types.String("two"), types.Uint(2), types.UUID(uuid.MustParse("99999999-9999-9999-9999-999999999999"))}, - []types.Value{types.String("one"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))}, - []types.Value{types.String("one"), types.Uint(2), types.UUID(uuid.MustParse("00000000-0000-0000-0000-000000000000"))}, - nil, - false, - true, - ), - createRowMergeStruct( - "modify rows where one adds a column", - []types.Value{types.String("two"), types.Uint(2)}, - []types.Value{types.String("one"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))}, - []types.Value{types.String("one"), types.Uint(2)}, - []types.Value{types.String("two"), types.Uint(3), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))}, - true, - false, - ), - createRowMergeStruct( - "modify row where values added in different columns", - []types.Value{types.String("one"), types.Uint(2), types.String(""), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff"))}, - []types.Value{types.String("one"), types.Uint(2), types.UUID(uuid.MustParse("ffffffff-ffff-ffff-ffff-ffffffffffff")), types.String("")}, - []types.Value{types.String("one"), types.Uint(2), types.NullValue, types.NullValue}, - nil, - false, - true, - ), - createRowMergeStruct( - "modify row where initial value wasn't given", - []types.Value{mustTuple(types.NewTuple(types.Format_Default, types.String("one"), types.Uint(2), types.String("a")))}, - []types.Value{mustTuple(types.NewTuple(types.Format_Default, types.String("one"), types.Uint(2), types.String("b")))}, - []types.Value{mustTuple(types.NewTuple(types.Format_Default, types.String("one"), types.Uint(2), types.NullValue))}, - nil, - false, - true, - ), + if types.Format_Default != types.Format_DOLT_1 { + t.Skip() + } + + tests := make([]rowMergeTest, len(testCases)) + for i, t := range testCases { + tests[i] = createRowMergeStruct(t) + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + v := newValueMerger(test.mergedSch, test.leftSch, test.rightSch, test.baseSch, syncPool) + + merged, isConflict := v.tryMerge(test.row, test.mergeRow, test.ancRow) + assert.Equal(t, test.expectConflict, isConflict) + vD := prolly.ValueDescriptorFromSchema(test.mergedSch) + assert.Equal(t, vD.Format(test.expectedResult), vD.Format(merged)) + }) + } +} + +func TestNomsRowMerge(t *testing.T) { + if types.Format_Default == types.Format_DOLT_1 { + t.Skip() + } + + testCases := append(testCases, convergentEditCases...) + tests := make([]nomsRowMergeTest, len(testCases)) + for i, t := range testCases { + tests[i] = createNomsRowMergeStruct(t) } for _, test := range tests { @@ -171,31 +282,89 @@ func valsToTestTuple(vals []types.Value, includePrimaryKeys bool) types.Value { return mustTuple(types.NewTuple(types.Format_Default, tplVals...)) } -func createRowMergeStruct(name string, vals, mergeVals, ancVals, expected []types.Value, expectCellMrg bool, expectCnf bool) RowMergeTest { - longest := vals +func createRowMergeStruct(t testCase) rowMergeTest { + mergedSch := calcMergedSchema(t) + leftSch := calcSchema(t.rowCnt) + rightSch := calcSchema(t.mRowCnt) + baseSch := calcSchema(t.aRowCnt) - if len(mergeVals) > len(longest) { - longest = mergeVals + tpl := buildTup(leftSch, t.row) + mergeTpl := buildTup(rightSch, t.mergeRow) + ancTpl := buildTup(baseSch, t.ancRow) + expectedTpl := buildTup(mergedSch, t.expectedResult) + return rowMergeTest{ + t.name, + tpl, mergeTpl, ancTpl, + mergedSch, leftSch, rightSch, baseSch, + expectedTpl, + t.expectCellMerge, + t.expectConflict} +} + +func createNomsRowMergeStruct(t testCase) nomsRowMergeTest { + sch := calcMergedSchema(t) + + tpl := valsToTestTupleWithPks(toVals(t.row)) + mergeTpl := valsToTestTupleWithPks(toVals(t.mergeRow)) + ancTpl := valsToTestTupleWithPks(toVals(t.ancRow)) + expectedTpl := valsToTestTupleWithPks(toVals(t.expectedResult)) + return nomsRowMergeTest{t.name, tpl, mergeTpl, ancTpl, sch, expectedTpl, t.expectCellMerge, t.expectConflict} +} + +func calcMergedSchema(t testCase) schema.Schema { + longest := t.rowCnt + if t.mRowCnt > longest { + longest = t.mRowCnt + } + if t.aRowCnt > longest { + longest = t.aRowCnt } - if len(ancVals) > len(longest) { - longest = ancVals - } + return calcSchema(longest) +} - cols := make([]schema.Column, len(longest)+1) +func calcSchema(nCols int) schema.Schema { + cols := make([]schema.Column, nCols+1) // Schema needs a primary key to be valid, but all the logic being tested works only on the non-key columns. cols[0] = schema.NewColumn("primaryKey", 0, types.IntKind, true) - for i, val := range longest { + for i := 0; i < nCols; i++ { tag := i + 1 - cols[tag] = schema.NewColumn(strconv.FormatInt(int64(tag), 10), uint64(tag), val.Kind(), false) + cols[tag] = schema.NewColumn(strconv.FormatInt(int64(tag), 10), uint64(tag), types.IntKind, false) } colColl := schema.NewColCollection(cols...) sch := schema.MustSchemaFromCols(colColl) - - tpl := valsToTestTupleWithPks(vals) - mergeTpl := valsToTestTupleWithPks(mergeVals) - ancTpl := valsToTestTupleWithPks(ancVals) - expectedTpl := valsToTestTupleWithPks(expected) - return RowMergeTest{name, tpl, mergeTpl, ancTpl, sch, expectedTpl, expectCellMrg, expectCnf} + return sch +} + +func buildTup(sch schema.Schema, r []*int) val.Tuple { + if r == nil { + return nil + } + + vD := prolly.ValueDescriptorFromSchema(sch) + vB := val.NewTupleBuilder(vD) + for i, v := range r { + if v != nil { + vB.PutInt64(i, int64(*v)) + } + } + return vB.Build(syncPool) +} + +func toVals(ints []*int) []types.Value { + if ints == nil { + return nil + } + + v := make([]types.Value, len(ints)) + for i, d := range ints { + if d == nil { + v[i] = types.NullValue + continue + } + + v[i] = types.Int(*d) + } + return v } diff --git a/go/libraries/doltcore/table/editor/creation/index.go b/go/libraries/doltcore/table/editor/creation/index.go index 696dd2fb50..44f798207e 100644 --- a/go/libraries/doltcore/table/editor/creation/index.go +++ b/go/libraries/doltcore/table/editor/creation/index.go @@ -182,7 +182,7 @@ func BuildSecondaryProllyIndex(ctx context.Context, vrw types.ValueReadWriter, s // create a key builder for index key tuples kd, _ := secondary.Descriptors() keyBld := val.NewTupleBuilder(kd) - keyMap := getIndexKeyMapping(sch, idx) + keyMap := GetIndexKeyMapping(sch, idx) mut := secondary.Mutate() for { @@ -222,7 +222,7 @@ func BuildSecondaryProllyIndex(ctx context.Context, vrw types.ValueReadWriter, s return durable.IndexFromProllyMap(secondary), nil } -func getIndexKeyMapping(sch schema.Schema, idx schema.Index) (m val.OrdinalMapping) { +func GetIndexKeyMapping(sch schema.Schema, idx schema.Index) (m val.OrdinalMapping) { m = make(val.OrdinalMapping, len(idx.AllTags())) for i, tag := range idx.AllTags() { diff --git a/go/store/prolly/map.go b/go/store/prolly/map.go index 67679ff578..bd839e7134 100644 --- a/go/store/prolly/map.go +++ b/go/store/prolly/map.go @@ -166,6 +166,11 @@ func (m Map) IterRange(ctx context.Context, rng Range) (MapIter, error) { } } +// Pool returns the pool.BuffPool of the underlying tuples' tree.NodeStore +func (m Map) Pool() pool.BuffPool { + return m.tuples.ns.Pool() +} + func (m Map) pointLookupFromRange(ctx context.Context, rng Range) (*pointLookup, error) { search := pointLookupSearchFn(rng) cur, err := tree.NewCursorFromSearchFn(ctx, m.tuples.ns, m.tuples.root, search)