mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 18:59:12 -06:00
Parallelise ordered diff left/right initial fetches, other improvements (#2092)
I also changed the ordered diff test to test both the edit-distance and streaming versions of the algorithm.
This commit is contained in:
@@ -5,6 +5,8 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
)
|
||||
|
||||
@@ -33,19 +35,29 @@ func sendChange(changes chan<- ValueChanged, closeChan <-chan struct{}, change V
|
||||
|
||||
// TODO - something other than the literal edit-distance, which is way too much cpu work for this case - https://github.com/attic-labs/noms/issues/2027
|
||||
func orderedSequenceDiff(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error {
|
||||
lastCur := newCursorAt(last, emptyKey, false, false)
|
||||
currentCur := newCursorAt(current, emptyKey, false, false)
|
||||
lastHeight := lastCur.depth()
|
||||
currentHeight := currentCur.depth()
|
||||
var lastHeight, currentHeight int
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
getHeight := func(seq orderedSequence, out *int) {
|
||||
cur := newCursorAt(seq, emptyKey, false, false)
|
||||
*out = cur.depth()
|
||||
wg.Done()
|
||||
}
|
||||
go getHeight(last, &lastHeight)
|
||||
go getHeight(current, ¤tHeight)
|
||||
wg.Wait()
|
||||
return orderedSequenceDiffInternalNodes(last, current, changes, closeChan, lastHeight, currentHeight)
|
||||
}
|
||||
|
||||
func orderedSequenceDiffInternalNodes(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}, lastHeight, currentHeight int) error {
|
||||
if lastHeight > currentHeight {
|
||||
lastChild := last.(orderedMetaSequence).getCompositeChildSequence(0, uint64(last.seqLen())).(orderedSequence)
|
||||
return orderedSequenceDiff(lastChild, current, changes, closeChan)
|
||||
return orderedSequenceDiffInternalNodes(lastChild, current, changes, closeChan, lastHeight-1, currentHeight)
|
||||
}
|
||||
|
||||
if currentHeight > lastHeight {
|
||||
currentChild := current.(orderedMetaSequence).getCompositeChildSequence(0, uint64(current.seqLen())).(orderedSequence)
|
||||
return orderedSequenceDiff(last, currentChild, changes, closeChan)
|
||||
return orderedSequenceDiffInternalNodes(last, currentChild, changes, closeChan, lastHeight, currentHeight-1)
|
||||
}
|
||||
|
||||
if !isMetaSequence(last) && !isMetaSequence(current) {
|
||||
@@ -56,9 +68,19 @@ func orderedSequenceDiff(last orderedSequence, current orderedSequence, changes
|
||||
func(i uint64, j uint64) bool { return compareFn(int(i), int(j)) })
|
||||
|
||||
for _, splice := range initialSplices {
|
||||
lastChild := last.(orderedMetaSequence).getCompositeChildSequence(splice.SpAt, splice.SpRemoved).(orderedSequence)
|
||||
currentChild := current.(orderedMetaSequence).getCompositeChildSequence(splice.SpFrom, splice.SpAdded).(orderedSequence)
|
||||
err := orderedSequenceDiff(lastChild, currentChild, changes, closeChan)
|
||||
var lastChild, currentChild orderedSequence
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
lastChild = last.(orderedMetaSequence).getCompositeChildSequence(splice.SpAt, splice.SpRemoved).(orderedSequence)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
currentChild = current.(orderedMetaSequence).getCompositeChildSequence(splice.SpFrom, splice.SpAdded).(orderedSequence)
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
err := orderedSequenceDiffInternalNodes(lastChild, currentChild, changes, closeChan, lastHeight-1, currentHeight-1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@ const (
|
||||
lengthOfNumbersTest = 1000
|
||||
)
|
||||
|
||||
type diffFn func(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, closeChan <-chan struct{}) error
|
||||
|
||||
type diffTestSuite struct {
|
||||
suite.Suite
|
||||
from1, to1, by1 int
|
||||
@@ -36,12 +38,12 @@ func newDiffTestSuite(from1, to1, by1, from2, to2, by2, numAddsExpected, numRemo
|
||||
}
|
||||
}
|
||||
|
||||
func accumulateOrderedSequenceDiffChanges(o1, o2 orderedSequence) (added []Value, removed []Value, modified []Value) {
|
||||
func accumulateOrderedSequenceDiffChanges(o1, o2 orderedSequence, df diffFn) (added []Value, removed []Value, modified []Value) {
|
||||
changes := make(chan ValueChanged)
|
||||
closeChan := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
err := orderedSequenceDiffLeafItems(o1, o2, changes, closeChan)
|
||||
err := df(o1, o2, changes, closeChan)
|
||||
if err == nil {
|
||||
close(changes)
|
||||
}
|
||||
@@ -71,12 +73,13 @@ func (suite *diffTestSuite) TestDiff() {
|
||||
return true
|
||||
}
|
||||
|
||||
runTest := func(name string, vf valFn, cf colFn) {
|
||||
runTestDf := func(name string, vf valFn, cf colFn, df diffFn) {
|
||||
col1 := cf(vf(suite.from1, suite.to1, suite.by1))
|
||||
col2 := cf(vf(suite.from2, suite.to2, suite.by2))
|
||||
suite.added, suite.removed, suite.modified = accumulateOrderedSequenceDiffChanges(
|
||||
col1.sequence().(orderedSequence),
|
||||
col2.sequence().(orderedSequence))
|
||||
col2.sequence().(orderedSequence),
|
||||
df)
|
||||
suite.Equal(suite.numAddsExpected, len(suite.added), "test %s: num added is not as expected", name)
|
||||
suite.Equal(suite.numRemovesExpected, len(suite.removed), "test %s: num removed is not as expected", name)
|
||||
suite.Equal(suite.numModifiedExpected, len(suite.modified), "test %s: num modified is not as expected", name)
|
||||
@@ -85,6 +88,11 @@ func (suite *diffTestSuite) TestDiff() {
|
||||
suite.True(notNil(suite.modified), "test %s: modified has nil values", name)
|
||||
}
|
||||
|
||||
runTest := func(name string, vf valFn, cf colFn) {
|
||||
runTestDf(name, vf, cf, orderedSequenceDiff)
|
||||
runTestDf(name, vf, cf, orderedSequenceDiffLeafItems)
|
||||
}
|
||||
|
||||
newSetAsCol := func(vs []Value) Collection { return NewSet(vs...) }
|
||||
newMapAsCol := func(vs []Value) Collection { return NewMap(vs...) }
|
||||
|
||||
|
||||
Reference in New Issue
Block a user