orderedSequenceDiff more like indexedSequenceDiff - parallel (#1961)

orderedSequenceDiff more like indexedSequenceDiff - parallel and working top-down rather than sequential
This commit is contained in:
Mike Gray
2016-07-12 17:57:26 -07:00
committed by GitHub
parent d811fc1aac
commit 4dd9415a1a
7 changed files with 107 additions and 40 deletions

View File

@@ -34,8 +34,6 @@ func Diff(w io.Writer, v1, v2 types.Value) (err error) {
err = d.Try(func() {
for di, ok := dq.PopFront(); ok; di, ok = dq.PopFront() {
p, key, v1, v2 := di.path, di.key, di.v1, di.v2
v1.Type().Kind()
if v1 == nil && v2 != nil {
line(w, addPrefix, key, v2)
}
@@ -132,8 +130,8 @@ func diffMaps(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.Map) {
dq.PushBack(diffInfo{path: p1, key: change.V, v1: c1, v2: c2})
} else {
wroteHeader = writeHeader(w, wroteHeader, p)
line(w, subPrefix, change.V, v1.Get(change.V))
line(w, addPrefix, change.V, v2.Get(change.V))
line(w, subPrefix, change.V, c1)
line(w, addPrefix, change.V, c2)
}
default:
panic("unknown change type")

View File

@@ -16,12 +16,12 @@ func (e ChangeChannelClosedError) Error() string { return e.msg }
func indexedSequenceDiff(last indexedSequence, lastHeight int, lastOffset uint64, current indexedSequence, currentHeight int, currentOffset uint64, changes chan<- Splice, closeChan <-chan struct{}, maxSpliceMatrixSize uint64) error {
if lastHeight > currentHeight {
lastChild := last.(indexedMetaSequence).getCompositeChildSequence(0, uint64(last.seqLen()))
lastChild := last.(indexedMetaSequence).getCompositeChildSequence(0, uint64(last.seqLen())).(indexedSequence)
return indexedSequenceDiff(lastChild, lastHeight-1, lastOffset, current, currentHeight, currentOffset, changes, closeChan, maxSpliceMatrixSize)
}
if currentHeight > lastHeight {
currentChild := current.(indexedMetaSequence).getCompositeChildSequence(0, uint64(current.seqLen()))
currentChild := current.(indexedMetaSequence).getCompositeChildSequence(0, uint64(current.seqLen())).(indexedSequence)
return indexedSequenceDiff(last, lastHeight, lastOffset, currentChild, currentHeight-1, currentOffset, changes, closeChan, maxSpliceMatrixSize)
}
@@ -43,8 +43,8 @@ func indexedSequenceDiff(last indexedSequence, lastHeight int, lastOffset uint64
}
} else {
lastChild := last.(indexedMetaSequence).getCompositeChildSequence(splice.SpAt, splice.SpRemoved)
currentChild := current.(indexedMetaSequence).getCompositeChildSequence(splice.SpFrom, splice.SpAdded)
lastChild := last.(indexedMetaSequence).getCompositeChildSequence(splice.SpAt, splice.SpRemoved).(indexedSequence)
currentChild := current.(indexedMetaSequence).getCompositeChildSequence(splice.SpFrom, splice.SpAdded).(indexedSequence)
lastChildOffset := lastOffset
if splice.SpAt > 0 {
lastChildOffset += last.getOffset(int(splice.SpAt)-1) + 1

View File

@@ -60,7 +60,10 @@ func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
func (m Map) Diff(last Map, changes chan<- ValueChanged, closeChan <-chan struct{}) {
go func() {
orderedSequenceDiff(last.sequence().(orderedSequence), m.sequence().(orderedSequence), changes, closeChan)
err := orderedSequenceDiff(last.sequence().(orderedSequence), m.sequence().(orderedSequence), changes, closeChan)
if err == nil {
close(changes)
}
}()
}

View File

@@ -138,13 +138,7 @@ func (ms metaSequenceObject) getChildSequence(idx int) sequence {
return mt.ref.TargetValue(ms.vr).(Collection).sequence()
}
// Returns the sequences pointed to by all items[i], s.t. start <= i < end, and returns the
// concatentation as one long composite sequence
func (ms metaSequenceObject) getCompositeChildSequence(start uint64, length uint64) indexedSequence {
childIsMeta := false
metaItems := []metaTuple{}
valueItems := []Value{}
func (ms metaSequenceObject) beginFetchingChildSequences(start, length uint64) chan interface{} {
input := make(chan interface{})
output := orderedparallel.New(input, func(item interface{}) interface{} {
i := item.(int)
@@ -158,32 +152,60 @@ func (ms metaSequenceObject) getCompositeChildSequence(start uint64, length uint
close(input)
}()
return output
}
i := uint64(start)
for item := range output {
seq := item.(sequence)
if i == start {
if idxSeq, ok := seq.(indexedSequence); ok {
childIsMeta = isMetaSequence(idxSeq)
}
}
if childIsMeta {
childMs, _ := seq.(indexedMetaSequence)
metaItems = append(metaItems, childMs.metaSequenceObject.tuples...)
continue
}
// Returns the sequences pointed to by all items[i], s.t. start <= i < end, and returns the
// concatentation as one long composite sequence
func (ms metaSequenceObject) getCompositeChildSequence(start uint64, length uint64) sequence {
metaItems := []metaTuple{}
mapItems := []mapEntry{}
valueItems := []Value{}
if ll, ok := seq.(listLeafSequence); ok {
valueItems = append(valueItems, ll.values...)
}
i++
childIsMeta := false
isIndexedSequence := false
if ListKind == ms.Type().Kind() {
isIndexedSequence = true
}
if childIsMeta {
return newIndexedMetaSequence(metaItems, ms.Type(), ms.vr)
output := ms.beginFetchingChildSequences(start, length)
for item := range output {
seq := item.(sequence)
switch t := seq.(type) {
case indexedMetaSequence:
childIsMeta = true
metaItems = append(metaItems, t.metaSequenceObject.tuples...)
case orderedMetaSequence:
childIsMeta = true
metaItems = append(metaItems, t.metaSequenceObject.tuples...)
case mapLeafSequence:
mapItems = append(mapItems, t.data...)
case setLeafSequence:
valueItems = append(valueItems, t.data...)
case listLeafSequence:
valueItems = append(valueItems, t.values...)
default:
panic("unreachable")
}
}
if isIndexedSequence {
if childIsMeta {
return newIndexedMetaSequence(metaItems, ms.Type(), ms.vr)
} else {
return newListLeafSequence(ms.vr, valueItems...)
}
} else {
return newListLeafSequence(ms.vr, valueItems...)
if childIsMeta {
return newOrderedMetaSequence(metaItems, ms.Type(), ms.vr)
} else {
if MapKind == ms.Type().Kind() {
return newMapLeafSequence(ms.vr, mapItems...)
} else {
return newSetLeafSequence(ms.vr, valueItems...)
}
}
}
}

View File

@@ -31,9 +31,46 @@ func sendChange(changes chan<- ValueChanged, closeChan <-chan struct{}, change V
return nil
}
// TODO - something other than the literal edit-distance, which is way too much cpu work for this case - https://github.com/attic-labs/noms/issues/2027
func orderedSequenceDiff(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error {
lastCur := newCursorAt(last, emptyKey, false, false)
currentCur := newCursorAt(current, emptyKey, false, false)
lastHeight := lastCur.depth()
currentHeight := currentCur.depth()
if lastHeight > currentHeight {
lastChild := last.(orderedMetaSequence).getCompositeChildSequence(0, uint64(last.seqLen())).(orderedSequence)
return orderedSequenceDiff(lastChild, current, changes, closeChan)
}
if currentHeight > lastHeight {
currentChild := current.(orderedMetaSequence).getCompositeChildSequence(0, uint64(current.seqLen())).(orderedSequence)
return orderedSequenceDiff(last, currentChild, changes, closeChan)
}
if !isMetaSequence(last) && !isMetaSequence(current) {
return orderedSequenceDiffLeafItems(last, current, changes, closeChan)
} else {
compareFn := last.getCompareFn(current)
initialSplices := calcSplices(uint64(last.seqLen()), uint64(current.seqLen()), DEFAULT_MAX_SPLICE_MATRIX_SIZE,
func(i uint64, j uint64) bool { return compareFn(int(i), int(j)) })
for _, splice := range initialSplices {
lastChild := last.(orderedMetaSequence).getCompositeChildSequence(splice.SpAt, splice.SpRemoved).(orderedSequence)
currentChild := current.(orderedMetaSequence).getCompositeChildSequence(splice.SpFrom, splice.SpAdded).(orderedSequence)
err := orderedSequenceDiff(lastChild, currentChild, changes, closeChan)
if err != nil {
return err
}
}
}
return nil
}
func orderedSequenceDiffLeafItems(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error {
lastCur := newCursorAt(last, emptyKey, false, false)
currentCur := newCursorAt(current, emptyKey, false, false)
for lastCur.valid() && currentCur.valid() {
fastForward(lastCur, currentCur)
@@ -74,7 +111,6 @@ func orderedSequenceDiff(last orderedSequence, current orderedSequence, changes
}
currentCur.advance()
}
close(changes)
return nil
}

View File

@@ -40,7 +40,12 @@ func accumulateOrderedSequenceDiffChanges(o1, o2 orderedSequence) (added []Value
changes := make(chan ValueChanged)
closeChan := make(chan struct{})
go orderedSequenceDiff(o1, o2, changes, closeChan)
go func() {
err := orderedSequenceDiffLeafItems(o1, o2, changes, closeChan)
if err == nil {
close(changes)
}
}()
for change := range changes {
if change.ChangeType == DiffChangeAdded {
added = append(added, change.V)

View File

@@ -38,7 +38,10 @@ func NewSet(v ...Value) Set {
func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
go func() {
orderedSequenceDiff(last.sequence().(orderedSequence), s.sequence().(orderedSequence), changes, closeChan)
err := orderedSequenceDiff(last.sequence().(orderedSequence), s.sequence().(orderedSequence), changes, closeChan)
if err == nil {
close(changes)
}
}()
}