diff between a range of values (#1402)

This commit is contained in:
Brian Hendriks
2021-03-09 17:44:50 -08:00
committed by GitHub
parent b5be24a06f
commit 111cbebec0
7 changed files with 385 additions and 11 deletions

View File

@@ -59,6 +59,8 @@ type AsyncDiffer struct {
eg *errgroup.Group
egCtx context.Context
egCancel func()
diffStats map[types.DiffChangeType]uint64
}
var _ RowDiffer = &AsyncDiffer{}
@@ -66,11 +68,11 @@ var _ RowDiffer = &AsyncDiffer{}
// todo: make package private once dolthub is migrated
func NewAsyncDiffer(bufferedDiffs int) *AsyncDiffer {
return &AsyncDiffer{
make(chan diff.Difference, bufferedDiffs),
bufferedDiffs,
nil,
context.Background(),
func() {},
diffChan: make(chan diff.Difference, bufferedDiffs),
bufferSize: bufferedDiffs,
egCtx: context.Background(),
egCancel: func() {},
diffStats: make(map[types.DiffChangeType]uint64),
}
}
@@ -80,6 +82,18 @@ func tableDontDescendLists(v1, v2 types.Value) bool {
}
func (ad *AsyncDiffer) Start(ctx context.Context, from, to types.Map) {
ad.start(ctx, func() error {
return diff.Diff(ctx, from, to, ad.diffChan, true, tableDontDescendLists)
})
}
func (ad *AsyncDiffer) StartWithRange(ctx context.Context, from, to types.Map, start types.Value, inRange types.ValueInRange) {
ad.start(ctx, func() error {
return diff.DiffMapRange(ctx, from, to, start, inRange, ad.diffChan, true, tableDontDescendLists)
})
}
func (ad *AsyncDiffer) start(ctx context.Context, diffFunc func() error) {
ad.eg, ad.egCtx = errgroup.WithContext(ctx)
ad.egCancel = async.GoWithCancel(ad.egCtx, ad.eg, func(ctx context.Context) (err error) {
defer close(ad.diffChan)
@@ -88,7 +102,7 @@ func (ad *AsyncDiffer) Start(ctx context.Context, from, to types.Map) {
err = fmt.Errorf("panic in diff.Diff: %v", r)
}
}()
return diff.Diff(ctx, from, to, ad.diffChan, true, tableDontDescendLists)
return diffFunc()
})
}
@@ -98,12 +112,17 @@ func (ad *AsyncDiffer) Close() error {
}
func (ad *AsyncDiffer) GetDiffs(numDiffs int, timeout time.Duration) ([]*diff.Difference, bool, error) {
if timeout < 0 {
return ad.GetDiffsWithoutTimeout(numDiffs)
}
diffs := make([]*diff.Difference, 0, ad.bufferSize)
timeoutChan := time.After(timeout)
for {
select {
case d, more := <-ad.diffChan:
if more {
ad.diffStats[d.ChangeType]++
diffs = append(diffs, &d)
if numDiffs != 0 && numDiffs == len(diffs) {
return diffs, true, nil
@@ -119,6 +138,26 @@ func (ad *AsyncDiffer) GetDiffs(numDiffs int, timeout time.Duration) ([]*diff.Di
}
}
func (ad *AsyncDiffer) GetDiffsWithoutTimeout(numDiffs int) ([]*diff.Difference, bool, error) {
diffs := make([]*diff.Difference, 0, ad.bufferSize)
for {
select {
case d, more := <-ad.diffChan:
if more {
ad.diffStats[d.ChangeType]++
diffs = append(diffs, &d)
if numDiffs != 0 && numDiffs == len(diffs) {
return diffs, true, nil
}
} else {
return diffs, false, ad.eg.Wait()
}
case <-ad.egCtx.Done():
return nil, false, ad.eg.Wait()
}
}
}
type keylessDiffer struct {
*AsyncDiffer

View File

@@ -0,0 +1,217 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package diff
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
)
func TestAsyncDiffer(t *testing.T) {
ctx := context.Background()
storage := &chunks.MemoryStorage{}
db := datas.NewDatabase(storage.NewView())
vals := []types.Value{
types.Uint(0), types.String("a"),
types.Uint(1), types.String("b"),
types.Uint(3), types.String("d"),
types.Uint(4), types.String("e"),
types.Uint(6), types.String("g"),
types.Uint(7), types.String("h"),
types.Uint(9), types.String("j"),
types.Uint(10), types.String("k"),
types.Uint(12), types.String("m"),
types.Uint(13), types.String("n"),
types.Uint(15), types.String("p"),
types.Uint(16), types.String("q"),
types.Uint(18), types.String("s"),
types.Uint(19), types.String("t"),
types.Uint(21), types.String("v"),
types.Uint(22), types.String("w"),
types.Uint(24), types.String("y"),
types.Uint(25), types.String("z"),
}
m1, err := types.NewMap(ctx, db, vals...)
require.NoError(t, err)
vals = []types.Value{
types.Uint(0), types.String("a"), // unchanged
//types.Uint(1), types.String("b"), // deleted
types.Uint(2), types.String("c"), // added
types.Uint(3), types.String("d"), // unchanged
//types.Uint(4), types.String("e"), // deleted
types.Uint(5), types.String("f"), // added
types.Uint(6), types.String("g"), // unchanged
//types.Uint(7), types.String("h"), // deleted
types.Uint(8), types.String("i"), // added
types.Uint(9), types.String("j"), // unchanged
//types.Uint(10), types.String("k"), // deleted
types.Uint(11), types.String("l"), // added
types.Uint(12), types.String("m2"), // changed
//types.Uint(13), types.String("n"), // deleted
types.Uint(14), types.String("o"), // added
types.Uint(15), types.String("p2"), // changed
//types.Uint(16), types.String("q"), // deleted
types.Uint(17), types.String("r"), // added
types.Uint(18), types.String("s2"), // changed
//types.Uint(19), types.String("t"), // deleted
types.Uint(20), types.String("u"), // added
types.Uint(21), types.String("v2"), // changed
//types.Uint(22), types.String("w"), // deleted
types.Uint(23), types.String("x"), // added
types.Uint(24), types.String("y2"), // changed
//types.Uint(25), types.String("z"), // deleted
}
m2, err := types.NewMap(ctx, db, vals...)
require.NoError(t, err)
tests := []struct {
name string
createdStarted func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer
expectedStats map[types.DiffChangeType]uint64
}{
{
name: "iter all",
createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer {
ad := NewAsyncDiffer(4)
ad.Start(ctx, m1, m2)
return ad
},
expectedStats: map[types.DiffChangeType]uint64{
types.DiffChangeModified: 5,
types.DiffChangeAdded: 8,
types.DiffChangeRemoved: 9,
},
},
{
name: "iter range starting with nil",
createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer {
ad := NewAsyncDiffer(4)
ad.StartWithRange(ctx, m1, m2, nil, func(value types.Value) (bool, error) {
return true, nil
})
return ad
},
expectedStats: map[types.DiffChangeType]uint64{
types.DiffChangeModified: 5,
types.DiffChangeAdded: 8,
types.DiffChangeRemoved: 9,
},
},
{
name: "iter range staring with Null Value",
createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer {
ad := NewAsyncDiffer(4)
ad.StartWithRange(ctx, m1, m2, types.NullValue, func(value types.Value) (bool, error) {
return true, nil
})
return ad
},
expectedStats: map[types.DiffChangeType]uint64{
types.DiffChangeModified: 5,
types.DiffChangeAdded: 8,
types.DiffChangeRemoved: 9,
},
},
{
name: "iter range less than 17",
createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer {
ad := NewAsyncDiffer(4)
end := types.Uint(27)
ad.StartWithRange(ctx, m1, m2, types.NullValue, func(value types.Value) (bool, error) {
return value.Less(m1.Format(), end)
})
return ad
},
expectedStats: map[types.DiffChangeType]uint64{
types.DiffChangeModified: 5,
types.DiffChangeAdded: 8,
types.DiffChangeRemoved: 9,
},
},
{
name: "iter range less than 15",
createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer {
ad := NewAsyncDiffer(4)
end := types.Uint(15)
ad.StartWithRange(ctx, m1, m2, types.NullValue, func(value types.Value) (bool, error) {
return value.Less(m1.Format(), end)
})
return ad
},
expectedStats: map[types.DiffChangeType]uint64{
types.DiffChangeModified: 1,
types.DiffChangeAdded: 5,
types.DiffChangeRemoved: 5,
},
},
{
name: "iter range 10 < 15",
createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer {
ad := NewAsyncDiffer(4)
start := types.Uint(10)
end := types.Uint(15)
ad.StartWithRange(ctx, m1, m2, start, func(value types.Value) (bool, error) {
return value.Less(m1.Format(), end)
})
return ad
},
expectedStats: map[types.DiffChangeType]uint64{
types.DiffChangeModified: 1,
types.DiffChangeAdded: 2,
types.DiffChangeRemoved: 2,
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
ad := test.createdStarted(ctx, m1, m2)
err := readAll(ad)
require.NoError(t, err)
require.Equal(t, test.expectedStats, ad.diffStats)
})
}
}
func readAll(ad *AsyncDiffer) error {
for {
_, more, err := ad.GetDiffs(10, -1)
if err != nil {
return err
}
if !more {
break
}
}
return nil
}

View File

@@ -105,6 +105,27 @@ type differ struct {
// <some code>
// }
func Diff(ctx context.Context, v1, v2 types.Value, dChan chan<- Difference, leftRight bool, descFunc ShouldDescFunc) error {
f := func(ctx context.Context, d differ, v1, v2 types.Value) error {
return d.diff(ctx, nil, v1, v2)
}
return diff(ctx, f, v1, v2, dChan, leftRight, descFunc)
}
func DiffMapRange(ctx context.Context, m1, m2 types.Map, start types.Value, inRange types.ValueInRange, dChan chan<- Difference, leftRight bool, descFunc ShouldDescFunc) error {
f := func(ctx context.Context, d differ, v1, v2 types.Value) error {
return d.diffMapsInRange(ctx, nil, m1, m2, start, inRange)
}
return diff(ctx, f, m1, m2, dChan, leftRight, descFunc)
}
func diff(ctx context.Context,
f func(ctx context.Context, d differ, v1, v2 types.Value) error,
v1, v2 types.Value,
dChan chan<- Difference,
leftRight bool,
descFunc ShouldDescFunc) error {
if descFunc == nil {
descFunc = ShouldDescend
}
@@ -123,7 +144,7 @@ func Diff(ctx context.Context, v1, v2 types.Value, dChan chan<- Difference, left
return d.sendDiff(ctx, Difference{Path: nil, ChangeType: types.DiffChangeModified, OldValue: v1, NewValue: v2})
} else {
d.GoCatchPanic(func() error {
return d.diff(ctx, nil, v1, v2)
return f(ctx, d, v1, v2)
})
return d.Wait()
}
@@ -254,6 +275,14 @@ func (d differ) diffLists(ctx context.Context, p types.Path, v1, v2 types.List)
}
func (d differ) diffMaps(ctx context.Context, p types.Path, v1, v2 types.Map) error {
trueFunc := func(value types.Value) (bool, error) {
return true, nil
}
return d.diffMapsInRange(ctx, p, v1, v2, nil, trueFunc)
}
func (d differ) diffMapsInRange(ctx context.Context, p types.Path, v1, v2 types.Map, start types.Value, inRange types.ValueInRange) error {
return d.diffOrdered(ctx, p,
func(v types.Value) (types.PathPart, error) {
if types.ValueCanBePathIndex(v) {
@@ -270,8 +299,12 @@ func (d differ) diffMaps(ctx context.Context, p types.Path, v1, v2 types.Map) er
},
func(ctx context.Context, cc chan<- types.ValueChanged) error {
if d.leftRight {
return v2.DiffLeftRight(ctx, v1, cc)
return v2.DiffLeftRightInRange(ctx, v1, start, inRange, cc)
} else {
if start != nil {
panic("not implemented")
}
return v2.DiffHybrid(ctx, v1, cc)
}
},

View File

@@ -31,6 +31,8 @@ import (
"github.com/dolthub/dolt/go/store/d"
)
type ValueInRange func(Value) (bool, error)
var ErrKeysNotOrdered = errors.New("streaming map keys not ordered")
var EmptyMap Map
@@ -205,10 +207,28 @@ func (m Map) DiffHybrid(ctx context.Context, last Map, changes chan<- ValueChang
// streaming approach, optimised for returning results early, but not
// completing quickly.
func (m Map) DiffLeftRight(ctx context.Context, last Map, changes chan<- ValueChanged) error {
trueFunc := func(Value) (bool, error) {
return true, nil
}
return m.DiffLeftRightInRange(ctx, last, nil, trueFunc, changes)
}
func (m Map) DiffLeftRightInRange(ctx context.Context, last Map, start Value, inRange ValueInRange, changes chan<- ValueChanged) error {
if m.Equals(last) {
return nil
}
return orderedSequenceDiffLeftRight(ctx, last.orderedSequence, m.orderedSequence, changes)
startKey := emptyKey
if !IsNull(start) {
var err error
startKey, err = newOrderedKey(start, m.Format())
if err != nil {
return err
}
}
return orderedSequenceDiffLeftRightInRange(ctx, last.orderedSequence, m.orderedSequence, startKey, inRange, changes)
}
// Collection interface

View File

@@ -39,6 +39,16 @@ type MapIterator interface {
Next(ctx context.Context) (k, v Value, err error)
}
type EmptyMapIterator struct{}
func (mtItr EmptyMapIterator) Next(ctx context.Context) (k, v Value, err error) {
return nil, nil, nil
}
func (mtItr EmptyMapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) {
return Tuple{}, Tuple{}, io.EOF
}
// mapIterator can efficiently iterate through a Noms Map.
type mapIterator struct {
sequenceIter sequenceIterator

View File

@@ -192,16 +192,24 @@ func orderedSequenceDiffInternalNodes(ctx context.Context, last orderedSequence,
// Streams the diff from |last| to |current| into |changes|, using a left-right approach.
// Left-right immediately descends to the first change and starts streaming changes, but compared to top-down it's serial and much slower to calculate the full diff.
func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, current orderedSequence, changes chan<- ValueChanged) error {
lastCur, err := newCursorAt(ctx, last, emptyKey, false, false)
trueFunc := func(Value) (bool, error) {
return true, nil
}
return orderedSequenceDiffLeftRightInRange(ctx, last, current, emptyKey, trueFunc, changes)
}
func orderedSequenceDiffLeftRightInRange(ctx context.Context, last orderedSequence, current orderedSequence, startKey orderedKey, inRange ValueInRange, changes chan<- ValueChanged) error {
lastCur, err := newCursorAt(ctx, last, startKey, false, false)
if err != nil {
return err
}
currentCur, err := newCursorAt(ctx, current, emptyKey, false, false)
currentCur, err := newCursorAt(ctx, current, startKey, false, false)
if err != nil {
return err
}
VALIDRANGES:
for lastCur.valid() && currentCur.valid() {
err := fastForward(ctx, lastCur, currentCur)
if err != nil {
@@ -231,6 +239,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur
if isLess, err := currentKey.Less(last.format(), lastKey); err != nil {
return err
} else if isLess {
isInRange, err := inRange(currentKey.v)
if err != nil {
return err
} else if !isInRange {
break VALIDRANGES
}
mv, err := getMapValue(currentCur)
if err != nil {
return err
@@ -245,6 +260,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur
return err
}
} else {
isInRange, err := inRange(lastKey.v)
if !isInRange {
return err
} else if !isInRange {
break VALIDRANGES
}
if isLess, err := lastKey.Less(last.format(), currentKey); err != nil {
return err
} else if isLess {
@@ -296,6 +318,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur
return err
}
isInRange, err := inRange(lastKey.v)
if err != nil {
return err
} else if !isInRange {
break
}
mv, err := getMapValue(lastCur)
if err != nil {
return err
@@ -317,6 +346,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur
return err
}
isInRange, err := inRange(currKey.v)
if err != nil {
return err
} else if !isInRange {
break
}
mv, err := getMapValue(currentCur)
if err != nil {
return err

View File

@@ -296,6 +296,25 @@ func NewTuple(nbf *NomsBinFormat, values ...Value) (Tuple, error) {
return Tuple{valueImpl{vrw, nbf, w.data(), nil}}, nil
}
// CopyOf creates a copy of a tuple. This is necessary in cases where keeping a reference to the original tuple is
// preventing larger objects from being collected.
func (t Tuple) CopyOf(vrw ValueReadWriter) Tuple {
buff := make([]byte, len(t.buff))
offsets := make([]uint32, len(t.offsets))
copy(buff, t.buff)
copy(offsets, t.offsets)
return Tuple{
valueImpl{
buff: buff,
offsets: offsets,
vrw: vrw,
nbf: t.nbf,
},
}
}
func (t Tuple) Empty() bool {
return t.Len() == 0
}