Merge pull request #1237 from rafael-atticlabs/treeWalkRef

tree walks now operate over types.RefBase
This commit is contained in:
Rafael Weinstein
2016-04-13 15:36:43 -07:00
7 changed files with 69 additions and 70 deletions

View File

@@ -77,9 +77,13 @@ func main() {
go func() {
ds := dataset.Store()
walk.SomeP(ds.ReadValue(inputRef), ds, func(v types.Value) (stop bool) {
walk.SomeP(ds.ReadValue(inputRef), ds, func(v types.Value, r types.RefBase) (stop bool) {
var g common.Geoposition
// Only consider RefOfStruct
if r == nil {
return
}
switch v := v.(type) {
case hasGeoposition:
g = v.Geoposition()
@@ -94,11 +98,6 @@ func main() {
return
}
// TODO: This check is mega bummer. We really only want to consider RefOfStruct, but it's complicated to filter the case of an inline struct out.
/*if !ds.Has(v.Ref()) {
return
}*/
stop = true
nodeDef := &common.NodeDef{Geoposition: g.Def(), Reference: v.Ref()}

View File

@@ -52,7 +52,7 @@ func main() {
d.Exp.False(sourceRef.IsEmpty(), "Unknown source object: %s", *sourceObject)
var err error
*sink, err = sink.Pull(sourceStore, sourceRef, int(*p))
*sink, err = sink.Pull(sourceStore, datas.NewRefOfCommit(sourceRef), int(*p))
util.MaybeWriteMemProfile()
d.Exp.NoError(err)

View File

@@ -9,22 +9,22 @@ import (
)
// CopyMissingChunksP copies to |sink| all chunks in source that are reachable from (and including) |r|, skipping chunks that |sink| already has
func CopyMissingChunksP(source DataStore, sink *LocalDataStore, sourceRef ref.Ref, concurrency int) {
copyCallback := func(r ref.Ref) bool {
return sink.has(r)
func CopyMissingChunksP(source DataStore, sink *LocalDataStore, sourceRef types.RefBase, concurrency int) {
copyCallback := func(r types.RefBase) bool {
return sink.has(r.TargetRef())
}
copyWorker(source, sink, sourceRef, copyCallback, concurrency)
}
// CopyReachableChunksP copies to |sink| all chunks reachable from (and including) |r|, but that are not in the subtree rooted at |exclude|
func CopyReachableChunksP(source, sink DataStore, sourceRef, exclude ref.Ref, concurrency int) {
func CopyReachableChunksP(source, sink DataStore, sourceRef, exclude types.RefBase, concurrency int) {
excludeRefs := map[ref.Ref]bool{}
if !exclude.IsEmpty() {
if !exclude.TargetRef().IsEmpty() {
mu := sync.Mutex{}
excludeCallback := func(r ref.Ref) bool {
excludeCallback := func(r types.RefBase) bool {
mu.Lock()
excludeRefs[r] = true
excludeRefs[r.TargetRef()] = true
mu.Unlock()
return false
}
@@ -32,13 +32,13 @@ func CopyReachableChunksP(source, sink DataStore, sourceRef, exclude ref.Ref, co
walk.SomeChunksP(exclude, source, excludeCallback, concurrency)
}
copyCallback := func(r ref.Ref) bool {
return excludeRefs[r]
copyCallback := func(r types.RefBase) bool {
return excludeRefs[r.TargetRef()]
}
copyWorker(source, sink, sourceRef, copyCallback, concurrency)
}
func copyWorker(source DataStore, sink DataStore, sourceRef ref.Ref, stopFn walk.SomeChunksCallback, concurrency int) {
func copyWorker(source DataStore, sink DataStore, sourceRef types.RefBase, stopFn walk.SomeChunksCallback, concurrency int) {
hcs := sink.hintedChunkSink()
walk.SomeChunksP(sourceRef, newTeeDataSource(source.hintedChunkStore(), hcs), stopFn, concurrency)

View File

@@ -57,17 +57,17 @@ func (ds *Dataset) CommitWithParents(v types.Value, p datas.SetOfRefOfCommit) (D
return Dataset{store, ds.id}, err
}
func (ds *Dataset) Pull(sourceStore datas.DataStore, sourceRef ref.Ref, concurrency int) (Dataset, error) {
func (ds *Dataset) Pull(sourceStore datas.DataStore, sourceRef datas.RefOfCommit, concurrency int) (Dataset, error) {
_, topDown := ds.Store().(*datas.LocalDataStore)
return ds.pull(sourceStore, sourceRef, concurrency, topDown)
}
func (ds *Dataset) pull(source datas.DataStore, sourceRef ref.Ref, concurrency int, topDown bool) (Dataset, error) {
func (ds *Dataset) pull(source datas.DataStore, sourceRef datas.RefOfCommit, concurrency int, topDown bool) (Dataset, error) {
sink := *ds
sinkHeadRef := ref.Ref{}
sinkHeadRef := datas.NewRefOfCommit(ref.Ref{})
if currentHead, ok := sink.MaybeHead(); ok {
sinkHeadRef = currentHead.Ref()
sinkHeadRef = datas.NewRefOfCommit(currentHead.Ref())
}
if sourceRef == sinkHeadRef {
@@ -87,8 +87,8 @@ func (ds *Dataset) pull(source datas.DataStore, sourceRef ref.Ref, concurrency i
return sink, err
}
func (ds *Dataset) validateRefAsCommit(r ref.Ref) datas.Commit {
v := ds.store.ReadValue(r)
func (ds *Dataset) validateRefAsCommit(r datas.RefOfCommit) datas.Commit {
v := ds.store.ReadValue(r.TargetRef())
d.Exp.NotNil(v, "%v cannot be found", r)
d.Exp.True(v.Type().Equals(datas.NewCommit().Type()), "Not a Commit: %+v", v)
@@ -96,7 +96,7 @@ func (ds *Dataset) validateRefAsCommit(r ref.Ref) datas.Commit {
}
// setNewHead takes the Ref of the desired new Head of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should take any necessary corrective action and try again using this new Dataset.
func (ds *Dataset) setNewHead(newHeadRef ref.Ref) (Dataset, error) {
func (ds *Dataset) setNewHead(newHeadRef datas.RefOfCommit) (Dataset, error) {
commit := ds.validateRefAsCommit(newHeadRef)
return ds.CommitWithParents(commit.Value(), commit.Parents())
}

View File

@@ -15,9 +15,9 @@ func createTestDataset(name string) Dataset {
func TestValidateRef(t *testing.T) {
ds := createTestDataset("test")
r := ds.Store().WriteValue(types.Bool(true)).TargetRef()
r := ds.Store().WriteValue(types.Bool(true))
assert.Panics(t, func() { ds.validateRefAsCommit(r) })
assert.Panics(t, func() { ds.validateRefAsCommit(datas.NewRefOfCommit(r.TargetRef())) })
}
func NewList(ds Dataset, vs ...types.Value) types.RefBase {
@@ -67,7 +67,7 @@ func pullTest(t *testing.T, topdown bool) {
source, err = source.Commit(updatedValue)
assert.NoError(err)
sink, err = sink.pull(source.Store(), source.Head().Ref(), 1, topdown)
sink, err = sink.pull(source.Store(), datas.NewRefOfCommit(source.Head().Ref()), 1, topdown)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -96,7 +96,7 @@ func pullFirstCommit(t *testing.T, topdown bool) {
source, err := source.Commit(sourceInitialValue)
assert.NoError(err)
sink, err = sink.pull(source.Store(), source.Head().Ref(), 1, topdown)
sink, err = sink.pull(source.Store(), datas.NewRefOfCommit(source.Head().Ref()), 1, topdown)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -123,7 +123,7 @@ func pullDeepRef(t *testing.T, topdown bool) {
source, err := source.Commit(sourceInitialValue)
assert.NoError(err)
sink, err = sink.pull(source.Store(), source.Head().Ref(), 1, topdown)
sink, err = sink.pull(source.Store(), datas.NewRefOfCommit(source.Head().Ref()), 1, topdown)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}

View File

@@ -9,12 +9,11 @@ import (
"github.com/attic-labs/noms/types"
)
// SomeCallback takes a types.Value and returns a bool indicating whether
// the current walk should skip the tree descending from value.
type SomeCallback func(v types.Value) bool
// SomeCallback takes a types.Value and returns a bool indicating whether the current walk should skip the tree descending from value. If |v| is a top-level value in a Chunk, then |r| will be the Ref which referenced it (otherwise |r| is nil).
type SomeCallback func(v types.Value, r types.RefBase) bool
// AllCallback takes a types.Value and processes it.
type AllCallback func(v types.Value)
// AllCallback takes a types.Value and processes it. If |v| is a top-level value in a Chunk, then |r| will be the Ref which referenced it (otherwise |r| is nil).
type AllCallback func(v types.Value, r types.RefBase)
// SomeP recursively walks over all types.Values reachable from r and calls cb on them. If cb ever returns true, the walk will stop recursing on the current ref. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe.
func SomeP(v types.Value, vr types.ValueReader, cb SomeCallback, concurrency int) {
@@ -23,8 +22,8 @@ func SomeP(v types.Value, vr types.ValueReader, cb SomeCallback, concurrency int
// AllP recursively walks over all types.Values reachable from r and calls cb on them. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe.
func AllP(v types.Value, vr types.ValueReader, cb AllCallback, concurrency int) {
doTreeWalkP(v, vr, func(v types.Value) (skip bool) {
cb(v)
doTreeWalkP(v, vr, func(v types.Value, r types.RefBase) (skip bool) {
cb(v, r)
return
}, concurrency)
}
@@ -37,40 +36,41 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren
mu := sync.Mutex{}
wg := sync.WaitGroup{}
var processVal func(v types.Value)
processVal = func(v types.Value) {
if cb(v) {
var processVal func(v types.Value, r types.RefBase)
processVal = func(v types.Value, r types.RefBase) {
if cb(v, r) {
return
}
if r, ok := v.(types.RefBase); ok {
if sr, ok := v.(types.RefBase); ok {
wg.Add(1)
rq.tail() <- r.TargetRef()
rq.tail() <- sr
} else {
for _, c := range v.ChildValues() {
processVal(c)
processVal(c, nil)
}
}
}
processRef := func(r ref.Ref) {
processRef := func(r types.RefBase) {
defer wg.Done()
mu.Lock()
skip := visited[r]
visited[r] = true
skip := visited[r.TargetRef()]
visited[r.TargetRef()] = true
mu.Unlock()
if skip || f.didFail() {
return
}
v := vr.ReadValue(r)
target := r.TargetRef()
v := vr.ReadValue(target)
if v == nil {
f.fail(fmt.Errorf("Attempt to copy absent ref:%s", r.String()))
f.fail(fmt.Errorf("Attempt to copy absent ref:%s", target.String()))
return
}
processVal(v)
processVal(v, r)
}
iter := func() {
@@ -83,7 +83,7 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren
go iter()
}
processVal(v)
processVal(v, nil)
wg.Wait()
rq.close()
@@ -91,37 +91,37 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren
f.checkNotFailed()
}
// SomeChunksCallback takes a ref.Ref and returns a bool indicating whether
// SomeChunksCallback takes a types.RefBase and returns a bool indicating whether
// the current walk should skip the tree descending from value.
type SomeChunksCallback func(r ref.Ref) bool
type SomeChunksCallback func(r types.RefBase) bool
// SomeChunksP Invokes callback on all chunks reachable from |r| in top-down order. |callback| is invoked only once for each chunk regardless of how many times the chunk appears
func SomeChunksP(r ref.Ref, vr types.ValueReader, callback SomeChunksCallback, concurrency int) {
func SomeChunksP(r types.RefBase, vr types.ValueReader, callback SomeChunksCallback, concurrency int) {
doChunkWalkP(r, vr, callback, concurrency)
}
func doChunkWalkP(r ref.Ref, vr types.ValueReader, callback SomeChunksCallback, concurrency int) {
func doChunkWalkP(r types.RefBase, vr types.ValueReader, callback SomeChunksCallback, concurrency int) {
rq := newRefQueue()
wg := sync.WaitGroup{}
mu := sync.Mutex{}
visitedRefs := map[ref.Ref]bool{}
walkChunk := func(r ref.Ref) {
walkChunk := func(r types.RefBase) {
defer wg.Done()
mu.Lock()
visited := visitedRefs[r]
visitedRefs[r] = true
visited := visitedRefs[r.TargetRef()]
visitedRefs[r.TargetRef()] = true
mu.Unlock()
if visited || callback(r) {
return
}
v := vr.ReadValue(r)
v := vr.ReadValue(r.TargetRef())
for _, r1 := range v.Chunks() {
wg.Add(1)
rq.tail() <- r1.TargetRef()
rq.tail() <- r1
}
}
@@ -143,22 +143,22 @@ func doChunkWalkP(r ref.Ref, vr types.ValueReader, callback SomeChunksCallback,
// refQueue emulates a buffered channel of refs of unlimited size.
type refQueue struct {
head func() <-chan ref.Ref
tail func() chan<- ref.Ref
head func() <-chan types.RefBase
tail func() chan<- types.RefBase
close func()
}
func newRefQueue() refQueue {
head := make(chan ref.Ref, 64)
tail := make(chan ref.Ref, 64)
head := make(chan types.RefBase, 64)
tail := make(chan types.RefBase, 64)
done := make(chan struct{})
buff := []ref.Ref{}
buff := []types.RefBase{}
push := func(r ref.Ref) {
push := func(r types.RefBase) {
buff = append(buff, r)
}
pop := func() ref.Ref {
pop := func() types.RefBase {
d.Chk.True(len(buff) > 0)
r := buff[0]
buff = buff[1:]
@@ -191,10 +191,10 @@ func newRefQueue() refQueue {
}()
return refQueue{
func() <-chan ref.Ref {
func() <-chan types.RefBase {
return head
},
func() chan<- ref.Ref {
func() chan<- types.RefBase {
return tail
},
func() {

View File

@@ -24,7 +24,7 @@ func (suite *WalkAllTestSuite) SetupTest() {
func (suite *WalkAllTestSuite) walkWorker(r types.RefBase, expected int) {
actual := 0
AllP(r, suite.vs, func(c types.Value) {
AllP(r, suite.vs, func(c types.Value, r types.RefBase) {
actual++
}, 1)
suite.Equal(expected, actual)
@@ -103,7 +103,7 @@ func (suite *WalkTestSuite) SetupTest() {
func (suite *WalkTestSuite) TestStopWalkImmediately() {
actual := 0
SomeP(types.NewList(types.NewSet(), types.NewList()), suite.vs, func(v types.Value) bool {
SomeP(types.NewList(types.NewSet(), types.NewList()), suite.vs, func(v types.Value, r types.RefBase) bool {
actual++
return true
}, 1)
@@ -111,7 +111,7 @@ func (suite *WalkTestSuite) TestStopWalkImmediately() {
}
func (suite *WalkTestSuite) skipWorker(composite types.Value) (reached []types.Value) {
SomeP(composite, suite.vs, func(v types.Value) bool {
SomeP(composite, suite.vs, func(v types.Value, r types.RefBase) bool {
suite.False(v.Equals(suite.deadValue), "Should never have reached %+v", suite.deadValue)
reached = append(reached, v)
return v.Equals(suite.mustSkip)