diff --git a/clients/quad_tree/main.go b/clients/quad_tree/main.go index fbe3379aed..f363f06f5a 100644 --- a/clients/quad_tree/main.go +++ b/clients/quad_tree/main.go @@ -77,9 +77,13 @@ func main() { go func() { ds := dataset.Store() - walk.SomeP(ds.ReadValue(inputRef), ds, func(v types.Value) (stop bool) { + walk.SomeP(ds.ReadValue(inputRef), ds, func(v types.Value, r types.RefBase) (stop bool) { var g common.Geoposition + // Only consider RefOfStruct + if r == nil { + return + } switch v := v.(type) { case hasGeoposition: g = v.Geoposition() @@ -94,11 +98,6 @@ func main() { return } - // TODO: This check is mega bummer. We really only want to consider RefOfStruct, but it's complicated to filter the case of an inline struct out. - /*if !ds.Has(v.Ref()) { - return - }*/ - stop = true nodeDef := &common.NodeDef{Geoposition: g.Def(), Reference: v.Ref()} diff --git a/clients/shove/shove.go b/clients/shove/shove.go index de3221b6e2..4a9d3fef5c 100644 --- a/clients/shove/shove.go +++ b/clients/shove/shove.go @@ -52,7 +52,7 @@ func main() { d.Exp.False(sourceRef.IsEmpty(), "Unknown source object: %s", *sourceObject) var err error - *sink, err = sink.Pull(sourceStore, sourceRef, int(*p)) + *sink, err = sink.Pull(sourceStore, datas.NewRefOfCommit(sourceRef), int(*p)) util.MaybeWriteMemProfile() d.Exp.NoError(err) diff --git a/datas/pull.go b/datas/pull.go index 2ce7aae9ca..c8f36d67ed 100644 --- a/datas/pull.go +++ b/datas/pull.go @@ -9,22 +9,22 @@ import ( ) // CopyMissingChunksP copies to |sink| all chunks in source that are reachable from (and including) |r|, skipping chunks that |sink| already has -func CopyMissingChunksP(source DataStore, sink *LocalDataStore, sourceRef ref.Ref, concurrency int) { - copyCallback := func(r ref.Ref) bool { - return sink.has(r) +func CopyMissingChunksP(source DataStore, sink *LocalDataStore, sourceRef types.RefBase, concurrency int) { + copyCallback := func(r types.RefBase) bool { + return sink.has(r.TargetRef()) } copyWorker(source, sink, sourceRef, copyCallback, concurrency) } // CopyReachableChunksP copies to |sink| all chunks reachable from (and including) |r|, but that are not in the subtree rooted at |exclude| -func CopyReachableChunksP(source, sink DataStore, sourceRef, exclude ref.Ref, concurrency int) { +func CopyReachableChunksP(source, sink DataStore, sourceRef, exclude types.RefBase, concurrency int) { excludeRefs := map[ref.Ref]bool{} - if !exclude.IsEmpty() { + if !exclude.TargetRef().IsEmpty() { mu := sync.Mutex{} - excludeCallback := func(r ref.Ref) bool { + excludeCallback := func(r types.RefBase) bool { mu.Lock() - excludeRefs[r] = true + excludeRefs[r.TargetRef()] = true mu.Unlock() return false } @@ -32,13 +32,13 @@ func CopyReachableChunksP(source, sink DataStore, sourceRef, exclude ref.Ref, co walk.SomeChunksP(exclude, source, excludeCallback, concurrency) } - copyCallback := func(r ref.Ref) bool { - return excludeRefs[r] + copyCallback := func(r types.RefBase) bool { + return excludeRefs[r.TargetRef()] } copyWorker(source, sink, sourceRef, copyCallback, concurrency) } -func copyWorker(source DataStore, sink DataStore, sourceRef ref.Ref, stopFn walk.SomeChunksCallback, concurrency int) { +func copyWorker(source DataStore, sink DataStore, sourceRef types.RefBase, stopFn walk.SomeChunksCallback, concurrency int) { hcs := sink.hintedChunkSink() walk.SomeChunksP(sourceRef, newTeeDataSource(source.hintedChunkStore(), hcs), stopFn, concurrency) diff --git a/dataset/dataset.go b/dataset/dataset.go index 91a5fd675f..870f836871 100644 --- a/dataset/dataset.go +++ b/dataset/dataset.go @@ -57,17 +57,17 @@ func (ds *Dataset) CommitWithParents(v types.Value, p datas.SetOfRefOfCommit) (D return Dataset{store, ds.id}, err } -func (ds *Dataset) Pull(sourceStore datas.DataStore, sourceRef ref.Ref, concurrency int) (Dataset, error) { +func (ds *Dataset) Pull(sourceStore datas.DataStore, sourceRef datas.RefOfCommit, concurrency int) (Dataset, error) { _, topDown := ds.Store().(*datas.LocalDataStore) return ds.pull(sourceStore, sourceRef, concurrency, topDown) } -func (ds *Dataset) pull(source datas.DataStore, sourceRef ref.Ref, concurrency int, topDown bool) (Dataset, error) { +func (ds *Dataset) pull(source datas.DataStore, sourceRef datas.RefOfCommit, concurrency int, topDown bool) (Dataset, error) { sink := *ds - sinkHeadRef := ref.Ref{} + sinkHeadRef := datas.NewRefOfCommit(ref.Ref{}) if currentHead, ok := sink.MaybeHead(); ok { - sinkHeadRef = currentHead.Ref() + sinkHeadRef = datas.NewRefOfCommit(currentHead.Ref()) } if sourceRef == sinkHeadRef { @@ -87,8 +87,8 @@ func (ds *Dataset) pull(source datas.DataStore, sourceRef ref.Ref, concurrency i return sink, err } -func (ds *Dataset) validateRefAsCommit(r ref.Ref) datas.Commit { - v := ds.store.ReadValue(r) +func (ds *Dataset) validateRefAsCommit(r datas.RefOfCommit) datas.Commit { + v := ds.store.ReadValue(r.TargetRef()) d.Exp.NotNil(v, "%v cannot be found", r) d.Exp.True(v.Type().Equals(datas.NewCommit().Type()), "Not a Commit: %+v", v) @@ -96,7 +96,7 @@ func (ds *Dataset) validateRefAsCommit(r ref.Ref) datas.Commit { } // setNewHead takes the Ref of the desired new Head of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should take any necessary corrective action and try again using this new Dataset. -func (ds *Dataset) setNewHead(newHeadRef ref.Ref) (Dataset, error) { +func (ds *Dataset) setNewHead(newHeadRef datas.RefOfCommit) (Dataset, error) { commit := ds.validateRefAsCommit(newHeadRef) return ds.CommitWithParents(commit.Value(), commit.Parents()) } diff --git a/dataset/pull_test.go b/dataset/pull_test.go index f07f1fe541..4f4030cdd4 100644 --- a/dataset/pull_test.go +++ b/dataset/pull_test.go @@ -15,9 +15,9 @@ func createTestDataset(name string) Dataset { func TestValidateRef(t *testing.T) { ds := createTestDataset("test") - r := ds.Store().WriteValue(types.Bool(true)).TargetRef() + r := ds.Store().WriteValue(types.Bool(true)) - assert.Panics(t, func() { ds.validateRefAsCommit(r) }) + assert.Panics(t, func() { ds.validateRefAsCommit(datas.NewRefOfCommit(r.TargetRef())) }) } func NewList(ds Dataset, vs ...types.Value) types.RefBase { @@ -67,7 +67,7 @@ func pullTest(t *testing.T, topdown bool) { source, err = source.Commit(updatedValue) assert.NoError(err) - sink, err = sink.pull(source.Store(), source.Head().Ref(), 1, topdown) + sink, err = sink.pull(source.Store(), datas.NewRefOfCommit(source.Head().Ref()), 1, topdown) assert.NoError(err) assert.True(source.Head().Equals(sink.Head())) } @@ -96,7 +96,7 @@ func pullFirstCommit(t *testing.T, topdown bool) { source, err := source.Commit(sourceInitialValue) assert.NoError(err) - sink, err = sink.pull(source.Store(), source.Head().Ref(), 1, topdown) + sink, err = sink.pull(source.Store(), datas.NewRefOfCommit(source.Head().Ref()), 1, topdown) assert.NoError(err) assert.True(source.Head().Equals(sink.Head())) } @@ -123,7 +123,7 @@ func pullDeepRef(t *testing.T, topdown bool) { source, err := source.Commit(sourceInitialValue) assert.NoError(err) - sink, err = sink.pull(source.Store(), source.Head().Ref(), 1, topdown) + sink, err = sink.pull(source.Store(), datas.NewRefOfCommit(source.Head().Ref()), 1, topdown) assert.NoError(err) assert.True(source.Head().Equals(sink.Head())) } diff --git a/walk/walk.go b/walk/walk.go index f8a2e3fa3e..47dc1e66ee 100644 --- a/walk/walk.go +++ b/walk/walk.go @@ -9,12 +9,11 @@ import ( "github.com/attic-labs/noms/types" ) -// SomeCallback takes a types.Value and returns a bool indicating whether -// the current walk should skip the tree descending from value. -type SomeCallback func(v types.Value) bool +// SomeCallback takes a types.Value and returns a bool indicating whether the current walk should skip the tree descending from value. If |v| is a top-level value in a Chunk, then |r| will be the Ref which referenced it (otherwise |r| is nil). +type SomeCallback func(v types.Value, r types.RefBase) bool -// AllCallback takes a types.Value and processes it. -type AllCallback func(v types.Value) +// AllCallback takes a types.Value and processes it. If |v| is a top-level value in a Chunk, then |r| will be the Ref which referenced it (otherwise |r| is nil). +type AllCallback func(v types.Value, r types.RefBase) // SomeP recursively walks over all types.Values reachable from r and calls cb on them. If cb ever returns true, the walk will stop recursing on the current ref. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe. func SomeP(v types.Value, vr types.ValueReader, cb SomeCallback, concurrency int) { @@ -23,8 +22,8 @@ func SomeP(v types.Value, vr types.ValueReader, cb SomeCallback, concurrency int // AllP recursively walks over all types.Values reachable from r and calls cb on them. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe. func AllP(v types.Value, vr types.ValueReader, cb AllCallback, concurrency int) { - doTreeWalkP(v, vr, func(v types.Value) (skip bool) { - cb(v) + doTreeWalkP(v, vr, func(v types.Value, r types.RefBase) (skip bool) { + cb(v, r) return }, concurrency) } @@ -37,40 +36,41 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren mu := sync.Mutex{} wg := sync.WaitGroup{} - var processVal func(v types.Value) - processVal = func(v types.Value) { - if cb(v) { + var processVal func(v types.Value, r types.RefBase) + processVal = func(v types.Value, r types.RefBase) { + if cb(v, r) { return } - if r, ok := v.(types.RefBase); ok { + if sr, ok := v.(types.RefBase); ok { wg.Add(1) - rq.tail() <- r.TargetRef() + rq.tail() <- sr } else { for _, c := range v.ChildValues() { - processVal(c) + processVal(c, nil) } } } - processRef := func(r ref.Ref) { + processRef := func(r types.RefBase) { defer wg.Done() mu.Lock() - skip := visited[r] - visited[r] = true + skip := visited[r.TargetRef()] + visited[r.TargetRef()] = true mu.Unlock() if skip || f.didFail() { return } - v := vr.ReadValue(r) + target := r.TargetRef() + v := vr.ReadValue(target) if v == nil { - f.fail(fmt.Errorf("Attempt to copy absent ref:%s", r.String())) + f.fail(fmt.Errorf("Attempt to copy absent ref:%s", target.String())) return } - processVal(v) + processVal(v, r) } iter := func() { @@ -83,7 +83,7 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren go iter() } - processVal(v) + processVal(v, nil) wg.Wait() rq.close() @@ -91,37 +91,37 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren f.checkNotFailed() } -// SomeChunksCallback takes a ref.Ref and returns a bool indicating whether +// SomeChunksCallback takes a types.RefBase and returns a bool indicating whether // the current walk should skip the tree descending from value. -type SomeChunksCallback func(r ref.Ref) bool +type SomeChunksCallback func(r types.RefBase) bool // SomeChunksP Invokes callback on all chunks reachable from |r| in top-down order. |callback| is invoked only once for each chunk regardless of how many times the chunk appears -func SomeChunksP(r ref.Ref, vr types.ValueReader, callback SomeChunksCallback, concurrency int) { +func SomeChunksP(r types.RefBase, vr types.ValueReader, callback SomeChunksCallback, concurrency int) { doChunkWalkP(r, vr, callback, concurrency) } -func doChunkWalkP(r ref.Ref, vr types.ValueReader, callback SomeChunksCallback, concurrency int) { +func doChunkWalkP(r types.RefBase, vr types.ValueReader, callback SomeChunksCallback, concurrency int) { rq := newRefQueue() wg := sync.WaitGroup{} mu := sync.Mutex{} visitedRefs := map[ref.Ref]bool{} - walkChunk := func(r ref.Ref) { + walkChunk := func(r types.RefBase) { defer wg.Done() mu.Lock() - visited := visitedRefs[r] - visitedRefs[r] = true + visited := visitedRefs[r.TargetRef()] + visitedRefs[r.TargetRef()] = true mu.Unlock() if visited || callback(r) { return } - v := vr.ReadValue(r) + v := vr.ReadValue(r.TargetRef()) for _, r1 := range v.Chunks() { wg.Add(1) - rq.tail() <- r1.TargetRef() + rq.tail() <- r1 } } @@ -143,22 +143,22 @@ func doChunkWalkP(r ref.Ref, vr types.ValueReader, callback SomeChunksCallback, // refQueue emulates a buffered channel of refs of unlimited size. type refQueue struct { - head func() <-chan ref.Ref - tail func() chan<- ref.Ref + head func() <-chan types.RefBase + tail func() chan<- types.RefBase close func() } func newRefQueue() refQueue { - head := make(chan ref.Ref, 64) - tail := make(chan ref.Ref, 64) + head := make(chan types.RefBase, 64) + tail := make(chan types.RefBase, 64) done := make(chan struct{}) - buff := []ref.Ref{} + buff := []types.RefBase{} - push := func(r ref.Ref) { + push := func(r types.RefBase) { buff = append(buff, r) } - pop := func() ref.Ref { + pop := func() types.RefBase { d.Chk.True(len(buff) > 0) r := buff[0] buff = buff[1:] @@ -191,10 +191,10 @@ func newRefQueue() refQueue { }() return refQueue{ - func() <-chan ref.Ref { + func() <-chan types.RefBase { return head }, - func() chan<- ref.Ref { + func() chan<- types.RefBase { return tail }, func() { diff --git a/walk/walk_test.go b/walk/walk_test.go index 5193232966..8653a5f669 100644 --- a/walk/walk_test.go +++ b/walk/walk_test.go @@ -24,7 +24,7 @@ func (suite *WalkAllTestSuite) SetupTest() { func (suite *WalkAllTestSuite) walkWorker(r types.RefBase, expected int) { actual := 0 - AllP(r, suite.vs, func(c types.Value) { + AllP(r, suite.vs, func(c types.Value, r types.RefBase) { actual++ }, 1) suite.Equal(expected, actual) @@ -103,7 +103,7 @@ func (suite *WalkTestSuite) SetupTest() { func (suite *WalkTestSuite) TestStopWalkImmediately() { actual := 0 - SomeP(types.NewList(types.NewSet(), types.NewList()), suite.vs, func(v types.Value) bool { + SomeP(types.NewList(types.NewSet(), types.NewList()), suite.vs, func(v types.Value, r types.RefBase) bool { actual++ return true }, 1) @@ -111,7 +111,7 @@ func (suite *WalkTestSuite) TestStopWalkImmediately() { } func (suite *WalkTestSuite) skipWorker(composite types.Value) (reached []types.Value) { - SomeP(composite, suite.vs, func(v types.Value) bool { + SomeP(composite, suite.vs, func(v types.Value, r types.RefBase) bool { suite.False(v.Equals(suite.deadValue), "Should never have reached %+v", suite.deadValue) reached = append(reached, v) return v.Equals(suite.mustSkip)