go/store/datas: pull.go: Clean up concurrent table file download implementation.

* Use x/sync/semaphore instead of bounded struct{} channel.

* Small improvement to stop submitting work once errgroup ctx is canceled.

* Track which files have been successfully downloaded and ensure that we retry
  unsuccessful attempts.

* Replace some repeated-cleanup-before-return with defer calls.
This commit is contained in:
Aaron Son
2020-10-14 10:21:51 -07:00
parent 6a3734ce8e
commit d4e22b2af3

View File

@@ -32,6 +32,7 @@ import (
"github.com/cenkalti/backoff"
"github.com/golang/snappy"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/chunks"
@@ -119,129 +120,118 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile
return fileIds, fileIDtoTblFile
}
func CloseWithErr(c io.Closer, err *error) {
e := c.Close()
if *err == nil && e != nil {
*err = e
}
}
const concurrentTableFileDownloads = 3
func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error {
root, tblFiles, err := srcTS.Sources(ctx)
if err != nil {
return err
}
report := func(e TableFileEvent) {
if eventCh != nil {
eventCh <- e
}
}
// Initializes the list of fileIDs we are going to download, and the map of fileIDToTF. If this clone takes a long
// time some of the urls within the nbs.TableFiles will expire and fail to download. At that point we will retrieve
// the sources again, and update the fileIDToTF map with updated info, but not change the files we are downloading.
desiredFiles, fileIDToTF := mapTableFiles(tblFiles)
completed := make([]bool, len(desiredFiles))
if eventCh != nil {
eventCh <- TableFileEvent{Listed, tblFiles}
}
report(TableFileEvent{Listed, tblFiles})
i := 0
download := func(ctx context.Context) error {
var err error
maxGoroutines := 3
guard := make(chan struct{}, maxGoroutines)
sem := semaphore.NewWeighted(concurrentTableFileDownloads)
eg, ctx := errgroup.WithContext(ctx)
for i < len(desiredFiles) {
guard <- struct{}{} // would block if guard channel is already filled
func(i int) {
eg.Go(func() error {
fileID := desiredFiles[i]
tblFile, ok := fileIDToTF[fileID]
if !ok {
// conjoin happened during clone
<-guard
return backoff.Permanent(errors.New("table file not found. please try again"))
}
var rd io.ReadCloser
rd, err = tblFile.Open(ctx)
if err != nil {
<-guard
return err
}
defer func() error {
closeErr := rd.Close()
if err == nil && closeErr != nil {
<-guard
return err
}
return nil
}()
if eventCh != nil {
eventCh <- TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}}
}
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil)
if err != nil {
if eventCh != nil {
eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}}
}
<-guard
return err
}
if eventCh != nil {
eventCh <- TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}}
}
<-guard
return nil
})
}(i)
i++
}
if err := eg.Wait(); err != nil {
close(guard)
// If at any point there is an error we retrieve updated TableFile information before retrying.
var sourcesErr error
_, tblFiles, sourcesErr = srcTS.Sources(ctx)
if sourcesErr != nil {
return backoff.Permanent(sourcesErr)
for i := 0; i < len(desiredFiles); i++ {
if completed[i] {
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
// The errgroup ctx has been canceled. We will
// return the error from wg.Wait() below.
break
}
idx := i
eg.Go(func() (err error) {
defer sem.Release(1)
_, fileIDToTF = mapTableFiles(tblFiles)
fileID := desiredFiles[idx]
tblFile, ok := fileIDToTF[fileID]
if !ok {
// conjoin happened during clone
return backoff.Permanent(errors.New("table file not found. please try again"))
}
return err
var rd io.ReadCloser
if rd, err = tblFile.Open(ctx); err != nil {
return err
}
defer CloseWithErr(rd, &err)
report(TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}})
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil)
if err != nil {
report(TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}})
return err
}
report(TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}})
completed[idx] = true
return nil
})
}
close(guard)
return nil
return eg.Wait()
}
const maxAttempts = 3
previousCompletedCnt := 0
failureCount := 0
// keep going as long as progress is being made. If progress is not made retry up to maxAttempts times.
for failureCount < maxAttempts {
initialIdx := i
err = download(ctx)
if err == nil {
break
madeProgress := func() bool {
currentCompletedCnt := 0
for _, b := range completed {
if b {
currentCompletedCnt++
}
}
if permanent, ok := err.(*backoff.PermanentError); ok {
return permanent.Err
} else if i == initialIdx {
failureCount++
if currentCompletedCnt == previousCompletedCnt {
return false
} else {
failureCount = 0
previousCompletedCnt = currentCompletedCnt
return true
}
}
if err != nil {
return err
// keep going as long as progress is being made. If progress is not made retry up to maxAttempts times.
for {
err = download(ctx)
if err == nil {
break
}
if permanent, ok := err.(*backoff.PermanentError); ok {
return permanent.Err
} else if madeProgress() {
failureCount = 0
} else {
failureCount++
}
if failureCount >= maxAttempts {
return err
}
if _, tblFiles, err = srcTS.Sources(ctx); err != nil {
return err
}
}
return sinkTS.SetRootChunk(ctx, root, hash.Hash{})