Include MB and MB/s in sync progress (#1980)

This commit is contained in:
Ben Kalman
2016-07-06 16:26:13 -07:00
committed by GitHub
parent fa496f59ef
commit f57353ea89
3 changed files with 20 additions and 15 deletions

View File

@@ -16,6 +16,7 @@ import (
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/profile"
"github.com/attic-labs/noms/go/util/status"
humanize "github.com/dustin/go-humanize"
)
var (
@@ -49,6 +50,7 @@ func runSync(args []string) int {
d.CheckError(err)
defer sinkDataset.Database().Close()
start := time.Now()
progressCh := make(chan datas.PullProgress)
lastProgressCh := make(chan datas.PullProgress)
@@ -64,15 +66,14 @@ func runSync(args []string) int {
last = info
if status.WillPrint() {
pct := 100.0 * float64(info.DoneCount) / float64(info.KnownCount)
status.Printf("Syncing - %.2f%% (%d/%d chunks)", pct, info.DoneCount, info.KnownCount)
bytesPerSec := float64(info.DoneBytes) / float64(time.Since(start).Seconds())
status.Printf("Syncing - %.2f%% (%d/%d chunks) - %s/s", pct, info.DoneCount, info.KnownCount, humanize.Bytes(uint64(bytesPerSec)))
}
}
lastProgressCh <- last
}()
start := time.Now()
err = d.Try(func() {
defer profile.MaybeStartProfile().Stop()
var err error
@@ -86,7 +87,7 @@ func runSync(args []string) int {
close(progressCh)
if last := <-lastProgressCh; last.DoneCount > 0 {
status.Printf("Done - Synced %d chunks in %s", last.DoneCount, time.Since(start).String())
status.Printf("Done - Synced %s (%d chunks) in %s", humanize.Bytes(last.DoneBytes), last.DoneCount, time.Since(start).String())
status.Done()
} else {
fmt.Println(flag.Arg(1), "is up to date.")

View File

@@ -14,7 +14,7 @@ import (
)
type PullProgress struct {
DoneCount, KnownCount uint64
DoneCount, KnownCount, DoneBytes uint64
}
// Pull objects that descends from sourceRef from srcDB to sinkDB. sinkHeadRef should point to a Commit (in sinkDB) that's an ancestor of sourceRef. This allows the algorithm to figure out which portions of data are already present in sinkDB and skip copying them.
@@ -77,13 +77,13 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
traverseWorker()
}
var doneCount, knownCount uint64
updateProgress := func(moreDone, moreKnown uint64) {
var doneCount, knownCount, doneBytes uint64
updateProgress := func(moreDone, moreKnown, moreBytes uint64) {
if progressCh == nil {
return
}
doneCount, knownCount = doneCount+moreDone, knownCount+moreKnown
progressCh <- PullProgress{doneCount, knownCount + uint64(len(srcQ))}
doneCount, knownCount, doneBytes = doneCount+moreDone, knownCount+moreKnown, doneBytes+moreBytes
progressCh <- PullProgress{doneCount, knownCount + uint64(len(srcQ)), doneBytes}
}
// hc and reachableChunks aren't goroutine-safe, so only write them here.
@@ -93,7 +93,7 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
srcRefs, sinkRefs, comRefs := planWork(&srcQ, &sinkQ)
srcWork, sinkWork, comWork := len(srcRefs), len(sinkRefs), len(comRefs)
if srcWork+comWork > 0 {
updateProgress(0, uint64(srcWork+comWork))
updateProgress(0, uint64(srcWork+comWork), 0)
}
// These goroutines send work to traverseWorkers, blocking when all are busy. They self-terminate when they've sent all they have.
@@ -113,7 +113,7 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
reachableChunks.Remove(res.readHash)
}
srcWork--
updateProgress(1, 0)
updateProgress(1, 0, uint64(res.readBytes))
case res := <-sinkResChan:
for _, reachable := range res.reachables {
heap.Push(&sinkQ, reachable)
@@ -130,7 +130,7 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
hc[reachable.TargetHash()] = res.readHash
}
comWork--
updateProgress(1, 0)
updateProgress(1, 0, uint64(res.readBytes))
}
}
}
@@ -147,6 +147,7 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
type traverseResult struct {
readHash hash.Hash
reachables types.RefSlice
readBytes int
}
// planWork deals with three possible situations:
@@ -214,14 +215,14 @@ func traverseSource(srcRef types.Ref, srcDB, sinkDB Database) traverseResult {
v := types.DecodeValue(c, srcDB)
d.Chk.True(v != nil, "Expected decoded chunk to be non-nil.")
sinkDB.batchStore().SchedulePut(c, srcRef.Height(), types.Hints{})
return traverseResult{h, v.Chunks()}
return traverseResult{h, v.Chunks(), len(c.Data())}
}
return traverseResult{}
}
func traverseSink(sinkRef types.Ref, db Database) traverseResult {
if sinkRef.Height() > 1 {
return traverseResult{sinkRef.TargetHash(), sinkRef.TargetValue(db).Chunks()}
return traverseResult{sinkRef.TargetHash(), sinkRef.TargetValue(db).Chunks(), 0}
}
return traverseResult{}
}
@@ -244,7 +245,7 @@ func traverseCommon(comRef, sinkHead types.Ref, db Database) traverseResult {
}
i++
}
return traverseResult{comRef.TargetHash(), chunks}
return traverseResult{comRef.TargetHash(), chunks, 0}
}
return traverseResult{}
}

View File

@@ -126,10 +126,12 @@ func (pt *progressTracker) Validate(suite *PullSuite) {
first := progress[0]
suite.Zero(first.DoneCount)
suite.True(first.KnownCount > 0)
suite.Zero(first.DoneBytes)
last := progress[len(progress)-1]
suite.True(last.DoneCount > 0)
suite.Equal(last.DoneCount, last.KnownCount)
suite.True(last.DoneBytes > 0)
for i, prog := range progress {
suite.True(prog.KnownCount >= prog.DoneCount)
@@ -137,6 +139,7 @@ func (pt *progressTracker) Validate(suite *PullSuite) {
prev := progress[i-1]
suite.True(prog.DoneCount >= prev.DoneCount)
suite.True(prog.KnownCount >= prev.KnownCount)
suite.True(prog.DoneBytes >= prev.DoneBytes)
}
}
}