From 98804d33c5ffd7eaa49552afc01e1c9260a2ddf7 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 7 May 2021 16:31:27 -0700 Subject: [PATCH] Revert "Merge pull request #1686 from dolthub/aaron/remotestorage-streaming-download-locations" This reverts commit 13b2ebc080d086ec8383869e1eb84076be169a01, reversing changes made to 9c6d31b1f3176604a4feddd5ba2d54c670ec8c04. --- .../doltcore/remotestorage/chunk_store.go | 93 +++++++------------ go/utils/remotesrv/grpc.go | 61 ------------ 2 files changed, 35 insertions(+), 119 deletions(-) diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 2377f986d1..0448b67082 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -249,7 +249,8 @@ func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.Ha } const ( - getLocsBatchSize = (4 * 1024) / 20 + getLocsBatchSize = 32 * 1024 + getLocsMaxConcurrency = 4 ) type GetRange remotesapi.HttpGetRange @@ -350,14 +351,10 @@ func (gr *GetRange) GetDownloadFunc(ctx context.Context, fetcher HTTPFetcher, ch } func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (map[string]*GetRange, error) { - span, ctx := tracing.StartSpan(ctx, "remotestorage.getDLLocs") - span.LogKV("num_hashes", len(hashes)) - defer span.Finish() - res := make(map[string]*GetRange) // channel for receiving results from go routines making grpc calls to get download locations for chunks - resCh := make(chan []*remotesapi.HttpGetRange) + dlLocChan := make(chan []*remotesapi.HttpGetRange) eg, ctx := errgroup.WithContext(ctx) @@ -365,7 +362,7 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (m eg.Go(func() error { for { select { - case locs, ok := <-resCh: + case locs, ok := <-dlLocChan: if !ok { return nil } @@ -383,67 +380,47 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (m } }) - // go routine for batching the get location requests, streaming the requests and streaming the responses. - eg.Go(func() error { - var reqs []*remotesapi.GetDownloadLocsRequest - hashesBytes := HashesToSlices(hashes) - batchItr(len(hashesBytes), getLocsBatchSize, func(st, end int) (stop bool) { - batch := hashesBytes[st:end] + hashesBytes := HashesToSlices(hashes) + var work []func() error + + // batchItr creates work functions which request a batch of chunk download locations and write the results to the + // dlLocChan + batchItr(len(hashesBytes), getLocsBatchSize, func(st, end int) (stop bool) { + batch := hashesBytes[st:end] + f := func() error { req := &remotesapi.GetDownloadLocsRequest{RepoId: dcs.getRepoId(), ChunkHashes: batch} - reqs = append(reqs, req) - return false - }) - op := func() error { - stream, err := dcs.csClient.StreamDownloadLocations(ctx) + resp, err := dcs.csClient.GetDownloadLocations(ctx, req) if err != nil { - return NewRpcError(err, "StreamDownloadLocations", dcs.host, nil) + return NewRpcError(err, "GetDownloadLocations", dcs.host, req) } - seg, ctx := errgroup.WithContext(ctx) - completedReqs := 0 - // Write requests - seg.Go(func() error { - for i := range reqs { - if err := stream.Send(reqs[i]); err != nil { - return NewRpcError(err, "StreamDownloadLocations", dcs.host, reqs[i]) - } - } - return stream.CloseSend() - }) - // Read responses - seg.Go(func() error { - for { - resp, err := stream.Recv() - if err != nil { - if err == io.EOF { - return nil - } - return NewRpcError(err, "StreamDownloadLocations", dcs.host, reqs[completedReqs]) - } - tosend := make([]*remotesapi.HttpGetRange, len(resp.Locs)) - for i, l := range resp.Locs { - tosend[i] = l.Location.(*remotesapi.DownloadLoc_HttpGetRange).HttpGetRange - } - select { - case resCh <- tosend: - completedReqs += 1 - case <-ctx.Done(): - return ctx.Err() - } - } - }) - err = seg.Wait() - reqs = reqs[completedReqs:] - if len(reqs) == 0 { - close(resCh) + tosend := make([]*remotesapi.HttpGetRange, len(resp.Locs)) + for i, l := range resp.Locs { + tosend[i] = l.Location.(*remotesapi.DownloadLoc_HttpGetRange).HttpGetRange } - return processGrpcErr(err) + select { + case dlLocChan <- tosend: + case <-ctx.Done(): + } + return nil } - return backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries)) + work = append(work, f) + return false + }) + + span, ctx := tracing.StartSpan(ctx, "remotestorage.getDLLocs") + span.LogKV("num_batches", len(work), "num_hashes", len(hashes)) + defer span.Finish() + + // execute the work and close the channel after as no more results will come in + eg.Go(func() error { + defer close(dlLocChan) + return concurrentExec(work, getLocsMaxConcurrency) }) if err := eg.Wait(); err != nil { return nil, err } + return res, nil } diff --git a/go/utils/remotesrv/grpc.go b/go/utils/remotesrv/grpc.go index c14e0adb76..7cf4443f1e 100644 --- a/go/utils/remotesrv/grpc.go +++ b/go/utils/remotesrv/grpc.go @@ -17,7 +17,6 @@ package main import ( "context" "fmt" - "io" "log" "os" "path/filepath" @@ -25,7 +24,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/proto" remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" @@ -118,7 +116,6 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot url, err := rs.getDownloadUrl(logger, org, repoName, loc.String()) if err != nil { log.Println("Failed to sign request", err) - return nil, err } logger("The URL is " + url) @@ -130,64 +127,6 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot return &remotesapi.GetDownloadLocsResponse{Locs: locs}, nil } -func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStoreService_StreamDownloadLocationsServer) error { - logger := getReqLogger("GRPC", "StreamDownloadLocations") - defer func() { logger("finished") }() - - var repoID *remotesapi.RepoId - var cs *nbs.NomsBlockStore - for { - req, err := stream.Recv() - if err != nil { - if err == io.EOF { - return nil - } - return err - } - - if !proto.Equal(req.RepoId, repoID) { - repoID = req.RepoId - cs = rs.getStore(repoID, "StreamDownloadLoctions") - if cs == nil { - return status.Error(codes.Internal, "Could not get chunkstore") - } - logger(fmt.Sprintf("found repo %s/%s", repoID.Org, repoID.RepoName)) - } - - org := req.RepoId.Org - repoName := req.RepoId.RepoName - hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes) - locations, err := cs.GetChunkLocations(hashes) - if err != nil { - return err - } - - var locs []*remotesapi.DownloadLoc - for loc, hashToRange := range locations { - var ranges []*remotesapi.RangeChunk - for h, r := range hashToRange { - hCpy := h - ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length}) - } - - url, err := rs.getDownloadUrl(logger, org, repoName, loc.String()) - if err != nil { - log.Println("Failed to sign request", err) - return err - } - - logger("The URL is " + url) - - getRange := &remotesapi.HttpGetRange{Url: url, Ranges: ranges} - locs = append(locs, &remotesapi.DownloadLoc{Location: &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: getRange}}) - } - - if err := stream.Send(&remotesapi.GetDownloadLocsResponse{Locs: locs}); err != nil { - return err - } - } -} - func (rs *RemoteChunkStore) getDownloadUrl(logger func(string), org, repoName, fileId string) (string, error) { return fmt.Sprintf("http://%s/%s/%s/%s", rs.HttpHost, org, repoName, fileId), nil }