Merge pull request #3123 from dolthub/aaron/pull-stats-rewrite

go/store/datas/pull: Use errgroup. Rewrite Stats publishing.
This commit is contained in:
Aaron Son
2022-04-04 11:23:30 -07:00
committed by GitHub
8 changed files with 389 additions and 370 deletions

View File

@@ -175,73 +175,32 @@ func (ts *TextSpinner) next() string {
return string([]rune{spinnerSeq[ts.seqPos]})
}
func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent, language progLanguage) {
var currentTreeLevel int
var percentBuffered float64
var tableFilesClosed int
var filesTransfered int
var ts TextSpinner
func pullerProgFunc(ctx context.Context, statsCh chan pull.Stats, language progLanguage) {
p := cli.NewEphemeralPrinter()
uploadRate := ""
for evt := range pullerEventCh {
if ctx.Err() != nil {
for {
select {
case <-ctx.Done():
return
}
switch evt.EventType {
case pull.NewLevelTWEvent:
if evt.TWEventDetails.TreeLevel != 1 {
currentTreeLevel = evt.TWEventDetails.TreeLevel
percentBuffered = 0
case stats, ok := <-statsCh:
if !ok {
return
}
case pull.DestDBHasTWEvent:
if evt.TWEventDetails.TreeLevel != -1 {
currentTreeLevel = evt.TWEventDetails.TreeLevel
}
case pull.LevelUpdateTWEvent:
if evt.TWEventDetails.TreeLevel != -1 {
currentTreeLevel = evt.TWEventDetails.TreeLevel
toBuffer := evt.TWEventDetails.ChunksInLevel - evt.TWEventDetails.ChunksAlreadyHad
if toBuffer > 0 {
percentBuffered = 100 * float64(evt.TWEventDetails.ChunksBuffered) / float64(toBuffer)
}
}
case pull.LevelDoneTWEvent:
case pull.TableFileClosedEvent:
tableFilesClosed += 1
case pull.StartUploadTableFileEvent:
case pull.UploadTableFileUpdateEvent:
bps := float64(evt.TFEventDetails.Stats.Read) / evt.TFEventDetails.Stats.Elapsed.Seconds()
uploadRate = humanize.Bytes(uint64(bps)) + "/s"
case pull.EndUploadTableFileEvent:
filesTransfered += 1
}
if currentTreeLevel == -1 {
continue
}
p.Printf("%s Tree Level: %d, Percent Buffered: %.2f%%, ", ts.next(), currentTreeLevel, percentBuffered)
if language == downloadLanguage {
p.Printf("Files Written: %d", filesTransfered)
} else {
if len(uploadRate) > 0 {
p.Printf("Files Created: %d, Files Uploaded: %d, Current Upload Speed: %s\n", tableFilesClosed, filesTransfered, uploadRate)
if language == downloadLanguage {
p.Printf("Downloaded %s chunks, %s @ %s/s.",
humanize.Comma(int64(stats.FetchedSourceChunks)),
humanize.Bytes(stats.FetchedSourceBytes),
humanize.SIWithDigits(stats.FetchedSourceBytesPerSec, 2, "B"),
)
} else {
p.Printf("Files Created: %d, Files Uploaded: %d\n", tableFilesClosed, filesTransfered)
p.Printf("Uploaded %s of %s @ %s/s.",
humanize.Bytes(stats.FinishedSendBytes),
humanize.Bytes(stats.BufferedSendBytes),
humanize.SIWithDigits(stats.SendBytesPerSec, 2, "B"),
)
}
}
p.Display()
}
p.Display()
}
func progFunc(ctx context.Context, progChan chan pull.PullProgress) {
@@ -288,8 +247,8 @@ const (
)
func buildProgStarter(language progLanguage) actions.ProgStarter {
return func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent, 128)
return func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats) {
statsCh := make(chan pull.Stats, 128)
progChan := make(chan pull.PullProgress, 128)
wg := &sync.WaitGroup{}
@@ -302,21 +261,16 @@ func buildProgStarter(language progLanguage) actions.ProgStarter {
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh, language)
pullerProgFunc(ctx, statsCh, language)
}()
return wg, progChan, pullerEventCh
return wg, progChan, statsCh
}
}
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) {
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats) {
cancel()
close(progChan)
close(pullerEventCh)
close(statsCh)
wg.Wait()
}
func bytesPerSec(bytes uint64, start time.Time) string {
bps := float64(bytes) / float64(time.Since(start).Seconds())
return humanize.Bytes(uint64(bps))
}

View File

@@ -1147,7 +1147,7 @@ func (ddb *DoltDB) pruneUnreferencedDatasets(ctx context.Context) error {
// PullChunks initiates a pull into this database from the source database
// given, pulling all chunks reachable from the given targetHash. Pull progress
// is communicated over the provided channel.
func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB, targetHash hash.Hash, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB, targetHash hash.Hash, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
srcCS := datas.ChunkStoreFromDatabase(srcDB.db)
destCS := datas.ChunkStoreFromDatabase(ddb.db)
wrf, err := types.WalkRefsForChunkStore(srcCS)
@@ -1156,7 +1156,7 @@ func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB
}
if datas.CanUsePuller(srcDB.db) && datas.CanUsePuller(ddb.db) {
puller, err := pull.NewPuller(ctx, tempDir, defaultChunksPerTF, srcCS, destCS, wrf, targetHash, pullerEventCh)
puller, err := pull.NewPuller(ctx, tempDir, defaultChunksPerTF, srcCS, destCS, wrf, targetHash, statsCh)
if err == pull.ErrDBUpToDate {
return nil
} else if err != nil {

View File

@@ -236,7 +236,7 @@ func mustForkDB(t *testing.T, fromDB *doltdb.DoltDB, bn string, cm *doltdb.Commi
err = forkEnv.InitRepo(context.Background(), types.Format_Default, "Bill Billerson", "bill@billerson.com", env.DefaultInitBranch)
require.NoError(t, err)
p1 := make(chan pull.PullProgress)
p2 := make(chan pull.PullerEvent)
p2 := make(chan pull.Stats)
go func() {
for range p1 {
}

View File

@@ -21,7 +21,7 @@ import (
"github.com/dolthub/dolt/go/store/datas/pull"
)
func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent) {
func pullerProgFunc(ctx context.Context, statsCh <-chan pull.Stats) {
for {
select {
case <-ctx.Done():
@@ -31,7 +31,7 @@ func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent)
select {
case <-ctx.Done():
return
case <-pullerEventCh:
case <-statsCh:
default:
}
}
@@ -53,8 +53,8 @@ func progFunc(ctx context.Context, progChan <-chan pull.PullProgress) {
}
}
func NoopRunProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent)
func NoopRunProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats) {
statsCh := make(chan pull.Stats)
progChan := make(chan pull.PullProgress)
wg := &sync.WaitGroup{}
@@ -67,15 +67,15 @@ func NoopRunProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgr
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh)
pullerProgFunc(ctx, statsCh)
}()
return wg, progChan, pullerEventCh
return wg, progChan, statsCh
}
func NoopStopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) {
func NoopStopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats) {
cancel()
close(progChan)
close(pullerEventCh)
close(statsCh)
wg.Wait()
}

View File

@@ -42,15 +42,15 @@ var ErrFailedToDeleteBackup = errors.New("failed to delete backup")
var ErrFailedToGetBackupDb = errors.New("failed to get backup db")
var ErrUnknownPushErr = errors.New("unknown push error")
type ProgStarter func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent)
type ProgStopper func(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent)
type ProgStarter func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats)
type ProgStopper func(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats)
// Push will update a destination branch, in a given destination database if it can be done as a fast forward merge.
// This is accomplished first by verifying that the remote tracking reference for the source database can be updated to
// the given commit via a fast forward merge. If this is the case, an attempt will be made to update the branch in the
// destination db to the given commit via fast forward move. If that succeeds the tracking branch is updated in the
// source db.
func Push(ctx context.Context, tempTableDir string, mode ref.UpdateMode, destRef ref.BranchRef, remoteRef ref.RemoteRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func Push(ctx context.Context, tempTableDir string, mode ref.UpdateMode, destRef ref.BranchRef, remoteRef ref.RemoteRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
var err error
if mode == ref.FastForwardOnly {
canFF, err := srcDB.CanFastForward(ctx, remoteRef, commit)
@@ -68,7 +68,7 @@ func Push(ctx context.Context, tempTableDir string, mode ref.UpdateMode, destRef
return err
}
err = destDB.PullChunks(ctx, tempTableDir, srcDB, rf.TargetHash(), progChan, pullerEventCh)
err = destDB.PullChunks(ctx, tempTableDir, srcDB, rf.TargetHash(), progChan, statsCh)
if err != nil {
return err
@@ -140,7 +140,7 @@ func DoPush(ctx context.Context, rsr env.RepoStateReader, rsw env.RepoStateWrite
}
// PushTag pushes a commit tag and all underlying data from a local source database to a remote destination database.
func PushTag(ctx context.Context, tempTableDir string, destRef ref.TagRef, srcDB, destDB *doltdb.DoltDB, tag *doltdb.Tag, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func PushTag(ctx context.Context, tempTableDir string, destRef ref.TagRef, srcDB, destDB *doltdb.DoltDB, tag *doltdb.Tag, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
var err error
addr, err := tag.GetAddr()
@@ -148,7 +148,7 @@ func PushTag(ctx context.Context, tempTableDir string, destRef ref.TagRef, srcDB
return err
}
err = destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, pullerEventCh)
err = destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, statsCh)
if err != nil {
return err
@@ -188,9 +188,9 @@ func PushToRemoteBranch(ctx context.Context, rsr env.RepoStateReader, tempTableD
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = Push(ctx, tempTableDir, mode, destRef.(ref.BranchRef), remoteRef.(ref.RemoteRef), localDB, remoteDB, cm, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = Push(ctx, tempTableDir, mode, destRef.(ref.BranchRef), remoteRef.(ref.RemoteRef), localDB, remoteDB, cm, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
switch err {
case nil:
@@ -211,9 +211,9 @@ func pushTagToRemote(ctx context.Context, tempTableDir string, srcRef, destRef r
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = PushTag(ctx, tempTableDir, destRef.(ref.TagRef), localDB, remoteDB, tg, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = PushTag(ctx, tempTableDir, destRef.(ref.TagRef), localDB, remoteDB, tg, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err != nil {
return err
@@ -250,24 +250,24 @@ func DeleteRemoteBranch(ctx context.Context, targetRef ref.BranchRef, remoteRef
}
// FetchCommit takes a fetches a commit and all underlying data from a remote source database to the local destination database.
func FetchCommit(ctx context.Context, tempTablesDir string, srcDB, destDB *doltdb.DoltDB, srcDBCommit *doltdb.Commit, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func FetchCommit(ctx context.Context, tempTablesDir string, srcDB, destDB *doltdb.DoltDB, srcDBCommit *doltdb.Commit, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
stRef, err := srcDBCommit.GetStRef()
if err != nil {
return err
}
return destDB.PullChunks(ctx, tempTablesDir, srcDB, stRef.TargetHash(), progChan, pullerEventCh)
return destDB.PullChunks(ctx, tempTablesDir, srcDB, stRef.TargetHash(), progChan, statsCh)
}
// FetchTag takes a fetches a commit tag and all underlying data from a remote source database to the local destination database.
func FetchTag(ctx context.Context, tempTableDir string, srcDB, destDB *doltdb.DoltDB, srcDBTag *doltdb.Tag, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func FetchTag(ctx context.Context, tempTableDir string, srcDB, destDB *doltdb.DoltDB, srcDBTag *doltdb.Tag, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
addr, err := srcDBTag.GetAddr()
if err != nil {
return err
}
return destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, pullerEventCh)
return destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, statsCh)
}
// Clone pulls all data from a remote source database to a local destination database.
@@ -309,9 +309,9 @@ func FetchFollowTags(ctx context.Context, tempTableDir string, srcDB, destDB *do
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = FetchTag(ctx, tempTableDir, srcDB, destDB, tag, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = FetchTag(ctx, tempTableDir, srcDB, destDB, tag, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err == nil {
cli.Println()
} else if err == pull.ErrDBUpToDate {
@@ -363,9 +363,9 @@ func FetchRemoteBranch(
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = FetchCommit(ctx, tempTablesDir, srcDB, destDB, srcDBCommit, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = FetchCommit(ctx, tempTablesDir, srcDB, destDB, srcDBCommit, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err == pull.ErrDBUpToDate {
err = nil
}
@@ -466,15 +466,15 @@ func SyncRoots(ctx context.Context, srcDb, destDb *doltdb.DoltDB, tempTableDir s
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
wg, progChan, statsCh := progStarter(newCtx)
defer func() {
progStopper(cancelFunc, wg, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err == nil {
cli.Println()
}
}()
err = destDb.PullChunks(ctx, tempTableDir, srcDb, srcRoot, progChan, pullerEventCh)
err = destDb.PullChunks(ctx, tempTableDir, srcDb, srcRoot, progChan, statsCh)
if err != nil {
return err
}

View File

@@ -153,7 +153,7 @@ func (d DoltPullFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) {
return noConflicts, nil
}
func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent) {
func pullerProgFunc(ctx context.Context, statsCh <-chan pull.Stats) {
for {
if ctx.Err() != nil {
return
@@ -161,7 +161,7 @@ func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent)
select {
case <-ctx.Done():
return
case <-pullerEventCh:
case <-statsCh:
default:
}
}
@@ -181,8 +181,8 @@ func progFunc(ctx context.Context, progChan <-chan pull.PullProgress) {
}
}
func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent)
func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats) {
statsCh := make(chan pull.Stats)
progChan := make(chan pull.PullProgress)
wg := &sync.WaitGroup{}
@@ -195,15 +195,15 @@ func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress,
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh)
pullerProgFunc(ctx, statsCh)
}()
return wg, progChan, pullerEventCh
return wg, progChan, statsCh
}
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) {
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats) {
cancel()
close(progChan)
close(pullerEventCh)
close(statsCh)
wg.Wait()
}

View File

@@ -20,16 +20,18 @@ import (
"fmt"
"io"
"log"
"math"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
@@ -73,54 +75,15 @@ type Puller struct {
tempDir string
chunksPerTF int
eventCh chan PullerEvent
pushLog *log.Logger
}
type PullerEventType int
const (
NewLevelTWEvent PullerEventType = iota
DestDBHasTWEvent
LevelUpdateTWEvent
LevelDoneTWEvent
TableFileClosedEvent
StartUploadTableFileEvent
UploadTableFileUpdateEvent
EndUploadTableFileEvent
)
type TreeWalkEventDetails struct {
TreeLevel int
ChunksInLevel int
ChunksAlreadyHad int
ChunksBuffered int
ChildrenFound int
}
type TableFileEventDetails struct {
CurrentFileSize int64
Stats iohelp.ReadStats
}
type PullerEvent struct {
EventType PullerEventType
TWEventDetails TreeWalkEventDetails
TFEventDetails TableFileEventDetails
}
func NewTWPullerEvent(et PullerEventType, details *TreeWalkEventDetails) PullerEvent {
return PullerEvent{EventType: et, TWEventDetails: *details}
}
func NewTFPullerEvent(et PullerEventType, details *TableFileEventDetails) PullerEvent {
return PullerEvent{EventType: et, TFEventDetails: *details}
statsCh chan Stats
stats *stats
}
// NewPuller creates a new Puller instance to do the syncing. If a nil puller is returned without error that means
// that there is nothing to pull and the sinkDB is already up to date.
func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcCS, sinkCS chunks.ChunkStore, walkRefs WalkRefs, rootChunkHash hash.Hash, eventCh chan PullerEvent) (*Puller, error) {
func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcCS, sinkCS chunks.ChunkStore, walkRefs WalkRefs, rootChunkHash hash.Hash, statsCh chan Stats) (*Puller, error) {
// Sanity Check
exists, err := srcCS.Has(ctx, rootChunkHash)
@@ -177,8 +140,9 @@ func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcCS, sink
tempDir: tempDir,
wr: wr,
chunksPerTF: chunksPerTF,
eventCh: eventCh,
pushLog: pushLogger,
statsCh: statsCh,
stats: &stats{},
}
if lcs, ok := sinkCS.(chunks.LoggingChunkStore); ok {
@@ -198,10 +162,129 @@ type tempTblFile struct {
id string
path string
numChunks int
chunksLen uint64
contentLen uint64
contentHash []byte
}
type countingReader struct {
io.ReadCloser
cnt *uint64
}
func (c countingReader) Read(p []byte) (int, error) {
n, err := c.ReadCloser.Read(p)
atomic.AddUint64(c.cnt, uint64(n))
return n, err
}
func emitStats(s *stats, ch chan Stats) (cancel func()) {
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
cancel = func() {
close(done)
wg.Wait()
}
go func() {
defer wg.Done()
sampleduration := 100 * time.Millisecond
samplesinsec := uint64((1 * time.Second) / sampleduration)
weight := 0.1
ticker := time.NewTicker(sampleduration)
defer ticker.Stop()
var lastSendBytes, lastFetchedBytes uint64
for {
select {
case <-ticker.C:
newSendBytes := atomic.LoadUint64(&s.finishedSendBytes)
newFetchedBytes := atomic.LoadUint64(&s.fetchedSourceBytes)
sendBytesDiff := newSendBytes - lastSendBytes
fetchedBytesDiff := newFetchedBytes - lastFetchedBytes
newSendBPS := float64(sendBytesDiff * samplesinsec)
newFetchedBPS := float64(fetchedBytesDiff * samplesinsec)
curSendBPS := math.Float64frombits(atomic.LoadUint64(&s.sendBytesPerSec))
curFetchedBPS := math.Float64frombits(atomic.LoadUint64(&s.fetchedSourceBytesPerSec))
smoothedSendBPS := newSendBPS
if curSendBPS != 0 {
smoothedSendBPS = curSendBPS + weight*(newSendBPS-curSendBPS)
}
smoothedFetchBPS := newFetchedBPS
if curFetchedBPS != 0 {
smoothedFetchBPS = curFetchedBPS + weight*(newFetchedBPS-curFetchedBPS)
}
atomic.StoreUint64(&s.sendBytesPerSec, math.Float64bits(smoothedSendBPS))
atomic.StoreUint64(&s.fetchedSourceBytesPerSec, math.Float64bits(smoothedFetchBPS))
lastSendBytes = newSendBytes
lastFetchedBytes = newFetchedBytes
case <-done:
return
}
}
}()
go func() {
defer wg.Done()
updateduration := 1 * time.Second
ticker := time.NewTicker(updateduration)
for {
select {
case <-ticker.C:
ch <- s.read()
case <-done:
ch <- s.read()
return
}
}
}()
return cancel
}
type stats struct {
finishedSendBytes uint64
bufferedSendBytes uint64
sendBytesPerSec uint64
totalSourceChunks uint64
fetchedSourceChunks uint64
fetchedSourceBytes uint64
fetchedSourceBytesPerSec uint64
sendBytesPerSecF float64
fetchedSourceBytesPerSecF float64
}
type Stats struct {
FinishedSendBytes uint64
BufferedSendBytes uint64
SendBytesPerSec float64
TotalSourceChunks uint64
FetchedSourceChunks uint64
FetchedSourceBytes uint64
FetchedSourceBytesPerSec float64
}
func (s *stats) read() Stats {
var ret Stats
ret.FinishedSendBytes = atomic.LoadUint64(&s.finishedSendBytes)
ret.BufferedSendBytes = atomic.LoadUint64(&s.bufferedSendBytes)
ret.SendBytesPerSec = math.Float64frombits(atomic.LoadUint64(&s.sendBytesPerSec))
ret.TotalSourceChunks = atomic.LoadUint64(&s.totalSourceChunks)
ret.FetchedSourceChunks = atomic.LoadUint64(&s.fetchedSourceChunks)
ret.FetchedSourceBytes = atomic.LoadUint64(&s.fetchedSourceBytes)
ret.FetchedSourceBytesPerSec = math.Float64frombits(atomic.LoadUint64(&s.fetchedSourceBytesPerSec))
return ret
}
func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile) error {
fi, err := os.Stat(tmpTblFile.path)
if err != nil {
@@ -210,145 +293,136 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile
fileSize := fi.Size()
defer func() {
go func() {
_ = file.Remove(tmpTblFile.path)
}()
_ = file.Remove(tmpTblFile.path)
}()
// By tracking the number of bytes uploaded here,
// we can add bytes on to our bufferedSendBytes when
// we have to retry a table file write.
var localUploaded uint64
return p.sinkDBCS.(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) {
f, err := os.Open(tmpTblFile.path)
if err != nil {
return nil, 0, err
}
fWithStats := iohelp.NewReaderWithStats(f, fileSize)
fWithStats.Start(func(stats iohelp.ReadStats) {
p.addEvent(NewTFPullerEvent(UploadTableFileUpdateEvent, &TableFileEventDetails{
CurrentFileSize: fileSize,
Stats: stats,
}))
})
if localUploaded == 0 {
// So far, we've added all the bytes for the compressed chunk data.
// We add the remaining bytes here --- bytes for the index and the
// table file footer.
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize)-tmpTblFile.chunksLen)
} else {
// A retry. We treat it as if what was already uploaded was rebuffered.
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(localUploaded))
localUploaded = 0
}
fWithStats := countingReader{countingReader{f, &localUploaded}, &p.stats.finishedSendBytes}
return fWithStats, uint64(fileSize), nil
})
}
func (p *Puller) processCompletedTables(ctx context.Context, ae *atomicerr.AtomicError, completedTables <-chan FilledWriters) {
func (p *Puller) processCompletedTables(ctx context.Context, completedTables <-chan FilledWriters) error {
fileIdToNumChunks := make(map[string]int)
var err error
for tblFile := range completedTables {
p.tablefileSema.Release(1)
LOOP:
for {
select {
case tblFile, ok := <-completedTables:
if !ok {
break LOOP
}
p.tablefileSema.Release(1)
if ae.IsSet() {
continue // drain
// content length before we finish the write, which will
// add the index and table file footer.
chunksLen := tblFile.wr.ContentLength()
id, err := tblFile.wr.Finish()
if err != nil {
return err
}
path := filepath.Join(p.tempDir, id)
err = tblFile.wr.FlushToFile(path)
if err != nil {
return err
}
ttf := tempTblFile{
id: id,
path: path,
numChunks: tblFile.wr.Size(),
chunksLen: chunksLen,
contentLen: tblFile.wr.ContentLength(),
contentHash: tblFile.wr.GetMD5(),
}
err = p.uploadTempTableFile(ctx, ttf)
if err != nil {
return err
}
fileIdToNumChunks[id] = ttf.numChunks
case <-ctx.Done():
return ctx.Err()
}
p.addEvent(NewTFPullerEvent(StartUploadTableFileEvent, &TableFileEventDetails{
CurrentFileSize: int64(tblFile.wr.ContentLength()),
}))
var id string
id, err = tblFile.wr.Finish()
if ae.SetIfError(err) {
continue
}
path := filepath.Join(p.tempDir, id)
err = tblFile.wr.FlushToFile(path)
if ae.SetIfError(err) {
continue
}
ttf := tempTblFile{
id: id,
path: path,
numChunks: tblFile.wr.Size(),
contentLen: tblFile.wr.ContentLength(),
contentHash: tblFile.wr.GetMD5(),
}
err = p.uploadTempTableFile(ctx, ttf)
if ae.SetIfError(err) {
continue
}
p.addEvent(NewTFPullerEvent(EndUploadTableFileEvent, &TableFileEventDetails{
CurrentFileSize: int64(ttf.contentLen),
}))
fileIdToNumChunks[id] = ttf.numChunks
}
if ae.IsSet() {
return
}
err = p.sinkDBCS.(nbs.TableFileStore).AddTableFilesToManifest(ctx, fileIdToNumChunks)
ae.SetIfError(err)
return p.sinkDBCS.(nbs.TableFileStore).AddTableFilesToManifest(ctx, fileIdToNumChunks)
}
// Pull executes the sync operation
func (p *Puller) Pull(ctx context.Context) error {
twDetails := &TreeWalkEventDetails{TreeLevel: -1}
if p.statsCh != nil {
c := emitStats(p.stats, p.statsCh)
defer c()
}
leaves := make(hash.HashSet)
absent := make(hash.HashSet)
absent.Insert(p.rootChunkHash)
ae := atomicerr.New()
wg := &sync.WaitGroup{}
eg, ctx := errgroup.WithContext(ctx)
completedTables := make(chan FilledWriters, 8)
wg.Add(1)
go func() {
defer wg.Done()
p.processCompletedTables(ctx, ae, completedTables)
}()
eg.Go(func() error {
return p.processCompletedTables(ctx, completedTables)
})
p.tablefileSema.Acquire(ctx, 1)
for len(absent) > 0 {
limitToNewChunks(absent, p.downloaded)
chunksInLevel := len(absent)
twDetails.ChunksInLevel = chunksInLevel
p.addEvent(NewTWPullerEvent(NewLevelTWEvent, twDetails))
var err error
absent, err = p.sinkDBCS.HasMany(ctx, absent)
if ae.SetIfError(err) {
break
eg.Go(func() error {
if err := p.tablefileSema.Acquire(ctx, 1); err != nil {
return err
}
for len(absent) > 0 {
limitToNewChunks(absent, p.downloaded)
twDetails.ChunksAlreadyHad = chunksInLevel - len(absent)
p.addEvent(NewTWPullerEvent(DestDBHasTWEvent, twDetails))
var err error
absent, err = p.sinkDBCS.HasMany(ctx, absent)
if err != nil {
return err
}
if len(absent) > 0 {
leaves, absent, err = p.getCmp(ctx, twDetails, leaves, absent, completedTables)
if ae.SetIfError(err) {
break
if len(absent) > 0 {
leaves, absent, err = p.getCmp(ctx, leaves, absent, completedTables)
if err != nil {
return err
}
}
}
}
if !ae.IsSet() && p.wr.Size() > 0 {
// p.wr may be nil in the error case
if p.wr != nil {
p.addEvent(NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{
CurrentFileSize: int64(p.wr.ContentLength()),
}))
if p.wr != nil && p.wr.Size() > 0 {
select {
case completedTables <- FilledWriters{p.wr}:
case <-ctx.Done():
return ctx.Err()
}
}
completedTables <- FilledWriters{p.wr}
}
close(completedTables)
return nil
})
close(completedTables)
wg.Wait()
return ae.Get()
return eg.Wait()
}
func limitToNewChunks(absent hash.HashSet, downloaded hash.HashSet) {
@@ -366,130 +440,130 @@ func limitToNewChunks(absent hash.HashSet, downloaded hash.HashSet) {
}
}
func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, leaves, batch hash.HashSet, completedTables chan FilledWriters) (hash.HashSet, hash.HashSet, error) {
func (p *Puller) getCmp(ctx context.Context, leaves, batch hash.HashSet, completedTables chan FilledWriters) (hash.HashSet, hash.HashSet, error) {
found := make(chan nbs.CompressedChunk, 4096)
processed := make(chan CmpChnkAndRefs, 4096)
ae := atomicerr.New()
go func() {
defer close(found)
atomic.AddUint64(&p.stats.totalSourceChunks, uint64(len(batch)))
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
err := p.srcChunkStore.GetManyCompressed(ctx, batch, func(ctx context.Context, c nbs.CompressedChunk) {
atomic.AddUint64(&p.stats.fetchedSourceBytes, uint64(len(c.FullCompressedChunk)))
atomic.AddUint64(&p.stats.fetchedSourceChunks, uint64(1))
select {
case found <- c:
case <-ctx.Done():
}
})
ae.SetIfError(err)
}()
if err != nil {
return err
}
close(found)
return nil
})
batchSize := len(batch)
numChunkWorkers := (batchSize / 1024) + 1
if numChunkWorkers > maxChunkWorkers {
numChunkWorkers = maxChunkWorkers
}
go func() {
defer close(processed)
for cmpChnk := range found {
if ae.IsSet() {
break
}
p.downloaded.Insert(cmpChnk.H)
if leaves.Has(cmpChnk.H) {
processed <- CmpChnkAndRefs{cmpChnk: cmpChnk}
} else {
chnk, err := cmpChnk.ToChunk()
if ae.SetIfError(err) {
return
eg.Go(func() error {
LOOP:
for {
select {
case cmpChnk, ok := <-found:
if !ok {
break LOOP
}
refs := make(map[hash.Hash]int)
if err := p.wrf(chnk, func(h hash.Hash, height uint64) error {
refs[h] = int(height)
return nil
}); ae.SetIfError(err) {
return
p.downloaded.Insert(cmpChnk.H)
if leaves.Has(cmpChnk.H) {
select {
case processed <- CmpChnkAndRefs{cmpChnk: cmpChnk}:
case <-ctx.Done():
return ctx.Err()
}
} else {
chnk, err := cmpChnk.ToChunk()
if err != nil {
return err
}
refs := make(map[hash.Hash]int)
err = p.wrf(chnk, func(h hash.Hash, height uint64) error {
refs[h] = int(height)
return nil
})
if err != nil {
return err
}
select {
case processed <- CmpChnkAndRefs{cmpChnk: cmpChnk, refs: refs}:
case <-ctx.Done():
return ctx.Err()
}
}
processed <- CmpChnkAndRefs{cmpChnk: cmpChnk, refs: refs}
case <-ctx.Done():
return ctx.Err()
}
}
}()
var err error
var maxHeight int
close(processed)
return nil
})
batchSize := len(batch)
nextLeaves := make(hash.HashSet, batchSize)
nextLevel := make(hash.HashSet, batchSize)
twDetails.ChunksBuffered = 0
for cmpAndRef := range processed {
if err != nil {
// drain to prevent deadlock
continue
}
eg.Go(func() error {
var seen int
LOOP:
for {
select {
case cmpAndRef, ok := <-processed:
if !ok {
break LOOP
}
seen++
twDetails.ChunksBuffered++
err := p.wr.AddCmpChunk(cmpAndRef.cmpChnk)
if err != nil {
return err
}
if twDetails.ChunksBuffered%100 == 0 {
p.addEvent(NewTWPullerEvent(LevelUpdateTWEvent, twDetails))
}
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(len(cmpAndRef.cmpChnk.FullCompressedChunk)))
err = p.wr.AddCmpChunk(cmpAndRef.cmpChnk)
if p.wr.Size() >= p.chunksPerTF {
select {
case completedTables <- FilledWriters{p.wr}:
case <-ctx.Done():
return ctx.Err()
}
p.wr = nil
if ae.SetIfError(err) {
continue
}
if err := p.tablefileSema.Acquire(ctx, 1); err != nil {
return err
}
p.wr, err = nbs.NewCmpChunkTableWriter(p.tempDir)
if err != nil {
return err
}
}
if p.wr.Size() >= p.chunksPerTF {
p.addEvent(NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{
CurrentFileSize: int64(p.wr.ContentLength()),
}))
for h, height := range cmpAndRef.refs {
nextLevel.Insert(h)
completedTables <- FilledWriters{p.wr}
p.wr = nil
p.tablefileSema.Acquire(ctx, 1)
p.wr, err = nbs.NewCmpChunkTableWriter(p.tempDir)
if ae.SetIfError(err) {
continue
}
}
for h, height := range cmpAndRef.refs {
nextLevel.Insert(h)
twDetails.ChildrenFound++
if height == 1 {
nextLeaves.Insert(h)
}
if height > maxHeight {
maxHeight = height
if height == 1 {
nextLeaves.Insert(h)
}
}
case <-ctx.Done():
return ctx.Err()
}
}
}
if seen != len(batch) {
return errors.New("failed to get all chunks.")
}
return nil
})
if err := ae.Get(); err != nil {
err := eg.Wait()
if err != nil {
return nil, nil, err
}
if twDetails.ChunksBuffered != len(batch) {
return nil, nil, errors.New("failed to get all chunks.")
}
p.addEvent(NewTWPullerEvent(LevelDoneTWEvent, twDetails))
twDetails.TreeLevel = maxHeight
return nextLeaves, nextLevel, nil
}
func (p *Puller) addEvent(evt PullerEvent) {
if p.eventCh != nil {
p.eventCh <- evt
}
}

View File

@@ -291,24 +291,15 @@ func TestPuller(t *testing.T) {
for k, rootAddr := range states {
t.Run(k, func(t *testing.T) {
eventCh := make(chan PullerEvent, 128)
statsCh := make(chan Stats, 16)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
for evt := range eventCh {
var details interface{}
switch evt.EventType {
case NewLevelTWEvent, DestDBHasTWEvent, LevelUpdateTWEvent:
details = evt.TWEventDetails
default:
details = evt.TFEventDetails
}
jsonBytes, err := json.Marshal(details)
for evt := range statsCh {
jsonBytes, err := json.Marshal(evt)
if err == nil {
t.Logf("event_type: %d details: %s\n", evt.EventType, string(jsonBytes))
t.Logf("stats: %s\n", string(jsonBytes))
}
}
}()
@@ -321,11 +312,11 @@ func TestPuller(t *testing.T) {
require.NoError(t, err)
wrf, err := types.WalkRefsForChunkStore(datas.ChunkStoreFromDatabase(db))
require.NoError(t, err)
plr, err := NewPuller(ctx, tmpDir, 128, datas.ChunkStoreFromDatabase(db), datas.ChunkStoreFromDatabase(sinkdb), wrf, rootAddr, eventCh)
plr, err := NewPuller(ctx, tmpDir, 128, datas.ChunkStoreFromDatabase(db), datas.ChunkStoreFromDatabase(sinkdb), wrf, rootAddr, statsCh)
require.NoError(t, err)
err = plr.Pull(ctx)
close(eventCh)
close(statsCh)
require.NoError(t, err)
wg.Wait()