Refactor Chunks and ChildValues API to work iteratively (#2599)

* Refactors Chunks and ChildValues API to be iterative change also
exposes WalkValues which replaces SomeP and AllP
This commit is contained in:
zcstarr
2016-09-30 16:53:00 -07:00
committed by GitHub
parent c04f623375
commit 40b28f94e5
31 changed files with 193 additions and 412 deletions
+9 -1
View File
@@ -193,6 +193,14 @@ func (dbc *databaseCommon) validateRefAsCommit(r types.Ref) types.Struct {
return v.(types.Struct)
}
func getNumValues(v types.Value) (count int) {
count = 0
v.WalkValues(func(v types.Value) {
count++
})
return
}
func buildNewCommit(ds Dataset, v types.Value, opts CommitOptions) types.Struct {
parents := opts.Parents
if (parents == types.Set{}) {
@@ -205,7 +213,7 @@ func buildNewCommit(ds Dataset, v types.Value, opts CommitOptions) types.Struct
meta := opts.Meta
// Ideally, would like to do 'if meta == types.Struct{}' but types.Struct is not comparable in Go
// since it contains a slice.
if meta.Type() == nil && len(meta.ChildValues()) == 0 {
if meta.Type() == nil && getNumValues(meta) == 0 {
meta = types.EmptyStruct
}
return NewCommit(v, parents, meta)
+10 -3
View File
@@ -237,6 +237,13 @@ func sendWork(ch chan<- types.Ref, refs types.RefSlice) {
type hintCache map[hash.Hash]hash.Hash
func getChunks(v types.Value) (chunks []types.Ref) {
v.WalkRefs(func(ref types.Ref) {
chunks = append(chunks, ref)
})
return
}
func traverseSource(srcRef types.Ref, srcDB, sinkDB Database, estimateBytesWritten bool) traverseSourceResult {
h := srcRef.TargetHash()
if !sinkDB.has(h) {
@@ -251,7 +258,7 @@ func traverseSource(srcRef types.Ref, srcDB, sinkDB Database, estimateBytesWritt
// write size is implementation specific.
bytesWritten = len(snappy.Encode(nil, c.Data()))
}
ts := traverseSourceResult{traverseResult{h, v.Chunks(), len(c.Data())}, bytesWritten}
ts := traverseSourceResult{traverseResult{h, getChunks(v), len(c.Data())}, bytesWritten}
return ts
}
return traverseSourceResult{}
@@ -259,7 +266,7 @@ func traverseSource(srcRef types.Ref, srcDB, sinkDB Database, estimateBytesWritt
func traverseSink(sinkRef types.Ref, db Database) traverseResult {
if sinkRef.Height() > 1 {
return traverseResult{sinkRef.TargetHash(), sinkRef.TargetValue(db).Chunks(), 0}
return traverseResult{sinkRef.TargetHash(), getChunks(sinkRef.TargetValue(db)), 0}
}
return traverseResult{}
}
@@ -272,7 +279,7 @@ func traverseCommon(comRef, sinkHead types.Ref, db Database) traverseResult {
if comRef.Equals(sinkHead) {
exclusionSet = commit.Get(ParentsField).(types.Set)
}
chunks := types.RefSlice(commit.Chunks())
chunks := types.RefSlice(getChunks(commit))
for i := 0; i < len(chunks); {
if exclusionSet.Has(chunks[i]) {
end := len(chunks) - 1
+3 -4
View File
@@ -103,12 +103,11 @@ func (b Blob) Hash() hash.Hash {
return *b.h
}
func (b Blob) ChildValues() []Value {
return []Value{}
func (b Blob) WalkValues(cb ValueCallback) {
}
func (b Blob) Chunks() []Ref {
return b.seq.Chunks()
func (b Blob) WalkRefs(cb RefCallback) {
b.seq.WalkRefs(cb)
}
func (b Blob) Type() *Type {
+1 -2
View File
@@ -26,6 +26,5 @@ func (bl blobLeafSequence) getItem(idx int) sequenceItem {
return bl.data[idx]
}
func (bl blobLeafSequence) Chunks() []Ref {
return []Ref{}
func (bl blobLeafSequence) WalkRefs(cb RefCallback) {
}
+2 -4
View File
@@ -27,12 +27,10 @@ func (v Bool) Hash() hash.Hash {
return getHash(v)
}
func (v Bool) ChildValues() []Value {
return nil
func (v Bool) WalkValues(cb ValueCallback) {
}
func (v Bool) Chunks() []Ref {
return nil
func (v Bool) WalkRefs(cb RefCallback) {
}
func (v Bool) Type() *Type {
+3 -3
View File
@@ -43,7 +43,7 @@ func (suite *collectionTestSuite) TestEquals() {
}
func (suite *collectionTestSuite) TestChunkCountAndType() {
chunks := suite.col.Chunks()
chunks := getChunks(suite.col)
suite.Equal(suite.expectChunkCount, len(chunks))
refType := MakeRefType(suite.expectType)
for _, r := range chunks {
@@ -62,12 +62,12 @@ func (suite *collectionTestSuite) TestRoundTripAndValidate() {
func (suite *collectionTestSuite) TestPrependChunkDiff() {
v2 := suite.prependOne()
suite.Equal(suite.expectPrependChunkDiff, chunkDiffCount(suite.col.Chunks(), v2.Chunks()))
suite.Equal(suite.expectPrependChunkDiff, chunkDiffCount(getChunks(suite.col), getChunks(v2)))
}
func (suite *collectionTestSuite) TestAppendChunkDiff() {
v2 := suite.appendOne()
suite.Equal(suite.expectAppendChunkDiff, chunkDiffCount(suite.col.Chunks(), v2.Chunks()))
suite.Equal(suite.expectAppendChunkDiff, chunkDiffCount(getChunks(suite.col), getChunks(v2)))
}
func deriveCollectionHeight(c Collection) uint64 {
+5 -5
View File
@@ -589,11 +589,11 @@ func TestWriteEmptyUnionList(t *testing.T) {
type bogusType int
func (bg bogusType) Equals(other Value) bool { return false }
func (bg bogusType) Less(other Value) bool { return false }
func (bg bogusType) Hash() hash.Hash { return hash.Hash{} }
func (bg bogusType) ChildValues() []Value { return ValueSlice{} }
func (bg bogusType) Chunks() []Ref { return RefSlice{} }
func (bg bogusType) Equals(other Value) bool { return false }
func (bg bogusType) Less(other Value) bool { return false }
func (bg bogusType) Hash() hash.Hash { return hash.Hash{} }
func (bg bogusType) WalkValues(cb ValueCallback) {}
func (bg bogusType) WalkRefs(cb RefCallback) {}
func (bg bogusType) Type() *Type {
return MakeCycleType(0)
}
+4 -6
View File
@@ -91,16 +91,14 @@ func (l List) Hash() hash.Hash {
return *l.h
}
func (l List) ChildValues() []Value {
values := make([]Value, l.Len())
func (l List) WalkValues(cb ValueCallback) {
l.IterAll(func(v Value, idx uint64) {
values[idx] = v
cb(v)
})
return values
}
func (l List) Chunks() []Ref {
return l.seq.Chunks()
func (l List) WalkRefs(cb RefCallback) {
l.seq.WalkRefs(cb)
}
func (l List) Type() *Type {
+2 -3
View File
@@ -31,9 +31,8 @@ func (ll listLeafSequence) getItem(idx int) sequenceItem {
return ll.values[idx]
}
func (ll listLeafSequence) Chunks() (chunks []Ref) {
func (ll listLeafSequence) WalkRefs(cb RefCallback) {
for _, v := range ll.values {
chunks = append(chunks, v.Chunks()...)
v.WalkRefs(cb)
}
return
}
+5 -4
View File
@@ -107,15 +107,16 @@ func (m Map) Hash() hash.Hash {
return *m.h
}
func (m Map) ChildValues() (values []Value) {
func (m Map) WalkValues(cb ValueCallback) {
m.IterAll(func(k, v Value) {
values = append(values, k, v)
cb(k)
cb(v)
})
return
}
func (m Map) Chunks() []Ref {
return m.seq.Chunks()
func (m Map) WalkRefs(cb RefCallback) {
m.seq.WalkRefs(cb)
}
func (m Map) Type() *Type {
+3 -4
View File
@@ -50,12 +50,11 @@ func (ml mapLeafSequence) getItem(idx int) sequenceItem {
return ml.data[idx]
}
func (ml mapLeafSequence) Chunks() (chunks []Ref) {
func (ml mapLeafSequence) WalkRefs(cb RefCallback) {
for _, entry := range ml.data {
chunks = append(chunks, entry.key.Chunks()...)
chunks = append(chunks, entry.value.Chunks()...)
entry.key.WalkRefs(cb)
entry.value.WalkRefs(cb)
}
return
}
func (ml mapLeafSequence) getCompareFn(other sequence) compareFn {
+3 -3
View File
@@ -1046,15 +1046,15 @@ func TestMapChunks(t *testing.T) {
assert := assert.New(t)
l1 := NewMap(Number(0), Number(1))
c1 := l1.Chunks()
c1 := getChunks(l1)
assert.Len(c1, 0)
l2 := NewMap(NewRef(Number(0)), Number(1))
c2 := l2.Chunks()
c2 := getChunks(l2)
assert.Len(c2, 1)
l3 := NewMap(Number(0), NewRef(Number(1)))
c3 := l3.Chunks()
c3 := getChunks(l3)
assert.Len(c3, 1)
}
+4 -7
View File
@@ -126,12 +126,10 @@ func (ms metaSequence) valueReader() ValueReader {
return ms.vr
}
func (ms metaSequence) Chunks() []Ref {
chunks := make([]Ref, len(ms.tuples))
for i, tuple := range ms.tuples {
chunks[i] = tuple.ref
func (ms metaSequence) WalkRefs(cb RefCallback) {
for _, tuple := range ms.tuples {
cb(tuple.ref)
}
return chunks
}
func (ms metaSequence) Type() *Type {
@@ -263,8 +261,7 @@ func (es emptySequence) valueReader() ValueReader {
return nil
}
func (es emptySequence) Chunks() (chunks []Ref) {
return
func (es emptySequence) WalkRefs(cb RefCallback) {
}
func (es emptySequence) Type() *Type {
+2 -4
View File
@@ -27,12 +27,10 @@ func (v Number) Hash() hash.Hash {
return getHash(v)
}
func (v Number) ChildValues() []Value {
return nil
func (v Number) WalkValues(cb ValueCallback) {
}
func (v Number) Chunks() []Ref {
return nil
func (v Number) WalkRefs(cb RefCallback) {
}
func (v Number) Type() *Type {
+7 -10
View File
@@ -28,13 +28,11 @@ func constructRef(t *Type, target hash.Hash, height uint64) Ref {
}
func maxChunkHeight(v Value) (max uint64) {
if chunks := v.Chunks(); chunks != nil {
for _, r := range chunks {
if height := r.Height(); height > max {
max = height
}
v.WalkRefs(func(r Ref) {
if height := r.Height(); height > max {
max = height
}
}
})
return
}
@@ -67,12 +65,11 @@ func (r Ref) Hash() hash.Hash {
return *r.h
}
func (r Ref) ChildValues() []Value {
return nil
func (r Ref) WalkValues(cb ValueCallback) {
}
func (r Ref) Chunks() (chunks []Ref) {
return append(chunks, r)
func (r Ref) WalkRefs(cb RefCallback) {
cb(r)
}
func (r Ref) Type() *Type {
+2 -2
View File
@@ -48,6 +48,6 @@ func TestRefChunks(t *testing.T) {
l := NewList()
r := NewRef(l)
assert.Len(r.Chunks(), 1)
assert.Equal(r, r.Chunks()[0])
assert.Len(getChunks(r), 1)
assert.Equal(r, getChunks(r)[0])
}
+1 -1
View File
@@ -13,7 +13,7 @@ type sequence interface {
seqLen() int
numLeaves() uint64
valueReader() ValueReader
Chunks() []Ref
WalkRefs(cb RefCallback)
Type() *Type
getCompareFn(other sequence) compareFn
getChildSequence(idx int) sequence
+2 -2
View File
@@ -57,11 +57,11 @@ func (ts testSequence) Hash() hash.Hash {
panic("not reached")
}
func (ts testSequence) ChildValues() []Value {
func (ts testSequence) WalkValues(cb ValueCallback) {
panic("not reached")
}
func (ts testSequence) Chunks() []Ref {
func (ts testSequence) WalkRefs(cb RefCallback) {
panic("not reached")
}
+4 -5
View File
@@ -92,15 +92,14 @@ func (s Set) Hash() hash.Hash {
return *s.h
}
func (s Set) ChildValues() (values []Value) {
func (s Set) WalkValues(cb ValueCallback) {
s.IterAll(func(v Value) {
values = append(values, v)
cb(v)
})
return
}
func (s Set) Chunks() []Ref {
return s.seq.Chunks()
func (s Set) WalkRefs(cb RefCallback) {
s.seq.WalkRefs(cb)
}
func (s Set) Type() *Type {
+2 -3
View File
@@ -24,11 +24,10 @@ func (sl setLeafSequence) getItem(idx int) sequenceItem {
return sl.data[idx]
}
func (sl setLeafSequence) Chunks() (chunks []Ref) {
func (sl setLeafSequence) WalkRefs(cb RefCallback) {
for _, v := range sl.data {
chunks = append(chunks, v.Chunks()...)
v.WalkRefs(cb)
}
return
}
func (sl setLeafSequence) getCompareFn(other sequence) compareFn {
+4 -4
View File
@@ -817,11 +817,11 @@ func TestSetChunks(t *testing.T) {
assert := assert.New(t)
l1 := NewSet(Number(0))
c1 := l1.Chunks()
c1 := getChunks(l1)
assert.Len(c1, 0)
l2 := NewSet(NewRef(Number(0)))
c2 := l2.Chunks()
c2 := getChunks(l2)
assert.Len(c2, 1)
}
@@ -834,8 +834,8 @@ func TestSetChunks2(t *testing.T) {
vs := NewTestValueStore()
doTest := func(ts testSet) {
set := ts.toSet()
set2chunks := vs.ReadValue(vs.WriteValue(set).TargetHash()).Chunks()
for i, r := range set.Chunks() {
set2chunks := getChunks(vs.ReadValue(vs.WriteValue(set).TargetHash()))
for i, r := range getChunks(set) {
assert.True(r.Type().Equals(set2chunks[i].Type()), "%s != %s", r.Type().Describe(), set2chunks[i].Type().Describe())
}
}
+2 -4
View File
@@ -25,12 +25,10 @@ func (s String) Hash() hash.Hash {
return getHash(s)
}
func (s String) ChildValues() []Value {
return nil
func (s String) WalkValues(cb ValueCallback) {
}
func (s String) Chunks() []Ref {
return nil
func (s String) WalkRefs(cb RefCallback) {
}
func (s String) Type() *Type {
+6 -6
View File
@@ -76,17 +76,17 @@ func (s Struct) Hash() hash.Hash {
return *s.h
}
func (s Struct) ChildValues() []Value {
return s.values
func (s Struct) WalkValues(cb ValueCallback) {
for _, v := range s.values {
cb(v)
}
}
func (s Struct) Chunks() (chunks []Ref) {
chunks = append(chunks, s.t.Chunks()...)
func (s Struct) WalkRefs(cb RefCallback) {
for _, v := range s.values {
chunks = append(chunks, v.Chunks()...)
v.WalkRefs(cb)
}
return
}
func (s Struct) Type() *Type {
+9 -2
View File
@@ -10,6 +10,13 @@ import (
"github.com/attic-labs/testify/assert"
)
func getChunks(v Value) (chunks []Ref) {
v.WalkRefs(func(r Ref) {
chunks = append(chunks, r)
})
return
}
func TestGenericStructEquals(t *testing.T) {
assert := assert.New(t)
@@ -34,8 +41,8 @@ func TestGenericStructChunks(t *testing.T) {
s1 := NewStructWithType(typ, ValueSlice{NewRef(b)})
assert.Len(s1.Chunks(), 1)
assert.Equal(b.Hash(), s1.Chunks()[0].TargetHash())
assert.Len(getChunks(s1), 1)
assert.Equal(b.Hash(), getChunks(s1)[0].TargetHash())
}
func TestGenericStructNew(t *testing.T) {
+4 -4
View File
@@ -83,15 +83,15 @@ func (t *Type) Hash() hash.Hash {
return *t.h
}
func (t *Type) ChildValues() (res []Value) {
func (t *Type) WalkValues(cb ValueCallback) {
switch desc := t.Desc.(type) {
case CompoundDesc:
for _, t := range desc.ElemTypes {
res = append(res, t)
cb(t)
}
case StructDesc:
desc.IterFields(func(name string, t *Type) {
res = append(res, t)
cb(t)
})
case PrimitiveDesc:
// Nothing, these have no child values
@@ -101,7 +101,7 @@ func (t *Type) ChildValues() (res []Value) {
return
}
func (t *Type) Chunks() (chunks []Ref) {
func (t *Type) WalkRefs(cb RefCallback) {
return
}
+8 -5
View File
@@ -8,6 +8,9 @@ import (
"github.com/attic-labs/noms/go/hash"
)
type ValueCallback func(v Value)
type RefCallback func(ref Ref)
// Value is the interface all Noms values implement.
type Value interface {
// Equals determines if two different Noms values represents the same underlying value.
@@ -24,13 +27,13 @@ type Value interface {
// same hash they must be equal.
Hash() hash.Hash
// ChildValues returns the immediate children of this value in the DAG, if any, not including
// Type().
ChildValues() []Value
// WalkValues iterates over the immediate children of this value in the DAG, if any, not including
// Type()
WalkValues(ValueCallback)
// Chunks returns the refs to the underlying chunks. If this value is a collection that has been
// WalkRefs iterates over the refs to the underlying chunks. If this value is a collection that has been
// chunked then this will return the refs of th sub trees of the prolly-tree.
Chunks() []Ref
WalkRefs(RefCallback)
// Type returns the type of the Noms value. All Noms values carry their runtime Noms type.
Type() *Type
+4 -3
View File
@@ -138,12 +138,12 @@ func (lvs *ValueStore) Close() error {
// cacheChunks looks at the Chunks reachable from v and, for each one checks if there's a hint in the cache. If there isn't, or if the hint is a self-reference, the chunk gets r set as its new hint.
func (lvs *ValueStore) cacheChunks(v Value, r hash.Hash) {
for _, reachable := range v.Chunks() {
v.WalkRefs(func(reachable Ref) {
hash := reachable.TargetHash()
if cur := lvs.check(hash); cur == nil || cur.Hint().IsEmpty() || cur.Hint() == hash {
lvs.set(hash, hintedChunk{getTargetType(reachable), r})
}
}
})
}
func (lvs *ValueStore) isPresent(r hash.Hash) (present bool) {
@@ -188,7 +188,7 @@ func (lvs *ValueStore) opCache() opCache {
func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints {
hints := map[hash.Hash]struct{}{}
for _, reachable := range v.Chunks() {
collectHints := func(reachable Ref) {
// First, check the type cache to see if reachable is already known to be valid.
targetHash := reachable.TargetHash()
entry := lvs.check(targetHash)
@@ -212,6 +212,7 @@ func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints {
targetType := getTargetType(reachable)
d.PanicIfTrue(!entry.Type().Equals(targetType), "Value to write contains ref %s, which points to a value of a different type: %+v != %+v", reachable.TargetHash(), entry.Type(), targetType)
}
v.WalkRefs(collectHints)
return hints
}
+17 -243
View File
@@ -6,272 +6,46 @@
package walk
import (
"fmt"
"sync"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
)
// SomeCallback takes a types.Value and returns a bool indicating whether the current walk should skip the tree descending from value. If |v| is a top-level value in a Chunk, then |r| will be the Ref which referenced it (otherwise |r| is nil).
type SomeCallback func(v types.Value, r *types.Ref) bool
// AllCallback takes a types.Value and processes it. If |v| is a top-level value in a Chunk, then |r| will be the Ref which referenced it (otherwise |r| is nil).
type AllCallback func(v types.Value, r *types.Ref)
// SomeP recursively walks over all types.Values reachable from r and calls cb on them. If cb ever returns true, the walk will stop recursing on the current ref. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe.
func SomeP(v types.Value, vr types.ValueReader, cb SomeCallback, concurrency int) {
doTreeWalkP(v, vr, cb, concurrency)
}
// AllP recursively walks over all types.Values reachable from r and calls cb on them. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe.
func AllP(v types.Value, vr types.ValueReader, cb AllCallback, concurrency int) {
doTreeWalkP(v, vr, func(v types.Value, r *types.Ref) (skip bool) {
cb(v, r)
return
}, concurrency)
}
func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurrency int) {
rq := newRefQueue()
f := newFailure()
type SkipValueCallback func(v types.Value) bool
// WalkValues recursively walks over all types. Values reachable from r and calls cb on them.
func WalkValues(target types.Value, vr types.ValueReader, cb SkipValueCallback) {
var processRef func(r types.Ref)
var processVal func(v types.Value)
visited := map[hash.Hash]bool{}
mu := sync.Mutex{}
wg := sync.WaitGroup{}
var processVal func(v types.Value, r *types.Ref)
processVal = func(v types.Value, r *types.Ref) {
if cb(v, r) {
processVal = func(v types.Value) {
if cb(v) {
return
}
if sr, ok := v.(types.Ref); ok {
wg.Add(1)
rq.tail() <- sr
processRef(sr)
} else {
switch coll := v.(type) {
case types.List:
coll.IterAll(func(c types.Value, index uint64) {
processVal(c, nil)
})
case types.Set:
coll.IterAll(func(c types.Value) {
processVal(c, nil)
})
case types.Map:
coll.IterAll(func(k, c types.Value) {
processVal(k, nil)
processVal(c, nil)
})
default:
for _, c := range v.ChildValues() {
processVal(c, nil)
}
}
v.WalkValues(processVal)
}
}
processRef := func(r types.Ref) {
defer wg.Done()
mu.Lock()
skip := visited[r.TargetHash()]
visited[r.TargetHash()] = true
mu.Unlock()
if skip || f.didFail() {
processRef = func(r types.Ref) {
target := r.TargetHash()
if visited[target] {
return
}
target := r.TargetHash()
visited[target] = true
v := vr.ReadValue(target)
if v == nil {
f.fail(fmt.Errorf("Attempt to visit absent ref:%s", target.String()))
return
}
processVal(v, &r)
}
iter := func() {
for r := range rq.head() {
processRef(r)
}
}
for i := 0; i < concurrency; i++ {
go iter()
}
processVal(v, nil)
wg.Wait()
rq.close()
f.checkNotFailed()
}
// SomeChunksStopCallback is called for every unique types.Ref |r|. Return true to stop walking beyond |r|.
type SomeChunksStopCallback func(r types.Ref) bool
// SomeChunksChunkCallback is called for every unique chunks.Chunk |c| which wasn't stopped from SomeChunksStopCallback. |r| is a types.Ref referring to |c|.
type SomeChunksChunkCallback func(r types.Ref, c chunks.Chunk)
// SomeChunksP invokes callbacks on every unique chunk reachable from |r| in top-down order. Callbacks are invoked only once for each chunk regardless of how many times the chunk appears.
//
// |stopCb| is invoked for the types.Ref of every chunk. It can return true to stop SomeChunksP from descending any further.
// |chunkCb| is optional, invoked with the chunks.Chunk referenced by |stopCb| if it didn't return true.
func SomeChunksP(r types.Ref, bs types.BatchStore, stopCb SomeChunksStopCallback, chunkCb SomeChunksChunkCallback, concurrency int) {
rq := newRefQueue()
wg := sync.WaitGroup{}
mu := sync.Mutex{}
visitedRefs := map[hash.Hash]bool{}
walkChunk := func(r types.Ref) {
defer wg.Done()
tr := r.TargetHash()
mu.Lock()
visited := visitedRefs[tr]
visitedRefs[tr] = true
mu.Unlock()
if visited || stopCb(r) {
d.Chk.Fail("Attempt to visit absent ref:%s", target.String())
return
}
// Try to avoid the cost of reading |c|. It's only necessary if the caller wants to know about every chunk, or if we need to descend below |c| (ref height > 1).
var c chunks.Chunk
if chunkCb != nil || r.Height() > 1 {
c = bs.Get(tr)
d.PanicIfTrue(c.IsEmpty())
if chunkCb != nil {
chunkCb(r, c)
}
}
if r.Height() == 1 {
return
}
v := types.DecodeValue(c, nil)
for _, r1 := range v.Chunks() {
wg.Add(1)
rq.tail() <- r1
}
processVal(v)
}
iter := func() {
for r := range rq.head() {
walkChunk(r)
}
}
//Process initial value
processVal(target)
for i := 0; i < concurrency; i++ {
go iter()
}
wg.Add(1)
rq.tail() <- r
wg.Wait()
rq.close()
}
// refQueue emulates a buffered channel of refs of unlimited size.
type refQueue struct {
head func() <-chan types.Ref
tail func() chan<- types.Ref
close func()
}
func newRefQueue() refQueue {
head := make(chan types.Ref, 64)
tail := make(chan types.Ref, 64)
done := make(chan struct{})
buff := []types.Ref{}
push := func(r types.Ref) {
buff = append(buff, r)
}
pop := func() types.Ref {
d.PanicIfFalse(len(buff) > 0)
r := buff[0]
buff = buff[1:]
return r
}
go func() {
loop:
for {
if len(buff) == 0 {
select {
case r := <-tail:
push(r)
case <-done:
break loop
}
} else {
first := buff[0]
select {
case r := <-tail:
push(r)
case head <- first:
r := pop()
d.PanicIfFalse(r == first)
case <-done:
break loop
}
}
}
}()
return refQueue{
func() <-chan types.Ref {
return head
},
func() chan<- types.Ref {
return tail
},
func() {
close(head)
done <- struct{}{}
},
}
}
type failure struct {
err error
mu *sync.Mutex
}
func newFailure() *failure {
return &failure{
mu: &sync.Mutex{},
}
}
func (f *failure) fail(err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.err == nil { // only capture first error
f.err = err
}
}
func (f *failure) didFail() bool {
f.mu.Lock()
defer f.mu.Unlock()
return f.err != nil
}
func (f *failure) checkNotFailed() {
f.mu.Lock()
defer f.mu.Unlock()
d.Chk.NoError(f.err)
}
+60 -60
View File
@@ -28,14 +28,23 @@ func (suite *WalkAllTestSuite) SetupTest() {
suite.vs = types.NewTestValueStore()
}
func (suite *WalkAllTestSuite) walkWorker(r types.Ref, expected int) {
func (suite *WalkAllTestSuite) walkWorker(v types.Value, expected int) {
actual := 0
AllP(r, suite.vs, func(c types.Value, r *types.Ref) {
WalkValues(v, suite.vs, func(c types.Value) bool {
actual++
}, 1)
return false
})
suite.Equal(expected, actual)
}
func (suite *WalkAllTestSuite) TestWalkValuesDuplicates() {
dup := suite.NewList(types.Number(9), types.Number(10), types.Number(11), types.Number(12), types.Number(13))
l := suite.NewList(types.Number(8), dup, dup)
suite.walkWorker(l, 11)
}
func (suite *WalkAllTestSuite) TestWalkPrimitives() {
suite.walkWorker(suite.vs.WriteValue(types.Number(0.0)), 2)
suite.walkWorker(suite.vs.WriteValue(types.String("hello")), 2)
@@ -50,6 +59,54 @@ func (suite *WalkAllTestSuite) TestWalkComposites() {
suite.walkWorker(suite.NewMap(types.Number(8), types.Bool(true), types.Number(0), types.Bool(false)), 6)
}
func (suite *WalkTestSuite) skipWorker(composite types.Value) (reached []types.Value) {
WalkValues(composite, suite.vs, func(v types.Value) bool {
suite.False(v.Equals(suite.deadValue), "Should never have reached %+v", suite.deadValue)
reached = append(reached, v)
return v.Equals(suite.mustSkip)
})
return
}
// Skipping a sub-tree must allow other items in the list to be processed.
func (suite *WalkTestSuite) TestSkipListElement() {
wholeList := types.NewList(suite.mustSkip, suite.shouldSee, suite.shouldSee)
reached := suite.skipWorker(wholeList)
for _, v := range []types.Value{wholeList, suite.mustSkip, suite.shouldSee, suite.shouldSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 6)
}
func (suite *WalkTestSuite) TestSkipSetElement() {
wholeSet := types.NewSet(suite.mustSkip, suite.shouldSee).Insert(suite.shouldSee)
reached := suite.skipWorker(wholeSet)
for _, v := range []types.Value{wholeSet, suite.mustSkip, suite.shouldSee, suite.shouldSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 4)
}
func (suite *WalkTestSuite) TestSkipMapValue() {
shouldAlsoSeeItem := types.String("Also good")
shouldAlsoSee := types.NewSet(shouldAlsoSeeItem)
wholeMap := types.NewMap(suite.shouldSee, suite.mustSkip, shouldAlsoSee, suite.shouldSee)
reached := suite.skipWorker(wholeMap)
for _, v := range []types.Value{wholeMap, suite.shouldSee, suite.shouldSeeItem, suite.mustSkip, shouldAlsoSee, shouldAlsoSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 8)
}
func (suite *WalkTestSuite) TestSkipMapKey() {
wholeMap := types.NewMap(suite.mustSkip, suite.shouldSee, suite.shouldSee, suite.shouldSee)
reached := suite.skipWorker(wholeMap)
for _, v := range []types.Value{wholeMap, suite.mustSkip, suite.shouldSee, suite.shouldSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 8)
}
func (suite *WalkAllTestSuite) NewList(vs ...types.Value) types.Ref {
v := types.NewList(vs...)
return suite.vs.WriteValue(v)
@@ -101,60 +158,3 @@ func (suite *WalkTestSuite) SetupTest() {
suite.deadValue = types.Number(0xDEADBEEF)
suite.mustSkip = types.NewList(suite.deadValue)
}
func (suite *WalkTestSuite) TestStopWalkImmediately() {
actual := 0
SomeP(types.NewList(types.NewSet(), types.NewList()), suite.vs, func(v types.Value, r *types.Ref) bool {
actual++
return true
}, 1)
suite.Equal(1, actual)
}
func (suite *WalkTestSuite) skipWorker(composite types.Value) (reached []types.Value) {
SomeP(composite, suite.vs, func(v types.Value, r *types.Ref) bool {
suite.False(v.Equals(suite.deadValue), "Should never have reached %+v", suite.deadValue)
reached = append(reached, v)
return v.Equals(suite.mustSkip)
}, 1)
return
}
// Skipping a sub-tree must allow other items in the list to be processed.
func (suite *WalkTestSuite) SkipTestSkipListElement() {
wholeList := types.NewList(suite.mustSkip, suite.shouldSee, suite.shouldSee)
reached := suite.skipWorker(wholeList)
for _, v := range []types.Value{wholeList, suite.mustSkip, suite.shouldSee, suite.shouldSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 6)
}
func (suite *WalkTestSuite) SkipTestSkipSetElement() {
wholeSet := types.NewSet(suite.mustSkip, suite.shouldSee).Insert(suite.shouldSee)
reached := suite.skipWorker(wholeSet)
for _, v := range []types.Value{wholeSet, suite.mustSkip, suite.shouldSee, suite.shouldSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 4)
}
func (suite *WalkTestSuite) SkipTestSkipMapValue() {
shouldAlsoSeeItem := types.String("Also good")
shouldAlsoSee := types.NewSet(shouldAlsoSeeItem)
wholeMap := types.NewMap(suite.shouldSee, suite.mustSkip, shouldAlsoSee, suite.shouldSee)
reached := suite.skipWorker(wholeMap)
for _, v := range []types.Value{wholeMap, suite.shouldSee, suite.shouldSeeItem, suite.mustSkip, shouldAlsoSee, shouldAlsoSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 8)
}
func (suite *WalkTestSuite) SkipTestSkipMapKey() {
wholeMap := types.NewMap(suite.mustSkip, suite.shouldSee, suite.shouldSee, suite.shouldSee)
reached := suite.skipWorker(wholeMap)
for _, v := range []types.Value{wholeMap, suite.mustSkip, suite.shouldSee, suite.shouldSeeItem} {
suite.Contains(reached, v, "Doesn't contain %+v", v)
}
suite.Len(reached, 8)
}
+3 -2
View File
@@ -164,7 +164,7 @@ func addElementsToGraphBuilder(gb *types.GraphBuilder, db datas.Database, rootOb
}
index := Index{m: IndexMap{}}
walk.AllP(rootObject, db, func(v types.Value, r *types.Ref) {
walk.WalkValues(rootObject, db, func(v types.Value) bool {
typ := v.Type()
typeCacheMutex.Lock()
hasPath, ok := typeCache[typ]
@@ -182,7 +182,8 @@ func addElementsToGraphBuilder(gb *types.GraphBuilder, db datas.Database, rootOb
typeCacheMutex.Unlock()
}
}
}, 4)
return false
})
status.Done()
}
+2 -3
View File
@@ -26,7 +26,6 @@ func main() {
func index() (win bool) {
var dbStr = flag.String("db", "", "input database spec")
var outDSStr = flag.String("out-ds", "", "output dataset to write to - if empty, defaults to input dataset")
var parallelism = flag.Int("parallelism", 16, "number of parallel goroutines to search")
flag.Usage = usage
flag.Parse(false)
@@ -96,7 +95,7 @@ func index() (win bool) {
byFace := types.NewGraphBuilder(db, types.MapKind, true)
for _, v := range inputs {
walk.SomeP(v, db, func(cv types.Value, _ *types.Ref) (stop bool) {
walk.WalkValues(v, db, func(cv types.Value) (stop bool) {
if types.IsSubtype(photoType, cv.Type()) {
s := cv.(types.Struct)
@@ -130,7 +129,7 @@ func index() (win bool) {
stop = true
}
return
}, *parallelism)
})
}
outDS, err = db.CommitValue(outDS, types.NewStruct("", types.StructData{