diff --git a/go/go.mod b/go/go.mod index e4fb54032e..85df2e0d0c 100644 --- a/go/go.mod +++ b/go/go.mod @@ -27,4 +27,5 @@ require ( gopkg.in/square/go-jose.v2 v2.2.2 ) -replace github.com/attic-labs/noms => github.com/liquidata-inc/noms v0.0.0-20190408232856-671acb36001e +// replace github.com/attic-labs/noms => github.com/liquidata-inc/noms v0.0.0-20190408232856-671acb36001e +replace github.com/attic-labs/noms => ../../../noms diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 85387ea7ad..10d459c90a 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -33,19 +33,14 @@ type DoltChunkStore struct { csClient remotesapi.ChunkStoreServiceClient cache chunkCache httpFetcher HttpFetcher - context context.Context } func NewDoltChunkStore(org, repoName, host string, csClient remotesapi.ChunkStoreServiceClient) *DoltChunkStore { - return &DoltChunkStore{org, repoName, host, csClient, newMapChunkCache(), globalHttpFetcher, context.Background()} + return &DoltChunkStore{org, repoName, host, csClient, newMapChunkCache(), globalHttpFetcher} } func (dcs *DoltChunkStore) WithHttpFetcher(fetcher HttpFetcher) *DoltChunkStore { - return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, fetcher, dcs.context} -} - -func (dcs *DoltChunkStore) WithContext(context context.Context) *DoltChunkStore { - return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, dcs.httpFetcher, context} + return &DoltChunkStore{dcs.org, dcs.repoName, dcs.host, dcs.csClient, dcs.cache, fetcher} } func (dcs *DoltChunkStore) getRepoId() *remotesapi.RepoId { @@ -57,10 +52,10 @@ func (dcs *DoltChunkStore) getRepoId() *remotesapi.RepoId { // Get the Chunk for the value of the hash in the store. If the hash is // absent from the store EmptyChunk is returned. -func (dcs *DoltChunkStore) Get(h hash.Hash) chunks.Chunk { +func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) chunks.Chunk { hashes := hash.HashSet{h: struct{}{}} foundChan := make(chan *chunks.Chunk, 1) - dcs.GetMany(hashes, foundChan) + dcs.GetMany(ctx, hashes, foundChan) select { case ch := <-foundChan: @@ -73,7 +68,7 @@ func (dcs *DoltChunkStore) Get(h hash.Hash) chunks.Chunk { // GetMany gets the Chunks with |hashes| from the store. On return, // |foundChunks| will have been fully sent all chunks which have been // found. Any non-present chunks will silently be ignored. -func (dcs *DoltChunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) { +func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) { hashToChunk := dcs.cache.Get(hashes) notCached := make([]hash.Hash, 0, len(hashes)) @@ -88,7 +83,7 @@ func (dcs *DoltChunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks } if len(notCached) > 0 { - chnks, err := dcs.readChunksAndCache(notCached) + chnks, err := dcs.readChunksAndCache(ctx, notCached) if err != nil { //follow noms convention @@ -102,17 +97,17 @@ func (dcs *DoltChunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks } } -func (dcs *DoltChunkStore) readChunksAndCache(hashes []hash.Hash) ([]chunks.Chunk, error) { +func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes []hash.Hash) ([]chunks.Chunk, error) { // read all from remote and cache and put in known hashesBytes := HashesToSlices(hashes) req := remotesapi.GetDownloadLocsRequest{RepoId: dcs.getRepoId(), Hashes: hashesBytes} - resp, err := dcs.csClient.GetDownloadLocations(dcs.context, &req) + resp, err := dcs.csClient.GetDownloadLocations(ctx, &req) if err != nil { return nil, NewRpcError(err, "GetDownloadLocations", dcs.host, req) } - chnks, err := dcs.downloadChunks(resp.Locs) + chnks, err := dcs.downloadChunks(ctx, resp.Locs) if err != nil { return nil, err @@ -125,16 +120,16 @@ func (dcs *DoltChunkStore) readChunksAndCache(hashes []hash.Hash) ([]chunks.Chun // Returns true iff the value at the address |h| is contained in the // store -func (dcs *DoltChunkStore) Has(h hash.Hash) bool { +func (dcs *DoltChunkStore) Has(ctx context.Context, h hash.Hash) bool { hashes := hash.HashSet{h: struct{}{}} - absent := dcs.HasMany(hashes) + absent := dcs.HasMany(ctx, hashes) return len(absent) == 0 } // Returns a new HashSet containing any members of |hashes| that are // absent from the store. -func (dcs *DoltChunkStore) HasMany(hashes hash.HashSet) (absent hash.HashSet) { +func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet) { notCached := dcs.cache.Has(hashes) if len(notCached) == 0 { @@ -143,7 +138,7 @@ func (dcs *DoltChunkStore) HasMany(hashes hash.HashSet) (absent hash.HashSet) { hashSl, byteSl := HashSetToSlices(notCached) req := remotesapi.HasChunksRequest{RepoId: dcs.getRepoId(), Hashes: byteSl} - resp, err := dcs.csClient.HasChunks(dcs.context, &req) + resp, err := dcs.csClient.HasChunks(ctx, &req) if err != nil { rpcErr := NewRpcError(err, "HasMany", dcs.host, req) @@ -187,7 +182,7 @@ func (dcs *DoltChunkStore) HasMany(hashes hash.HashSet) (absent hash.HashSet) { // subsequent Get and Has calls, but must not be persistent until a call // to Flush(). Put may be called concurrently with other calls to Put(), // Get(), GetMany(), Has() and HasMany(). -func (dcs *DoltChunkStore) Put(c chunks.Chunk) { +func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk) { dcs.cache.Put([]chunks.Chunk{c}) } @@ -198,9 +193,9 @@ func (dcs *DoltChunkStore) Version() string { // Rebase brings this ChunkStore into sync with the persistent storage's // current root. -func (dcs *DoltChunkStore) Rebase() { +func (dcs *DoltChunkStore) Rebase(ctx context.Context) { req := &remotesapi.RebaseRequest{RepoId: dcs.getRepoId()} - _, err := dcs.csClient.Rebase(dcs.context, req) + _, err := dcs.csClient.Rebase(ctx, req) if err != nil { rpcErr := NewRpcError(err, "Rebase", dcs.host, req) @@ -212,9 +207,9 @@ func (dcs *DoltChunkStore) Rebase() { // Root returns the root of the database as of the time the ChunkStore // was opened or the most recent call to Rebase. -func (dcs *DoltChunkStore) Root() hash.Hash { +func (dcs *DoltChunkStore) Root(ctx context.Context) hash.Hash { req := &remotesapi.RootRequest{RepoId: dcs.getRepoId()} - resp, err := dcs.csClient.Root(dcs.context, req) + resp, err := dcs.csClient.Root(ctx, req) if err != nil { rpcErr := NewRpcError(err, "Root", dcs.host, req) @@ -229,8 +224,8 @@ func (dcs *DoltChunkStore) Root() hash.Hash { // Commit atomically attempts to persist all novel Chunks and update the // persisted root hash from last to current (or keeps it the same). // If last doesn't match the root in persistent storage, returns false. -func (dcs *DoltChunkStore) Commit(current, last hash.Hash) bool { - hashToChunkCount, err := dcs.uploadChunks() +func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash) bool { + hashToChunkCount, err := dcs.uploadChunks(ctx) if err != nil { // follow noms convention @@ -243,7 +238,7 @@ func (dcs *DoltChunkStore) Commit(current, last hash.Hash) bool { } req := &remotesapi.CommitRequest{RepoId: dcs.getRepoId(), Current: current[:], Last: last[:], ChunkTableInfo: chnkTblInfo} - resp, err := dcs.csClient.Commit(dcs.context, req) + resp, err := dcs.csClient.Commit(ctx, req) if err != nil { rpcErr := NewRpcError(err, "Commit", dcs.host, req) @@ -278,7 +273,7 @@ func (dcs *DoltChunkStore) Close() error { } // getting this working using the simplest approach first -func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) { +func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int, error) { hashToChunk := dcs.cache.GetAndClearChunksToFlush() if len(hashToChunk) == 0 { @@ -312,7 +307,7 @@ func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) { } req := &remotesapi.GetUploadLocsRequest{RepoId: dcs.getRepoId(), Hashes: hashBytes} - resp, err := dcs.csClient.GetUploadLocations(dcs.context, req) + resp, err := dcs.csClient.GetUploadLocations(ctx, req) if err != nil { return map[hash.Hash]int{}, err @@ -324,7 +319,7 @@ func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) { data := hashToData[h] switch typedLoc := loc.Location.(type) { case *remotesapi.UploadLoc_HttpPost: - err = dcs.httpPostUpload(loc.Hash, typedLoc.HttpPost, data) + err = dcs.httpPostUpload(ctx, loc.Hash, typedLoc.HttpPost, data) default: break } @@ -337,7 +332,7 @@ func (dcs *DoltChunkStore) uploadChunks() (map[hash.Hash]int, error) { return hashToCount, nil } -func (dcs *DoltChunkStore) httpPostUpload(hashBytes []byte, post *remotesapi.HttpPostChunk, data []byte) error { +func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, hashBytes []byte, post *remotesapi.HttpPostChunk, data []byte) error { //resp, err := http(post.Url, "application/octet-stream", bytes.NewBuffer(data)) req, err := http.NewRequest(http.MethodPut, post.Url, bytes.NewBuffer(data)) if err != nil { @@ -357,7 +352,7 @@ func (dcs *DoltChunkStore) httpPostUpload(hashBytes []byte, post *remotesapi.Htt } // getting this working using the simplest approach first -func (dcs *DoltChunkStore) downloadChunks(locs []*remotesapi.DownloadLoc) ([]chunks.Chunk, error) { +func (dcs *DoltChunkStore) downloadChunks(ctx context.Context, locs []*remotesapi.DownloadLoc) ([]chunks.Chunk, error) { var allChunks []chunks.Chunk for _, loc := range locs { @@ -365,9 +360,9 @@ func (dcs *DoltChunkStore) downloadChunks(locs []*remotesapi.DownloadLoc) ([]chu var chnks []chunks.Chunk switch typedLoc := loc.Location.(type) { case *remotesapi.DownloadLoc_HttpGet: - chnks, err = dcs.httpGetDownload(typedLoc.HttpGet) + chnks, err = dcs.httpGetDownload(ctx, typedLoc.HttpGet) case *remotesapi.DownloadLoc_HttpGetRange: - chnks, err = dcs.httpGetRangeDownload(typedLoc.HttpGetRange) + chnks, err = dcs.httpGetRangeDownload(ctx, typedLoc.HttpGetRange) } if err != nil { @@ -380,7 +375,7 @@ func (dcs *DoltChunkStore) downloadChunks(locs []*remotesapi.DownloadLoc) ([]chu return allChunks, nil } -func (dcs *DoltChunkStore) httpGetDownload(httpGet *remotesapi.HttpGetChunk) ([]chunks.Chunk, error) { +func (dcs *DoltChunkStore) httpGetDownload(ctx context.Context, httpGet *remotesapi.HttpGetChunk) ([]chunks.Chunk, error) { hashes := httpGet.Hashes if len(hashes) != 1 { return nil, errors.New("not supported yet") @@ -418,7 +413,7 @@ type bytesResult struct { err error } -func getRanges(httpFetcher HttpFetcher, url string, rangeChan <-chan *remotesapi.RangeChunk, resultChan chan<- bytesResult, stopChan <-chan struct{}) { +func getRanges(ctx context.Context, httpFetcher HttpFetcher, url string, rangeChan <-chan *remotesapi.RangeChunk, resultChan chan<- bytesResult, stopChan <-chan struct{}) { for { select { case <-stopChan: @@ -470,7 +465,7 @@ func getRanges(httpFetcher HttpFetcher, url string, rangeChan <-chan *remotesapi } } -func (dcs *DoltChunkStore) httpGetRangeDownload(getRange *remotesapi.HttpGetRange) ([]chunks.Chunk, error) { +func (dcs *DoltChunkStore) httpGetRangeDownload(ctx context.Context, getRange *remotesapi.HttpGetRange) ([]chunks.Chunk, error) { url := getRange.Url rangeCount := len(getRange.Ranges) @@ -491,7 +486,7 @@ func (dcs *DoltChunkStore) httpGetRangeDownload(getRange *remotesapi.HttpGetRang resultChan := make(chan bytesResult, 2*concurrency) for i := 0; i < concurrency; i++ { - go getRanges(dcs.httpFetcher, url, rangeChan, resultChan, stopChan) + go getRanges(ctx, dcs.httpFetcher, url, rangeChan, resultChan, stopChan) } for _, r := range getRange.Ranges { diff --git a/go/utils/remotesrv/grpc.go b/go/utils/remotesrv/grpc.go index e4a79981f6..a9a3f9b9ff 100644 --- a/go/utils/remotesrv/grpc.go +++ b/go/utils/remotesrv/grpc.go @@ -42,7 +42,7 @@ func (rs RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasChu hashes, hashToIndex := remotestorage.ParseByteSlices(req.Hashes) - absent := cs.HasMany(hashes) + absent := cs.HasMany(ctx, hashes) indices := make([]int32, len(absent)) logger(fmt.Sprintf("missing chunks: %v", indices)) @@ -117,7 +117,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes org := req.RepoId.Org repoName := req.RepoId.RepoName hashes, _ := remotestorage.ParseByteSlices(req.Hashes) - absent := cs.HasMany(hashes) + absent := cs.HasMany(ctx, hashes) var locs []*remotesapi.UploadLoc for h := range hashes { @@ -157,7 +157,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)) err := pantoerr.PanicToError("Rebase failed", func() error { - cs.Rebase() + cs.Rebase(ctx) return nil }) @@ -182,7 +182,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques var h hash.Hash err := pantoerr.PanicToError("Root failed", func() error { - h = cs.Root() + h = cs.Root(ctx) return nil }) @@ -220,7 +220,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe var ok bool err := pantoerr.PanicToError("Commit failed", func() error { - ok = cs.Commit(currHash, lastHash) + ok = cs.Commit(ctx, currHash, lastHash) return nil })