go/store/datas/pull: Iterate on new stats.

This commit is contained in:
Aaron Son
2022-03-29 12:25:45 -07:00
parent 6679007caf
commit e0abfac106

View File

@@ -20,9 +20,12 @@ import (
"fmt"
"io"
"log"
"math"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/errgroup"
@@ -74,6 +77,8 @@ type Puller struct {
eventCh chan PullerEvent
pushLog *log.Logger
stats *stats
}
type PullerEventType int
@@ -178,6 +183,7 @@ func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcCS, sink
chunksPerTF: chunksPerTF,
eventCh: eventCh,
pushLog: pushLogger,
stats: &stats{},
}
if lcs, ok := sinkCS.(chunks.LoggingChunkStore); ok {
@@ -201,6 +207,91 @@ type tempTblFile struct {
contentHash []byte
}
type countingReader struct {
io.ReadCloser
cnt *uint64
}
func (c countingReader) Read(p []byte) (int, error) {
n, err := c.ReadCloser.Read(p)
atomic.AddUint64(c.cnt, uint64(n))
return n, err
}
func updateBytesPerSecond(s *stats) (cancel func()) {
done := make(chan struct{})
cancel = func() {
close(done)
}
sampleduration := 100 * time.Millisecond
samplesinsec := uint64((1 * time.Second) / sampleduration)
weight := 0.9
ticker := time.NewTicker(sampleduration)
go func() {
defer ticker.Stop()
var lastSendBytes, lastFetchedBytes uint64
for {
select {
case <-ticker.C:
newSendBytes := atomic.LoadUint64(&s.finishedSendBytes)
newFetchedBytes := atomic.LoadUint64(&s.fetchedSourceBytes)
sendBytesDiff := newSendBytes - lastSendBytes
fetchedBytesDiff := newFetchedBytes - lastFetchedBytes
newSendBPS := float64(sendBytesDiff * samplesinsec)
newFetchedBPS := float64(fetchedBytesDiff * samplesinsec)
curSendBPS := math.Float64frombits(atomic.LoadUint64(&s.sendBytesPerSec))
curFetchedBPS := math.Float64frombits(atomic.LoadUint64(&s.fetchedSourceBytesPerSec))
smoothedSendBPS := newSendBPS
if curSendBPS != 0 {
smoothedSendBPS = newSendBPS + weight * (curSendBPS - newSendBPS)
}
smoothedFetchBPS := newFetchedBPS
if curFetchedBPS != 0 {
smoothedFetchBPS = newFetchedBPS + weight * (curFetchedBPS - newFetchedBPS)
}
atomic.StoreUint64(&s.sendBytesPerSec, math.Float64bits(smoothedSendBPS))
atomic.StoreUint64(&s.fetchedSourceBytesPerSec, math.Float64bits(smoothedFetchBPS))
case <-done:
return
}
}
}()
return cancel
}
type stats struct {
finishedSendBytes uint64
bufferedSendBytes uint64
sendBytesPerSec uint64
totalSourceChunks uint64
fetchedSourceChunks uint64
fetchedSourceBytes uint64
fetchedSourceBytesPerSec uint64
sendBytesPerSecF float64
fetchedSourceBytesPerSecF float64
}
func (s *stats) read() stats {
var ret stats;
ret.finishedSendBytes = atomic.LoadUint64(&s.finishedSendBytes)
ret.bufferedSendBytes = atomic.LoadUint64(&s.bufferedSendBytes)
ret.sendBytesPerSecF = math.Float64frombits(atomic.LoadUint64(&s.sendBytesPerSec))
ret.totalSourceChunks = atomic.LoadUint64(&s.totalSourceChunks)
ret.fetchedSourceChunks = atomic.LoadUint64(&s.fetchedSourceChunks)
ret.fetchedSourceBytes = atomic.LoadUint64(&s.fetchedSourceBytes)
ret.fetchedSourceBytesPerSecF = math.Float64frombits(atomic.LoadUint64(&s.fetchedSourceBytesPerSec))
return ret
}
func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile) error {
fi, err := os.Stat(tmpTblFile.path)
if err != nil {
@@ -218,13 +309,8 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile
return nil, 0, err
}
fWithStats := iohelp.NewReaderWithStats(f, fileSize)
fWithStats.Start(func(stats iohelp.ReadStats) {
p.addEvent(ctx, NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
CurrentFileSize: fileSize,
Stats: stats,
}))
})
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize))
fWithStats := countingReader{f, &p.stats.finishedSendBytes}
return fWithStats, uint64(fileSize), nil
})
@@ -287,6 +373,9 @@ LOOP:
// Pull executes the sync operation
func (p *Puller) Pull(ctx context.Context) error {
c := updateBytesPerSecond(p.stats)
defer c()
twDetails := &TreeWalkEventDetails{TreeLevel: -1}
leaves := make(hash.HashSet)
@@ -371,9 +460,12 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
found := make(chan nbs.CompressedChunk, 4096)
processed := make(chan CmpChnkAndRefs, 4096)
atomic.AddUint64(&p.stats.totalSourceChunks, uint64(len(batch)))
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := p.srcChunkStore.GetManyCompressed(ctx, batch, func(ctx context.Context, c nbs.CompressedChunk) {
atomic.AddUint64(&p.stats.fetchedSourceBytes, uint64(len(c.FullCompressedChunk)))
atomic.AddUint64(&p.stats.fetchedSourceChunks, uint64(1))
select {
case found <- c:
case <-ctx.Done():