go/utils/remotesrv: Update for StreamDownloadLocations.

This commit is contained in:
Aaron Son
2021-05-07 09:34:28 -07:00
parent d9f6d850a7
commit fc81b2494c

View File

@@ -17,6 +17,7 @@ package main
import (
"context"
"fmt"
"io"
"log"
"os"
"path/filepath"
@@ -24,6 +25,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
@@ -116,6 +118,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
url, err := rs.getDownloadUrl(logger, org, repoName, loc.String())
if err != nil {
log.Println("Failed to sign request", err)
return nil, err
}
logger("The URL is " + url)
@@ -127,6 +130,64 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
return &remotesapi.GetDownloadLocsResponse{Locs: locs}, nil
}
func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStoreService_StreamDownloadLocationsServer) error {
logger := getReqLogger("GRPC", "StreamDownloadLocations")
defer func() { logger("finished") }()
var repoID *remotesapi.RepoId
var cs *nbs.NomsBlockStore
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
if !proto.Equal(req.RepoId, repoID) {
repoID = req.RepoId
cs = rs.getStore(repoID, "StreamDownloadLoctions")
if cs == nil {
return status.Error(codes.Internal, "Could not get chunkstore")
}
logger(fmt.Sprintf("found repo %s/%s", repoID.Org, repoID.RepoName))
}
org := req.RepoId.Org
repoName := req.RepoId.RepoName
hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes)
locations, err := cs.GetChunkLocations(hashes)
if err != nil {
return err
}
var locs []*remotesapi.DownloadLoc
for loc, hashToRange := range locations {
var ranges []*remotesapi.RangeChunk
for h, r := range hashToRange {
hCpy := h
ranges = append(ranges, &remotesapi.RangeChunk{Hash: hCpy[:], Offset: r.Offset, Length: r.Length})
}
url, err := rs.getDownloadUrl(logger, org, repoName, loc.String())
if err != nil {
log.Println("Failed to sign request", err)
return err
}
logger("The URL is " + url)
getRange := &remotesapi.HttpGetRange{Url: url, Ranges: ranges}
locs = append(locs, &remotesapi.DownloadLoc{Location: &remotesapi.DownloadLoc_HttpGetRange{HttpGetRange: getRange}})
}
if err := stream.Send(&remotesapi.GetDownloadLocsResponse{Locs: locs}); err != nil {
return err
}
}
}
func (rs *RemoteChunkStore) getDownloadUrl(logger func(string), org, repoName, fileId string) (string, error) {
return fmt.Sprintf("http://%s/%s/%s/%s", rs.HttpHost, org, repoName, fileId), nil
}