CopyChunksP

This commit is contained in:
Rafael Weinstein
2015-09-16 10:50:19 -07:00
parent 5158a9626a
commit 068d4d3878
7 changed files with 264 additions and 145 deletions
+20 -11
View File
@@ -3,6 +3,7 @@ package main
import (
"flag"
"log"
"runtime"
"github.com/attic-labs/noms/clients/util"
"github.com/attic-labs/noms/d"
@@ -12,16 +13,20 @@ import (
)
var (
localDsFlags = dataset.NewFlagsWithPrefix("local-")
remoteDsFlags = dataset.NewFlagsWithPrefix("remote-")
sinkDsFlags = dataset.NewFlagsWithPrefix("sink-")
sourceDsFlags = dataset.NewFlagsWithPrefix("source-")
p = flag.Uint("p", 512, "parallelism")
)
func main() {
cpuCount := runtime.NumCPU()
runtime.GOMAXPROCS(cpuCount)
flag.Parse()
source := remoteDsFlags.CreateDataset()
sink := localDsFlags.CreateDataset()
if source == nil || sink == nil {
source := sourceDsFlags.CreateDataset()
sink := sinkDsFlags.CreateDataset()
if source == nil || sink == nil || *p == 0 {
flag.Usage()
return
}
@@ -33,14 +38,18 @@ func main() {
defer util.StopCPUProfile()
}
newHeadRef := source.Head().Ref()
currentHeadRef := ref.Ref{}
sourceHeadRef := source.Head().Ref()
sinkHeadRef := ref.Ref{}
if currentHead, ok := sink.MaybeHead(); ok {
currentHeadRef = currentHead.Ref()
sinkHeadRef = currentHead.Ref()
}
refs := sync.DiffHeadsByRef(currentHeadRef, newHeadRef, source.Store())
sync.CopyChunks(refs, source.Store(), sink.Store())
for ok := false; !ok; *sink, ok = sync.SetNewHead(newHeadRef, *sink) {
if sourceHeadRef == sinkHeadRef {
return
}
sync.CopyReachableChunksP(sourceHeadRef, sinkHeadRef, source.Store(), sink.Store(), int(*p))
for ok := false; !ok; *sink, ok = sync.SetNewHead(sourceHeadRef, *sink) {
continue
}
+63 -27
View File
@@ -26,35 +26,71 @@ func validateRefAsCommit(r ref.Ref, cs chunks.ChunkSource) datas.Commit {
return datas.CommitFromVal(v)
}
// DiffHeadsByRef takes two Refs, validates that both refer to Heads in the given ChunkSource, and then returns the set of Refs that can be reached from 'big', but not 'small'.
func DiffHeadsByRef(small, big ref.Ref, cs chunks.ChunkSource) []ref.Ref {
if small != (ref.Ref{}) {
validateRefAsCommit(small, cs)
}
validateRefAsCommit(big, cs)
return walk.Difference(small, big, cs)
}
// CopyChunks reads each Ref in refs out of src and writes it into sink.
func CopyChunks(refs []ref.Ref, src chunks.ChunkSource, sink chunks.ChunkSink) {
copy := func(ref ref.Ref) {
reader := src.Get(ref)
defer reader.Close()
d.Exp.NotNil(reader, "Attempt to copy ref which wasn't found: %+v", ref)
writer := sink.Put()
defer writer.Close()
_, err := io.Copy(writer, reader)
d.Exp.NoError(err)
}
for _, ref := range refs {
copy(ref)
}
return
}
// SetNewHead takes the Ref of the desired new Head of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should try again using this new Dataset.
func SetNewHead(newHeadRef ref.Ref, ds dataset.Dataset) (dataset.Dataset, bool) {
commit := validateRefAsCommit(newHeadRef, ds.Store())
return ds.CommitWithParents(commit.Value(), datas.SetOfCommitFromVal(commit.Parents()))
}
// Copys all chunks reachable from (and including) |r| but excluding all chunks reachable from (and including) |exclude| in |source| to |sink|.
func CopyReachableChunksP(r, exclude ref.Ref, source chunks.ChunkSource, sink chunks.ChunkSink, concurrency int) {
excludeRefs := map[ref.Ref]bool{}
hasRef := func(r ref.Ref) bool {
return excludeRefs[r]
}
if exclude != (ref.Ref{}) {
refChan := make(chan ref.Ref, 1024)
addRef := func(r ref.Ref) {
refChan <- r
}
go func() {
walk.AllP(exclude, source, addRef, concurrency)
close(refChan)
}()
for r := range refChan {
excludeRefs[r] = true
}
}
tcs := &teeChunkSource{source, sink}
walk.SomeP(r, tcs, hasRef, concurrency)
}
// teeChunkSource just serves the purpose of writing to |sink| every chunk that is read from |source|.
type teeChunkSource struct {
source chunks.ChunkSource
sink chunks.ChunkSink
}
func (trs *teeChunkSource) Get(ref ref.Ref) io.ReadCloser {
r := trs.source.Get(ref)
if r == nil {
return nil
}
w := trs.sink.Put()
tr := io.TeeReader(r, w)
return forwardCloser{tr, []io.Closer{r, w}}
}
func (trs *teeChunkSource) Has(ref ref.Ref) bool {
return trs.source.Has(ref)
}
// forwardCloser closes multiple io.Closer objects.
type forwardCloser struct {
io.Reader
cs []io.Closer
}
func (fc forwardCloser) Close() error {
for _, c := range fc.cs {
if err := c.Close(); err != nil {
return err
}
}
return nil
}
+23 -24
View File
@@ -26,70 +26,69 @@ func TestValidateRef(t *testing.T) {
func TestPull(t *testing.T) {
assert := assert.New(t)
puller := createTestDataset("puller")
pullee := createTestDataset("pullee")
sink := createTestDataset("sink")
source := createTestDataset("source")
// Give puller and pullee some initial shared context.
// Give sink and source some initial shared context.
initialValue := types.NewMap(
types.NewString("first"), types.NewList(),
types.NewString("second"), types.NewList(types.Int32(2)))
ok := false
pullee, ok = pullee.Commit(initialValue)
source, ok = source.Commit(initialValue)
assert.True(ok)
puller, ok = puller.Commit(initialValue)
sink, ok = sink.Commit(initialValue)
assert.True(ok)
// Add some new stuff to pullee.
// Add some new stuff to source.
updatedValue := initialValue.Set(
types.NewString("third"), types.NewList(types.Int32(3)))
pullee, ok = pullee.Commit(updatedValue)
source, ok = source.Commit(updatedValue)
assert.True(ok)
// Add some more stuff, so that pullee isn't directly ahead of puller.
// Add some more stuff, so that source isn't directly ahead of sink.
updatedValue = updatedValue.Set(
types.NewString("fourth"), types.NewList(types.Int32(4)))
pullee, ok = pullee.Commit(updatedValue)
source, ok = source.Commit(updatedValue)
assert.True(ok)
refs := DiffHeadsByRef(puller.Head().Ref(), pullee.Head().Ref(), pullee.Store())
CopyChunks(refs, pullee.Store(), puller.Store())
puller, ok = SetNewHead(pullee.Head().Ref(), puller)
CopyReachableChunksP(source.Head().Ref(), sink.Head().Ref(), source.Store(), sink.Store(), 1)
sink, ok = SetNewHead(source.Head().Ref(), sink)
assert.True(ok)
assert.True(pullee.Head().Equals(puller.Head()))
assert.True(source.Head().Equals(sink.Head()))
}
func TestPullFirstCommit(t *testing.T) {
assert := assert.New(t)
puller := createTestDataset("puller")
pullee := createTestDataset("pullee")
sink := createTestDataset("sink")
source := createTestDataset("source")
initialValue := types.NewMap(
types.NewString("first"), types.NewList(),
types.NewString("second"), types.NewList(types.Int32(2)))
pullee, ok := pullee.Commit(initialValue)
source, ok := source.Commit(initialValue)
assert.True(ok)
pullerHeadRef := func() ref.Ref {
head, ok := puller.MaybeHead()
sinkHeadRef := func() ref.Ref {
head, ok := sink.MaybeHead()
if ok {
return head.Ref()
}
return ref.Ref{}
}()
refs := DiffHeadsByRef(pullerHeadRef, pullee.Head().Ref(), pullee.Store())
CopyChunks(refs, pullee.Store(), puller.Store())
puller, ok = SetNewHead(pullee.Head().Ref(), puller)
CopyReachableChunksP(source.Head().Ref(), sinkHeadRef, source.Store(), sink.Store(), 1)
CopyReachableChunksP(source.Head().Ref(), sinkHeadRef, source.Store(), sink.Store(), 1)
sink, ok = SetNewHead(source.Head().Ref(), sink)
assert.True(ok)
assert.True(pullee.Head().Equals(puller.Head()))
assert.True(source.Head().Equals(sink.Head()))
}
func TestFailedCopyChunks(t *testing.T) {
cs := &chunks.NopStore{}
cs := &chunks.MemoryStore{}
r := ref.Parse("sha1-0000000000000000000000000000000000000000")
assert.Panics(t, func() { CopyChunks([]ref.Ref{r}, cs, cs) })
assert.Panics(t, func() { CopyReachableChunksP(r, ref.Ref{}, cs, cs, 1) })
}
-23
View File
@@ -1,23 +0,0 @@
package walk
import (
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/ref"
)
// Difference returns the refs of the chunks reachable from 'big' that cannot be reached from 'small'
func Difference(small, big ref.Ref, cs chunks.ChunkSource) (refs []ref.Ref) {
smallRefs := map[ref.Ref]bool{}
if small != (ref.Ref{}) {
All(small, cs, func(r ref.Ref) {
smallRefs[r] = true
})
}
Some(big, cs, func(r ref.Ref) (skip bool) {
if skip = smallRefs[r]; !skip {
refs = append(refs, r)
}
return
})
return
}
-40
View File
@@ -1,40 +0,0 @@
package walk
import (
"testing"
"github.com/attic-labs/noms/Godeps/_workspace/src/github.com/stretchr/testify/assert"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
func TestDifference(t *testing.T) {
assert := assert.New(t)
cs := &chunks.TestStore{}
storeAndRef := func(v types.Value) ref.Ref {
return types.WriteValue(v, cs)
}
// {"string": "string",
// "map": {"nested": "string"}
// "mtlist": []
// }
small := types.NewMap(
types.NewString("string"), types.NewString("string"),
types.NewString("map"), types.NewMap(types.NewString("nested"), types.NewString("string")),
types.NewString("mtlist"), types.NewList())
setVal := types.NewSet(types.Int32(7))
big := small.Set(types.NewString("set"), setVal)
var hashes []string
for _, r := range Difference(storeAndRef(small), storeAndRef(big), cs) {
hashes = append(hashes, r.String())
}
assert.Contains(hashes, setVal.Ref().String())
assert.Empty(Difference(small.Ref(), small.Ref(), cs))
}
+152 -14
View File
@@ -1,7 +1,11 @@
package walk
import (
"fmt"
"sync"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
@@ -13,26 +17,160 @@ type SomeCallback func(r ref.Ref) bool
// AllCallback takes a ref and processes it.
type AllCallback func(r ref.Ref)
// Some recursively walks over all ref.Refs reachable from r and calls cb on them.
// If cb ever returns true, the walk will stop recursing on the current ref.
func Some(r ref.Ref, cs chunks.ChunkSource, cb SomeCallback) {
doTreeWalk(r, cs, cb)
// Some recursively walks over all ref.Refs reachable from r and calls cb on them. If cb ever returns true, the walk will stop recursing on the current ref. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe.
func SomeP(r ref.Ref, cs chunks.ChunkSource, cb SomeCallback, concurrency int) {
doTreeWalkP(r, cs, cb, concurrency)
}
// All recursively walks over all ref.Refs reachable from r and calls cb on them.
func All(r ref.Ref, cs chunks.ChunkSource, cb AllCallback) {
doTreeWalk(r, cs, func(r ref.Ref) (skip bool) {
// All recursively walks over all ref.Refs reachable from r and calls cb on them. If |concurrency| > 1, it is the callers responsibility to make ensure that |cb| is threadsafe.
func AllP(r ref.Ref, cs chunks.ChunkSource, cb AllCallback, concurrency int) {
doTreeWalkP(r, cs, func(r ref.Ref) (skip bool) {
cb(r)
return
})
}, concurrency)
}
func doTreeWalk(r ref.Ref, cs chunks.ChunkSource, cb SomeCallback) {
if cb(r) {
return
func doTreeWalkP(r ref.Ref, cs chunks.ChunkSource, cb SomeCallback, concurrency int) {
rq := newRefQueue()
f := newFailure()
visited := map[ref.Ref]bool{}
mu := sync.Mutex{}
wg := sync.WaitGroup{}
processRef := func(r ref.Ref) {
defer wg.Done()
mu.Lock()
skip := cb(r) || visited[r]
visited[r] = true
mu.Unlock()
if skip || f.didFail() {
return
}
v := types.ReadValue(r, cs)
if v == nil {
f.fail(fmt.Errorf("Attempt to copy absent ref:%s", r.String()))
return
}
for _, cf := range v.Chunks() {
wg.Add(1)
rq.tail() <- cf.Ref()
}
}
v := types.ReadValue(r, cs)
for _, cf := range v.Chunks() {
doTreeWalk(cf.Ref(), cs, cb)
iter := func() {
for r := range rq.head() {
processRef(r)
}
}
for i := 0; i < concurrency; i++ {
go iter()
}
wg.Add(1)
rq.tail() <- r
wg.Wait()
rq.close()
f.checkNotFailed()
}
// refQueue emulates a buffered channel of refs of unlimited size.
type refQueue struct {
head func() <-chan ref.Ref
tail func() chan<- ref.Ref
close func()
}
func newRefQueue() refQueue {
head := make(chan ref.Ref, 64)
tail := make(chan ref.Ref, 64)
done := make(chan struct{})
buff := []ref.Ref{}
push := func(r ref.Ref) {
buff = append(buff, r)
}
pop := func() ref.Ref {
d.Chk.True(len(buff) > 0)
r := buff[0]
buff = buff[1:]
return r
}
go func() {
loop:
for {
if len(buff) == 0 {
select {
case r := <-tail:
push(r)
case <-done:
break loop
}
} else {
first := buff[0]
select {
case r := <-tail:
push(r)
case head <- first:
r := pop()
d.Chk.Equal(r, first)
case <-done:
break loop
}
}
}
}()
return refQueue{
func() <-chan ref.Ref {
return head
},
func() chan<- ref.Ref {
return tail
},
func() {
close(head)
done <- struct{}{}
},
}
}
type failure struct {
err error
mu *sync.Mutex
}
func newFailure() *failure {
return &failure{
mu: &sync.Mutex{},
}
}
func (f *failure) fail(err error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.err == nil { // only capture first error
f.err = err
}
}
func (f *failure) didFail() bool {
f.mu.Lock()
defer f.mu.Unlock()
return f.err != nil
}
func (f *failure) checkNotFailed() {
f.mu.Lock()
defer f.mu.Unlock()
d.Chk.NoError(f.err)
}
+6 -6
View File
@@ -25,9 +25,9 @@ func (suite *WalkAllTestSuite) SetupTest() {
func (suite *WalkAllTestSuite) walkWorker(r ref.Ref, expected int) {
actual := 0
All(r, suite.cs, func(r ref.Ref) {
AllP(r, suite.cs, func(r ref.Ref) {
actual++
})
}, 1)
suite.Equal(expected, actual)
}
@@ -85,19 +85,19 @@ func (suite *WalkTestSuite) SetupTest() {
func (suite *WalkTestSuite) TestStopWalkImmediately() {
actual := 0
Some(suite.storeAndRef(types.NewList(types.NewSet(), types.NewList())), suite.cs, func(r ref.Ref) bool {
SomeP(suite.storeAndRef(types.NewList(types.NewSet(), types.NewList())), suite.cs, func(r ref.Ref) bool {
actual++
return true
})
}, 1)
suite.Equal(1, actual)
}
func (suite *WalkTestSuite) skipWorker(composite types.Value) (reached []ref.Ref) {
Some(suite.storeAndRef(composite), suite.cs, func(r ref.Ref) bool {
SomeP(suite.storeAndRef(composite), suite.cs, func(r ref.Ref) bool {
suite.NotEqual(r, suite.deadValue.Ref(), "Should never have reached %+v", suite.deadValue)
reached = append(reached, r)
return r == suite.mustSkip.Ref()
})
}, 1)
return
}