From 111cbebec08d3433a534a841b72e9adcb2af91b8 Mon Sep 17 00:00:00 2001 From: Brian Hendriks Date: Tue, 9 Mar 2021 17:44:50 -0800 Subject: [PATCH] diff between a range of values (#1402) --- go/libraries/doltcore/diff/async_differ.go | 51 +++- .../doltcore/diff/async_differ_test.go | 217 ++++++++++++++++++ go/store/diff/diff.go | 37 ++- go/store/types/map.go | 22 +- go/store/types/map_iterator.go | 10 + go/store/types/ordered_sequences_diff.go | 40 +++- go/store/types/tuple.go | 19 ++ 7 files changed, 385 insertions(+), 11 deletions(-) create mode 100644 go/libraries/doltcore/diff/async_differ_test.go diff --git a/go/libraries/doltcore/diff/async_differ.go b/go/libraries/doltcore/diff/async_differ.go index 79a4db1888..c81a889a29 100644 --- a/go/libraries/doltcore/diff/async_differ.go +++ b/go/libraries/doltcore/diff/async_differ.go @@ -59,6 +59,8 @@ type AsyncDiffer struct { eg *errgroup.Group egCtx context.Context egCancel func() + + diffStats map[types.DiffChangeType]uint64 } var _ RowDiffer = &AsyncDiffer{} @@ -66,11 +68,11 @@ var _ RowDiffer = &AsyncDiffer{} // todo: make package private once dolthub is migrated func NewAsyncDiffer(bufferedDiffs int) *AsyncDiffer { return &AsyncDiffer{ - make(chan diff.Difference, bufferedDiffs), - bufferedDiffs, - nil, - context.Background(), - func() {}, + diffChan: make(chan diff.Difference, bufferedDiffs), + bufferSize: bufferedDiffs, + egCtx: context.Background(), + egCancel: func() {}, + diffStats: make(map[types.DiffChangeType]uint64), } } @@ -80,6 +82,18 @@ func tableDontDescendLists(v1, v2 types.Value) bool { } func (ad *AsyncDiffer) Start(ctx context.Context, from, to types.Map) { + ad.start(ctx, func() error { + return diff.Diff(ctx, from, to, ad.diffChan, true, tableDontDescendLists) + }) +} + +func (ad *AsyncDiffer) StartWithRange(ctx context.Context, from, to types.Map, start types.Value, inRange types.ValueInRange) { + ad.start(ctx, func() error { + return diff.DiffMapRange(ctx, from, to, start, inRange, ad.diffChan, true, tableDontDescendLists) + }) +} + +func (ad *AsyncDiffer) start(ctx context.Context, diffFunc func() error) { ad.eg, ad.egCtx = errgroup.WithContext(ctx) ad.egCancel = async.GoWithCancel(ad.egCtx, ad.eg, func(ctx context.Context) (err error) { defer close(ad.diffChan) @@ -88,7 +102,7 @@ func (ad *AsyncDiffer) Start(ctx context.Context, from, to types.Map) { err = fmt.Errorf("panic in diff.Diff: %v", r) } }() - return diff.Diff(ctx, from, to, ad.diffChan, true, tableDontDescendLists) + return diffFunc() }) } @@ -98,12 +112,17 @@ func (ad *AsyncDiffer) Close() error { } func (ad *AsyncDiffer) GetDiffs(numDiffs int, timeout time.Duration) ([]*diff.Difference, bool, error) { + if timeout < 0 { + return ad.GetDiffsWithoutTimeout(numDiffs) + } + diffs := make([]*diff.Difference, 0, ad.bufferSize) timeoutChan := time.After(timeout) for { select { case d, more := <-ad.diffChan: if more { + ad.diffStats[d.ChangeType]++ diffs = append(diffs, &d) if numDiffs != 0 && numDiffs == len(diffs) { return diffs, true, nil @@ -119,6 +138,26 @@ func (ad *AsyncDiffer) GetDiffs(numDiffs int, timeout time.Duration) ([]*diff.Di } } +func (ad *AsyncDiffer) GetDiffsWithoutTimeout(numDiffs int) ([]*diff.Difference, bool, error) { + diffs := make([]*diff.Difference, 0, ad.bufferSize) + for { + select { + case d, more := <-ad.diffChan: + if more { + ad.diffStats[d.ChangeType]++ + diffs = append(diffs, &d) + if numDiffs != 0 && numDiffs == len(diffs) { + return diffs, true, nil + } + } else { + return diffs, false, ad.eg.Wait() + } + case <-ad.egCtx.Done(): + return nil, false, ad.eg.Wait() + } + } +} + type keylessDiffer struct { *AsyncDiffer diff --git a/go/libraries/doltcore/diff/async_differ_test.go b/go/libraries/doltcore/diff/async_differ_test.go new file mode 100644 index 0000000000..61181dc947 --- /dev/null +++ b/go/libraries/doltcore/diff/async_differ_test.go @@ -0,0 +1,217 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package diff + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/chunks" + "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/types" +) + +func TestAsyncDiffer(t *testing.T) { + ctx := context.Background() + storage := &chunks.MemoryStorage{} + db := datas.NewDatabase(storage.NewView()) + + vals := []types.Value{ + types.Uint(0), types.String("a"), + types.Uint(1), types.String("b"), + types.Uint(3), types.String("d"), + types.Uint(4), types.String("e"), + types.Uint(6), types.String("g"), + types.Uint(7), types.String("h"), + types.Uint(9), types.String("j"), + types.Uint(10), types.String("k"), + types.Uint(12), types.String("m"), + types.Uint(13), types.String("n"), + types.Uint(15), types.String("p"), + types.Uint(16), types.String("q"), + types.Uint(18), types.String("s"), + types.Uint(19), types.String("t"), + types.Uint(21), types.String("v"), + types.Uint(22), types.String("w"), + types.Uint(24), types.String("y"), + types.Uint(25), types.String("z"), + } + + m1, err := types.NewMap(ctx, db, vals...) + require.NoError(t, err) + + vals = []types.Value{ + types.Uint(0), types.String("a"), // unchanged + //types.Uint(1), types.String("b"), // deleted + types.Uint(2), types.String("c"), // added + types.Uint(3), types.String("d"), // unchanged + //types.Uint(4), types.String("e"), // deleted + types.Uint(5), types.String("f"), // added + types.Uint(6), types.String("g"), // unchanged + //types.Uint(7), types.String("h"), // deleted + types.Uint(8), types.String("i"), // added + types.Uint(9), types.String("j"), // unchanged + //types.Uint(10), types.String("k"), // deleted + types.Uint(11), types.String("l"), // added + types.Uint(12), types.String("m2"), // changed + //types.Uint(13), types.String("n"), // deleted + types.Uint(14), types.String("o"), // added + types.Uint(15), types.String("p2"), // changed + //types.Uint(16), types.String("q"), // deleted + types.Uint(17), types.String("r"), // added + types.Uint(18), types.String("s2"), // changed + //types.Uint(19), types.String("t"), // deleted + types.Uint(20), types.String("u"), // added + types.Uint(21), types.String("v2"), // changed + //types.Uint(22), types.String("w"), // deleted + types.Uint(23), types.String("x"), // added + types.Uint(24), types.String("y2"), // changed + //types.Uint(25), types.String("z"), // deleted + } + m2, err := types.NewMap(ctx, db, vals...) + require.NoError(t, err) + + tests := []struct { + name string + createdStarted func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer + expectedStats map[types.DiffChangeType]uint64 + }{ + { + name: "iter all", + createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer { + ad := NewAsyncDiffer(4) + ad.Start(ctx, m1, m2) + return ad + }, + expectedStats: map[types.DiffChangeType]uint64{ + types.DiffChangeModified: 5, + types.DiffChangeAdded: 8, + types.DiffChangeRemoved: 9, + }, + }, + + { + name: "iter range starting with nil", + createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer { + ad := NewAsyncDiffer(4) + ad.StartWithRange(ctx, m1, m2, nil, func(value types.Value) (bool, error) { + return true, nil + }) + return ad + }, + expectedStats: map[types.DiffChangeType]uint64{ + types.DiffChangeModified: 5, + types.DiffChangeAdded: 8, + types.DiffChangeRemoved: 9, + }, + }, + + { + name: "iter range staring with Null Value", + createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer { + ad := NewAsyncDiffer(4) + ad.StartWithRange(ctx, m1, m2, types.NullValue, func(value types.Value) (bool, error) { + return true, nil + }) + return ad + }, + expectedStats: map[types.DiffChangeType]uint64{ + types.DiffChangeModified: 5, + types.DiffChangeAdded: 8, + types.DiffChangeRemoved: 9, + }, + }, + + { + name: "iter range less than 17", + createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer { + ad := NewAsyncDiffer(4) + end := types.Uint(27) + ad.StartWithRange(ctx, m1, m2, types.NullValue, func(value types.Value) (bool, error) { + return value.Less(m1.Format(), end) + }) + return ad + }, + expectedStats: map[types.DiffChangeType]uint64{ + types.DiffChangeModified: 5, + types.DiffChangeAdded: 8, + types.DiffChangeRemoved: 9, + }, + }, + + { + name: "iter range less than 15", + createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer { + ad := NewAsyncDiffer(4) + end := types.Uint(15) + ad.StartWithRange(ctx, m1, m2, types.NullValue, func(value types.Value) (bool, error) { + return value.Less(m1.Format(), end) + }) + return ad + }, + expectedStats: map[types.DiffChangeType]uint64{ + types.DiffChangeModified: 1, + types.DiffChangeAdded: 5, + types.DiffChangeRemoved: 5, + }, + }, + + { + name: "iter range 10 < 15", + createdStarted: func(ctx context.Context, m1, m2 types.Map) *AsyncDiffer { + ad := NewAsyncDiffer(4) + start := types.Uint(10) + end := types.Uint(15) + ad.StartWithRange(ctx, m1, m2, start, func(value types.Value) (bool, error) { + return value.Less(m1.Format(), end) + }) + return ad + }, + expectedStats: map[types.DiffChangeType]uint64{ + types.DiffChangeModified: 1, + types.DiffChangeAdded: 2, + types.DiffChangeRemoved: 2, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ctx := context.Background() + ad := test.createdStarted(ctx, m1, m2) + err := readAll(ad) + require.NoError(t, err) + require.Equal(t, test.expectedStats, ad.diffStats) + }) + } +} + +func readAll(ad *AsyncDiffer) error { + for { + _, more, err := ad.GetDiffs(10, -1) + + if err != nil { + return err + } + + if !more { + break + } + } + + return nil +} diff --git a/go/store/diff/diff.go b/go/store/diff/diff.go index 7f0b5e8430..10f9f05932 100644 --- a/go/store/diff/diff.go +++ b/go/store/diff/diff.go @@ -105,6 +105,27 @@ type differ struct { // // } func Diff(ctx context.Context, v1, v2 types.Value, dChan chan<- Difference, leftRight bool, descFunc ShouldDescFunc) error { + f := func(ctx context.Context, d differ, v1, v2 types.Value) error { + return d.diff(ctx, nil, v1, v2) + } + + return diff(ctx, f, v1, v2, dChan, leftRight, descFunc) +} + +func DiffMapRange(ctx context.Context, m1, m2 types.Map, start types.Value, inRange types.ValueInRange, dChan chan<- Difference, leftRight bool, descFunc ShouldDescFunc) error { + f := func(ctx context.Context, d differ, v1, v2 types.Value) error { + return d.diffMapsInRange(ctx, nil, m1, m2, start, inRange) + } + + return diff(ctx, f, m1, m2, dChan, leftRight, descFunc) +} + +func diff(ctx context.Context, + f func(ctx context.Context, d differ, v1, v2 types.Value) error, + v1, v2 types.Value, + dChan chan<- Difference, + leftRight bool, + descFunc ShouldDescFunc) error { if descFunc == nil { descFunc = ShouldDescend } @@ -123,7 +144,7 @@ func Diff(ctx context.Context, v1, v2 types.Value, dChan chan<- Difference, left return d.sendDiff(ctx, Difference{Path: nil, ChangeType: types.DiffChangeModified, OldValue: v1, NewValue: v2}) } else { d.GoCatchPanic(func() error { - return d.diff(ctx, nil, v1, v2) + return f(ctx, d, v1, v2) }) return d.Wait() } @@ -254,6 +275,14 @@ func (d differ) diffLists(ctx context.Context, p types.Path, v1, v2 types.List) } func (d differ) diffMaps(ctx context.Context, p types.Path, v1, v2 types.Map) error { + trueFunc := func(value types.Value) (bool, error) { + return true, nil + } + + return d.diffMapsInRange(ctx, p, v1, v2, nil, trueFunc) +} + +func (d differ) diffMapsInRange(ctx context.Context, p types.Path, v1, v2 types.Map, start types.Value, inRange types.ValueInRange) error { return d.diffOrdered(ctx, p, func(v types.Value) (types.PathPart, error) { if types.ValueCanBePathIndex(v) { @@ -270,8 +299,12 @@ func (d differ) diffMaps(ctx context.Context, p types.Path, v1, v2 types.Map) er }, func(ctx context.Context, cc chan<- types.ValueChanged) error { if d.leftRight { - return v2.DiffLeftRight(ctx, v1, cc) + return v2.DiffLeftRightInRange(ctx, v1, start, inRange, cc) } else { + if start != nil { + panic("not implemented") + } + return v2.DiffHybrid(ctx, v1, cc) } }, diff --git a/go/store/types/map.go b/go/store/types/map.go index bf90a79f69..f86d55c387 100644 --- a/go/store/types/map.go +++ b/go/store/types/map.go @@ -31,6 +31,8 @@ import ( "github.com/dolthub/dolt/go/store/d" ) +type ValueInRange func(Value) (bool, error) + var ErrKeysNotOrdered = errors.New("streaming map keys not ordered") var EmptyMap Map @@ -205,10 +207,28 @@ func (m Map) DiffHybrid(ctx context.Context, last Map, changes chan<- ValueChang // streaming approach, optimised for returning results early, but not // completing quickly. func (m Map) DiffLeftRight(ctx context.Context, last Map, changes chan<- ValueChanged) error { + trueFunc := func(Value) (bool, error) { + return true, nil + } + return m.DiffLeftRightInRange(ctx, last, nil, trueFunc, changes) +} + +func (m Map) DiffLeftRightInRange(ctx context.Context, last Map, start Value, inRange ValueInRange, changes chan<- ValueChanged) error { if m.Equals(last) { return nil } - return orderedSequenceDiffLeftRight(ctx, last.orderedSequence, m.orderedSequence, changes) + + startKey := emptyKey + if !IsNull(start) { + var err error + startKey, err = newOrderedKey(start, m.Format()) + + if err != nil { + return err + } + } + + return orderedSequenceDiffLeftRightInRange(ctx, last.orderedSequence, m.orderedSequence, startKey, inRange, changes) } // Collection interface diff --git a/go/store/types/map_iterator.go b/go/store/types/map_iterator.go index 1bb8ec035f..9eb3cdf4d3 100644 --- a/go/store/types/map_iterator.go +++ b/go/store/types/map_iterator.go @@ -39,6 +39,16 @@ type MapIterator interface { Next(ctx context.Context) (k, v Value, err error) } +type EmptyMapIterator struct{} + +func (mtItr EmptyMapIterator) Next(ctx context.Context) (k, v Value, err error) { + return nil, nil, nil +} + +func (mtItr EmptyMapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) { + return Tuple{}, Tuple{}, io.EOF +} + // mapIterator can efficiently iterate through a Noms Map. type mapIterator struct { sequenceIter sequenceIterator diff --git a/go/store/types/ordered_sequences_diff.go b/go/store/types/ordered_sequences_diff.go index e589394d13..ff04c8ba81 100644 --- a/go/store/types/ordered_sequences_diff.go +++ b/go/store/types/ordered_sequences_diff.go @@ -192,16 +192,24 @@ func orderedSequenceDiffInternalNodes(ctx context.Context, last orderedSequence, // Streams the diff from |last| to |current| into |changes|, using a left-right approach. // Left-right immediately descends to the first change and starts streaming changes, but compared to top-down it's serial and much slower to calculate the full diff. func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, current orderedSequence, changes chan<- ValueChanged) error { - lastCur, err := newCursorAt(ctx, last, emptyKey, false, false) + trueFunc := func(Value) (bool, error) { + return true, nil + } + return orderedSequenceDiffLeftRightInRange(ctx, last, current, emptyKey, trueFunc, changes) +} + +func orderedSequenceDiffLeftRightInRange(ctx context.Context, last orderedSequence, current orderedSequence, startKey orderedKey, inRange ValueInRange, changes chan<- ValueChanged) error { + lastCur, err := newCursorAt(ctx, last, startKey, false, false) if err != nil { return err } - currentCur, err := newCursorAt(ctx, current, emptyKey, false, false) + currentCur, err := newCursorAt(ctx, current, startKey, false, false) if err != nil { return err } +VALIDRANGES: for lastCur.valid() && currentCur.valid() { err := fastForward(ctx, lastCur, currentCur) if err != nil { @@ -231,6 +239,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur if isLess, err := currentKey.Less(last.format(), lastKey); err != nil { return err } else if isLess { + isInRange, err := inRange(currentKey.v) + if err != nil { + return err + } else if !isInRange { + break VALIDRANGES + } + mv, err := getMapValue(currentCur) if err != nil { return err @@ -245,6 +260,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur return err } } else { + isInRange, err := inRange(lastKey.v) + if !isInRange { + return err + } else if !isInRange { + break VALIDRANGES + } + if isLess, err := lastKey.Less(last.format(), currentKey); err != nil { return err } else if isLess { @@ -296,6 +318,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur return err } + isInRange, err := inRange(lastKey.v) + if err != nil { + return err + } else if !isInRange { + break + } + mv, err := getMapValue(lastCur) if err != nil { return err @@ -317,6 +346,13 @@ func orderedSequenceDiffLeftRight(ctx context.Context, last orderedSequence, cur return err } + isInRange, err := inRange(currKey.v) + if err != nil { + return err + } else if !isInRange { + break + } + mv, err := getMapValue(currentCur) if err != nil { return err diff --git a/go/store/types/tuple.go b/go/store/types/tuple.go index 4e0cf1b962..4b5a6abf99 100644 --- a/go/store/types/tuple.go +++ b/go/store/types/tuple.go @@ -296,6 +296,25 @@ func NewTuple(nbf *NomsBinFormat, values ...Value) (Tuple, error) { return Tuple{valueImpl{vrw, nbf, w.data(), nil}}, nil } +// CopyOf creates a copy of a tuple. This is necessary in cases where keeping a reference to the original tuple is +// preventing larger objects from being collected. +func (t Tuple) CopyOf(vrw ValueReadWriter) Tuple { + buff := make([]byte, len(t.buff)) + offsets := make([]uint32, len(t.offsets)) + + copy(buff, t.buff) + copy(offsets, t.offsets) + + return Tuple{ + valueImpl{ + buff: buff, + offsets: offsets, + vrw: vrw, + nbf: t.nbf, + }, + } +} + func (t Tuple) Empty() bool { return t.Len() == 0 }