Add pull chunk progress to noms sync (#1950)

This commit is contained in:
Ben Kalman
2016-07-06 15:01:32 -07:00
committed by GitHub
parent 4e4cbd323c
commit 53b9ce3c1e
7 changed files with 132 additions and 23 deletions
+38 -2
View File
@@ -6,12 +6,16 @@ package main
import (
"flag"
"fmt"
"log"
"time"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/datas"
"github.com/attic-labs/noms/go/spec"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/profile"
"github.com/attic-labs/noms/go/util/status"
)
var (
@@ -45,16 +49,48 @@ func runSync(args []string) int {
d.CheckError(err)
defer sinkDataset.Database().Close()
progressCh := make(chan datas.PullProgress)
lastProgressCh := make(chan datas.PullProgress)
go func() {
var last datas.PullProgress
for info := range progressCh {
if info.KnownCount == 1 {
// It's better to print "up to date" than "0% (0/1); 100% (1/1)".
continue
}
last = info
if status.WillPrint() {
pct := 100.0 * float64(info.DoneCount) / float64(info.KnownCount)
status.Printf("Syncing - %.2f%% (%d/%d chunks)", pct, info.DoneCount, info.KnownCount)
}
}
lastProgressCh <- last
}()
start := time.Now()
err = d.Try(func() {
defer profile.MaybeStartProfile().Stop()
var err error
sinkDataset, err = sinkDataset.Pull(sourceStore, types.NewRef(sourceObj), p)
sinkDataset, err = sinkDataset.Pull(sourceStore, types.NewRef(sourceObj), p, progressCh)
d.PanicIfError(err)
})
if err != nil {
log.Fatal(err)
}
close(progressCh)
if last := <-lastProgressCh; last.DoneCount > 0 {
status.Printf("Done - Synced %d chunks in %s", last.DoneCount, time.Since(start).String())
status.Done()
} else {
fmt.Println(flag.Arg(1), "is up to date.")
}
return 0
}
+2 -4
View File
@@ -37,16 +37,14 @@ func (s *nomsSyncTestSuite) TestSync() {
sourceSpec := spec.CreateValueSpecString("ldb", s.LdbDir, "#"+source1HeadRef.String())
ldb2dir := path.Join(s.TempDir, "ldb2")
sinkDatasetSpec := spec.CreateValueSpecString("ldb", ldb2dir, "bar")
out, _ := s.Run(main, []string{"sync", sourceSpec, sinkDatasetSpec})
s.Equal("", out)
s.Run(main, []string{"sync", sourceSpec, sinkDatasetSpec})
dest := dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(ldb2dir, "", 1, false)), "bar")
s.True(types.Number(42).Equals(dest.HeadValue()))
dest.Database().Close()
sourceDataset := spec.CreateValueSpecString("ldb", s.LdbDir, "foo")
out, _ = s.Run(main, []string{"sync", sourceDataset, sinkDatasetSpec})
s.Equal("", out)
s.Run(main, []string{"sync", sourceDataset, sinkDatasetSpec})
dest = dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(ldb2dir, "", 1, false)), "bar")
s.True(types.Number(43).Equals(dest.HeadValue()))
+20 -2
View File
@@ -13,9 +13,12 @@ import (
"github.com/attic-labs/noms/go/types"
)
type PullProgress struct {
DoneCount, KnownCount uint64
}
// Pull objects that descends from sourceRef from srcDB to sinkDB. sinkHeadRef should point to a Commit (in sinkDB) that's an ancestor of sourceRef. This allows the algorithm to figure out which portions of data are already present in sinkDB and skip copying them.
// TODO: Figure out how to add concurrency.
func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency int) {
func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency int, progressCh chan PullProgress) {
srcQ, sinkQ := types.RefHeap{sourceRef}, types.RefHeap{sinkHeadRef}
heap.Init(&srcQ)
heap.Init(&sinkQ)
@@ -74,12 +77,24 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
traverseWorker()
}
var doneCount, knownCount uint64
updateProgress := func(moreDone, moreKnown uint64) {
if progressCh == nil {
return
}
doneCount, knownCount = doneCount+moreDone, knownCount+moreKnown
progressCh <- PullProgress{doneCount, knownCount + uint64(len(srcQ))}
}
// hc and reachableChunks aren't goroutine-safe, so only write them here.
hc := hintCache{}
reachableChunks := hashSet{}
for !srcQ.Empty() {
srcRefs, sinkRefs, comRefs := planWork(&srcQ, &sinkQ)
srcWork, sinkWork, comWork := len(srcRefs), len(sinkRefs), len(comRefs)
if srcWork+comWork > 0 {
updateProgress(0, uint64(srcWork+comWork))
}
// These goroutines send work to traverseWorkers, blocking when all are busy. They self-terminate when they've sent all they have.
go sendWork(srcChan, srcRefs)
@@ -98,6 +113,7 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
reachableChunks.Remove(res.readHash)
}
srcWork--
updateProgress(1, 0)
case res := <-sinkResChan:
for _, reachable := range res.reachables {
heap.Push(&sinkQ, reachable)
@@ -114,9 +130,11 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
hc[reachable.TargetHash()] = res.readHash
}
comWork--
updateProgress(1, 0)
}
}
}
hints := types.Hints{}
for hash := range reachableChunks {
if hint, present := hc[hash]; present {
+57 -4
View File
@@ -99,6 +99,48 @@ func (suite *PullSuite) TearDownTest() {
suite.sourceCS.Close()
}
type progressTracker struct {
Ch chan PullProgress
doneCh chan []PullProgress
}
func startProgressTracker() *progressTracker {
pt := &progressTracker{make(chan PullProgress), make(chan []PullProgress)}
go func() {
progress := []PullProgress{}
for info := range pt.Ch {
progress = append(progress, info)
}
pt.doneCh <- progress
}()
return pt
}
func (pt *progressTracker) Validate(suite *PullSuite) {
close(pt.Ch)
progress := <-pt.doneCh
// Expecting exact progress would be unreliable and not necessary meaningful. Instead, just validate that it's useful and consistent.
suite.NotEmpty(progress)
first := progress[0]
suite.Zero(first.DoneCount)
suite.True(first.KnownCount > 0)
last := progress[len(progress)-1]
suite.True(last.DoneCount > 0)
suite.Equal(last.DoneCount, last.KnownCount)
for i, prog := range progress {
suite.True(prog.KnownCount >= prog.DoneCount)
if i > 0 {
prev := progress[i-1]
suite.True(prog.DoneCount >= prev.DoneCount)
suite.True(prog.KnownCount >= prev.KnownCount)
}
}
}
// Source: -3-> C(L2) -1-> N
// \ -2-> L1 -1-> N
// \ -1-> L0
@@ -107,9 +149,11 @@ func (suite *PullSuite) TearDownTest() {
func (suite *PullSuite) TestPullEverything() {
l := buildListOfHeight(2, suite.source)
sourceRef := suite.commitToSource(l, types.NewSet())
pt := startProgressTracker()
Pull(suite.source, suite.sink, sourceRef, types.Ref{}, 2)
Pull(suite.source, suite.sink, sourceRef, types.Ref{}, 2, pt.Ch)
suite.Equal(0, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.batchStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
@@ -148,12 +192,15 @@ func (suite *PullSuite) TestPullMultiGeneration() {
srcL = buildListOfHeight(5, suite.source)
sourceRef = suite.commitToSource(srcL, types.NewSet(sourceRef))
Pull(suite.source, suite.sink, sourceRef, sinkRef, 2)
pt := startProgressTracker()
Pull(suite.source, suite.sink, sourceRef, sinkRef, 2, pt.Ch)
if suite.sinkIsLocal() {
// C1 gets read from most-local DB
expectedReads++
}
suite.Equal(expectedReads, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.batchStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
@@ -195,10 +242,13 @@ func (suite *PullSuite) TestPullDivergentHistory() {
sourceRef = suite.commitToSource(srcL, types.NewSet(sourceRef))
preReads := suite.sinkCS.Reads
Pull(suite.source, suite.sink, sourceRef, sinkRef, 2)
pt := startProgressTracker()
Pull(suite.source, suite.sink, sourceRef, sinkRef, 2, pt.Ch)
// No objects read from sink, since sink Head is not an ancestor of source HEAD.
suite.Equal(preReads, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.batchStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
@@ -236,13 +286,16 @@ func (suite *PullSuite) TestPullUpdates() {
srcL = srcL.Set(1, suite.source.WriteValue(L3))
sourceRef = suite.commitToSource(srcL, types.NewSet(sourceRef))
Pull(suite.source, suite.sink, sourceRef, sinkRef, 2)
pt := startProgressTracker()
Pull(suite.source, suite.sink, sourceRef, sinkRef, 2, pt.Ch)
if suite.sinkIsLocal() {
// 3 objects read from sink: L3, L2 and C1 (when considering the shared commit).
expectedReads += 3
}
suite.Equal(expectedReads, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.batchStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
+8 -8
View File
@@ -91,11 +91,7 @@ func (ds *Dataset) CommitWithParents(v types.Value, p types.Set) (Dataset, error
return Dataset{store, ds.id}, err
}
func (ds *Dataset) Pull(sourceStore datas.Database, sourceRef types.Ref, concurrency int) (Dataset, error) {
return ds.pull(sourceStore, sourceRef, concurrency)
}
func (ds *Dataset) pull(source datas.Database, sourceRef types.Ref, concurrency int) (Dataset, error) {
func (ds *Dataset) Pull(sourceStore datas.Database, sourceRef types.Ref, concurrency int, progressCh chan datas.PullProgress) (Dataset, error) {
sink := *ds
sinkHeadRef := types.Ref{}
@@ -107,7 +103,7 @@ func (ds *Dataset) pull(source datas.Database, sourceRef types.Ref, concurrency
return sink, nil
}
datas.Pull(source, sink.Database(), sourceRef, sinkHeadRef, concurrency)
datas.Pull(sourceStore, sink.Database(), sourceRef, sinkHeadRef, concurrency, progressCh)
err := datas.ErrOptimisticLockFailed
for ; err == datas.ErrOptimisticLockFailed; sink, err = sink.setNewHead(sourceRef) {
}
@@ -118,8 +114,12 @@ func (ds *Dataset) pull(source datas.Database, sourceRef types.Ref, concurrency
func (ds *Dataset) validateRefAsCommit(r types.Ref) types.Struct {
v := ds.store.ReadValue(r.TargetHash())
d.PanicIfTrue(v == nil, "%v cannot be found", r)
d.PanicIfTrue(!v.Type().Equals(datas.NewCommit().Type()), "Not a Commit: %+v", v)
if v == nil {
panic(r.TargetHash().String() + " not found")
}
if !v.Type().Equals(datas.CommitType()) {
panic("Not a commit: " + types.EncodedValue(v))
}
return v.(types.Struct)
}
+3 -3
View File
@@ -73,7 +73,7 @@ func TestPullTopDown(t *testing.T) {
source, err = source.Commit(updatedValue)
assert.NoError(err)
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1)
sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -94,7 +94,7 @@ func TestPullFirstCommitTopDown(t *testing.T) {
source, err := source.Commit(sourceInitialValue)
assert.NoError(err)
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1)
sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -113,7 +113,7 @@ func TestPullDeepRefTopDown(t *testing.T) {
source, err := source.Commit(sourceInitialValue)
assert.NoError(err)
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1)
sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
+4
View File
@@ -26,6 +26,10 @@ func Clear() {
reset(time.Time{})
}
func WillPrint() bool {
return time.Now().Sub(lastTime) >= rate
}
func Printf(format string, args ...interface{}) {
now := time.Now()
if now.Sub(lastTime) < rate {