go/store/datas/pull: Remove PullerEvent status updates. Publish pull stats through a Stats struct instead.

This commit is contained in:
Aaron Son
2022-03-29 16:37:58 -07:00
parent e0abfac106
commit 096269b291
8 changed files with 167 additions and 269 deletions
+24 -70
View File
@@ -175,73 +175,32 @@ func (ts *TextSpinner) next() string {
return string([]rune{spinnerSeq[ts.seqPos]})
}
func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent, language progLanguage) {
var currentTreeLevel int
var percentBuffered float64
var tableFilesClosed int
var filesTransfered int
var ts TextSpinner
func pullerProgFunc(ctx context.Context, statsCh chan pull.Stats, language progLanguage) {
p := cli.NewEphemeralPrinter()
uploadRate := ""
for evt := range pullerEventCh {
if ctx.Err() != nil {
for {
select {
case <-ctx.Done():
return
}
switch evt.EventType {
case pull.NewLevelTWEvent:
if evt.TWEventDetails.TreeLevel != 1 {
currentTreeLevel = evt.TWEventDetails.TreeLevel
percentBuffered = 0
case stats, ok := <-statsCh:
if !ok {
return
}
case pull.DestDBHasTWEvent:
if evt.TWEventDetails.TreeLevel != -1 {
currentTreeLevel = evt.TWEventDetails.TreeLevel
}
case pull.LevelUpdateTWEvent:
if evt.TWEventDetails.TreeLevel != -1 {
currentTreeLevel = evt.TWEventDetails.TreeLevel
toBuffer := evt.TWEventDetails.ChunksInLevel - evt.TWEventDetails.ChunksAlreadyHad
if toBuffer > 0 {
percentBuffered = 100 * float64(evt.TWEventDetails.ChunksBuffered) / float64(toBuffer)
}
}
case pull.LevelDoneTWEvent:
case pull.TableFileClosedEvent:
tableFilesClosed += 1
case pull.StartUploadTableFileEvent:
case pull.UploadTableFileUpdateEvent:
bps := float64(evt.TFEventDetails.Stats.Read) / evt.TFEventDetails.Stats.Elapsed.Seconds()
uploadRate = humanize.Bytes(uint64(bps)) + "/s"
case pull.EndUploadTableFileEvent:
filesTransfered += 1
}
if currentTreeLevel == -1 {
continue
}
p.Printf("%s Tree Level: %d, Percent Buffered: %.2f%%, ", ts.next(), currentTreeLevel, percentBuffered)
if language == downloadLanguage {
p.Printf("Files Written: %d", filesTransfered)
} else {
if len(uploadRate) > 0 {
p.Printf("Files Created: %d, Files Uploaded: %d, Current Upload Speed: %s\n", tableFilesClosed, filesTransfered, uploadRate)
if language == downloadLanguage {
p.Printf("Downloaded %s chunks, %s @ %s/s.",
humanize.Comma(int64(stats.FetchedSourceChunks)),
humanize.Bytes(stats.FetchedSourceBytes),
humanize.SIWithDigits(stats.FetchedSourceBytesPerSec, 2, "B"),
)
} else {
p.Printf("Files Created: %d, Files Uploaded: %d\n", tableFilesClosed, filesTransfered)
p.Printf("Uploaded %s of %s @ %s/s.",
humanize.Bytes(stats.FinishedSendBytes),
humanize.Bytes(stats.BufferedSendBytes),
humanize.SIWithDigits(stats.SendBytesPerSec, 2, "B"),
)
}
}
p.Display()
}
p.Display()
}
func progFunc(ctx context.Context, progChan chan pull.PullProgress) {
@@ -288,8 +247,8 @@ const (
)
func buildProgStarter(language progLanguage) actions.ProgStarter {
return func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent, 128)
return func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats) {
statsCh := make(chan pull.Stats, 128)
progChan := make(chan pull.PullProgress, 128)
wg := &sync.WaitGroup{}
@@ -302,21 +261,16 @@ func buildProgStarter(language progLanguage) actions.ProgStarter {
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh, language)
pullerProgFunc(ctx, statsCh, language)
}()
return wg, progChan, pullerEventCh
return wg, progChan, statsCh
}
}
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) {
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats) {
cancel()
close(progChan)
close(pullerEventCh)
close(statsCh)
wg.Wait()
}
func bytesPerSec(bytes uint64, start time.Time) string {
bps := float64(bytes) / float64(time.Since(start).Seconds())
return humanize.Bytes(uint64(bps))
}
+2 -2
View File
@@ -1147,7 +1147,7 @@ func (ddb *DoltDB) pruneUnreferencedDatasets(ctx context.Context) error {
// PullChunks initiates a pull into this database from the source database
// given, pulling all chunks reachable from the given targetHash. Pull progress
// is communicated over the provided channel.
func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB, targetHash hash.Hash, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB, targetHash hash.Hash, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
srcCS := datas.ChunkStoreFromDatabase(srcDB.db)
destCS := datas.ChunkStoreFromDatabase(ddb.db)
wrf, err := types.WalkRefsForChunkStore(srcCS)
@@ -1156,7 +1156,7 @@ func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB
}
if datas.CanUsePuller(srcDB.db) && datas.CanUsePuller(ddb.db) {
puller, err := pull.NewPuller(ctx, tempDir, defaultChunksPerTF, srcCS, destCS, wrf, targetHash, pullerEventCh)
puller, err := pull.NewPuller(ctx, tempDir, defaultChunksPerTF, srcCS, destCS, wrf, targetHash, statsCh)
if err == pull.ErrDBUpToDate {
return nil
} else if err != nil {
@@ -236,7 +236,7 @@ func mustForkDB(t *testing.T, fromDB *doltdb.DoltDB, bn string, cm *doltdb.Commi
err = forkEnv.InitRepo(context.Background(), types.Format_Default, "Bill Billerson", "bill@billerson.com", env.DefaultInitBranch)
require.NoError(t, err)
p1 := make(chan pull.PullProgress)
p2 := make(chan pull.PullerEvent)
p2 := make(chan pull.Stats)
go func() {
for range p1 {
}
+8 -8
View File
@@ -21,7 +21,7 @@ import (
"github.com/dolthub/dolt/go/store/datas/pull"
)
func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent) {
func pullerProgFunc(ctx context.Context, statsCh <-chan pull.Stats) {
for {
select {
case <-ctx.Done():
@@ -31,7 +31,7 @@ func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent)
select {
case <-ctx.Done():
return
case <-pullerEventCh:
case <-statsCh:
default:
}
}
@@ -53,8 +53,8 @@ func progFunc(ctx context.Context, progChan <-chan pull.PullProgress) {
}
}
func NoopRunProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent)
func NoopRunProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats) {
statsCh := make(chan pull.Stats)
progChan := make(chan pull.PullProgress)
wg := &sync.WaitGroup{}
@@ -67,15 +67,15 @@ func NoopRunProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgr
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh)
pullerProgFunc(ctx, statsCh)
}()
return wg, progChan, pullerEventCh
return wg, progChan, statsCh
}
func NoopStopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) {
func NoopStopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats) {
cancel()
close(progChan)
close(pullerEventCh)
close(statsCh)
wg.Wait()
}
+25 -25
View File
@@ -42,15 +42,15 @@ var ErrFailedToDeleteBackup = errors.New("failed to delete backup")
var ErrFailedToGetBackupDb = errors.New("failed to get backup db")
var ErrUnknownPushErr = errors.New("unknown push error")
type ProgStarter func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent)
type ProgStopper func(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent)
type ProgStarter func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats)
type ProgStopper func(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats)
// Push will update a destination branch, in a given destination database if it can be done as a fast forward merge.
// This is accomplished first by verifying that the remote tracking reference for the source database can be updated to
// the given commit via a fast forward merge. If this is the case, an attempt will be made to update the branch in the
// destination db to the given commit via fast forward move. If that succeeds the tracking branch is updated in the
// source db.
func Push(ctx context.Context, tempTableDir string, mode ref.UpdateMode, destRef ref.BranchRef, remoteRef ref.RemoteRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func Push(ctx context.Context, tempTableDir string, mode ref.UpdateMode, destRef ref.BranchRef, remoteRef ref.RemoteRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
var err error
if mode == ref.FastForwardOnly {
canFF, err := srcDB.CanFastForward(ctx, remoteRef, commit)
@@ -68,7 +68,7 @@ func Push(ctx context.Context, tempTableDir string, mode ref.UpdateMode, destRef
return err
}
err = destDB.PullChunks(ctx, tempTableDir, srcDB, rf.TargetHash(), progChan, pullerEventCh)
err = destDB.PullChunks(ctx, tempTableDir, srcDB, rf.TargetHash(), progChan, statsCh)
if err != nil {
return err
@@ -140,7 +140,7 @@ func DoPush(ctx context.Context, rsr env.RepoStateReader, rsw env.RepoStateWrite
}
// PushTag pushes a commit tag and all underlying data from a local source database to a remote destination database.
func PushTag(ctx context.Context, tempTableDir string, destRef ref.TagRef, srcDB, destDB *doltdb.DoltDB, tag *doltdb.Tag, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func PushTag(ctx context.Context, tempTableDir string, destRef ref.TagRef, srcDB, destDB *doltdb.DoltDB, tag *doltdb.Tag, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
var err error
addr, err := tag.GetAddr()
@@ -148,7 +148,7 @@ func PushTag(ctx context.Context, tempTableDir string, destRef ref.TagRef, srcDB
return err
}
err = destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, pullerEventCh)
err = destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, statsCh)
if err != nil {
return err
@@ -188,9 +188,9 @@ func PushToRemoteBranch(ctx context.Context, rsr env.RepoStateReader, tempTableD
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = Push(ctx, tempTableDir, mode, destRef.(ref.BranchRef), remoteRef.(ref.RemoteRef), localDB, remoteDB, cm, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = Push(ctx, tempTableDir, mode, destRef.(ref.BranchRef), remoteRef.(ref.RemoteRef), localDB, remoteDB, cm, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
switch err {
case nil:
@@ -211,9 +211,9 @@ func pushTagToRemote(ctx context.Context, tempTableDir string, srcRef, destRef r
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = PushTag(ctx, tempTableDir, destRef.(ref.TagRef), localDB, remoteDB, tg, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = PushTag(ctx, tempTableDir, destRef.(ref.TagRef), localDB, remoteDB, tg, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err != nil {
return err
@@ -250,24 +250,24 @@ func DeleteRemoteBranch(ctx context.Context, targetRef ref.BranchRef, remoteRef
}
// FetchCommit takes a fetches a commit and all underlying data from a remote source database to the local destination database.
func FetchCommit(ctx context.Context, tempTablesDir string, srcDB, destDB *doltdb.DoltDB, srcDBCommit *doltdb.Commit, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func FetchCommit(ctx context.Context, tempTablesDir string, srcDB, destDB *doltdb.DoltDB, srcDBCommit *doltdb.Commit, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
stRef, err := srcDBCommit.GetStRef()
if err != nil {
return err
}
return destDB.PullChunks(ctx, tempTablesDir, srcDB, stRef.TargetHash(), progChan, pullerEventCh)
return destDB.PullChunks(ctx, tempTablesDir, srcDB, stRef.TargetHash(), progChan, statsCh)
}
// FetchTag takes a fetches a commit tag and all underlying data from a remote source database to the local destination database.
func FetchTag(ctx context.Context, tempTableDir string, srcDB, destDB *doltdb.DoltDB, srcDBTag *doltdb.Tag, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) error {
func FetchTag(ctx context.Context, tempTableDir string, srcDB, destDB *doltdb.DoltDB, srcDBTag *doltdb.Tag, progChan chan pull.PullProgress, statsCh chan pull.Stats) error {
addr, err := srcDBTag.GetAddr()
if err != nil {
return err
}
return destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, pullerEventCh)
return destDB.PullChunks(ctx, tempTableDir, srcDB, addr, progChan, statsCh)
}
// Clone pulls all data from a remote source database to a local destination database.
@@ -309,9 +309,9 @@ func FetchFollowTags(ctx context.Context, tempTableDir string, srcDB, destDB *do
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = FetchTag(ctx, tempTableDir, srcDB, destDB, tag, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = FetchTag(ctx, tempTableDir, srcDB, destDB, tag, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err == nil {
cli.Println()
} else if err == pull.ErrDBUpToDate {
@@ -363,9 +363,9 @@ func FetchRemoteBranch(
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
err = FetchCommit(ctx, tempTablesDir, srcDB, destDB, srcDBCommit, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, pullerEventCh)
wg, progChan, statsCh := progStarter(newCtx)
err = FetchCommit(ctx, tempTablesDir, srcDB, destDB, srcDBCommit, progChan, statsCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err == pull.ErrDBUpToDate {
err = nil
}
@@ -466,15 +466,15 @@ func SyncRoots(ctx context.Context, srcDb, destDb *doltdb.DoltDB, tempTableDir s
}
newCtx, cancelFunc := context.WithCancel(ctx)
wg, progChan, pullerEventCh := progStarter(newCtx)
wg, progChan, statsCh := progStarter(newCtx)
defer func() {
progStopper(cancelFunc, wg, progChan, pullerEventCh)
progStopper(cancelFunc, wg, progChan, statsCh)
if err == nil {
cli.Println()
}
}()
err = destDb.PullChunks(ctx, tempTableDir, srcDb, srcRoot, progChan, pullerEventCh)
err = destDb.PullChunks(ctx, tempTableDir, srcDb, srcRoot, progChan, statsCh)
if err != nil {
return err
}
@@ -153,7 +153,7 @@ func (d DoltPullFunc) Eval(ctx *sql.Context, row sql.Row) (interface{}, error) {
return noConflicts, nil
}
func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent) {
func pullerProgFunc(ctx context.Context, statsCh <-chan pull.Stats) {
for {
if ctx.Err() != nil {
return
@@ -161,7 +161,7 @@ func pullerProgFunc(ctx context.Context, pullerEventCh <-chan pull.PullerEvent)
select {
case <-ctx.Done():
return
case <-pullerEventCh:
case <-statsCh:
default:
}
}
@@ -181,8 +181,8 @@ func progFunc(ctx context.Context, progChan <-chan pull.PullProgress) {
}
}
func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent)
func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.Stats) {
statsCh := make(chan pull.Stats)
progChan := make(chan pull.PullProgress)
wg := &sync.WaitGroup{}
@@ -195,15 +195,15 @@ func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress,
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh)
pullerProgFunc(ctx, statsCh)
}()
return wg, progChan, pullerEventCh
return wg, progChan, statsCh
}
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) {
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, statsCh chan pull.Stats) {
cancel()
close(progChan)
close(pullerEventCh)
close(statsCh)
wg.Wait()
}
+93 -140
View File
@@ -24,14 +24,14 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"golang.org/x/sync/semaphore"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
@@ -75,56 +75,15 @@ type Puller struct {
tempDir string
chunksPerTF int
eventCh chan PullerEvent
pushLog *log.Logger
stats *stats
}
type PullerEventType int
const (
NewLevelTWEvent PullerEventType = iota
DestDBHasTWEvent
LevelUpdateTWEvent
LevelDoneTWEvent
TableFileClosedEvent
StartUploadTableFileEvent
UploadTableFileUpdateEvent
EndUploadTableFileEvent
)
type TreeWalkEventDetails struct {
TreeLevel int
ChunksInLevel int
ChunksAlreadyHad int
ChunksBuffered int
ChildrenFound int
}
type TableFileEventDetails struct {
CurrentFileSize int64
Stats iohelp.ReadStats
}
type PullerEvent struct {
EventType PullerEventType
TWEventDetails TreeWalkEventDetails
TFEventDetails TableFileEventDetails
}
func NewTWPullerEvent(et PullerEventType, details *TreeWalkEventDetails) PullerEvent {
return PullerEvent{EventType: et, TWEventDetails: *details}
}
func NewTFPullerEvent(et PullerEventType, details *TableFileEventDetails) PullerEvent {
return PullerEvent{EventType: et, TFEventDetails: *details}
statsCh chan Stats
stats *stats
}
// NewPuller creates a new Puller instance to do the syncing. If a nil puller is returned without error that means
// that there is nothing to pull and the sinkDB is already up to date.
func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcCS, sinkCS chunks.ChunkStore, walkRefs WalkRefs, rootChunkHash hash.Hash, eventCh chan PullerEvent) (*Puller, error) {
func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcCS, sinkCS chunks.ChunkStore, walkRefs WalkRefs, rootChunkHash hash.Hash, statsCh chan Stats) (*Puller, error) {
// Sanity Check
exists, err := srcCS.Has(ctx, rootChunkHash)
@@ -181,8 +140,8 @@ func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcCS, sink
tempDir: tempDir,
wr: wr,
chunksPerTF: chunksPerTF,
eventCh: eventCh,
pushLog: pushLogger,
statsCh: statsCh,
stats: &stats{},
}
@@ -203,6 +162,7 @@ type tempTblFile struct {
id string
path string
numChunks int
chunksLen uint64
contentLen uint64
contentHash []byte
}
@@ -218,18 +178,21 @@ func (c countingReader) Read(p []byte) (int, error) {
return n, err
}
func updateBytesPerSecond(s *stats) (cancel func()) {
func emitStats(s *stats, ch chan Stats) (cancel func()) {
done := make(chan struct{})
var wg sync.WaitGroup
wg.Add(2)
cancel = func() {
close(done)
wg.Wait()
}
sampleduration := 100 * time.Millisecond
samplesinsec := uint64((1 * time.Second) / sampleduration)
weight := 0.9
ticker := time.NewTicker(sampleduration)
go func() {
defer wg.Done()
sampleduration := 100 * time.Millisecond
samplesinsec := uint64((1 * time.Second) / sampleduration)
weight := 0.1
ticker := time.NewTicker(sampleduration)
defer ticker.Stop()
var lastSendBytes, lastFetchedBytes uint64
for {
@@ -248,21 +211,40 @@ func updateBytesPerSecond(s *stats) (cancel func()) {
smoothedSendBPS := newSendBPS
if curSendBPS != 0 {
smoothedSendBPS = newSendBPS + weight * (curSendBPS - newSendBPS)
smoothedSendBPS = curSendBPS + weight*(newSendBPS-curSendBPS)
}
smoothedFetchBPS := newFetchedBPS
if curFetchedBPS != 0 {
smoothedFetchBPS = newFetchedBPS + weight * (curFetchedBPS - newFetchedBPS)
smoothedFetchBPS = curFetchedBPS + weight*(newFetchedBPS-curFetchedBPS)
}
atomic.StoreUint64(&s.sendBytesPerSec, math.Float64bits(smoothedSendBPS))
atomic.StoreUint64(&s.fetchedSourceBytesPerSec, math.Float64bits(smoothedFetchBPS))
lastSendBytes = newSendBytes
lastFetchedBytes = newFetchedBytes
case <-done:
return
}
}
}()
go func() {
defer wg.Done()
updateduration := 1 * time.Second
ticker := time.NewTicker(updateduration)
for {
select {
case <-ticker.C:
ch <- s.read()
case <-done:
ch <- s.read()
return
}
}
}()
return cancel
}
@@ -271,24 +253,35 @@ type stats struct {
bufferedSendBytes uint64
sendBytesPerSec uint64
totalSourceChunks uint64
fetchedSourceChunks uint64
fetchedSourceBytes uint64
totalSourceChunks uint64
fetchedSourceChunks uint64
fetchedSourceBytes uint64
fetchedSourceBytesPerSec uint64
sendBytesPerSecF float64
fetchedSourceBytesPerSecF float64
}
func (s *stats) read() stats {
var ret stats;
ret.finishedSendBytes = atomic.LoadUint64(&s.finishedSendBytes)
ret.bufferedSendBytes = atomic.LoadUint64(&s.bufferedSendBytes)
ret.sendBytesPerSecF = math.Float64frombits(atomic.LoadUint64(&s.sendBytesPerSec))
ret.totalSourceChunks = atomic.LoadUint64(&s.totalSourceChunks)
ret.fetchedSourceChunks = atomic.LoadUint64(&s.fetchedSourceChunks)
ret.fetchedSourceBytes = atomic.LoadUint64(&s.fetchedSourceBytes)
ret.fetchedSourceBytesPerSecF = math.Float64frombits(atomic.LoadUint64(&s.fetchedSourceBytesPerSec))
type Stats struct {
FinishedSendBytes uint64
BufferedSendBytes uint64
SendBytesPerSec float64
TotalSourceChunks uint64
FetchedSourceChunks uint64
FetchedSourceBytes uint64
FetchedSourceBytesPerSec float64
}
func (s *stats) read() Stats {
var ret Stats
ret.FinishedSendBytes = atomic.LoadUint64(&s.finishedSendBytes)
ret.BufferedSendBytes = atomic.LoadUint64(&s.bufferedSendBytes)
ret.SendBytesPerSec = math.Float64frombits(atomic.LoadUint64(&s.sendBytesPerSec))
ret.TotalSourceChunks = atomic.LoadUint64(&s.totalSourceChunks)
ret.FetchedSourceChunks = atomic.LoadUint64(&s.fetchedSourceChunks)
ret.FetchedSourceBytes = atomic.LoadUint64(&s.fetchedSourceBytes)
ret.FetchedSourceBytesPerSec = math.Float64frombits(atomic.LoadUint64(&s.fetchedSourceBytesPerSec))
return ret
}
@@ -303,14 +296,27 @@ func (p *Puller) uploadTempTableFile(ctx context.Context, tmpTblFile tempTblFile
_ = file.Remove(tmpTblFile.path)
}()
// By tracking the number of bytes uploaded here,
// we can add bytes on to our bufferedSendBytes when
// we have to retry a table file write.
var localUploaded uint64
return p.sinkDBCS.(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) {
f, err := os.Open(tmpTblFile.path)
if err != nil {
return nil, 0, err
}
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize))
fWithStats := countingReader{f, &p.stats.finishedSendBytes}
if localUploaded == 0 {
// So far, we've added all the bytes for the compressed chunk data.
// We add the remaining bytes here --- bytes for the index and the
// table file footer.
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(fileSize)-tmpTblFile.chunksLen)
} else {
// A retry. We treat it as if what was already uploaded was rebuffered.
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(localUploaded))
localUploaded = 0
}
fWithStats := countingReader{countingReader{f, &localUploaded}, &p.stats.finishedSendBytes}
return fWithStats, uint64(fileSize), nil
})
@@ -327,11 +333,10 @@ LOOP:
break LOOP
}
p.tablefileSema.Release(1)
if err := p.addEvent(ctx, NewTFPullerEvent(StartUploadTableFileEvent, &TableFileEventDetails{
CurrentFileSize: int64(tblFile.wr.ContentLength()),
})); err != nil {
return err
}
// content length before we finish the write, which will
// add the index and table file footer.
chunksLen := tblFile.wr.ContentLength()
id, err := tblFile.wr.Finish()
if err != nil {
@@ -348,6 +353,7 @@ LOOP:
id: id,
path: path,
numChunks: tblFile.wr.Size(),
chunksLen: chunksLen,
contentLen: tblFile.wr.ContentLength(),
contentHash: tblFile.wr.GetMD5(),
}
@@ -356,12 +362,6 @@ LOOP:
return err
}
if err := p.addEvent(ctx, NewTFPullerEvent(EndUploadTableFileEvent, &TableFileEventDetails{
CurrentFileSize: int64(ttf.contentLen),
})); err != nil {
return err
}
fileIdToNumChunks[id] = ttf.numChunks
case <-ctx.Done():
return ctx.Err()
@@ -373,10 +373,10 @@ LOOP:
// Pull executes the sync operation
func (p *Puller) Pull(ctx context.Context) error {
c := updateBytesPerSecond(p.stats)
defer c()
twDetails := &TreeWalkEventDetails{TreeLevel: -1}
if p.statsCh != nil {
c := emitStats(p.stats, p.statsCh)
defer c()
}
leaves := make(hash.HashSet)
absent := make(hash.HashSet)
@@ -397,25 +397,14 @@ func (p *Puller) Pull(ctx context.Context) error {
for len(absent) > 0 {
limitToNewChunks(absent, p.downloaded)
chunksInLevel := len(absent)
twDetails.ChunksInLevel = chunksInLevel
if err := p.addEvent(ctx, NewTWPullerEvent(NewLevelTWEvent, twDetails)); err != nil {
return err
}
var err error
absent, err = p.sinkDBCS.HasMany(ctx, absent)
if err != nil {
return err
}
twDetails.ChunksAlreadyHad = chunksInLevel - len(absent)
if err := p.addEvent(ctx, NewTWPullerEvent(DestDBHasTWEvent, twDetails)); err != nil {
return err
}
if len(absent) > 0 {
leaves, absent, err = p.getCmp(ctx, twDetails, leaves, absent, completedTables)
leaves, absent, err = p.getCmp(ctx, leaves, absent, completedTables)
if err != nil {
return err
}
@@ -423,11 +412,6 @@ func (p *Puller) Pull(ctx context.Context) error {
}
if p.wr != nil {
if err := p.addEvent(ctx, NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{
CurrentFileSize: int64(p.wr.ContentLength()),
})); err != nil {
return err
}
select {
case completedTables <- FilledWriters{p.wr}:
case <-ctx.Done():
@@ -456,7 +440,7 @@ func limitToNewChunks(absent hash.HashSet, downloaded hash.HashSet) {
}
}
func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, leaves, batch hash.HashSet, completedTables chan FilledWriters) (hash.HashSet, hash.HashSet, error) {
func (p *Puller) getCmp(ctx context.Context, leaves, batch hash.HashSet, completedTables chan FilledWriters) (hash.HashSet, hash.HashSet, error) {
found := make(chan nbs.CompressedChunk, 4096)
processed := make(chan CmpChnkAndRefs, 4096)
@@ -479,7 +463,7 @@ func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, le
})
eg.Go(func() error {
LOOP:
LOOP:
for {
select {
case cmpChnk, ok := <-found:
@@ -521,40 +505,29 @@ LOOP:
return nil
})
var maxHeight int
batchSize := len(batch)
nextLeaves := make(hash.HashSet, batchSize)
nextLevel := make(hash.HashSet, batchSize)
eg.Go(func() error {
twDetails.ChunksBuffered = 0
LOOP:
var seen int
LOOP:
for {
select {
case cmpAndRef, ok := <-processed:
if !ok {
break LOOP
}
twDetails.ChunksBuffered++
if twDetails.ChunksBuffered%100 == 0 {
if err := p.addEvent(ctx, NewTWPullerEvent(LevelUpdateTWEvent, twDetails)); err != nil {
return err
}
}
seen++
err := p.wr.AddCmpChunk(cmpAndRef.cmpChnk)
if err != nil {
return err
}
if p.wr.Size() >= p.chunksPerTF {
if err := p.addEvent(ctx, NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{
CurrentFileSize: int64(p.wr.ContentLength()),
})); err != nil {
return err
}
atomic.AddUint64(&p.stats.bufferedSendBytes, uint64(len(cmpAndRef.cmpChnk.FullCompressedChunk)))
if p.wr.Size() >= p.chunksPerTF {
select {
case completedTables <- FilledWriters{p.wr}:
case <-ctx.Done():
@@ -573,27 +546,18 @@ LOOP:
for h, height := range cmpAndRef.refs {
nextLevel.Insert(h)
twDetails.ChildrenFound++
if height == 1 {
nextLeaves.Insert(h)
}
if height > maxHeight {
maxHeight = height
}
}
case <-ctx.Done():
return ctx.Err()
}
}
if twDetails.ChunksBuffered != len(batch) {
if seen != len(batch) {
return errors.New("failed to get all chunks.")
}
if err := p.addEvent(ctx, NewTWPullerEvent(LevelDoneTWEvent, twDetails)); err != nil {
return err
}
twDetails.TreeLevel = maxHeight
return nil
})
@@ -603,14 +567,3 @@ LOOP:
}
return nextLeaves, nextLevel, nil
}
func (p *Puller) addEvent(ctx context.Context, evt PullerEvent) error {
if p.eventCh != nil {
select {
case p.eventCh <- evt:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
+6 -15
View File
@@ -291,24 +291,15 @@ func TestPuller(t *testing.T) {
for k, rootAddr := range states {
t.Run(k, func(t *testing.T) {
eventCh := make(chan PullerEvent, 128)
statsCh := make(chan Stats, 16)
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer wg.Done()
for evt := range eventCh {
var details interface{}
switch evt.EventType {
case NewLevelTWEvent, DestDBHasTWEvent, LevelUpdateTWEvent:
details = evt.TWEventDetails
default:
details = evt.TFEventDetails
}
jsonBytes, err := json.Marshal(details)
for evt := range statsCh {
jsonBytes, err := json.Marshal(evt)
if err == nil {
t.Logf("event_type: %d details: %s\n", evt.EventType, string(jsonBytes))
t.Logf("stats: %s\n", string(jsonBytes))
}
}
}()
@@ -321,11 +312,11 @@ func TestPuller(t *testing.T) {
require.NoError(t, err)
wrf, err := types.WalkRefsForChunkStore(datas.ChunkStoreFromDatabase(db))
require.NoError(t, err)
plr, err := NewPuller(ctx, tmpDir, 128, datas.ChunkStoreFromDatabase(db), datas.ChunkStoreFromDatabase(sinkdb), wrf, rootAddr, eventCh)
plr, err := NewPuller(ctx, tmpDir, 128, datas.ChunkStoreFromDatabase(db), datas.ChunkStoreFromDatabase(sinkdb), wrf, rootAddr, statsCh)
require.NoError(t, err)
err = plr.Pull(ctx)
close(eventCh)
close(statsCh)
require.NoError(t, err)
wg.Wait()