Change sequence diff functions to be synchronous (#2098)

Currently they return immediately, but run internal goroutines which
stream results back on a channel. Now they must be run on their own
goroutine from outside the function.

This makes the code easier to reason about, and a future patch to write
a top-down and left-right multiplexing diff easier to write.
This commit is contained in:
Ben Kalman
2016-07-20 13:20:17 -07:00
committed by GitHub
parent b5cd065de1
commit 74200673d4
12 changed files with 143 additions and 116 deletions
+12 -3
View File
@@ -69,7 +69,10 @@ func diffLists(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.List) {
splices := make(chan types.Splice)
closeChan := make(chan struct{})
v2.Diff(v1, splices, closeChan)
go func() {
v2.Diff(v1, splices, closeChan)
close(splices)
}()
err := d.Try(func() {
for splice := range splices {
@@ -110,7 +113,10 @@ func diffMaps(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.Map) {
changes := make(chan types.ValueChanged)
closeChan := make(chan struct{})
v2.Diff(v1, changes, closeChan)
go func() {
v2.Diff(v1, changes, closeChan)
close(changes)
}()
err := d.Try(func() {
for change := range changes {
@@ -166,7 +172,10 @@ func diffSets(dq *diffQueue, w io.Writer, p types.Path, v1, v2 types.Set) {
changes := make(chan types.ValueChanged)
closeChan := make(chan struct{})
v2.Diff(v1, changes, closeChan)
go func() {
v2.Diff(v1, changes, closeChan)
close(changes)
}()
err := d.Try(func() {
for change := range changes {
+5 -16
View File
@@ -4,17 +4,7 @@
package types
var (
ChangeChanClosedErr = ChangeChannelClosedError{"Change channel closed"}
)
type ChangeChannelClosedError struct {
msg string
}
func (e ChangeChannelClosedError) Error() string { return e.msg }
func indexedSequenceDiff(last indexedSequence, lastHeight int, lastOffset uint64, current indexedSequence, currentHeight int, currentOffset uint64, changes chan<- Splice, closeChan <-chan struct{}, maxSpliceMatrixSize uint64) error {
func indexedSequenceDiff(last indexedSequence, lastHeight int, lastOffset uint64, current indexedSequence, currentHeight int, currentOffset uint64, changes chan<- Splice, closeChan <-chan struct{}, maxSpliceMatrixSize uint64) bool {
if lastHeight > currentHeight {
lastChild := last.(indexedMetaSequence).getCompositeChildSequence(0, uint64(last.seqLen())).(indexedSequence)
return indexedSequenceDiff(lastChild, lastHeight-1, lastOffset, current, currentHeight, currentOffset, changes, closeChan, maxSpliceMatrixSize)
@@ -39,7 +29,7 @@ func indexedSequenceDiff(last indexedSequence, lastHeight int, lastOffset uint64
select {
case changes <- splice:
case <-closeChan:
return ChangeChanClosedErr
return false
}
} else {
@@ -53,12 +43,11 @@ func indexedSequenceDiff(last indexedSequence, lastHeight int, lastOffset uint64
if splice.SpFrom > 0 {
currentChildOffset += current.getOffset(int(splice.SpFrom)-1) + 1
}
err := indexedSequenceDiff(lastChild, lastHeight-1, lastChildOffset, currentChild, currentHeight-1, currentChildOffset, changes, closeChan, maxSpliceMatrixSize)
if err != nil {
return err
if ok := indexedSequenceDiff(lastChild, lastHeight-1, lastChildOffset, currentChild, currentHeight-1, currentChildOffset, changes, closeChan, maxSpliceMatrixSize); !ok {
return false
}
}
}
return nil
return true
}
+15 -21
View File
@@ -196,28 +196,22 @@ func (l List) Diff(last List, changes chan<- Splice, closeChan <-chan struct{})
}
func (l List) DiffWithLimit(last List, changes chan<- Splice, closeChan <-chan struct{}, maxSpliceMatrixSize uint64) {
go func() {
if l.Equals(last) {
close(changes) // nothing changed
return
}
lLen, lastLen := l.Len(), last.Len()
if lLen == 0 {
changes <- Splice{0, lastLen, 0, 0} // everything removed
close(changes)
return
}
if lastLen == 0 {
changes <- Splice{0, 0, lLen, 0} // everything added
close(changes)
return
}
if l.Equals(last) {
return
}
lLen, lastLen := l.Len(), last.Len()
if lLen == 0 {
changes <- Splice{0, lastLen, 0, 0} // everything removed
return
}
if lastLen == 0 {
changes <- Splice{0, 0, lLen, 0} // everything added
return
}
lastCur := newCursorAtIndex(last.seq, 0)
lCur := newCursorAtIndex(l.seq, 0)
indexedSequenceDiff(last.seq, lastCur.depth(), 0, l.seq, lCur.depth(), 0, changes, closeChan, maxSpliceMatrixSize)
close(changes)
}()
lastCur := newCursorAtIndex(last.seq, 0)
lCur := newCursorAtIndex(l.seq, 0)
indexedSequenceDiff(last.seq, lastCur.depth(), 0, l.seq, lCur.depth(), 0, changes, closeChan, maxSpliceMatrixSize)
}
func newListLeafBoundaryChecker() boundaryChecker {
+8 -2
View File
@@ -627,7 +627,10 @@ func TestListModifyAfterRead(t *testing.T) {
func accumulateDiffSplices(l1, l2 List) (diff []Splice) {
diffChan := make(chan Splice)
l1.Diff(l2, diffChan, nil)
go func() {
l1.Diff(l2, diffChan, nil)
close(diffChan)
}()
for splice := range diffChan {
diff = append(diff, splice)
}
@@ -636,7 +639,10 @@ func accumulateDiffSplices(l1, l2 List) (diff []Splice) {
func accumulateDiffSplicesWithLimit(l1, l2 List, maxSpliceMatrixSize uint64) (diff []Splice) {
diffChan := make(chan Splice)
l1.DiffWithLimit(l2, diffChan, nil, maxSpliceMatrixSize)
go func() {
l1.DiffWithLimit(l2, diffChan, nil, maxSpliceMatrixSize)
close(diffChan)
}()
for splice := range diffChan {
diff = append(diff, splice)
}
+1 -6
View File
@@ -59,12 +59,7 @@ func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
}
func (m Map) Diff(last Map, changes chan<- ValueChanged, closeChan <-chan struct{}) {
go func() {
err := orderedSequenceDiff(last.sequence().(orderedSequence), m.sequence().(orderedSequence), changes, closeChan)
if err == nil {
close(changes)
}
}()
orderedSequenceDiffTopDown(last.sequence().(orderedSequence), m.sequence().(orderedSequence), changes, closeChan)
}
// Collection interface
+4 -1
View File
@@ -275,7 +275,10 @@ func getTestRefToValueOrderMap(scale int, vw ValueWriter) testMap {
func accumulateMapDiffChanges(m1, m2 Map) (added []Value, removed []Value, modified []Value) {
changes := make(chan ValueChanged)
m1.Diff(m2, changes, nil)
go func() {
m1.Diff(m2, changes, nil)
close(changes)
}()
for change := range changes {
if change.ChangeType == DiffChangeAdded {
added = append(added, change.V)
+45 -52
View File
@@ -5,9 +5,8 @@
package types
import (
"sync"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/util/functions"
)
type DiffChangeType uint8
@@ -23,33 +22,28 @@ type ValueChanged struct {
V Value
}
func sendChange(changes chan<- ValueChanged, closeChan <-chan struct{}, change ValueChanged) error {
func sendChange(changes chan<- ValueChanged, closeChan <-chan struct{}, change ValueChanged) bool {
select {
case changes <- change:
return true
case <-closeChan:
close(changes)
return ChangeChanClosedErr
return false
}
return nil
}
// TODO - something other than the literal edit-distance, which is way too much cpu work for this case - https://github.com/attic-labs/noms/issues/2027
func orderedSequenceDiff(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error {
// Streams the diff from |last| to |current| into |changes|, using a top-down approach.
// Top-down is parallel and efficiently returns the complete diff, but compared to left-right it's slow to start streaming changes.
func orderedSequenceDiffTopDown(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) bool {
var lastHeight, currentHeight int
wg := &sync.WaitGroup{}
wg.Add(2)
getHeight := func(seq orderedSequence, out *int) {
cur := newCursorAt(seq, emptyKey, false, false)
*out = cur.depth()
wg.Done()
}
go getHeight(last, &lastHeight)
go getHeight(current, &currentHeight)
wg.Wait()
functions.All(
func() { lastHeight = newCursorAt(last, emptyKey, false, false).depth() },
func() { currentHeight = newCursorAt(current, emptyKey, false, false).depth() },
)
return orderedSequenceDiffInternalNodes(last, current, changes, closeChan, lastHeight, currentHeight)
}
func orderedSequenceDiffInternalNodes(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}, lastHeight, currentHeight int) error {
// TODO - something other than the literal edit-distance, which is way too much cpu work for this case - https://github.com/attic-labs/noms/issues/2027
func orderedSequenceDiffInternalNodes(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}, lastHeight, currentHeight int) bool {
if lastHeight > currentHeight {
lastChild := last.(orderedMetaSequence).getCompositeChildSequence(0, uint64(last.seqLen())).(orderedSequence)
return orderedSequenceDiffInternalNodes(lastChild, current, changes, closeChan, lastHeight-1, currentHeight)
@@ -61,36 +55,34 @@ func orderedSequenceDiffInternalNodes(last orderedSequence, current orderedSeque
}
if !isMetaSequence(last) && !isMetaSequence(current) {
return orderedSequenceDiffLeafItems(last, current, changes, closeChan)
} else {
compareFn := last.getCompareFn(current)
initialSplices := calcSplices(uint64(last.seqLen()), uint64(current.seqLen()), DEFAULT_MAX_SPLICE_MATRIX_SIZE,
func(i uint64, j uint64) bool { return compareFn(int(i), int(j)) })
return orderedSequenceDiffLeftRight(last, current, changes, closeChan)
}
for _, splice := range initialSplices {
var lastChild, currentChild orderedSequence
wg := &sync.WaitGroup{}
wg.Add(2)
go func() {
compareFn := last.getCompareFn(current)
initialSplices := calcSplices(uint64(last.seqLen()), uint64(current.seqLen()), DEFAULT_MAX_SPLICE_MATRIX_SIZE,
func(i uint64, j uint64) bool { return compareFn(int(i), int(j)) })
for _, splice := range initialSplices {
var lastChild, currentChild orderedSequence
functions.All(
func() {
lastChild = last.(orderedMetaSequence).getCompositeChildSequence(splice.SpAt, splice.SpRemoved).(orderedSequence)
wg.Done()
}()
go func() {
},
func() {
currentChild = current.(orderedMetaSequence).getCompositeChildSequence(splice.SpFrom, splice.SpAdded).(orderedSequence)
wg.Done()
}()
wg.Wait()
err := orderedSequenceDiffInternalNodes(lastChild, currentChild, changes, closeChan, lastHeight-1, currentHeight-1)
if err != nil {
return err
}
},
)
if ok := orderedSequenceDiffInternalNodes(lastChild, currentChild, changes, closeChan, lastHeight-1, currentHeight-1); !ok {
return false
}
}
return nil
return true
}
func orderedSequenceDiffLeafItems(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error {
// Streams the diff from |last| to |current| into |changes|, using a left-right approach.
// Left-right immediately descends to the first change and starts streaming changes, but compared to top-down it's serial and much slower to calculate the full diff.
func orderedSequenceDiffLeftRight(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) bool {
lastCur := newCursorAt(last, emptyKey, false, false)
currentCur := newCursorAt(current, emptyKey, false, false)
@@ -102,18 +94,18 @@ func orderedSequenceDiffLeafItems(last orderedSequence, current orderedSequence,
lastKey := getCurrentKey(lastCur)
currentKey := getCurrentKey(currentCur)
if currentKey.Less(lastKey) {
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeAdded, currentKey.v}); err != nil {
return err
if ok := sendChange(changes, closeChan, ValueChanged{DiffChangeAdded, currentKey.v}); !ok {
return false
}
currentCur.advance()
} else if lastKey.Less(currentKey) {
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeRemoved, lastKey.v}); err != nil {
return err
if ok := sendChange(changes, closeChan, ValueChanged{DiffChangeRemoved, lastKey.v}); !ok {
return false
}
lastCur.advance()
} else {
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeModified, lastKey.v}); err != nil {
return err
if ok := sendChange(changes, closeChan, ValueChanged{DiffChangeModified, lastKey.v}); !ok {
return false
}
lastCur.advance()
currentCur.advance()
@@ -122,18 +114,19 @@ func orderedSequenceDiffLeafItems(last orderedSequence, current orderedSequence,
}
for lastCur.valid() {
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeRemoved, getCurrentKey(lastCur).v}); err != nil {
return err
if ok := sendChange(changes, closeChan, ValueChanged{DiffChangeRemoved, getCurrentKey(lastCur).v}); !ok {
return false
}
lastCur.advance()
}
for currentCur.valid() {
if err := sendChange(changes, closeChan, ValueChanged{DiffChangeAdded, getCurrentKey(currentCur).v}); err != nil {
return err
if ok := sendChange(changes, closeChan, ValueChanged{DiffChangeAdded, getCurrentKey(currentCur).v}); !ok {
return false
}
currentCur.advance()
}
return nil
return true
}
/**
+5 -8
View File
@@ -14,7 +14,7 @@ const (
lengthOfNumbersTest = 1000
)
type diffFn func(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error
type diffFn func(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) bool
type diffTestSuite struct {
suite.Suite
@@ -41,12 +41,9 @@ func newDiffTestSuite(from1, to1, by1, from2, to2, by2, numAddsExpected, numRemo
func accumulateOrderedSequenceDiffChanges(o1, o2 orderedSequence, df diffFn) (added []Value, removed []Value, modified []Value) {
changes := make(chan ValueChanged)
closeChan := make(chan struct{})
go func() {
err := df(o1, o2, changes, closeChan)
if err == nil {
close(changes)
}
df(o1, o2, changes, closeChan)
close(changes)
}()
for change := range changes {
if change.ChangeType == DiffChangeAdded {
@@ -89,8 +86,8 @@ func (suite *diffTestSuite) TestDiff() {
}
runTest := func(name string, vf valFn, cf colFn) {
runTestDf(name, vf, cf, orderedSequenceDiff)
runTestDf(name, vf, cf, orderedSequenceDiffLeafItems)
runTestDf(name, vf, cf, orderedSequenceDiffTopDown)
runTestDf(name, vf, cf, orderedSequenceDiffLeftRight)
}
newSetAsCol := func(vs []Value) Collection { return NewSet(vs...) }
+1 -6
View File
@@ -37,12 +37,7 @@ func NewSet(v ...Value) Set {
}
func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
go func() {
err := orderedSequenceDiff(last.sequence().(orderedSequence), s.sequence().(orderedSequence), changes, closeChan)
if err == nil {
close(changes)
}
}()
orderedSequenceDiffTopDown(last.sequence().(orderedSequence), s.sequence().(orderedSequence), changes, closeChan)
}
// Collection interface
+4 -1
View File
@@ -194,7 +194,10 @@ func getTestRefToValueOrderSet(scale int, vw ValueWriter) testSet {
func accumulateSetDiffChanges(s1, s2 Set) (added []Value, removed []Value) {
changes := make(chan ValueChanged)
s1.Diff(s2, changes, nil)
go func() {
s1.Diff(s2, changes, nil)
close(changes)
}()
for change := range changes {
if change.ChangeType == DiffChangeAdded {
added = append(added, change.V)
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package functions
import "sync"
// Runs all functions in |fs| in parallel, and returns when all functions have returned.
func All(fs ...func()) {
wg := &sync.WaitGroup{}
wg.Add(len(fs))
for _, f_ := range fs {
f := f_
go func() {
f()
wg.Done()
}()
}
wg.Wait()
}
+22
View File
@@ -0,0 +1,22 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package functions
import (
"testing"
"github.com/attic-labs/testify/assert"
)
func TestAll(t *testing.T) {
assert := assert.New(t)
// Set |res| via |ch| to test it's running in parallel - if not, they'll deadlock.
var res int
ch := make(chan int)
All(func() { ch <- 42 }, func() { res = <-ch })
assert.Equal(42, res)
}