go: remotesrv: Use RepoPath when it is provided.

This commit is contained in:
Aaron Son
2022-09-21 11:55:58 -07:00
parent 73f922493e
commit 91a09258ec
8 changed files with 95 additions and 79 deletions

View File

@@ -21,7 +21,7 @@ import (
)
type DBCache interface {
Get(org, repo, nbfVerStr string) (RemoteSrvStore, error)
Get(path, nbfVerStr string) (RemoteSrvStore, error)
}
type RemoteSrvStore interface {

View File

@@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
@@ -60,17 +59,33 @@ func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCac
}
}
type repoRequest interface {
GetRepoId() *remotesapi.RepoId
GetRepoPath() string
}
func getRepoPath(req repoRequest) string {
if req.GetRepoPath() != "" {
return req.GetRepoPath()
}
if req.GetRepoId() != nil {
return req.GetRepoId().Org + "/" + req.GetRepoId().RepoName
}
return ""
}
func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasChunksRequest) (*remotesapi.HasChunksResponse, error) {
logger := getReqLogger(rs.lgr, "HasChunks")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
logger.Printf("found repo %s", repoPath)
hashes, hashToIndex := remotestorage.ParseByteSlices(req.Hashes)
@@ -117,13 +132,14 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
logger := getReqLogger(rs.lgr, "GetDownloadLocations")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
logger.Printf("found repo %s", repoPath)
hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes)
@@ -168,7 +184,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
md, _ := metadata.FromIncomingContext(stream.Context())
var repoID *remotesapi.RepoId
var repoPath string
var cs RemoteSrvStore
var prefix string
for {
@@ -180,13 +196,14 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
return err
}
if !proto.Equal(req.RepoId, repoID) {
repoID = req.RepoId
cs = rs.getStore(logger, repoID)
nextPath := getRepoPath(req)
if nextPath != repoPath {
repoPath = nextPath
cs = rs.getStore(logger, repoPath)
if cs == nil {
return status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found repo %s/%s", repoID.Org, repoID.RepoName)
logger.Printf("found repo %s", repoPath)
prefix, err = rs.getRelativeStorePath(cs)
if err != nil {
@@ -228,12 +245,12 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
func (rs *RemoteChunkStore) getHost(md metadata.MD) string {
host := rs.HttpHost
if strings.HasPrefix(rs.HttpHost, ":") && rs.HttpHost != ":80" {
if strings.HasPrefix(rs.HttpHost, ":") {
hosts := md.Get(":authority")
if len(hosts) > 0 {
host = strings.Split(hosts[0], ":")[0] + rs.HttpHost
}
} else if rs.HttpHost == "" || rs.HttpHost == ":80" {
} else if rs.HttpHost == "" {
hosts := md.Get(":authority")
if len(hosts) > 0 {
host = hosts[0]
@@ -274,16 +291,15 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
logger := getReqLogger(rs.lgr, "GetUploadLocations")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
logger.Printf("found repo %s", repoPath)
org := req.RepoId.Org
repoName := req.RepoId.RepoName
tfds := parseTableFileDetails(req)
md, _ := metadata.FromIncomingContext(ctx)
@@ -291,7 +307,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
var locs []*remotesapi.UploadLoc
for _, tfd := range tfds {
h := hash.New(tfd.Id)
url, err := rs.getUploadUrl(logger, md, org, repoName, tfd)
url, err := rs.getUploadUrl(logger, md, repoPath, tfd)
if err != nil {
return nil, status.Error(codes.Internal, "Failed to get upload Url.")
@@ -306,7 +322,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
return &remotesapi.GetUploadLocsResponse{Locs: locs}, nil
}
func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, org, repoName string, tfd *remotesapi.TableFileDetails) (string, error) {
func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, repoPath string, tfd *remotesapi.TableFileDetails) (string, error) {
fileID := hash.New(tfd.Id).String()
params := url.Values{}
params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks)))
@@ -315,7 +331,7 @@ func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, o
return (&url.URL{
Scheme: "http",
Host: rs.getHost(md),
Path: fmt.Sprintf("%s/%s/%s", org, repoName, fileID),
Path: fmt.Sprintf("%s/%s", repoPath, fileID),
RawQuery: params.Encode(),
}).String(), nil
}
@@ -324,18 +340,19 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
logger := getReqLogger(rs.lgr, "Rebase")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
logger.Printf("found %s", repoPath)
err := cs.Rebase(ctx)
if err != nil {
logger.Printf("error occurred during processing of Rebace rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err)
logger.Printf("error occurred during processing of Rebace rpc of %s details: %v", repoPath, err)
return nil, status.Errorf(codes.Internal, "failed to rebase: %v", err)
}
@@ -346,7 +363,8 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
logger := getReqLogger(rs.lgr, "Root")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
@@ -355,7 +373,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
h, err := cs.Root(ctx)
if err != nil {
logger.Printf("error occurred during processing of Root rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err)
logger.Printf("error occurred during processing of Root rpc of %s details: %v", repoPath, err)
return nil, status.Error(codes.Internal, "Failed to get root")
}
@@ -366,13 +384,14 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
logger := getReqLogger(rs.lgr, "Commit")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
logger.Printf("found %s", repoPath)
//should validate
updates := make(map[string]int)
@@ -394,11 +413,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
ok, err = cs.Commit(ctx, currHash, lastHash)
if err != nil {
logger.Printf("error occurred during processing of Commit of %s/%s last %s curr: %s details: %v", req.RepoId.Org, req.RepoId.RepoName, lastHash.String(), currHash.String(), err)
logger.Printf("error occurred during processing of Commit of %s last %s curr: %s details: %v", repoPath, lastHash.String(), currHash.String(), err)
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
}
logger.Printf("committed %s/%s moved from %s -> %s", req.RepoId.Org, req.RepoId.RepoName, currHash.String(), lastHash.String())
logger.Printf("committed %s moved from %s -> %s", repoPath, lastHash.String(), currHash.String())
return &remotesapi.CommitResponse{Success: ok}, nil
}
@@ -406,7 +425,8 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
logger := getReqLogger(rs.lgr, "GetRepoMetadata")
defer func() { logger.Println("finished") }()
cs := rs.getOrCreateStore(logger, req.RepoId, req.ClientRepoFormat.NbfVersion)
repoPath := getRepoPath(req)
cs := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
@@ -432,13 +452,14 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
logger := getReqLogger(rs.lgr, "ListTableFiles")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
logger.Printf("found repo %s", repoPath)
root, tables, appendixTables, err := cs.Sources(ctx)
@@ -500,13 +521,14 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
logger := getReqLogger(rs.lgr, "AddTableFiles")
defer func() { logger.Println("finished") }()
cs := rs.getStore(logger, req.RepoId)
repoPath := getRepoPath(req)
cs := rs.getStore(logger, repoPath)
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
logger.Printf("found %s", repoPath)
// should validate
updates := make(map[string]int)
@@ -524,18 +546,15 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
return &remotesapi.AddTableFilesResponse{Success: true}, nil
}
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoId *remotesapi.RepoId) RemoteSrvStore {
return rs.getOrCreateStore(logger, repoId, types.Format_Default.VersionString())
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) RemoteSrvStore {
return rs.getOrCreateStore(logger, repoPath, types.Format_Default.VersionString())
}
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoId *remotesapi.RepoId, nbfVerStr string) RemoteSrvStore {
org := repoId.Org
repoName := repoId.RepoName
cs, err := rs.csCache.Get(org, repoName, nbfVerStr)
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) RemoteSrvStore {
cs, err := rs.csCache.Get(repoPath, nbfVerStr)
if err != nil {
logger.Printf("Failed to retrieve chunkstore for %s/%s\n", org, repoName)
logger.Printf("Failed to retrieve chunkstore for %s\n", repoPath)
}
return cs

View File

@@ -100,16 +100,15 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
return
}
tokens := strings.Split(path, "/")
if len(tokens) != 3 {
i := strings.LastIndex(path, "/")
if i < 0 || len(path[i:]) != 33 {
logger.Printf("response to: %v method: %v http response code: %v", req.RequestURI, req.Method, http.StatusNotFound)
respWr.WriteHeader(http.StatusNotFound)
return
}
org := tokens[0]
repo := tokens[1]
file := tokens[2]
filepath := path[:i]
file := path[i+1:]
q := req.URL.Query()
ncs := q.Get("num_chunks")
@@ -149,7 +148,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
return
}
statusCode = writeTableFile(req.Context(), logger, fh.dbCache, org, repo, file, num_chunks, content_hash, uint64(content_length), req.Body)
statusCode = writeTableFile(req.Context(), logger, fh.dbCache, filepath, file, num_chunks, content_hash, uint64(content_length), req.Body)
}
if statusCode != -1 {
@@ -248,7 +247,7 @@ func (u *uploadreader) Close() error {
return nil
}
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, org, repo, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) int {
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) int {
_, ok := hash.MaybeParse(fileId)
if !ok {
logger.Println(fileId, "is not a valid hash")
@@ -257,9 +256,9 @@ func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache,
logger.Println(fileId, "is valid")
cs, err := dbCache.Get(org, repo, types.Format_Default.VersionString())
cs, err := dbCache.Get(path, types.Format_Default.VersionString())
if err != nil {
logger.Println("failed to get", org+"/"+repo, "repository:", err.Error())
logger.Println("failed to get", path, "repository:", err.Error())
return http.StatusInternalServerError
}

View File

@@ -120,7 +120,8 @@ type DoltChunkStore struct {
func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
var repoId *remotesapi.RepoId
tokens := strings.Split(strings.Trim(path, "/"), "/")
path = strings.Trim(path, "/")
tokens := strings.Split(path, "/")
if len(tokens) == 2 {
org := tokens[0]
repoName := tokens[1]
@@ -1263,7 +1264,7 @@ func (dcs *DoltChunkStore) AddTableFilesToManifest(ctx context.Context, fileIdTo
}
id, token := dcs.getRepoId()
dcs.logf("Adding Table files to repo: %s/%s -\n%s", id.Org, id.RepoName, debugStr)
dcs.logf("Adding Table files to repo: %s -\n%s", dcs.repoPath, debugStr)
atReq := &remotesapi.AddTableFilesRequest{
RepoId: id,
RepoToken: token,

View File

@@ -32,9 +32,9 @@ type remotesrvStore struct {
var _ remotesrv.DBCache = remotesrvStore{}
func (s remotesrvStore) Get(org, repo, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
sess := dsess.DSessFromSess(s.ctx.Session)
db, err := sess.Provider().Database(s.ctx, repo)
db, err := sess.Provider().Database(s.ctx, path)
if err != nil {
return nil, err
}

View File

@@ -43,32 +43,28 @@ func NewLocalCSCache(filesys filesys.Filesys) *LocalCSCache {
}
}
func (cache *LocalCSCache) Get(org, repo, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
func (cache *LocalCSCache) Get(repopath, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
cache.mu.Lock()
defer cache.mu.Unlock()
id := filepath.Join(org, repo)
id := filepath.FromSlash(repopath)
if cs, ok := cache.dbs[id]; ok {
return cs, nil
}
var newCS *nbs.NomsBlockStore
if cache.fs != nil {
err := cache.fs.MkDirs(id)
if err != nil {
return nil, err
}
path, err := cache.fs.Abs(id)
if err != nil {
return nil, err
}
err := cache.fs.MkDirs(id)
if err != nil {
return nil, err
}
path, err := cache.fs.Abs(id)
if err != nil {
return nil, err
}
newCS, err = nbs.NewLocalStore(context.TODO(), nbfVerStr, path, defaultMemTableSize, nbs.NewUnlimitedMemQuotaProvider())
if err != nil {
return nil, err
}
newCS, err := nbs.NewLocalStore(context.TODO(), nbfVerStr, path, defaultMemTableSize, nbs.NewUnlimitedMemQuotaProvider())
if err != nil {
return nil, err
}
cache.dbs[id] = newCS
@@ -80,6 +76,6 @@ type SingletonCSCache struct {
s remotesrv.RemoteSrvStore
}
func (cache SingletonCSCache) Get(org, repo, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
func (cache SingletonCSCache) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
return cache.s, nil
}

View File

@@ -54,6 +54,7 @@ func main() {
*httpHostParam = fmt.Sprintf("%s:%d", *httpHostParam, *httpPortParam)
} else {
*httpPortParam = 80
*httpHostParam = ":80"
log.Println("'http-port' parameter not provided. Using default port 80")
}

View File

@@ -35,7 +35,7 @@ teardown() {
srv_pid=$!
cd ../
dolt clone http://localhost:50051/ignored_named/remote repo1
dolt clone http://localhost:50051/remote repo1
cd repo1
run dolt ls
[[ "$output" =~ "vals" ]] || false
@@ -62,7 +62,7 @@ SQL
cd ../
# By cloning here, we have a near-at-hand way to wait for the server to be ready.
dolt clone http://localhost:50051/ignored_named/remote cloned_remote
dolt clone http://localhost:50051/remote cloned_remote
dolt sql-client -u root <<SQL
create database created;
@@ -73,7 +73,7 @@ call dolt_add('vals');
call dolt_commit('-m', 'add some vals');
SQL
dolt clone http://localhost:50051/ignored_named/created cloned_created
dolt clone http://localhost:50051/created cloned_created
cd cloned_created
run dolt ls
[[ "$output" =~ "vals" ]] || false
@@ -93,7 +93,7 @@ SQL
srv_pid=$!
cd ../
dolt clone http://localhost:50051/test-org/remote remote_cloned
dolt clone http://localhost:50051/remote remote_cloned
cd remote_cloned
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5);'
@@ -110,7 +110,7 @@ SQL
dolt sql-server --remotesapi-port 50051 &
srv_pid=$!
dolt clone http://localhost:50051/test-org/remote_one remote_one_cloned
dolt clone http://localhost:50051/remote_one remote_one_cloned
cd ../remote_two
dolt init
@@ -133,7 +133,7 @@ SQL
cd ../../
mkdir -p read_replica
cd read_replica
dolt clone http://127.0.0.1:50051/test-org/db
dolt clone http://127.0.0.1:50051/db
cd db
dolt sql <<SQL
set @@persist.dolt_read_replica_remote = 'origin';