mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 10:38:10 -06:00
Merge pull request #934 from VinaiRachakonda/vinai/clone-faster-with-routines
Add go routine to clone
This commit is contained in:
@@ -80,7 +80,7 @@ require (
|
||||
go.uber.org/zap v1.15.0
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
|
||||
golang.org/x/net v0.0.0-20200904194848-62affa334b73
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
|
||||
golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520
|
||||
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f
|
||||
google.golang.org/api v0.32.0
|
||||
google.golang.org/grpc v1.32.0
|
||||
|
||||
@@ -806,6 +806,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a h1:WXEvlFVvvGxCJLG6REjsT03i
|
||||
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 h1:qwRHBd0NqMbJxfbotnDhm2ByMI1Shq4Y6oRJo21SGJA=
|
||||
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 h1:Bx6FllMpG4NWDOfhMBz1VR2QYNp/SAOHPIAsaVmxfPo=
|
||||
golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
|
||||
@@ -31,6 +31,8 @@ import (
|
||||
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/golang/snappy"
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/atomicerr"
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
@@ -62,6 +64,7 @@ func makeProgTrack(progressCh chan PullProgress) func(moreDone, moreKnown, moreA
|
||||
}
|
||||
|
||||
func Clone(ctx context.Context, srcDB, sinkDB Database, eventCh chan<- TableFileEvent) error {
|
||||
|
||||
srcCS := srcDB.chunkStore().(interface{})
|
||||
sinkCS := sinkDB.chunkStore().(interface{})
|
||||
|
||||
@@ -117,118 +120,118 @@ func mapTableFiles(tblFiles []nbs.TableFile) ([]string, map[string]nbs.TableFile
|
||||
return fileIds, fileIDtoTblFile
|
||||
}
|
||||
|
||||
func CloseWithErr(c io.Closer, err *error) {
|
||||
e := c.Close()
|
||||
if *err == nil && e != nil {
|
||||
*err = e
|
||||
}
|
||||
}
|
||||
|
||||
const concurrentTableFileDownloads = 3
|
||||
|
||||
func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<- TableFileEvent) error {
|
||||
root, tblFiles, err := srcTS.Sources(ctx)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
report := func(e TableFileEvent) {
|
||||
if eventCh != nil {
|
||||
eventCh <- e
|
||||
}
|
||||
}
|
||||
|
||||
// Initializes the list of fileIDs we are going to download, and the map of fileIDToTF. If this clone takes a long
|
||||
// time some of the urls within the nbs.TableFiles will expire and fail to download. At that point we will retrieve
|
||||
// the sources again, and update the fileIDToTF map with updated info, but not change the files we are downloading.
|
||||
desiredFiles, fileIDToTF := mapTableFiles(tblFiles)
|
||||
completed := make([]bool, len(desiredFiles))
|
||||
|
||||
if eventCh != nil {
|
||||
eventCh <- TableFileEvent{Listed, tblFiles}
|
||||
}
|
||||
report(TableFileEvent{Listed, tblFiles})
|
||||
|
||||
i := 0
|
||||
download := func(ctx context.Context) error {
|
||||
var err error
|
||||
for i < len(desiredFiles) {
|
||||
fileID := desiredFiles[i]
|
||||
tblFile, ok := fileIDToTF[fileID]
|
||||
|
||||
if !ok {
|
||||
// conjoin happened during clone
|
||||
return backoff.Permanent(errors.New("table file not found. please try again"))
|
||||
sem := semaphore.NewWeighted(concurrentTableFileDownloads)
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for i := 0; i < len(desiredFiles); i++ {
|
||||
if completed[i] {
|
||||
continue
|
||||
}
|
||||
|
||||
err = func() (err error) {
|
||||
var rd io.ReadCloser
|
||||
rd, err = tblFile.Open(ctx)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
closeErr := rd.Close()
|
||||
|
||||
if err == nil && closeErr != nil {
|
||||
err = closeErr
|
||||
}
|
||||
}()
|
||||
|
||||
if eventCh != nil {
|
||||
eventCh <- TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}}
|
||||
}
|
||||
|
||||
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil)
|
||||
|
||||
if err != nil {
|
||||
if eventCh != nil {
|
||||
eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if eventCh != nil {
|
||||
eventCh <- TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}}
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
if err := sem.Acquire(ctx, 1); err != nil {
|
||||
// The errgroup ctx has been canceled. We will
|
||||
// return the error from wg.Wait() below.
|
||||
break
|
||||
}
|
||||
idx := i
|
||||
eg.Go(func() (err error) {
|
||||
defer sem.Release(1)
|
||||
|
||||
i++
|
||||
fileID := desiredFiles[idx]
|
||||
tblFile, ok := fileIDToTF[fileID]
|
||||
if !ok {
|
||||
// conjoin happened during clone
|
||||
return backoff.Permanent(errors.New("table file not found. please try again"))
|
||||
}
|
||||
|
||||
var rd io.ReadCloser
|
||||
if rd, err = tblFile.Open(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
defer CloseWithErr(rd, &err)
|
||||
|
||||
report(TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}})
|
||||
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil)
|
||||
if err != nil {
|
||||
report(TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}})
|
||||
return err
|
||||
}
|
||||
|
||||
report(TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}})
|
||||
completed[idx] = true
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// If at any point there is an error we retrieve updated TableFile information before retrying.
|
||||
var sourcesErr error
|
||||
_, tblFiles, sourcesErr = srcTS.Sources(ctx)
|
||||
|
||||
if sourcesErr != nil {
|
||||
return backoff.Permanent(sourcesErr)
|
||||
}
|
||||
|
||||
_, fileIDToTF = mapTableFiles(tblFiles)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return eg.Wait()
|
||||
}
|
||||
|
||||
const maxAttempts = 3
|
||||
previousCompletedCnt := 0
|
||||
failureCount := 0
|
||||
|
||||
// keep going as long as progress is being made. If progress is not made retry up to maxAttempts times.
|
||||
for failureCount < maxAttempts {
|
||||
initialIdx := i
|
||||
err = download(ctx)
|
||||
|
||||
if err == nil {
|
||||
break
|
||||
madeProgress := func() bool {
|
||||
currentCompletedCnt := 0
|
||||
for _, b := range completed {
|
||||
if b {
|
||||
currentCompletedCnt++
|
||||
}
|
||||
}
|
||||
|
||||
if permanent, ok := err.(*backoff.PermanentError); ok {
|
||||
return permanent.Err
|
||||
} else if i == initialIdx {
|
||||
failureCount++
|
||||
if currentCompletedCnt == previousCompletedCnt {
|
||||
return false
|
||||
} else {
|
||||
failureCount = 0
|
||||
previousCompletedCnt = currentCompletedCnt
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
// keep going as long as progress is being made. If progress is not made retry up to maxAttempts times.
|
||||
for {
|
||||
err = download(ctx)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if permanent, ok := err.(*backoff.PermanentError); ok {
|
||||
return permanent.Err
|
||||
} else if madeProgress() {
|
||||
failureCount = 0
|
||||
} else {
|
||||
failureCount++
|
||||
}
|
||||
if failureCount >= maxAttempts {
|
||||
return err
|
||||
}
|
||||
if _, tblFiles, err = srcTS.Sources(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return sinkTS.SetRootChunk(ctx, root, hash.Hash{})
|
||||
|
||||
Reference in New Issue
Block a user