From e43930c1fc322cacced8aad66dcd6009706431bc Mon Sep 17 00:00:00 2001 From: Dhruv Sringari Date: Mon, 21 Feb 2022 12:22:49 -0800 Subject: [PATCH] Add progLanguage --- go/cmd/dolt/commands/backup.go | 4 +- go/cmd/dolt/commands/fetch.go | 2 +- go/cmd/dolt/commands/pull.go | 4 +- go/cmd/dolt/commands/push.go | 61 ++++++++++++-------- go/cmd/dolt/commands/read_tables.go | 7 ++- go/libraries/doltcore/env/actions/remotes.go | 1 - go/store/datas/pull/puller.go | 5 ++ 7 files changed, 52 insertions(+), 32 deletions(-) diff --git a/go/cmd/dolt/commands/backup.go b/go/cmd/dolt/commands/backup.go index fd9b86b28a..958be61250 100644 --- a/go/cmd/dolt/commands/backup.go +++ b/go/cmd/dolt/commands/backup.go @@ -257,7 +257,7 @@ func syncBackup(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgParseR } destDb, err := b.GetRemoteDB(ctx, dEnv.DoltDB.ValueReadWriter().Format()) - err = actions.SyncRoots(ctx, dEnv.DoltDB, destDb, dEnv.TempTableFilesDir(), runProgFuncs, stopProgFuncs) + err = actions.SyncRoots(ctx, dEnv.DoltDB, destDb, dEnv.TempTableFilesDir(), buildProgStarter(defaultLanguage), stopProgFuncs) switch err { case nil: @@ -319,7 +319,7 @@ func restoreBackup(ctx context.Context, dEnv *env.DoltEnv, apr *argparser.ArgPar return errhand.VerboseErrorFromError(err) } - err = actions.SyncRoots(ctx, srcDb, dEnv.DoltDB, dEnv.TempTableFilesDir(), runProgFuncs, stopProgFuncs) + err = actions.SyncRoots(ctx, srcDb, dEnv.DoltDB, dEnv.TempTableFilesDir(), buildProgStarter(downloadLanguage), stopProgFuncs) if err != nil { // If we're cloning into a directory that already exists do not erase it. Otherwise // make best effort to delete the directory we created. diff --git a/go/cmd/dolt/commands/fetch.go b/go/cmd/dolt/commands/fetch.go index bf54bea12d..053aa3c5aa 100644 --- a/go/cmd/dolt/commands/fetch.go +++ b/go/cmd/dolt/commands/fetch.go @@ -81,7 +81,7 @@ func (cmd FetchCmd) Exec(ctx context.Context, commandStr string, args []string, } updateMode := ref.UpdateMode{Force: apr.Contains(cli.ForceFlag)} - err = actions.FetchRefSpecs(ctx, dEnv.DbData(), refSpecs, r, updateMode, runProgFuncs, stopProgFuncs) + err = actions.FetchRefSpecs(ctx, dEnv.DbData(), refSpecs, r, updateMode, buildProgStarter(downloadLanguage), stopProgFuncs) switch err { case doltdb.ErrUpToDate: return HandleVErrAndExitCode(nil, usage) diff --git a/go/cmd/dolt/commands/pull.go b/go/cmd/dolt/commands/pull.go index 1136184d76..5720c97fa5 100644 --- a/go/cmd/dolt/commands/pull.go +++ b/go/cmd/dolt/commands/pull.go @@ -111,7 +111,7 @@ func pullHelper(ctx context.Context, dEnv *env.DoltEnv, pullSpec *env.PullSpec) if remoteTrackRef != nil { - srcDBCommit, err := actions.FetchRemoteBranch(ctx, dEnv.TempTableFilesDir(), pullSpec.Remote, srcDB, dEnv.DoltDB, pullSpec.Branch, remoteTrackRef, runProgFuncs, stopProgFuncs) + srcDBCommit, err := actions.FetchRemoteBranch(ctx, dEnv.TempTableFilesDir(), pullSpec.Remote, srcDB, dEnv.DoltDB, pullSpec.Branch, remoteTrackRef, buildProgStarter(downloadLanguage), stopProgFuncs) if err != nil { return err } @@ -177,7 +177,7 @@ func pullHelper(ctx context.Context, dEnv *env.DoltEnv, pullSpec *env.PullSpec) if err != nil { return err } - err = actions.FetchFollowTags(ctx, dEnv.TempTableFilesDir(), srcDB, dEnv.DoltDB, runProgFuncs, stopProgFuncs) + err = actions.FetchFollowTags(ctx, dEnv.TempTableFilesDir(), srcDB, dEnv.DoltDB, buildProgStarter(downloadLanguage), stopProgFuncs) if err != nil { return err diff --git a/go/cmd/dolt/commands/push.go b/go/cmd/dolt/commands/push.go index 61858ceed5..356af14336 100644 --- a/go/cmd/dolt/commands/push.go +++ b/go/cmd/dolt/commands/push.go @@ -112,7 +112,7 @@ func (cmd PushCmd) Exec(ctx context.Context, commandStr string, args []string, d } var verr errhand.VerboseError - err = actions.DoPush(ctx, dEnv.RepoStateReader(), dEnv.RepoStateWriter(), dEnv.DoltDB, dEnv.TempTableFilesDir(), opts, runProgFuncs, stopProgFuncs) + err = actions.DoPush(ctx, dEnv.RepoStateReader(), dEnv.RepoStateWriter(), dEnv.DoltDB, dEnv.TempTableFilesDir(), opts, buildProgStarter(defaultLanguage), stopProgFuncs) if err != nil { verr = printInfoForPushError(err, opts.Remote, opts.DestRef, opts.RemoteRef) } @@ -175,11 +175,11 @@ func (ts *TextSpinner) next() string { return string([]rune{spinnerSeq[ts.seqPos]}) } -func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent) { +func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent, language progLanguage) { var pos int var currentTreeLevel int var percentBuffered float64 - var tableFilesBuffered int + var tableFilesClosed int var filesUploaded int var ts TextSpinner @@ -213,7 +213,7 @@ func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent) { case pull.LevelDoneTWEvent: case pull.TableFileClosedEvent: - tableFilesBuffered += 1 + tableFilesClosed += 1 case pull.StartUploadTableFileEvent: @@ -230,10 +230,16 @@ func pullerProgFunc(ctx context.Context, pullerEventCh chan pull.PullerEvent) { } var msg string - if len(uploadRate) > 0 { - msg = fmt.Sprintf("%s Tree Level: %d, Percent Buffered: %.2f%%, Files Written: %d, Files Uploaded: %d, Current Upload Speed: %s", ts.next(), currentTreeLevel, percentBuffered, tableFilesBuffered, filesUploaded, uploadRate) + msg = fmt.Sprintf("%s Tree Level: %d, Percent Buffered: %.2f%%,", ts.next(), currentTreeLevel, percentBuffered) + + if language == downloadLanguage { + msg = fmt.Sprintf("%s Files Written: %d", msg, filesUploaded) } else { - msg = fmt.Sprintf("%s Tree Level: %d, Percent Buffered: %.2f%% Files Written: %d, Files Uploaded: %d", ts.next(), currentTreeLevel, percentBuffered, tableFilesBuffered, filesUploaded) + if len(uploadRate) > 0 { + msg = fmt.Sprintf("%s Files Created: %d, Files Uploaded: %d, Current Upload Speed: %s", msg, tableFilesClosed, filesUploaded, uploadRate) + } else { + msg = fmt.Sprintf("%s Files Created: %d, Files Uploaded: %d", msg, tableFilesClosed, filesUploaded) + } } pos = cli.DeleteAndPrint(pos, msg) @@ -278,24 +284,34 @@ func progFunc(ctx context.Context, progChan chan pull.PullProgress) { } } -func runProgFuncs(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) { - pullerEventCh := make(chan pull.PullerEvent, 128) - progChan := make(chan pull.PullProgress, 128) - wg := &sync.WaitGroup{} +// progLanguage is the language to use when displaying progress for a pull from a src db to a sink db. +type progLanguage int - wg.Add(1) - go func() { - defer wg.Done() - progFunc(ctx, progChan) - }() +const ( + defaultLanguage progLanguage = iota + downloadLanguage +) - wg.Add(1) - go func() { - defer wg.Done() - pullerProgFunc(ctx, pullerEventCh) - }() +func buildProgStarter(language progLanguage) actions.ProgStarter { + return func(ctx context.Context) (*sync.WaitGroup, chan pull.PullProgress, chan pull.PullerEvent) { + pullerEventCh := make(chan pull.PullerEvent, 128) + progChan := make(chan pull.PullProgress, 128) + wg := &sync.WaitGroup{} - return wg, progChan, pullerEventCh + wg.Add(1) + go func() { + defer wg.Done() + progFunc(ctx, progChan) + }() + + wg.Add(1) + go func() { + defer wg.Done() + pullerProgFunc(ctx, pullerEventCh, language) + }() + + return wg, progChan, pullerEventCh + } } func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan pull.PullProgress, pullerEventCh chan pull.PullerEvent) { @@ -303,7 +319,6 @@ func stopProgFuncs(cancel context.CancelFunc, wg *sync.WaitGroup, progChan chan close(progChan) close(pullerEventCh) wg.Wait() - } func bytesPerSec(bytes uint64, start time.Time) string { diff --git a/go/cmd/dolt/commands/read_tables.go b/go/cmd/dolt/commands/read_tables.go index 1a278801da..e8dc64da58 100644 --- a/go/cmd/dolt/commands/read_tables.go +++ b/go/cmd/dolt/commands/read_tables.go @@ -149,7 +149,7 @@ func (cmd ReadTablesCmd) Exec(ctx context.Context, commandStr string, args []str } for _, tblName := range tblNames { - destRoot, verr = pullTableValue(ctx, dEnv, srcDB, srcRoot, destRoot, tblName, commitStr) + destRoot, verr = pullTableValue(ctx, dEnv, srcDB, srcRoot, destRoot, downloadLanguage, tblName, commitStr) if verr != nil { return HandleVErrAndExitCode(verr, usage) @@ -165,7 +165,7 @@ func (cmd ReadTablesCmd) Exec(ctx context.Context, commandStr string, args []str return 0 } -func pullTableValue(ctx context.Context, dEnv *env.DoltEnv, srcDB *doltdb.DoltDB, srcRoot, destRoot *doltdb.RootValue, tblName, commitStr string) (*doltdb.RootValue, errhand.VerboseError) { +func pullTableValue(ctx context.Context, dEnv *env.DoltEnv, srcDB *doltdb.DoltDB, srcRoot, destRoot *doltdb.RootValue, language progLanguage, tblName, commitStr string) (*doltdb.RootValue, errhand.VerboseError) { tbl, ok, err := srcRoot.GetTable(ctx, tblName) if !ok { @@ -182,7 +182,8 @@ func pullTableValue(ctx context.Context, dEnv *env.DoltEnv, srcDB *doltdb.DoltDB newCtx, cancelFunc := context.WithCancel(ctx) cli.Println("Retrieving", tblName) - wg, progChan, pullerEventCh := runProgFuncs(newCtx) + runProgFunc := buildProgStarter(language) + wg, progChan, pullerEventCh := runProgFunc(newCtx) err = dEnv.DoltDB.PullChunks(ctx, dEnv.TempTableFilesDir(), srcDB, tblHash, progChan, pullerEventCh) stopProgFuncs(cancelFunc, wg, progChan, pullerEventCh) if err != nil { diff --git a/go/libraries/doltcore/env/actions/remotes.go b/go/libraries/doltcore/env/actions/remotes.go index 10d08dc485..ac5f4fa26c 100644 --- a/go/libraries/doltcore/env/actions/remotes.go +++ b/go/libraries/doltcore/env/actions/remotes.go @@ -21,7 +21,6 @@ import ( "sync" "github.com/dolthub/dolt/go/cmd/dolt/cli" - eventsapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" diff --git a/go/store/datas/pull/puller.go b/go/store/datas/pull/puller.go index a118ef2f7d..175095ed36 100644 --- a/go/store/datas/pull/puller.go +++ b/go/store/datas/pull/puller.go @@ -337,6 +337,11 @@ func (p *Puller) Pull(ctx context.Context) error { if !ae.IsSet() && p.wr.Size() > 0 { // p.wr may be nil in the error case + if p.wr != nil { + p.addEvent(NewTFPullerEvent(TableFileClosedEvent, &TableFileEventDetails{ + CurrentFileSize: int64(p.wr.ContentLength()), + })) + } completedTables <- FilledWriters{p.wr} }