Add progLanguage

This commit is contained in:
Dhruv Sringari
2022-02-21 12:22:49 -08:00
parent 8328916026
commit e43930c1fc
7 changed files with 52 additions and 32 deletions

View File

@@ -257,7 +257,7 @@ func syncBackup(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgParseR
}
destDb, err := b.GetRemoteDB(ctx, dEnv.DoltDB.ValueReadWriter().Format())
err = actions.SyncRoots(ctx, dEnv.DoltDB, destDb, dEnv.TempTableFilesDir(), runProgFuncs, stopProgFuncs)
err = actions.SyncRoots(ctx, dEnv.DoltDB, destDb, dEnv.TempTableFilesDir(), buildProgStarter(defaultLanguage), stopProgFuncs)
switch err {
case nil:
@@ -319,7 +319,7 @@ func restoreBackup(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPar
return errhand.VerboseErrorFromError(err)
}
err = actions.SyncRoots(ctx, srcDb, dEnv.DoltDB, dEnv.TempTableFilesDir(), runProgFuncs, stopProgFuncs)
err = actions.SyncRoots(ctx, srcDb, dEnv.DoltDB, dEnv.TempTableFilesDir(), buildProgStarter(downloadLanguage), stopProgFuncs)
if err != nil {
// If we're cloning into a directory that already exists do not erase it. Otherwise
// make best effort to delete the directory we created.

View File

@@ -81,7 +81,7 @@ func (cmd FetchCmd) Exec(ctx context.Context, commandStr string, args []string,
}
updateMode := ref.UpdateMode{Force: apr.Contains(cli.ForceFlag)}
err = actions.FetchRefSpecs(ctx, dEnv.DbData(), refSpecs, r, updateMode, runProgFuncs, stopProgFuncs)
err = actions.FetchRefSpecs(ctx, dEnv.DbData(), refSpecs, r, updateMode, buildProgStarter(downloadLanguage), stopProgFuncs)
switch err {
case doltdb.ErrUpToDate:
return HandleVErrAndExitCode(nil, usage)

View File

@@ -111,7 +111,7 @@ func pullHelper(ctx context.Context, dEnv *env.DoltEnv, pullSpec *env.PullSpec)
if remoteTrackRef != nil {
srcDBCommit, err := actions.FetchRemoteBranch(ctx, dEnv.TempTableFilesDir(), pullSpec.Remote, srcDB, dEnv.DoltDB, pullSpec.Branch, remoteTrackRef, runProgFuncs, stopProgFuncs)
srcDBCommit, err := actions.FetchRemoteBranch(ctx, dEnv.TempTableFilesDir(), pullSpec.Remote, srcDB, dEnv.DoltDB, pullSpec.Branch, remoteTrackRef, buildProgStarter(downloadLanguage), stopProgFuncs)
if err != nil {
return err
}
@@ -177,7 +177,7 @@ func pullHelper(ctx context.Context, dEnv *env.DoltEnv, pullSpec *env.PullSpec)
if err != nil {
return err
}
err = actions.FetchFollowTags(ctx, dEnv.TempTableFilesDir(), srcDB, dEnv.DoltDB, runProgFuncs, stopProgFuncs)
err = actions.FetchFollowTags(ctx, dEnv.TempTableFilesDir(), srcDB, dEnv.DoltDB, buildProgStarter(downloadLanguage), stopProgFuncs)
if err != nil {
return err

View File

@@ -112,7 +112,7 @@ func (cmd PushCmd) Exec(ctx context.Context, commandStr string, args []string, d
}
var verr errhand.VerboseError
err = actions.DoPush(ctx, dEnv.RepoStateReader(), dEnv.RepoStateWriter(), dEnv.DoltDB, dEnv.TempTableFilesDir(), opts, runProgFuncs, stopProgFuncs)
err = actions.DoPush(ctx, dEnv.RepoStateReader(), dEnv.RepoStateWriter(), dEnv.DoltDB, dEnv.TempTableFilesDir(), opts, buildProgStarter(defaultLanguage), stopProgFuncs)
if err != nil {
verr = printInfoForPushError(err, opts.Remote, opts.DestRef, opts.RemoteRef)
}
@@ -175,11 +175,11 @@ func (ts *TextSpinner) next() string {
return string([]rune{spinnerSeq[ts.seqPos]})
}
func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent) {
func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent, language progLanguage) {
var pos int
var currentTreeLevel int
var percentBuffered float64
var tableFilesBuffered int
var tableFilesClosed int
var filesUploaded int
var ts TextSpinner
@@ -213,7 +213,7 @@ func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent) {
case pull.LevelDoneTWEvent:
case pull.TableFileClosedEvent:
tableFilesBuffered += 1
tableFilesClosed += 1
case pull.StartUploadTableFileEvent:
@@ -230,10 +230,16 @@ func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent) {
}
var msg string
if len(uploadRate) > 0 {
msg = fmt.Sprintf("%s Tree Level: %d, Percent Buffered: %.2f%%, Files Written: %d, Files Uploaded: %d, Current Upload Speed: %s", ts.next(), currentTreeLevel, percentBuffered, tableFilesBuffered, filesUploaded, uploadRate)
msg = fmt.Sprintf("%s Tree Level: %d, Percent Buffered: %.2f%%,", ts.next(), currentTreeLevel, percentBuffered)
if language == downloadLanguage {
msg = fmt.Sprintf("%s Files Written: %d", msg, filesUploaded)
} else {
msg = fmt.Sprintf("%s Tree Level: %d, Percent Buffered: %.2f%% Files Written: %d, Files Uploaded: %d", ts.next(), currentTreeLevel, percentBuffered, tableFilesBuffered, filesUploaded)
if len(uploadRate) > 0 {
msg = fmt.Sprintf("%s Files Created: %d, Files Uploaded: %d, Current Upload Speed: %s", msg, tableFilesClosed, filesUploaded, uploadRate)
} else {
msg = fmt.Sprintf("%s Files Created: %d, Files Uploaded: %d", msg, tableFilesClosed, filesUploaded)
}
}
pos = cli.DeleteAndPrint(pos, msg)
@@ -278,24 +284,34 @@ func progFunc(ctx context.Context, progChan chan pull.PullProgress) {
}
}
func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent, 128)
progChan := make(chan pull.PullProgress, 128)
wg := &sync.WaitGroup{}
// progLanguage is the language to use when displaying progress for a pull from a src db to a sink db.
type progLanguage int
wg.Add(1)
go func() {
defer wg.Done()
progFunc(ctx, progChan)
}()
const (
defaultLanguage progLanguage = iota
downloadLanguage
)
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh)
}()
func buildProgStarter(language progLanguage) actions.ProgStarter {
return func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) {
pullerEventCh := make(chan pull.PullerEvent, 128)
progChan := make(chan pull.PullProgress, 128)
wg := &sync.WaitGroup{}
return wg, progChan, pullerEventCh
wg.Add(1)
go func() {
defer wg.Done()
progFunc(ctx, progChan)
}()
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(ctx, pullerEventCh, language)
}()
return wg, progChan, pullerEventCh
}
}
func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) {
@@ -303,7 +319,6 @@ func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan
close(progChan)
close(pullerEventCh)
wg.Wait()
}
func bytesPerSec(bytes uint64, start time.Time) string {

View File

@@ -149,7 +149,7 @@ func (cmd ReadTablesCmd) Exec(ctx context.Context, commandStr string, args []str
}
for _, tblName := range tblNames {
destRoot, verr = pullTableValue(ctx, dEnv, srcDB, srcRoot, destRoot, tblName, commitStr)
destRoot, verr = pullTableValue(ctx, dEnv, srcDB, srcRoot, destRoot, downloadLanguage, tblName, commitStr)
if verr != nil {
return HandleVErrAndExitCode(verr, usage)
@@ -165,7 +165,7 @@ func (cmd ReadTablesCmd) Exec(ctx context.Context, commandStr string, args []str
return 0
}
func pullTableValue(ctx context.Context, dEnv *env.DoltEnv, srcDB *doltdb.DoltDB, srcRoot, destRoot *doltdb.RootValue, tblName, commitStr string) (*doltdb.RootValue, errhand.VerboseError) {
func pullTableValue(ctx context.Context, dEnv *env.DoltEnv, srcDB *doltdb.DoltDB, srcRoot, destRoot *doltdb.RootValue, language progLanguage, tblName, commitStr string) (*doltdb.RootValue, errhand.VerboseError) {
tbl, ok, err := srcRoot.GetTable(ctx, tblName)
if !ok {
@@ -182,7 +182,8 @@ func pullTableValue(ctx context.Context, dEnv *env.DoltEnv, srcDB *doltdb.DoltDB
newCtx, cancelFunc := context.WithCancel(ctx)
cli.Println("Retrieving", tblName)
wg, progChan, pullerEventCh := runProgFuncs(newCtx)
runProgFunc := buildProgStarter(language)
wg, progChan, pullerEventCh := runProgFunc(newCtx)
err = dEnv.DoltDB.PullChunks(ctx, dEnv.TempTableFilesDir(), srcDB, tblHash, progChan, pullerEventCh)
stopProgFuncs(cancelFunc, wg, progChan, pullerEventCh)
if err != nil {

View File

@@ -21,7 +21,6 @@ import (
"sync"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"

View File

@@ -337,6 +337,11 @@ func (p *Puller) Pull(ctx context.Context) error {
if !ae.IsSet() && p.wr.Size() > 0 {
// p.wr may be nil in the error case
if p.wr != nil {
p.addEvent(NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{
CurrentFileSize: int64(p.wr.ContentLength()),
}))
}
completedTables <- FilledWriters{p.wr}
}