mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-27 07:26:31 -05:00
channels for orderedSequenceDiff (#1926)
This commit is contained in:
+52
-33
@@ -110,27 +110,38 @@ func diffLists(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.List) {
|
||||
func diffMaps(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.Map) {
|
||||
wroteHeader := false
|
||||
|
||||
added, removed, modified := v2.Diff(v1)
|
||||
for _, k := range added {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, addPrefix, k, v2.Get(k))
|
||||
}
|
||||
for _, k := range removed {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, subPrefix, k, v1.Get(k))
|
||||
}
|
||||
for _, k := range modified {
|
||||
c1, c2 := v1.Get(k), v2.Get(k)
|
||||
if canCompare(c1, c2) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
d.PanicIfError(types.WriteEncodedValueWithTags(buf, k))
|
||||
p1 := p.AddField(buf.String())
|
||||
dq.PushBack(diffInfo{path: p1, key: k, v1: c1, v2: c2})
|
||||
} else {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, subPrefix, k, v1.Get(k))
|
||||
line(w, addPrefix, k, v2.Get(k))
|
||||
changes := make(chan types.ValueChanged)
|
||||
closeChan := make(chan struct{})
|
||||
v2.Diff(v1, changes, closeChan)
|
||||
|
||||
err := d.Try(func() {
|
||||
for change := range changes {
|
||||
switch change.ChangeType {
|
||||
case types.DiffChangeAdded:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, addPrefix, change.V, v2.Get(change.V))
|
||||
case types.DiffChangeRemoved:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, subPrefix, change.V, v1.Get(change.V))
|
||||
case types.DiffChangeModified:
|
||||
c1, c2 := v1.Get(change.V), v2.Get(change.V)
|
||||
if canCompare(c1, c2) {
|
||||
buf := bytes.NewBuffer(nil)
|
||||
d.PanicIfError(types.WriteEncodedValueWithTags(buf, change.V))
|
||||
p1 := p.AddField(buf.String())
|
||||
dq.PushBack(diffInfo{path: p1, key: change.V, v1: c1, v2: c2})
|
||||
} else {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, subPrefix, change.V, v1.Get(change.V))
|
||||
line(w, addPrefix, change.V, v2.Get(change.V))
|
||||
}
|
||||
default:
|
||||
panic("unknown change type")
|
||||
}
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
closeChan <- struct{}{}
|
||||
}
|
||||
writeFooter(w, wroteHeader)
|
||||
}
|
||||
@@ -154,22 +165,30 @@ func diffStructs(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.Struct)
|
||||
|
||||
func diffSets(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.Set) {
|
||||
wroteHeader := false
|
||||
added, removed := v2.Diff(v1)
|
||||
if len(added) == 1 && len(removed) == 1 && canCompare(added[0], removed[0]) {
|
||||
p1 := p.AddField(added[0].Hash().String())
|
||||
dq.PushBack(diffInfo{path: p1, key: types.String(""), v1: removed[0], v2: added[0]})
|
||||
} else {
|
||||
for _, value := range removed {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, subPrefix, nil, value)
|
||||
}
|
||||
for _, value := range added {
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, addPrefix, nil, value)
|
||||
|
||||
changes := make(chan types.ValueChanged)
|
||||
closeChan := make(chan struct{})
|
||||
v2.Diff(v1, changes, closeChan)
|
||||
|
||||
err := d.Try(func() {
|
||||
for change := range changes {
|
||||
switch change.ChangeType {
|
||||
case types.DiffChangeAdded:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, addPrefix, nil, change.V)
|
||||
case types.DiffChangeRemoved:
|
||||
wroteHeader = writeHeader(w, wroteHeader, p)
|
||||
line(w, subPrefix, nil, change.V)
|
||||
default:
|
||||
// sets should not have any DiffChangeModified or unknown change types
|
||||
panic("unknown change type")
|
||||
}
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
closeChan <- struct{}{}
|
||||
}
|
||||
writeFooter(w, wroteHeader)
|
||||
return
|
||||
}
|
||||
|
||||
type prefixWriter struct {
|
||||
|
||||
@@ -89,7 +89,7 @@ func TestNomsSetDiff(t *testing.T) {
|
||||
Diff(buf, s1, s2)
|
||||
assert.Equal(expected, buf.String())
|
||||
|
||||
expected = "./.sha1-c26be7ea6e815f747c1552fe402a773ad466be88 {\n- \"m3\": \"m-three\"\n+ \"m3\": \"m-three-diff\"\n }\n./.sha1-c26be7ea6e815f747c1552fe402a773ad466be88.\"m4\" {\n- \"a1\": \"a-one\"\n+ \"a1\": \"a-one-diff\"\n }\n"
|
||||
expected = "./ {\n- {\n- \"m1\": \"m-one\",\n- \"m3\": \"m-three\",\n- \"m4\": {\n- \"a1\": \"a-one\",\n- \"a2\": \"a-two\",\n- \"a3\": \"a-three\",\n- \"a4\": \"a-four\",\n- },\n- \"v2\": \"m-two\",\n- }\n+ {\n+ \"m1\": \"m-one\",\n+ \"m3\": \"m-three-diff\",\n+ \"m4\": {\n+ \"a1\": \"a-one-diff\",\n+ \"a2\": \"a-two\",\n+ \"a3\": \"a-three\",\n+ \"a4\": \"a-four\",\n+ },\n+ \"v2\": \"m-two\",\n+ }\n }\n"
|
||||
s1 = createSet(mm1, mm2, mm3, mm4)
|
||||
s2 = createSet(mm1, mm2, mm3x, mm4)
|
||||
buf = util.NewBuffer(nil)
|
||||
|
||||
@@ -625,18 +625,16 @@ func TestListModifyAfterRead(t *testing.T) {
|
||||
assert.Equal(llen, list.Len())
|
||||
}
|
||||
|
||||
func accumulateDiffSplices(l1, l2 List) []Splice {
|
||||
var diff []Splice
|
||||
func accumulateDiffSplices(l1, l2 List) (diff []Splice) {
|
||||
diffChan := make(chan Splice)
|
||||
l1.Diff(l2, diffChan, nil)
|
||||
for splice := range diffChan {
|
||||
diff = append(diff, splice)
|
||||
}
|
||||
return diff
|
||||
return
|
||||
}
|
||||
|
||||
func accumulateDiffSplicesWithLimit(l1, l2 List, maxSpliceMatrixSize uint64) []Splice {
|
||||
var diff []Splice
|
||||
func accumulateDiffSplicesWithLimit(l1, l2 List, maxSpliceMatrixSize uint64) (diff []Splice) {
|
||||
diffChan := make(chan Splice)
|
||||
l1.DiffWithLimit(l2, diffChan, nil, maxSpliceMatrixSize)
|
||||
for splice := range diffChan {
|
||||
|
||||
+4
-2
@@ -59,8 +59,10 @@ func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
|
||||
return outChan
|
||||
}
|
||||
|
||||
func (m Map) Diff(last Map) (added []Value, removed []Value, modified []Value) {
|
||||
return orderedSequenceDiff(last.sequence().(orderedSequence), m.sequence().(orderedSequence))
|
||||
func (m Map) Diff(last Map, changes chan<- ValueChanged, closeChan <-chan struct{}) {
|
||||
go func() {
|
||||
orderedSequenceDiff(last.sequence().(orderedSequence), m.sequence().(orderedSequence), changes, closeChan)
|
||||
}()
|
||||
}
|
||||
|
||||
// Collection interface
|
||||
|
||||
+18
-6
@@ -49,10 +49,6 @@ func (tm testMap) MaybeGet(key Value) (v Value, ok bool) {
|
||||
func (tm testMap) Diff(last testMap) (added []Value, removed []Value, modified []Value) {
|
||||
// Note: this could be use tm.toMap/last.toMap and then tmMap.Diff(lastMap) but the
|
||||
// purpose of this method is to be redundant.
|
||||
added = make([]Value, 0)
|
||||
removed = make([]Value, 0)
|
||||
modified = make([]Value, 0)
|
||||
|
||||
if len(tm.entries) == 0 && len(last.entries) == 0 {
|
||||
return // nothing changed
|
||||
}
|
||||
@@ -276,8 +272,23 @@ func getTestRefToValueOrderMap(scale int, vw ValueWriter) testMap {
|
||||
})
|
||||
}
|
||||
|
||||
func accumulateMapDiffChanges(m1, m2 Map) (added []Value, removed []Value, modified []Value) {
|
||||
changes := make(chan ValueChanged)
|
||||
m1.Diff(m2, changes, nil)
|
||||
for change := range changes {
|
||||
if change.ChangeType == DiffChangeAdded {
|
||||
added = append(added, change.V)
|
||||
} else if change.ChangeType == DiffChangeRemoved {
|
||||
removed = append(removed, change.V)
|
||||
} else {
|
||||
modified = append(modified, change.V)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func diffMapTest(assert *assert.Assertions, m1 Map, m2 Map, numAddsExpected int, numRemovesExpected int, numModifiedExpected int) (added []Value, removed []Value, modified []Value) {
|
||||
added, removed, modified = m1.Diff(m2)
|
||||
added, removed, modified = accumulateMapDiffChanges(m1, m2)
|
||||
assert.Equal(numAddsExpected, len(added), "num added is not as expected")
|
||||
assert.Equal(numRemovesExpected, len(removed), "num removed is not as expected")
|
||||
assert.Equal(numModifiedExpected, len(modified), "num modified is not as expected")
|
||||
@@ -302,7 +313,8 @@ func TestMapDiff(t *testing.T) {
|
||||
testMapAdded, testMapRemoved, testMapModified := testMap1.Diff(testMap2)
|
||||
map1 := testMap1.toMap()
|
||||
map2 := testMap2.toMap()
|
||||
mapDiffAdded, mapDiffRemoved, mapDiffModified := map1.Diff(map2)
|
||||
|
||||
mapDiffAdded, mapDiffRemoved, mapDiffModified := accumulateMapDiffChanges(map1, map2)
|
||||
assert.Equal(t, testMapAdded, mapDiffAdded, "testMap.diff != map.diff")
|
||||
assert.Equal(t, testMapRemoved, mapDiffRemoved, "testMap.diff != map.diff")
|
||||
assert.Equal(t, testMapModified, mapDiffModified, "testMap.diff != map.diff")
|
||||
|
||||
@@ -8,13 +8,30 @@ import (
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
)
|
||||
|
||||
//
|
||||
// Returns a 3-tuple [added, removed, modified] sorted keys.
|
||||
//
|
||||
func orderedSequenceDiff(last orderedSequence, current orderedSequence) (added []Value, removed []Value, modified []Value) {
|
||||
added = make([]Value, 0)
|
||||
removed = make([]Value, 0)
|
||||
modified = make([]Value, 0)
|
||||
type DiffChangeType uint8
|
||||
|
||||
const (
|
||||
DiffChangeAdded DiffChangeType = iota
|
||||
DiffChangeRemoved
|
||||
DiffChangeModified
|
||||
)
|
||||
|
||||
type ValueChanged struct {
|
||||
ChangeType DiffChangeType
|
||||
V Value
|
||||
}
|
||||
|
||||
func sendChange(changes chan<- ValueChanged, closeChan <-chan struct{}, change ValueChanged) error {
|
||||
select {
|
||||
case changes <- change:
|
||||
case <-closeChan:
|
||||
close(changes)
|
||||
return ChangeChanClosedErr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func orderedSequenceDiff(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error {
|
||||
lastCur := newCursorAt(last, emptyKey, false, false)
|
||||
currentCur := newCursorAt(current, emptyKey, false, false)
|
||||
|
||||
@@ -26,13 +43,19 @@ func orderedSequenceDiff(last orderedSequence, current orderedSequence) (added [
|
||||
lastKey := getCurrentKey(lastCur)
|
||||
currentKey := getCurrentKey(currentCur)
|
||||
if currentKey.Less(lastKey) {
|
||||
added = append(added, currentKey.v)
|
||||
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeAdded, currentKey.v}); err != nil {
|
||||
return err
|
||||
}
|
||||
currentCur.advance()
|
||||
} else if lastKey.Less(currentKey) {
|
||||
removed = append(removed, lastKey.v)
|
||||
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeRemoved, lastKey.v}); err != nil {
|
||||
return err
|
||||
}
|
||||
lastCur.advance()
|
||||
} else {
|
||||
modified = append(modified, lastKey.v)
|
||||
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeModified, lastKey.v}); err != nil {
|
||||
return err
|
||||
}
|
||||
lastCur.advance()
|
||||
currentCur.advance()
|
||||
}
|
||||
@@ -40,15 +63,19 @@ func orderedSequenceDiff(last orderedSequence, current orderedSequence) (added [
|
||||
}
|
||||
|
||||
for lastCur.valid() {
|
||||
removed = append(removed, getCurrentKey(lastCur).v)
|
||||
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeRemoved, getCurrentKey(lastCur).v}); err != nil {
|
||||
return err
|
||||
}
|
||||
lastCur.advance()
|
||||
}
|
||||
for currentCur.valid() {
|
||||
added = append(added, getCurrentKey(currentCur).v)
|
||||
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeAdded, getCurrentKey(currentCur).v}); err != nil {
|
||||
return err
|
||||
}
|
||||
currentCur.advance()
|
||||
}
|
||||
|
||||
return added, removed, modified
|
||||
close(changes)
|
||||
return nil
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -36,6 +36,23 @@ func newDiffTestSuite(from1, to1, by1, from2, to2, by2, numAddsExpected, numRemo
|
||||
}
|
||||
}
|
||||
|
||||
func accumulateOrderedSequenceDiffChanges(o1, o2 orderedSequence) (added []Value, removed []Value, modified []Value) {
|
||||
changes := make(chan ValueChanged)
|
||||
closeChan := make(chan struct{})
|
||||
|
||||
go orderedSequenceDiff(o1, o2, changes, closeChan)
|
||||
for change := range changes {
|
||||
if change.ChangeType == DiffChangeAdded {
|
||||
added = append(added, change.V)
|
||||
} else if change.ChangeType == DiffChangeRemoved {
|
||||
removed = append(removed, change.V)
|
||||
} else {
|
||||
modified = append(modified, change.V)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (suite *diffTestSuite) TestDiff() {
|
||||
type valFn func(int, int, int) []Value
|
||||
type colFn func([]Value) Collection
|
||||
@@ -52,7 +69,7 @@ func (suite *diffTestSuite) TestDiff() {
|
||||
runTest := func(name string, vf valFn, cf colFn) {
|
||||
col1 := cf(vf(suite.from1, suite.to1, suite.by1))
|
||||
col2 := cf(vf(suite.from2, suite.to2, suite.by2))
|
||||
suite.added, suite.removed, suite.modified = orderedSequenceDiff(
|
||||
suite.added, suite.removed, suite.modified = accumulateOrderedSequenceDiffChanges(
|
||||
col1.sequence().(orderedSequence),
|
||||
col2.sequence().(orderedSequence))
|
||||
suite.Equal(suite.numAddsExpected, len(suite.added), "test %s: num added is not as expected", name)
|
||||
|
||||
+4
-5
@@ -37,11 +37,10 @@ func NewSet(v ...Value) Set {
|
||||
return newSet(seq.Done().(orderedSequence))
|
||||
}
|
||||
|
||||
func (s Set) Diff(last Set) (added []Value, removed []Value) {
|
||||
// Set diff shouldn't return modified since it's not possible a value in a set of "changes".
|
||||
// Elements can only enter and exit a set
|
||||
added, removed, _ = orderedSequenceDiff(last.sequence().(orderedSequence), s.sequence().(orderedSequence))
|
||||
return
|
||||
func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
|
||||
go func() {
|
||||
orderedSequenceDiff(last.sequence().(orderedSequence), s.sequence().(orderedSequence), changes, closeChan)
|
||||
}()
|
||||
}
|
||||
|
||||
// Collection interface
|
||||
|
||||
+14
-3
@@ -37,8 +37,6 @@ func (ts testSet) Has(key Value) bool {
|
||||
func (ts testSet) Diff(last testSet) (added []Value, removed []Value) {
|
||||
// Note: this could be use ts.toSet/last.toSet and then tsSet.Diff(lastSet) but the
|
||||
// purpose of this method is to be redundant.
|
||||
added = make([]Value, 0)
|
||||
removed = make([]Value, 0)
|
||||
if len(ts) == 0 && len(last) == 0 {
|
||||
return // nothing changed
|
||||
}
|
||||
@@ -194,8 +192,21 @@ func getTestRefToValueOrderSet(scale int, vw ValueWriter) testSet {
|
||||
})
|
||||
}
|
||||
|
||||
func accumulateSetDiffChanges(s1, s2 Set) (added []Value, removed []Value) {
|
||||
changes := make(chan ValueChanged)
|
||||
s1.Diff(s2, changes, nil)
|
||||
for change := range changes {
|
||||
if change.ChangeType == DiffChangeAdded {
|
||||
added = append(added, change.V)
|
||||
} else if change.ChangeType == DiffChangeRemoved {
|
||||
removed = append(removed, change.V)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func diffSetTest(assert *assert.Assertions, s1 Set, s2 Set, numAddsExpected int, numRemovesExpected int) (added []Value, removed []Value) {
|
||||
added, removed = s1.Diff(s2)
|
||||
added, removed = accumulateSetDiffChanges(s1, s2)
|
||||
assert.Equal(numAddsExpected, len(added), "num added is not as expected")
|
||||
assert.Equal(numRemovesExpected, len(removed), "num removed is not as expected")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user