From af86c92329df55a3a5d85f6cda3d457c18b77319 Mon Sep 17 00:00:00 2001 From: VinaiRachakonda Date: Tue, 13 Oct 2020 20:14:17 -0400 Subject: [PATCH 1/7] Routine --- go/cmd/dolt/commands/clone.go | 3 +- go/store/datas/pull.go | 93 ++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 41 deletions(-) diff --git a/go/cmd/dolt/commands/clone.go b/go/cmd/dolt/commands/clone.go index b79588a2b3..4af491983b 100644 --- a/go/cmd/dolt/commands/clone.go +++ b/go/cmd/dolt/commands/clone.go @@ -247,7 +247,6 @@ func cloneProg(eventCh <-chan datas.TableFileEvent) { chunksDownloaded int64 cliPos int ) - cliPos = cli.DeleteAndPrint(cliPos, "Retrieving remote information.") for tblFEvt := range eventCh { switch tblFEvt.EventType { @@ -286,8 +285,8 @@ func cloneRemote(ctx context.Context, srcDB *doltdb.DoltDB, remoteName, branch s }() err := actions.Clone(ctx, srcDB, dEnv.DoltDB, eventCh) - close(eventCh) + close(eventCh) wg.Wait() if err != nil { diff --git a/go/store/datas/pull.go b/go/store/datas/pull.go index 7a84799f78..5d47b586e8 100644 --- a/go/store/datas/pull.go +++ b/go/store/datas/pull.go @@ -29,6 +29,8 @@ import ( "math" "math/rand" + "golang.org/x/sync/errgroup" + "github.com/cenkalti/backoff" "github.com/golang/snappy" @@ -62,6 +64,7 @@ func makeProgTrack(progressCh chan PullProgress) func(moreDone, moreKnown, moreA } func Clone(ctx context.Context, srcDB, sinkDB Database, eventCh chan<- TableFileEvent) error { + srcCS := srcDB.chunkStore().(interface{}) sinkCS := sinkDB.chunkStore().(interface{}) @@ -118,6 +121,7 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile } func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error { + //cli.DeleteAndPrint(0, "HEREEEE\n") root, tblFiles, err := srcTS.Sources(ctx) if err != nil { @@ -136,60 +140,71 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- i := 0 download := func(ctx context.Context) error { var err error + + maxGoroutines := 3 + guard := make(chan struct{}, maxGoroutines) + eg, ctx := errgroup.WithContext(ctx) + for i < len(desiredFiles) { - fileID := desiredFiles[i] - tblFile, ok := fileIDToTF[fileID] + guard <- struct{}{} // would block if guard channel is already filled + func(i int) { + eg.Go(func() error { + fileID := desiredFiles[i] + tblFile, ok := fileIDToTF[fileID] - if !ok { - // conjoin happened during clone - return backoff.Permanent(errors.New("table file not found. please try again")) - } - - err = func() (err error) { - var rd io.ReadCloser - rd, err = tblFile.Open(ctx) - - if err != nil { - return err - } - - defer func() { - closeErr := rd.Close() - - if err == nil && closeErr != nil { - err = closeErr + if !ok { + // conjoin happened during clone + <-guard + return backoff.Permanent(errors.New("table file not found. please try again")) } - }() - if eventCh != nil { - eventCh <- TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}} - } + var rd io.ReadCloser + rd, err = tblFile.Open(ctx) - err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil) + if err != nil { + <-guard + return err + } + + defer func() error { + closeErr := rd.Close() + + if err == nil && closeErr != nil { + <-guard + return err + } + return nil + }() - if err != nil { if eventCh != nil { - eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}} + eventCh <- TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}} } - return err - } + err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil) - if eventCh != nil { - eventCh <- TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}} - } + if err != nil { + if eventCh != nil { + eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}} + } - return nil - }() + <-guard + return err + } - if err != nil { - break - } + if eventCh != nil { + eventCh <- TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}} + } + + <-guard + return nil + }) + }(i) i++ } - if err != nil { + if err := eg.Wait(); err != nil { + close(guard) // If at any point there is an error we retrieve updated TableFile information before retrying. var sourcesErr error _, tblFiles, sourcesErr = srcTS.Sources(ctx) @@ -202,7 +217,7 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- return err } - + close(guard) return nil } From 38b6672b961a919f7cf27a2a04e78294518779db Mon Sep 17 00:00:00 2001 From: VinaiRachakonda Date: Tue, 13 Oct 2020 20:16:58 -0400 Subject: [PATCH 2/7] Clean up pr --- go/cmd/dolt/commands/clone.go | 1 - go/store/datas/pull.go | 1 - 2 files changed, 2 deletions(-) diff --git a/go/cmd/dolt/commands/clone.go b/go/cmd/dolt/commands/clone.go index 4af491983b..87e0a7ccad 100644 --- a/go/cmd/dolt/commands/clone.go +++ b/go/cmd/dolt/commands/clone.go @@ -285,7 +285,6 @@ func cloneRemote(ctx context.Context, srcDB *doltdb.DoltDB, remoteName, branch s }() err := actions.Clone(ctx, srcDB, dEnv.DoltDB, eventCh) - close(eventCh) wg.Wait() diff --git a/go/store/datas/pull.go b/go/store/datas/pull.go index 5d47b586e8..216a1032f0 100644 --- a/go/store/datas/pull.go +++ b/go/store/datas/pull.go @@ -121,7 +121,6 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile } func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error { - //cli.DeleteAndPrint(0, "HEREEEE\n") root, tblFiles, err := srcTS.Sources(ctx) if err != nil { From efbb1bc3fc3f94a08b722a94439a3688e4aaab43 Mon Sep 17 00:00:00 2001 From: VinaiRachakonda Date: Tue, 13 Oct 2020 20:28:17 -0400 Subject: [PATCH 3/7] Add errgroup pkg --- go/go.mod | 2 +- go/go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go/go.mod b/go/go.mod index ba2d65f7a7..b96f60620e 100644 --- a/go/go.mod +++ b/go/go.mod @@ -80,7 +80,7 @@ require ( go.uber.org/zap v1.15.0 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 golang.org/x/net v0.0.0-20200904194848-62affa334b73 - golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 + golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f google.golang.org/api v0.32.0 google.golang.org/grpc v1.32.0 diff --git a/go/go.sum b/go/go.sum index c76900ac3a..e3a2fa0cbb 100644 --- a/go/go.sum +++ b/go/go.sum @@ -806,6 +806,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03i golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 h1:Bx6FllMpG4NWDOfhMBz1VR2QYNp/SAOHPIAsaVmxfPo= +golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= From 434700971817494e13ccfbc81059402235c9c6d7 Mon Sep 17 00:00:00 2001 From: VinaiRachakonda Date: Tue, 13 Oct 2020 20:30:08 -0400 Subject: [PATCH 4/7] Remove change to clone.go --- go/cmd/dolt/commands/clone.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/cmd/dolt/commands/clone.go b/go/cmd/dolt/commands/clone.go index 87e0a7ccad..b79588a2b3 100644 --- a/go/cmd/dolt/commands/clone.go +++ b/go/cmd/dolt/commands/clone.go @@ -247,6 +247,7 @@ func cloneProg(eventCh <-chan datas.TableFileEvent) { chunksDownloaded int64 cliPos int ) + cliPos = cli.DeleteAndPrint(cliPos, "Retrieving remote information.") for tblFEvt := range eventCh { switch tblFEvt.EventType { @@ -286,6 +287,7 @@ func cloneRemote(ctx context.Context, srcDB *doltdb.DoltDB, remoteName, branch s err := actions.Clone(ctx, srcDB, dEnv.DoltDB, eventCh) close(eventCh) + wg.Wait() if err != nil { From 21d280512f1cc65d681466f6a718a32d7c6dfa9e Mon Sep 17 00:00:00 2001 From: VinaiRachakonda Date: Wed, 14 Oct 2020 01:08:59 -0400 Subject: [PATCH 5/7] Fix gogroup issue? --- go/store/datas/pull.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/go/store/datas/pull.go b/go/store/datas/pull.go index 216a1032f0..77b2f226dc 100644 --- a/go/store/datas/pull.go +++ b/go/store/datas/pull.go @@ -32,13 +32,12 @@ import ( "golang.org/x/sync/errgroup" "github.com/cenkalti/backoff" - "github.com/golang/snappy" - "github.com/dolthub/dolt/go/store/atomicerr" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nbs" "github.com/dolthub/dolt/go/store/types" + "github.com/golang/snappy" ) type PullProgress struct { From 6a3734ce8ef3786aeb92153e07cb4cc450ec0d4e Mon Sep 17 00:00:00 2001 From: VinaiRachakonda Date: Wed, 14 Oct 2020 10:34:45 -0400 Subject: [PATCH 6/7] Fix lint issue --- go/store/datas/pull.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/store/datas/pull.go b/go/store/datas/pull.go index 77b2f226dc..06efb3a5ad 100644 --- a/go/store/datas/pull.go +++ b/go/store/datas/pull.go @@ -29,15 +29,15 @@ import ( "math" "math/rand" + "github.com/cenkalti/backoff" + "github.com/golang/snappy" "golang.org/x/sync/errgroup" - "github.com/cenkalti/backoff" "github.com/dolthub/dolt/go/store/atomicerr" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nbs" "github.com/dolthub/dolt/go/store/types" - "github.com/golang/snappy" ) type PullProgress struct { From d4e22b2af3071bf02a4cc80e86a1489f27e72363 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 14 Oct 2020 10:21:51 -0700 Subject: [PATCH 7/7] go/store/datas: pull.go: Clean up concurrent table file download implementation. * Use x/sync/semaphore instead of bounded struct{} channel. * Small improvement to stop submitting work once errgroup ctx is canceled. * Track which files have been successfully downloaded and ensure that we retry unsuccessful attempts. * Replace some repeated-cleanup-before-return with defer calls. --- go/store/datas/pull.go | 180 +++++++++++++++++++---------------------- 1 file changed, 85 insertions(+), 95 deletions(-) diff --git a/go/store/datas/pull.go b/go/store/datas/pull.go index 06efb3a5ad..4d48af3462 100644 --- a/go/store/datas/pull.go +++ b/go/store/datas/pull.go @@ -32,6 +32,7 @@ import ( "github.com/cenkalti/backoff" "github.com/golang/snappy" "golang.org/x/sync/errgroup" + "golang.org/x/sync/semaphore" "github.com/dolthub/dolt/go/store/atomicerr" "github.com/dolthub/dolt/go/store/chunks" @@ -119,129 +120,118 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile return fileIds, fileIDtoTblFile } +func CloseWithErr(c io.Closer, err *error) { + e := c.Close() + if *err == nil && e != nil { + *err = e + } +} + +const concurrentTableFileDownloads = 3 + func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error { root, tblFiles, err := srcTS.Sources(ctx) - if err != nil { return err } + report := func(e TableFileEvent) { + if eventCh != nil { + eventCh <- e + } + } + // Initializes the list of fileIDs we are going to download, and the map of fileIDToTF. If this clone takes a long // time some of the urls within the nbs.TableFiles will expire and fail to download. At that point we will retrieve // the sources again, and update the fileIDToTF map with updated info, but not change the files we are downloading. desiredFiles, fileIDToTF := mapTableFiles(tblFiles) + completed := make([]bool, len(desiredFiles)) - if eventCh != nil { - eventCh <- TableFileEvent{Listed, tblFiles} - } + report(TableFileEvent{Listed, tblFiles}) - i := 0 download := func(ctx context.Context) error { - var err error - - maxGoroutines := 3 - guard := make(chan struct{}, maxGoroutines) + sem := semaphore.NewWeighted(concurrentTableFileDownloads) eg, ctx := errgroup.WithContext(ctx) - - for i < len(desiredFiles) { - guard <- struct{}{} // would block if guard channel is already filled - func(i int) { - eg.Go(func() error { - fileID := desiredFiles[i] - tblFile, ok := fileIDToTF[fileID] - - if !ok { - // conjoin happened during clone - <-guard - return backoff.Permanent(errors.New("table file not found. please try again")) - } - - var rd io.ReadCloser - rd, err = tblFile.Open(ctx) - - if err != nil { - <-guard - return err - } - - defer func() error { - closeErr := rd.Close() - - if err == nil && closeErr != nil { - <-guard - return err - } - return nil - }() - - if eventCh != nil { - eventCh <- TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}} - } - - err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil) - - if err != nil { - if eventCh != nil { - eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}} - } - - <-guard - return err - } - - if eventCh != nil { - eventCh <- TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}} - } - - <-guard - return nil - }) - }(i) - - i++ - } - - if err := eg.Wait(); err != nil { - close(guard) - // If at any point there is an error we retrieve updated TableFile information before retrying. - var sourcesErr error - _, tblFiles, sourcesErr = srcTS.Sources(ctx) - - if sourcesErr != nil { - return backoff.Permanent(sourcesErr) + for i := 0; i < len(desiredFiles); i++ { + if completed[i] { + continue } + if err := sem.Acquire(ctx, 1); err != nil { + // The errgroup ctx has been canceled. We will + // return the error from wg.Wait() below. + break + } + idx := i + eg.Go(func() (err error) { + defer sem.Release(1) - _, fileIDToTF = mapTableFiles(tblFiles) + fileID := desiredFiles[idx] + tblFile, ok := fileIDToTF[fileID] + if !ok { + // conjoin happened during clone + return backoff.Permanent(errors.New("table file not found. please try again")) + } - return err + var rd io.ReadCloser + if rd, err = tblFile.Open(ctx); err != nil { + return err + } + defer CloseWithErr(rd, &err) + + report(TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}}) + err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil) + if err != nil { + report(TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}}) + return err + } + + report(TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}}) + completed[idx] = true + return nil + }) } - close(guard) - return nil + + return eg.Wait() } const maxAttempts = 3 + previousCompletedCnt := 0 failureCount := 0 - // keep going as long as progress is being made. If progress is not made retry up to maxAttempts times. - for failureCount < maxAttempts { - initialIdx := i - err = download(ctx) - - if err == nil { - break + madeProgress := func() bool { + currentCompletedCnt := 0 + for _, b := range completed { + if b { + currentCompletedCnt++ + } } - - if permanent, ok := err.(*backoff.PermanentError); ok { - return permanent.Err - } else if i == initialIdx { - failureCount++ + if currentCompletedCnt == previousCompletedCnt { + return false } else { - failureCount = 0 + previousCompletedCnt = currentCompletedCnt + return true } } - if err != nil { - return err + // keep going as long as progress is being made. If progress is not made retry up to maxAttempts times. + for { + err = download(ctx) + if err == nil { + break + } + if permanent, ok := err.(*backoff.PermanentError); ok { + return permanent.Err + } else if madeProgress() { + failureCount = 0 + } else { + failureCount++ + } + if failureCount >= maxAttempts { + return err + } + if _, tblFiles, err = srcTS.Sources(ctx); err != nil { + return err + } } return sinkTS.SetRootChunk(ctx, root, hash.Hash{})