mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-24 03:09:22 -06:00
Fix noms-sync surprising quantity (#2531)
* Use sampling for a better bytes-written estimate for noms sync * Confirmed that remaining overestimate of data written is consistent with leveldb stats and opened #2567 to track
This commit is contained in:
@@ -71,7 +71,7 @@ func runSync(args []string) int {
|
||||
last = info
|
||||
if status.WillPrint() {
|
||||
pct := 100.0 * float64(info.DoneCount) / float64(info.KnownCount)
|
||||
status.Printf("Syncing - %.2f%% (%s/s)", pct, bytesPerSec(info, start))
|
||||
status.Printf("Syncing - %.2f%% (%s/s)", pct, bytesPerSec(info.ApproxWrittenBytes, start))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,7 +100,8 @@ func runSync(args []string) int {
|
||||
|
||||
close(progressCh)
|
||||
if last := <-lastProgressCh; last.DoneCount > 0 {
|
||||
status.Printf("Done - Synced %s in %s (%s/s)", humanize.Bytes(last.DoneBytes), since(start), bytesPerSec(last, start))
|
||||
status.Printf("Done - Synced %s in %s (%s/s)",
|
||||
humanize.Bytes(last.ApproxWrittenBytes), since(start), bytesPerSec(last.ApproxWrittenBytes, start))
|
||||
status.Done()
|
||||
} else if !sinkExists {
|
||||
fmt.Printf("All chunks already exist at destination! Created new dataset %s.\n", args[1])
|
||||
@@ -113,11 +114,12 @@ func runSync(args []string) int {
|
||||
return 0
|
||||
}
|
||||
|
||||
func bytesPerSec(prog datas.PullProgress, start time.Time) string {
|
||||
bps := float64(prog.DoneBytes) / float64(time.Since(start).Seconds())
|
||||
func bytesPerSec(bytes uint64, start time.Time) string {
|
||||
bps := float64(bytes) / float64(time.Since(start).Seconds())
|
||||
return humanize.Bytes(uint64(bps))
|
||||
}
|
||||
|
||||
|
||||
func since(start time.Time) string {
|
||||
round := time.Second / 100
|
||||
now := time.Now().Round(round)
|
||||
|
||||
@@ -6,12 +6,14 @@ package chunks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/constants"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
humanize "github.com/dustin/go-humanize"
|
||||
"github.com/golang/snappy"
|
||||
flag "github.com/juju/gnuflag"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
@@ -246,7 +248,7 @@ func (l *internalLevelDBStore) Close() error {
|
||||
fmt.Println("GetCount: ", l.getCount)
|
||||
fmt.Println("HasCount: ", l.hasCount)
|
||||
fmt.Println("PutCount: ", l.putCount)
|
||||
fmt.Println("Average PutSize: ", l.putBytes/l.putCount)
|
||||
fmt.Printf("PutSize: %s (%d/chunk)\n", humanize.Bytes(uint64(l.putCount)), l.putBytes/int64(math.Max(1, float64(l.putCount))))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -5,18 +5,23 @@
|
||||
package datas
|
||||
|
||||
import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/golang/snappy"
|
||||
)
|
||||
|
||||
type PullProgress struct {
|
||||
DoneCount, KnownCount, DoneBytes uint64
|
||||
DoneCount, KnownCount, ApproxWrittenBytes uint64
|
||||
}
|
||||
|
||||
const bytesWrittenSampleRate = .10
|
||||
|
||||
// Pull objects that descends from sourceRef from srcDB to sinkDB. sinkHeadRef should point to a Commit (in sinkDB) that's an ancestor of sourceRef. This allows the algorithm to figure out which portions of data are already present in sinkDB and skip copying them.
|
||||
func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency int, progressCh chan PullProgress) {
|
||||
srcQ, sinkQ := &types.RefByHeight{sourceRef}, &types.RefByHeight{sinkHeadRef}
|
||||
@@ -41,7 +46,7 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
srcChan := make(chan types.Ref)
|
||||
sinkChan := make(chan types.Ref)
|
||||
comChan := make(chan types.Ref)
|
||||
srcResChan := make(chan traverseResult)
|
||||
srcResChan := make(chan traverseSourceResult)
|
||||
sinkResChan := make(chan traverseResult)
|
||||
comResChan := make(chan traverseResult)
|
||||
done := make(chan struct{})
|
||||
@@ -64,7 +69,13 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
for {
|
||||
select {
|
||||
case srcRef := <-srcChan:
|
||||
srcResChan <- traverseSource(srcRef, srcDB, sinkDB)
|
||||
// Hook in here to estimate the bytes written to disk during pull (since
|
||||
// srcChan contains all chunks to be written to the sink). Rather than measuring
|
||||
// the serialized, compressed bytes of each chunk, we take a 10% sample.
|
||||
// There's no immediately observable performance benefit to sampling here, but there's
|
||||
// also no appreciable loss in accuracy, so we'll keep it around.
|
||||
takeSample := rand.Float64() < bytesWrittenSampleRate
|
||||
srcResChan <- traverseSource(srcRef, srcDB, sinkDB, takeSample)
|
||||
case sinkRef := <-sinkChan:
|
||||
sinkResChan <- traverseSink(sinkRef, mostLocalDB)
|
||||
case comRef := <-comChan:
|
||||
@@ -80,23 +91,25 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
traverseWorker()
|
||||
}
|
||||
|
||||
var doneCount, knownCount, doneBytes uint64
|
||||
updateProgress := func(moreDone, moreKnown, moreBytes uint64) {
|
||||
var doneCount, knownCount, approxBytesWritten uint64
|
||||
updateProgress := func(moreDone, moreKnown, moreBytesRead, moreApproxBytesWritten uint64) {
|
||||
if progressCh == nil {
|
||||
return
|
||||
}
|
||||
doneCount, knownCount, doneBytes = doneCount+moreDone, knownCount+moreKnown, doneBytes+moreBytes
|
||||
progressCh <- PullProgress{doneCount, knownCount + uint64(srcQ.Len()), doneBytes}
|
||||
doneCount, knownCount, approxBytesWritten = doneCount+moreDone, knownCount+moreKnown, approxBytesWritten+moreApproxBytesWritten
|
||||
progressCh <- PullProgress{doneCount, knownCount + uint64(srcQ.Len()), approxBytesWritten}
|
||||
}
|
||||
|
||||
// hc and reachableChunks aren't goroutine-safe, so only write them here.
|
||||
hc := hintCache{}
|
||||
reachableChunks := hash.HashSet{}
|
||||
sampleSize := uint64(0)
|
||||
sampleCount := uint64(0)
|
||||
for !srcQ.Empty() {
|
||||
srcRefs, sinkRefs, comRefs := planWork(srcQ, sinkQ)
|
||||
srcWork, sinkWork, comWork := len(srcRefs), len(sinkRefs), len(comRefs)
|
||||
if srcWork+comWork > 0 {
|
||||
updateProgress(0, uint64(srcWork+comWork), 0)
|
||||
updateProgress(0, uint64(srcWork+comWork), 0, 0)
|
||||
}
|
||||
|
||||
// These goroutines send work to traverseWorkers, blocking when all are busy. They self-terminate when they've sent all they have.
|
||||
@@ -112,11 +125,16 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
srcQ.PushBack(reachable)
|
||||
reachableChunks.Insert(reachable.TargetHash())
|
||||
}
|
||||
if res.writeBytes > 0 {
|
||||
sampleSize += uint64(res.writeBytes)
|
||||
sampleCount += 1
|
||||
}
|
||||
if !res.readHash.IsEmpty() {
|
||||
reachableChunks.Remove(res.readHash)
|
||||
}
|
||||
srcWork--
|
||||
updateProgress(1, 0, uint64(res.readBytes))
|
||||
|
||||
updateProgress(1, 0, uint64(res.readBytes), sampleSize/uint64(math.Max(1, float64(sampleCount))))
|
||||
case res := <-sinkResChan:
|
||||
for _, reachable := range res.reachables {
|
||||
sinkQ.PushBack(reachable)
|
||||
@@ -133,7 +151,7 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
hc[reachable.TargetHash()] = res.readHash
|
||||
}
|
||||
comWork--
|
||||
updateProgress(1, 0, uint64(res.readBytes))
|
||||
updateProgress(1, 0, uint64(res.readBytes), 0)
|
||||
}
|
||||
}
|
||||
sort.Sort(sinkQ)
|
||||
@@ -157,6 +175,11 @@ type traverseResult struct {
|
||||
readBytes int
|
||||
}
|
||||
|
||||
type traverseSourceResult struct {
|
||||
traverseResult
|
||||
writeBytes int
|
||||
}
|
||||
|
||||
// planWork deals with three possible situations:
|
||||
// - head of srcQ is higher than head of sinkQ
|
||||
// - head of sinkQ is higher than head of srcQ
|
||||
@@ -211,7 +234,7 @@ func sendWork(ch chan<- types.Ref, refs types.RefSlice) {
|
||||
|
||||
type hintCache map[hash.Hash]hash.Hash
|
||||
|
||||
func traverseSource(srcRef types.Ref, srcDB, sinkDB Database) traverseResult {
|
||||
func traverseSource(srcRef types.Ref, srcDB, sinkDB Database, estimateBytesWritten bool) traverseSourceResult {
|
||||
h := srcRef.TargetHash()
|
||||
if !sinkDB.has(h) {
|
||||
srcBS := srcDB.validatingBatchStore()
|
||||
@@ -219,9 +242,16 @@ func traverseSource(srcRef types.Ref, srcDB, sinkDB Database) traverseResult {
|
||||
v := types.DecodeValue(c, srcDB)
|
||||
d.PanicIfFalse(v != nil, "Expected decoded chunk to be non-nil.")
|
||||
sinkDB.validatingBatchStore().SchedulePut(c, srcRef.Height(), types.Hints{})
|
||||
return traverseResult{h, v.Chunks(), len(c.Data())}
|
||||
bytesWritten := 0
|
||||
if estimateBytesWritten {
|
||||
// TODO: Probably better to hide this behind the BatchStore abstraction since
|
||||
// write size is implementation specific.
|
||||
bytesWritten = len(snappy.Encode(nil, c.Data()))
|
||||
}
|
||||
ts := traverseSourceResult{traverseResult{h, v.Chunks(), len(c.Data())}, bytesWritten}
|
||||
return ts
|
||||
}
|
||||
return traverseResult{}
|
||||
return traverseSourceResult{}
|
||||
}
|
||||
|
||||
func traverseSink(sinkRef types.Ref, db Database) traverseResult {
|
||||
|
||||
@@ -128,19 +128,18 @@ func (pt *progressTracker) Validate(suite *PullSuite) {
|
||||
first := progress[0]
|
||||
suite.Zero(first.DoneCount)
|
||||
suite.True(first.KnownCount > 0)
|
||||
suite.Zero(first.DoneBytes)
|
||||
suite.Zero(first.ApproxWrittenBytes)
|
||||
|
||||
last := progress[len(progress)-1]
|
||||
suite.True(last.DoneCount > 0)
|
||||
suite.Equal(last.DoneCount, last.KnownCount)
|
||||
suite.True(last.DoneBytes > 0)
|
||||
|
||||
for i, prog := range progress {
|
||||
suite.True(prog.KnownCount >= prog.DoneCount)
|
||||
if i > 0 {
|
||||
prev := progress[i-1]
|
||||
suite.True(prog.DoneCount >= prev.DoneCount)
|
||||
suite.True(prog.DoneBytes >= prev.DoneBytes)
|
||||
suite.True(prog.ApproxWrittenBytes >= prev.ApproxWrittenBytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user