{dolt/,}go: Update for noms ChunkStore taking context.

This commit is contained in:
Aaron Son
2019-04-24 16:44:43 -07:00
parent 9c47b2c743
commit a8b4290588
3 changed files with 39 additions and 43 deletions

View File

@@ -27,4 +27,5 @@ require (
gopkg.in/square/go-jose.v2 v2.2.2
)
replace github.com/attic-labs/noms => github.com/liquidata-inc/noms v0.0.0-20190408232856-671acb36001e
// replace github.com/attic-labs/noms => github.com/liquidata-inc/noms v0.0.0-20190408232856-671acb36001e
replace github.com/attic-labs/noms => ../../../noms

View File

@@ -33,19 +33,14 @@ type DoltChunkStore struct {
csClient remotesapi.ChunkStoreServiceClient
cache chunkCache
httpFetcher HttpFetcher
context context.Context
}
func NewDoltChunkStore(org, repoName, host string, csClient remotesapi.ChunkStoreServiceClient) *DoltChunkStore {
return &DoltChunkStore{org, repoName, host, csClient, newMapChunkCache(), globalHttpFetcher, context.Background()}
return &DoltChunkStore{org, repoName, host, csClient, newMapChunkCache(), globalHttpFetcher}
}
func (dcs *DoltChunkStore) WithHttpFetcher(fetcher HttpFetcher) *DoltChunkStore {
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, fetcher, dcs.context}
}
func (dcs *DoltChunkStore) WithContext(context context.Context) *DoltChunkStore {
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, dcs.httpFetcher, context}
return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, fetcher}
}
func (dcs *DoltChunkStore) getRepoId() *remotesapi.RepoId {
@@ -57,10 +52,10 @@ func (dcs *DoltChunkStore) getRepoId() *remotesapi.RepoId {
// Get the Chunk for the value of the hash in the store. If the hash is
// absent from the store EmptyChunk is returned.
func (dcs *DoltChunkStore) Get(h hash.Hash) chunks.Chunk {
func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) chunks.Chunk {
hashes := hash.HashSet{h: struct{}{}}
foundChan := make(chan *chunks.Chunk, 1)
dcs.GetMany(hashes, foundChan)
dcs.GetMany(ctx, hashes, foundChan)
select {
case ch := <-foundChan:
@@ -73,7 +68,7 @@ func (dcs *DoltChunkStore) Get(h hash.Hash) chunks.Chunk {
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
func (dcs *DoltChunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
hashToChunk := dcs.cache.Get(hashes)
notCached := make([]hash.Hash, 0, len(hashes))
@@ -88,7 +83,7 @@ func (dcs *DoltChunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks
}
if len(notCached) > 0 {
chnks, err := dcs.readChunksAndCache(notCached)
chnks, err := dcs.readChunksAndCache(ctx, notCached)
if err != nil {
//follow noms convention
@@ -102,17 +97,17 @@ func (dcs *DoltChunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks
}
}
func (dcs *DoltChunkStore) readChunksAndCache(hashes []hash.Hash) ([]chunks.Chunk, error) {
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash.Hash) ([]chunks.Chunk, error) {
// read all from remote and cache and put in known
hashesBytes := HashesToSlices(hashes)
req := remotesapi.GetDownloadLocsRequest{RepoId: dcs.getRepoId(), Hashes: hashesBytes}
resp, err := dcs.csClient.GetDownloadLocations(dcs.context, &req)
resp, err := dcs.csClient.GetDownloadLocations(ctx, &req)
if err != nil {
return nil, NewRpcError(err, "GetDownloadLocations", dcs.host, req)
}
chnks, err := dcs.downloadChunks(resp.Locs)
chnks, err := dcs.downloadChunks(ctx, resp.Locs)
if err != nil {
return nil, err
@@ -125,16 +120,16 @@ func (dcs *DoltChunkStore) readChunksAndCache(hashes []hash.Hash) ([]chunks.Chun
// Returns true iff the value at the address |h| is contained in the
// store
func (dcs *DoltChunkStore) Has(h hash.Hash) bool {
func (dcs *DoltChunkStore) Has(ctx context.Context, h hash.Hash) bool {
hashes := hash.HashSet{h: struct{}{}}
absent := dcs.HasMany(hashes)
absent := dcs.HasMany(ctx, hashes)
return len(absent) == 0
}
// Returns a new HashSet containing any members of |hashes| that are
// absent from the store.
func (dcs *DoltChunkStore) HasMany(hashes hash.HashSet) (absent hash.HashSet) {
func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet) {
notCached := dcs.cache.Has(hashes)
if len(notCached) == 0 {
@@ -143,7 +138,7 @@ func (dcs *DoltChunkStore) HasMany(hashes hash.HashSet) (absent hash.HashSet) {
hashSl, byteSl := HashSetToSlices(notCached)
req := remotesapi.HasChunksRequest{RepoId: dcs.getRepoId(), Hashes: byteSl}
resp, err := dcs.csClient.HasChunks(dcs.context, &req)
resp, err := dcs.csClient.HasChunks(ctx, &req)
if err != nil {
rpcErr := NewRpcError(err, "HasMany", dcs.host, req)
@@ -187,7 +182,7 @@ func (dcs *DoltChunkStore) HasMany(hashes hash.HashSet) (absent hash.HashSet) {
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany().
func (dcs *DoltChunkStore) Put(c chunks.Chunk) {
func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk) {
dcs.cache.Put([]chunks.Chunk{c})
}
@@ -198,9 +193,9 @@ func (dcs *DoltChunkStore) Version() string {
// Rebase brings this ChunkStore into sync with the persistent storage's
// current root.
func (dcs *DoltChunkStore) Rebase() {
func (dcs *DoltChunkStore) Rebase(ctx context.Context) {
req := &remotesapi.RebaseRequest{RepoId: dcs.getRepoId()}
_, err := dcs.csClient.Rebase(dcs.context, req)
_, err := dcs.csClient.Rebase(ctx, req)
if err != nil {
rpcErr := NewRpcError(err, "Rebase", dcs.host, req)
@@ -212,9 +207,9 @@ func (dcs *DoltChunkStore) Rebase() {
// Root returns the root of the database as of the time the ChunkStore
// was opened or the most recent call to Rebase.
func (dcs *DoltChunkStore) Root() hash.Hash {
func (dcs *DoltChunkStore) Root(ctx context.Context) hash.Hash {
req := &remotesapi.RootRequest{RepoId: dcs.getRepoId()}
resp, err := dcs.csClient.Root(dcs.context, req)
resp, err := dcs.csClient.Root(ctx, req)
if err != nil {
rpcErr := NewRpcError(err, "Root", dcs.host, req)
@@ -229,8 +224,8 @@ func (dcs *DoltChunkStore) Root() hash.Hash {
// Commit atomically attempts to persist all novel Chunks and update the
// persisted root hash from last to current (or keeps it the same).
// If last doesn't match the root in persistent storage, returns false.
func (dcs *DoltChunkStore) Commit(current, last hash.Hash) bool {
hashToChunkCount, err := dcs.uploadChunks()
func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) bool {
hashToChunkCount, err := dcs.uploadChunks(ctx)
if err != nil {
// follow noms convention
@@ -243,7 +238,7 @@ func (dcs *DoltChunkStore) Commit(current, last hash.Hash) bool {
}
req := &remotesapi.CommitRequest{RepoId: dcs.getRepoId(), Current: current[:], Last: last[:], ChunkTableInfo: chnkTblInfo}
resp, err := dcs.csClient.Commit(dcs.context, req)
resp, err := dcs.csClient.Commit(ctx, req)
if err != nil {
rpcErr := NewRpcError(err, "Commit", dcs.host, req)
@@ -278,7 +273,7 @@ func (dcs *DoltChunkStore) Close() error {
}
// getting this working using the simplest approach first
func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) {
func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) {
hashToChunk := dcs.cache.GetAndClearChunksToFlush()
if len(hashToChunk) == 0 {
@@ -312,7 +307,7 @@ func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) {
}
req := &remotesapi.GetUploadLocsRequest{RepoId: dcs.getRepoId(), Hashes: hashBytes}
resp, err := dcs.csClient.GetUploadLocations(dcs.context, req)
resp, err := dcs.csClient.GetUploadLocations(ctx, req)
if err != nil {
return map[hash.Hash]int{}, err
@@ -324,7 +319,7 @@ func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) {
data := hashToData[h]
switch typedLoc := loc.Location.(type) {
case *remotesapi.UploadLoc_HttpPost:
err = dcs.httpPostUpload(loc.Hash, typedLoc.HttpPost, data)
err = dcs.httpPostUpload(ctx, loc.Hash, typedLoc.HttpPost, data)
default:
break
}
@@ -337,7 +332,7 @@ func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) {
return hashToCount, nil
}
func (dcs *DoltChunkStore) httpPostUpload(hashBytes []byte, post *remotesapi.HttpPostChunk, data []byte) error {
func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, hashBytes []byte, post *remotesapi.HttpPostChunk, data []byte) error {
//resp, err := http(post.Url, "application/octet-stream", bytes.NewBuffer(data))
req, err := http.NewRequest(http.MethodPut, post.Url, bytes.NewBuffer(data))
if err != nil {
@@ -357,7 +352,7 @@ func (dcs *DoltChunkStore) httpPostUpload(hashBytes []byte, post *remotesapi.Htt
}
// getting this working using the simplest approach first
func (dcs *DoltChunkStore) downloadChunks(locs []*remotesapi.DownloadLoc) ([]chunks.Chunk, error) {
func (dcs *DoltChunkStore) downloadChunks(ctx context.Context, locs []*remotesapi.DownloadLoc) ([]chunks.Chunk, error) {
var allChunks []chunks.Chunk
for _, loc := range locs {
@@ -365,9 +360,9 @@ func (dcs *DoltChunkStore) downloadChunks(locs []*remotesapi.DownloadLoc) ([]chu
var chnks []chunks.Chunk
switch typedLoc := loc.Location.(type) {
case *remotesapi.DownloadLoc_HttpGet:
chnks, err = dcs.httpGetDownload(typedLoc.HttpGet)
chnks, err = dcs.httpGetDownload(ctx, typedLoc.HttpGet)
case *remotesapi.DownloadLoc_HttpGetRange:
chnks, err = dcs.httpGetRangeDownload(typedLoc.HttpGetRange)
chnks, err = dcs.httpGetRangeDownload(ctx, typedLoc.HttpGetRange)
}
if err != nil {
@@ -380,7 +375,7 @@ func (dcs *DoltChunkStore) downloadChunks(locs []*remotesapi.DownloadLoc) ([]chu
return allChunks, nil
}
func (dcs *DoltChunkStore) httpGetDownload(httpGet *remotesapi.HttpGetChunk) ([]chunks.Chunk, error) {
func (dcs *DoltChunkStore) httpGetDownload(ctx context.Context, httpGet *remotesapi.HttpGetChunk) ([]chunks.Chunk, error) {
hashes := httpGet.Hashes
if len(hashes) != 1 {
return nil, errors.New("not supported yet")
@@ -418,7 +413,7 @@ type bytesResult struct {
err error
}
func getRanges(httpFetcher HttpFetcher, url string, rangeChan <-chan *remotesapi.RangeChunk, resultChan chan<- bytesResult, stopChan <-chan struct{}) {
func getRanges(ctx context.Context, httpFetcher HttpFetcher, url string, rangeChan <-chan *remotesapi.RangeChunk, resultChan chan<- bytesResult, stopChan <-chan struct{}) {
for {
select {
case <-stopChan:
@@ -470,7 +465,7 @@ func getRanges(httpFetcher HttpFetcher, url string, rangeChan <-chan *remotesapi
}
}
func (dcs *DoltChunkStore) httpGetRangeDownload(getRange *remotesapi.HttpGetRange) ([]chunks.Chunk, error) {
func (dcs *DoltChunkStore) httpGetRangeDownload(ctx context.Context, getRange *remotesapi.HttpGetRange) ([]chunks.Chunk, error) {
url := getRange.Url
rangeCount := len(getRange.Ranges)
@@ -491,7 +486,7 @@ func (dcs *DoltChunkStore) httpGetRangeDownload(getRange *remotesapi.HttpGetRang
resultChan := make(chan bytesResult, 2*concurrency)
for i := 0; i < concurrency; i++ {
go getRanges(dcs.httpFetcher, url, rangeChan, resultChan, stopChan)
go getRanges(ctx, dcs.httpFetcher, url, rangeChan, resultChan, stopChan)
}
for _, r := range getRange.Ranges {

View File

@@ -42,7 +42,7 @@ func (rs RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasChu
hashes, hashToIndex := remotestorage.ParseByteSlices(req.Hashes)
absent := cs.HasMany(hashes)
absent := cs.HasMany(ctx, hashes)
indices := make([]int32, len(absent))
logger(fmt.Sprintf("missing chunks: %v", indices))
@@ -117,7 +117,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
org := req.RepoId.Org
repoName := req.RepoId.RepoName
hashes, _ := remotestorage.ParseByteSlices(req.Hashes)
absent := cs.HasMany(hashes)
absent := cs.HasMany(ctx, hashes)
var locs []*remotesapi.UploadLoc
for h := range hashes {
@@ -157,7 +157,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName))
err := pantoerr.PanicToError("Rebase failed", func() error {
cs.Rebase()
cs.Rebase(ctx)
return nil
})
@@ -182,7 +182,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
var h hash.Hash
err := pantoerr.PanicToError("Root failed", func() error {
h = cs.Root()
h = cs.Root(ctx)
return nil
})
@@ -220,7 +220,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
var ok bool
err := pantoerr.PanicToError("Commit failed", func() error {
ok = cs.Commit(currHash, lastHash)
ok = cs.Commit(ctx, currHash, lastHash)
return nil
})