From fc81b2494c75ece5e868cbb9b2d6f8635ccaa454 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 7 May 2021 09:34:28 -0700 Subject: [PATCH] go/utils/remotesrv: Update for StreamDownloadLocations. --- go/utils/remotesrv/grpc.go | 61 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/go/utils/remotesrv/grpc.go b/go/utils/remotesrv/grpc.go index 7cf4443f1e..c14e0adb76 100644 --- a/go/utils/remotesrv/grpc.go +++ b/go/utils/remotesrv/grpc.go @@ -17,6 +17,7 @@ package main import ( "context" "fmt" + "io" "log" "os" "path/filepath" @@ -24,6 +25,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1" "github.com/dolthub/dolt/go/libraries/doltcore/remotestorage" @@ -116,6 +118,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot url, err := rs.getDownloadUrl(logger, org, repoName, loc.String()) if err != nil { log.Println("Failed to sign request", err) + return nil, err } logger("The URL is " + url) @@ -127,6 +130,64 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot return &remotesapi.GetDownloadLocsResponse{Locs: locs}, nil } +func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStoreService_StreamDownloadLocationsServer) error { + logger := getReqLogger("GRPC", "StreamDownloadLocations") + defer func() { logger("finished") }() + + var repoID *remotesapi.RepoId + var cs *nbs.NomsBlockStore + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + if !proto.Equal(req.RepoId, repoID) { + repoID = req.RepoId + cs = rs.getStore(repoID, "StreamDownloadLoctions") + if cs == nil { + return status.Error(codes.Internal, "Could not get chunkstore") + } + logger(fmt.Sprintf("found repo %s/%s", repoID.Org, repoID.RepoName)) + } + + org := req.RepoId.Org + repoName := req.RepoId.RepoName + hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes) + locations, err := cs.GetChunkLocations(hashes) + if err != nil { + return err + } + + var locs []*remotesapi.DownloadLoc + for loc, hashToRange := range locations { + var ranges []*remotesapi.RangeChunk + for h, r := range hashToRange { + hCpy := h + ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length}) + } + + url, err := rs.getDownloadUrl(logger, org, repoName, loc.String()) + if err != nil { + log.Println("Failed to sign request", err) + return err + } + + logger("The URL is " + url) + + getRange := &remotesapi.HttpGetRange{Url: url, Ranges: ranges} + locs = append(locs, &remotesapi.DownloadLoc{Location: &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: getRange}}) + } + + if err := stream.Send(&remotesapi.GetDownloadLocsResponse{Locs: locs}); err != nil { + return err + } + } +} + func (rs *RemoteChunkStore) getDownloadUrl(logger func(string), org, repoName, fileId string) (string, error) { return fmt.Sprintf("http://%s/%s/%s/%s", rs.HttpHost, org, repoName, fileId), nil }