This commit is contained in:
Dhruv Sringari
2022-02-16 14:52:34 -08:00
parent 518f542654
commit a18bfc6c27
7 changed files with 84 additions and 20 deletions
+33
View File
@@ -20,6 +20,7 @@ import (
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
@@ -28,9 +29,12 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/libraries/utils/strhelp"
"github.com/dolthub/dolt/go/store/datas/pull"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/types"
"github.com/dustin/go-humanize"
)
var ErrRepositoryExists = errors.New("data repository already exists")
@@ -93,6 +97,8 @@ func cloneProg(eventCh <-chan pull.TableFileEvent) {
chunks int64
chunksDownloading int64
chunksDownloaded int64
currStats = make(map[string]iohelp.ReadStats)
tableFiles = make(map[string]*nbs.TableFile)
cliPos int
)
@@ -101,28 +107,55 @@ func cloneProg(eventCh <-chan pull.TableFileEvent) {
switch tblFEvt.EventType {
case pull.Listed:
for _, tf := range tblFEvt.TableFiles {
c := tf
tableFiles[c.FileID()] = &c
chunks += int64(tf.NumChunks())
}
case pull.DownloadStart:
for _, tf := range tblFEvt.TableFiles {
chunksDownloading += int64(tf.NumChunks())
}
case pull.DownloadStats:
for i, s := range tblFEvt.Stats {
tf := tblFEvt.TableFiles[i]
currStats[tf.FileID()] = s
}
case pull.DownloadSuccess:
for _, tf := range tblFEvt.TableFiles {
chunksDownloading -= int64(tf.NumChunks())
chunksDownloaded += int64(tf.NumChunks())
delete(currStats, tf.FileID())
}
case pull.DownloadFailed:
// Ignore for now and output errors on the main thread
for _, tf := range tblFEvt.TableFiles {
delete(currStats, tf.FileID())
}
}
str := fmt.Sprintf("%s of %s chunks complete. %s chunks being downloaded currently.", strhelp.CommaIfy(chunksDownloaded), strhelp.CommaIfy(chunks), strhelp.CommaIfy(chunksDownloading))
for _, fileId := range sortedKeys(currStats) {
s := currStats[fileId]
bps := float64(s.Read) / s.Elapsed.Seconds()
rate := humanize.Bytes(uint64(bps)) + "/s"
str = fmt.Sprintf("%s\nFile: %s (%s chunks) - %.2f%% downloaded, %s",
str, fileId, strhelp.CommaIfy(int64((*tableFiles[fileId]).NumChunks())), s.Percent*100, rate)
}
cliPos = cli.DeleteAndPrint(cliPos, str)
}
cli.Println()
}
func sortedKeys(m map[string]iohelp.ReadStats) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
func CloneRemote(ctx context.Context, srcDB *doltdb.DoltDB, remoteName, branch string, dEnv *env.DoltEnv) error {
eventCh := make(chan pull.TableFileEvent, 128)
@@ -1334,7 +1334,7 @@ func sanitizeSignedUrl(url string) string {
}
// Open returns an io.ReadCloser which can be used to read the bytes of a table file.
func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, error) {
func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) {
if drtf.info.RefreshAfter != nil && drtf.info.RefreshAfter.AsTime().After(time.Now()) {
resp, err := drtf.dcs.csClient.RefreshTableFileUrl(ctx, drtf.info.RefreshRequest)
if err == nil {
@@ -1345,20 +1345,20 @@ func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, error)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, drtf.info.Url, nil)
if err != nil {
return nil, err
return nil, 0, err
}
resp, err := drtf.dcs.httpFetcher.Do(req)
if err != nil {
return nil, err
return nil, 0, err
}
if resp.StatusCode/100 != 2 {
defer resp.Body.Close()
body := make([]byte, 4096)
n, _ := io.ReadFull(resp.Body, body)
return nil, fmt.Errorf("%w: status code: %d;\nurl: %s\n\nbody:\n\n%s\n", ErrRemoteTableFileGet, resp.StatusCode, sanitizeSignedUrl(drtf.info.Url), string(body[0:n]))
return nil, 0, fmt.Errorf("%w: status code: %d;\nurl: %s\n\nbody:\n\n%s\n", ErrRemoteTableFileGet, resp.StatusCode, sanitizeSignedUrl(drtf.info.Url), string(body[0:n]))
}
return resp.Body, nil
return resp.Body, uint64(resp.ContentLength), nil
}
@@ -63,6 +63,17 @@ func (rws *ReaderWithStats) Start(updateFunc func(ReadStats)) {
}()
}
// Close closes this reader. Only one of Close or Stop should be called
func (rws *ReaderWithStats) Close() error {
close(rws.closeCh)
if closer, ok := rws.rd.(io.Closer); ok {
return closer.Close()
}
return nil
}
func (rws *ReaderWithStats) Stop() {
close(rws.closeCh)
+19 -7
View File
@@ -20,6 +20,7 @@ import (
"io"
"github.com/cenkalti/backoff"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
@@ -59,6 +60,7 @@ type CloneTableFileEvent int
const (
Listed = iota
DownloadStart
DownloadStats
DownloadSuccess
DownloadFailed
)
@@ -66,6 +68,7 @@ const (
type TableFileEvent struct {
EventType CloneTableFileEvent
TableFiles []nbs.TableFile
Stats []iohelp.ReadStats
}
// mapTableFiles returns the list of all fileIDs for the table files, and a map from fileID to nbs.TableFile
@@ -111,7 +114,7 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
desiredFiles, fileIDToTF, fileIDToNumChunks := mapTableFiles(tblFiles)
completed := make([]bool, len(desiredFiles))
report(TableFileEvent{Listed, tblFiles})
report(TableFileEvent{EventType: Listed, TableFiles: tblFiles})
download := func(ctx context.Context) error {
sem := semaphore.NewWeighted(concurrentTableFileDownloads)
@@ -136,20 +139,29 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
return backoff.Permanent(errors.New("table file not found. please try again"))
}
var rd io.ReadCloser
if rd, err = tblFile.Open(ctx); err != nil {
rd, contentLength, err := tblFile.Open(ctx)
if err != nil {
return err
}
defer CloseWithErr(rd, &err)
rdStats := iohelp.NewReaderWithStats(rd, int64(contentLength))
defer CloseWithErr(rdStats, &err)
report(TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}})
rdStats.Start(func(s iohelp.ReadStats) {
report(TableFileEvent{
EventType: DownloadStats,
TableFiles: []nbs.TableFile{tblFile},
Stats: []iohelp.ReadStats{s},
})
})
report(TableFileEvent{EventType: DownloadStart, TableFiles: []nbs.TableFile{tblFile}})
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd, 0, nil)
if err != nil {
report(TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}})
report(TableFileEvent{EventType: DownloadFailed, TableFiles: []nbs.TableFile{tblFile}})
return err
}
report(TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}})
report(TableFileEvent{EventType: DownloadSuccess, TableFiles: []nbs.TableFile{tblFile}})
completed[idx] = true
return nil
})
+3
View File
@@ -133,6 +133,9 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data [
func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
plan, err := planConjoin(sources, stats)
for _, source := range sources {
source.Close()
}
if err != nil {
return emptyChunkSource{}, err
+11 -6
View File
@@ -1135,7 +1135,7 @@ func (nbs *NomsBlockStore) StatsSummary() string {
// tableFile is our implementation of TableFile.
type tableFile struct {
info TableSpecInfo
open func(ctx context.Context) (io.ReadCloser, error)
open func(ctx context.Context) (io.ReadCloser, uint64, error)
}
// FileID gets the id of the file
@@ -1148,8 +1148,8 @@ func (tf tableFile) NumChunks() int {
return int(tf.info.GetChunkCount())
}
// Open returns an io.ReadCloser which can be used to read the bytes of a table file.
func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, error) {
// Open returns an io.ReadCloser which can be used to read the bytes of a table file and the content length in bytes.
func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) {
return tf.open(ctx)
}
@@ -1210,13 +1210,18 @@ func getTableFiles(css map[addr]chunkSource, contents manifestContents, numSpecs
func newTableFile(cs chunkSource, info tableSpec) tableFile {
return tableFile{
info: info,
open: func(ctx context.Context) (io.ReadCloser, error) {
open: func(ctx context.Context) (io.ReadCloser, uint64, error) {
r, err := cs.reader(ctx)
if err != nil {
return nil, err
return nil, 0, err
}
return io.NopCloser(r), nil
len, err := cs.uncompressedLen()
if err != nil {
return nil, 0, err
}
return io.NopCloser(r), len, nil
},
}
}
+2 -2
View File
@@ -283,8 +283,8 @@ type TableFile interface {
// NumChunks returns the number of chunks in a table file
NumChunks() int
// Open returns an io.ReadCloser which can be used to read the bytes of a table file.
Open(ctx context.Context) (io.ReadCloser, error)
// Open returns an io.ReadCloser which can be used to read the bytes of a table file and the content length in bytes.
Open(ctx context.Context) (io.ReadCloser, uint64, error)
}
// Describes what is possible to do with TableFiles in a TableFileStore.