go/libraries/doltcore/remotestorage: Add a way to tune download concurrency, cache implementation.

This commit is contained in:
Aaron Son
2021-01-08 10:58:22 -08:00
parent 438a92ad19
commit 1caea0c516
2 changed files with 25 additions and 16 deletions

View File

@@ -19,8 +19,8 @@ import (
"github.com/dolthub/dolt/go/store/nbs"
)
// chunkCache is an interface used for caching chunks
type chunkCache interface {
// ChunkCache is an interface used for caching chunks
type ChunkCache interface {
// Put puts a slice of chunks into the cache.
Put(c []nbs.CompressedChunk)

View File

@@ -94,14 +94,15 @@ type HTTPFetcher interface {
}
type DoltChunkStore struct {
org string
repoName string
host string
csClient remotesapi.ChunkStoreServiceClient
cache chunkCache
metadata *remotesapi.GetRepoMetadataResponse
nbf *types.NomsBinFormat
httpFetcher HTTPFetcher
org string
repoName string
host string
csClient remotesapi.ChunkStoreServiceClient
cache ChunkCache
metadata *remotesapi.GetRepoMetadataResponse
nbf *types.NomsBinFormat
httpFetcher HTTPFetcher
downloadConcurrency int
}
func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
@@ -134,15 +135,23 @@ func NewDoltChunkStore(ctx context.Context, nbf *types.NomsBinFormat, org, repoN
return nil, err
}
return &DoltChunkStore{org, repoName, host, csClient, newMapChunkCache(), metadata, nbf, globalHttpFetcher}, nil
return &DoltChunkStore{org, repoName, host, csClient, newMapChunkCache(), metadata, nbf, globalHttpFetcher, defaultDownloadConcurrency}, nil
}
func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore {
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, dcs.metadata, dcs.nbf, fetcher}
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, dcs.metadata, dcs.nbf, fetcher, dcs.downloadConcurrency}
}
func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, noopChunkCache, dcs.metadata, dcs.nbf, dcs.httpFetcher}
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, noopChunkCache, dcs.metadata, dcs.nbf, dcs.httpFetcher, dcs.downloadConcurrency}
}
func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, cache, dcs.metadata, dcs.nbf, dcs.httpFetcher, dcs.downloadConcurrency}
}
func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency int) *DoltChunkStore {
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, dcs.metadata, dcs.nbf, dcs.httpFetcher, concurrency}
}
func (dcs *DoltChunkStore) getRepoId() *remotesapi.RepoId {
@@ -769,8 +778,8 @@ func aggregateDownloads(aggDistance uint64, resourceGets map[string]*GetRange) [
}
const (
chunkAggDistance = 8 * 1024
maxDownloadConcurrency = 64
chunkAggDistance = 8 * 1024
defaultDownloadConcurrency = 16
)
// creates work functions for each download and executes them in parallel. The work functions write downloaded chunks
@@ -801,7 +810,7 @@ func (dcs *DoltChunkStore) downloadChunks(ctx context.Context, resourceGets map[
}
// execute the work
err := concurrentExec(work, maxDownloadConcurrency)
err := concurrentExec(work, dcs.downloadConcurrency)
return err
}