mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-23 05:13:00 -05:00
Merge pull request #4387 from dolthub/aaron/remotestorage-remotesrv-support-repo-path
remotestorage,remotesrv: Be less restrictive in repository paths for remotesapi repos that are not served through dolthub.
This commit is contained in:
@@ -26,7 +26,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
"github.com/dolthub/dolt/go/libraries/events"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/argparser"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/earl"
|
||||
@@ -203,15 +202,8 @@ func createRemote(ctx context.Context, remoteName, remoteUrl string, params map[
|
||||
|
||||
r := env.NewRemote(remoteName, remoteUrl, params)
|
||||
ddb, err := r.GetRemoteDB(ctx, types.Format_Default, dEnv)
|
||||
|
||||
if err != nil {
|
||||
bdr := errhand.BuildDError("error: failed to get remote db").AddCause(err)
|
||||
|
||||
if err == remotestorage.ErrInvalidDoltSpecPath {
|
||||
urlObj, _ := earl.Parse(remoteUrl)
|
||||
bdr.AddDetails("'%s' should be in the format 'organization/repo'", urlObj.Path)
|
||||
}
|
||||
|
||||
return env.NoRemote, nil, bdr.Build()
|
||||
}
|
||||
|
||||
|
||||
@@ -111,9 +111,7 @@ func (cmd PushCmd) Exec(ctx context.Context, commandStr string, args []string, d
|
||||
|
||||
remoteDB, err := opts.Remote.GetRemoteDB(ctx, dEnv.DoltDB.ValueReadWriter().Format(), dEnv)
|
||||
if err != nil {
|
||||
if err == remotestorage.ErrInvalidDoltSpecPath {
|
||||
err = actions.HandleInvalidDoltSpecPathErr(opts.Remote.Name, opts.Remote.Url, err)
|
||||
}
|
||||
err = actions.HandleInitRemoteStorageClientErr(opts.Remote.Name, opts.Remote.Url, err)
|
||||
return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err), usage)
|
||||
}
|
||||
|
||||
|
||||
@@ -103,12 +103,8 @@ func (fact DoltRemoteFactory) newChunkStore(ctx context.Context, nbf *types.Noms
|
||||
|
||||
csClient := remotesapi.NewChunkStoreServiceClient(conn)
|
||||
cs, err := remotestorage.NewDoltChunkStoreFromPath(ctx, nbf, urlObj.Path, urlObj.Host, csClient)
|
||||
|
||||
if err == remotestorage.ErrInvalidDoltSpecPath {
|
||||
return nil, fmt.Errorf("invalid dolt url '%s'", urlObj.String())
|
||||
} else if err != nil {
|
||||
// TODO: Make this error more expressive
|
||||
return nil, err
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not access dolt url '%s': %w", urlObj.String(), err)
|
||||
}
|
||||
|
||||
if _, ok := params[NoCachingParameter]; ok {
|
||||
|
||||
@@ -26,7 +26,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dtestutils"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
@@ -322,10 +321,7 @@ func (mr *MultiRepoTestSetup) PushToRemote(dbName, remoteName, branchName string
|
||||
|
||||
remoteDB, err := opts.Remote.GetRemoteDB(ctx, dEnv.DoltDB.ValueReadWriter().Format(), mr.MrEnv.GetEnv(dbName))
|
||||
if err != nil {
|
||||
if err == remotestorage.ErrInvalidDoltSpecPath {
|
||||
mr.Errhand(actions.HandleInvalidDoltSpecPathErr(opts.Remote.Name, opts.Remote.Url, err))
|
||||
}
|
||||
mr.Errhand(fmt.Sprintf("Failed to get remote database: %s", err.Error()))
|
||||
mr.Errhand(actions.HandleInitRemoteStorageClientErr(opts.Remote.Name, opts.Remote.Url, err))
|
||||
}
|
||||
|
||||
tmpDir, err := dEnv.TempTableFilesDir()
|
||||
|
||||
+2
-8
@@ -471,13 +471,7 @@ func SyncRoots(ctx context.Context, srcDb, destDb *doltdb.DoltDB, tempTableDir s
|
||||
return nil
|
||||
}
|
||||
|
||||
func HandleInvalidDoltSpecPathErr(name, url string, err error) error {
|
||||
urlObj, _ := earl.Parse(url)
|
||||
path := urlObj.Path
|
||||
if path[0] == '/' {
|
||||
path = path[1:]
|
||||
}
|
||||
|
||||
var detail = fmt.Sprintf("the remote: %s %s '%s' should be in the format 'organization/repo'", name, url, path)
|
||||
func HandleInitRemoteStorageClientErr(name, url string, err error) error {
|
||||
var detail = fmt.Sprintf("the remote: %s '%s' could not be accessed", name, url)
|
||||
return fmt.Errorf("%w; %s; %s", ErrFailedToGetRemoteDb, detail, err.Error())
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ import (
|
||||
)
|
||||
|
||||
type DBCache interface {
|
||||
Get(org, repo, nbfVerStr string) (RemoteSrvStore, error)
|
||||
Get(path, nbfVerStr string) (RemoteSrvStore, error)
|
||||
}
|
||||
|
||||
type RemoteSrvStore interface {
|
||||
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
remotesapi "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
@@ -60,17 +59,33 @@ func NewHttpFSBackedChunkStore(lgr *logrus.Entry, httpHost string, csCache DBCac
|
||||
}
|
||||
}
|
||||
|
||||
type repoRequest interface {
|
||||
GetRepoId() *remotesapi.RepoId
|
||||
GetRepoPath() string
|
||||
}
|
||||
|
||||
func getRepoPath(req repoRequest) string {
|
||||
if req.GetRepoPath() != "" {
|
||||
return req.GetRepoPath()
|
||||
}
|
||||
if req.GetRepoId() != nil {
|
||||
return req.GetRepoId().Org + "/" + req.GetRepoId().RepoName
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasChunksRequest) (*remotesapi.HasChunksResponse, error) {
|
||||
logger := getReqLogger(rs.lgr, "HasChunks")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
logger.Printf("found repo %s", repoPath)
|
||||
|
||||
hashes, hashToIndex := remotestorage.ParseByteSlices(req.Hashes)
|
||||
|
||||
@@ -117,13 +132,14 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
|
||||
logger := getReqLogger(rs.lgr, "GetDownloadLocations")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
logger.Printf("found repo %s", repoPath)
|
||||
|
||||
hashes, _ := remotestorage.ParseByteSlices(req.ChunkHashes)
|
||||
|
||||
@@ -168,7 +184,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
|
||||
md, _ := metadata.FromIncomingContext(stream.Context())
|
||||
|
||||
var repoID *remotesapi.RepoId
|
||||
var repoPath string
|
||||
var cs RemoteSrvStore
|
||||
var prefix string
|
||||
for {
|
||||
@@ -180,13 +196,14 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
return err
|
||||
}
|
||||
|
||||
if !proto.Equal(req.RepoId, repoID) {
|
||||
repoID = req.RepoId
|
||||
cs = rs.getStore(logger, repoID)
|
||||
nextPath := getRepoPath(req)
|
||||
if nextPath != repoPath {
|
||||
repoPath = nextPath
|
||||
cs = rs.getStore(logger, repoPath)
|
||||
if cs == nil {
|
||||
return status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
logger.Printf("found repo %s/%s", repoID.Org, repoID.RepoName)
|
||||
logger.Printf("found repo %s", repoPath)
|
||||
|
||||
prefix, err = rs.getRelativeStorePath(cs)
|
||||
if err != nil {
|
||||
@@ -228,12 +245,12 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
|
||||
func (rs *RemoteChunkStore) getHost(md metadata.MD) string {
|
||||
host := rs.HttpHost
|
||||
if strings.HasPrefix(rs.HttpHost, ":") && rs.HttpHost != ":80" {
|
||||
if strings.HasPrefix(rs.HttpHost, ":") {
|
||||
hosts := md.Get(":authority")
|
||||
if len(hosts) > 0 {
|
||||
host = strings.Split(hosts[0], ":")[0] + rs.HttpHost
|
||||
}
|
||||
} else if rs.HttpHost == "" || rs.HttpHost == ":80" {
|
||||
} else if rs.HttpHost == "" {
|
||||
hosts := md.Get(":authority")
|
||||
if len(hosts) > 0 {
|
||||
host = hosts[0]
|
||||
@@ -274,16 +291,15 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
|
||||
logger := getReqLogger(rs.lgr, "GetUploadLocations")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
logger.Printf("found repo %s", repoPath)
|
||||
|
||||
org := req.RepoId.Org
|
||||
repoName := req.RepoId.RepoName
|
||||
tfds := parseTableFileDetails(req)
|
||||
|
||||
md, _ := metadata.FromIncomingContext(ctx)
|
||||
@@ -291,7 +307,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
|
||||
var locs []*remotesapi.UploadLoc
|
||||
for _, tfd := range tfds {
|
||||
h := hash.New(tfd.Id)
|
||||
url, err := rs.getUploadUrl(logger, md, org, repoName, tfd)
|
||||
url, err := rs.getUploadUrl(logger, md, repoPath, tfd)
|
||||
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, "Failed to get upload Url.")
|
||||
@@ -306,7 +322,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
|
||||
return &remotesapi.GetUploadLocsResponse{Locs: locs}, nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, org, repoName string, tfd *remotesapi.TableFileDetails) (string, error) {
|
||||
func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, repoPath string, tfd *remotesapi.TableFileDetails) (string, error) {
|
||||
fileID := hash.New(tfd.Id).String()
|
||||
params := url.Values{}
|
||||
params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks)))
|
||||
@@ -315,7 +331,7 @@ func (rs *RemoteChunkStore) getUploadUrl(logger *logrus.Entry, md metadata.MD, o
|
||||
return (&url.URL{
|
||||
Scheme: "http",
|
||||
Host: rs.getHost(md),
|
||||
Path: fmt.Sprintf("%s/%s/%s", org, repoName, fileID),
|
||||
Path: fmt.Sprintf("%s/%s", repoPath, fileID),
|
||||
RawQuery: params.Encode(),
|
||||
}).String(), nil
|
||||
}
|
||||
@@ -324,18 +340,19 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
|
||||
logger := getReqLogger(rs.lgr, "Rebase")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
logger.Printf("found %s", repoPath)
|
||||
|
||||
err := cs.Rebase(ctx)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred during processing of Rebace rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err)
|
||||
logger.Printf("error occurred during processing of Rebace rpc of %s details: %v", repoPath, err)
|
||||
return nil, status.Errorf(codes.Internal, "failed to rebase: %v", err)
|
||||
}
|
||||
|
||||
@@ -346,7 +363,8 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
|
||||
logger := getReqLogger(rs.lgr, "Root")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -355,7 +373,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
|
||||
h, err := cs.Root(ctx)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred during processing of Root rpc of %s/%s details: %v", req.RepoId.Org, req.RepoId.RepoName, err)
|
||||
logger.Printf("error occurred during processing of Root rpc of %s details: %v", repoPath, err)
|
||||
return nil, status.Error(codes.Internal, "Failed to get root")
|
||||
}
|
||||
|
||||
@@ -366,13 +384,14 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
logger := getReqLogger(rs.lgr, "Commit")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
logger.Printf("found %s", repoPath)
|
||||
|
||||
//should validate
|
||||
updates := make(map[string]int)
|
||||
@@ -394,11 +413,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
ok, err = cs.Commit(ctx, currHash, lastHash)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred during processing of Commit of %s/%s last %s curr: %s details: %v", req.RepoId.Org, req.RepoId.RepoName, lastHash.String(), currHash.String(), err)
|
||||
logger.Printf("error occurred during processing of Commit of %s last %s curr: %s details: %v", repoPath, lastHash.String(), currHash.String(), err)
|
||||
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
|
||||
}
|
||||
|
||||
logger.Printf("committed %s/%s moved from %s -> %s", req.RepoId.Org, req.RepoId.RepoName, currHash.String(), lastHash.String())
|
||||
logger.Printf("committed %s moved from %s -> %s", repoPath, lastHash.String(), currHash.String())
|
||||
return &remotesapi.CommitResponse{Success: ok}, nil
|
||||
}
|
||||
|
||||
@@ -406,7 +425,8 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
|
||||
logger := getReqLogger(rs.lgr, "GetRepoMetadata")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getOrCreateStore(logger, req.RepoId, req.ClientRepoFormat.NbfVersion)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion)
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
@@ -432,13 +452,14 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
|
||||
logger := getReqLogger(rs.lgr, "ListTableFiles")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger.Printf("found repo %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
logger.Printf("found repo %s", repoPath)
|
||||
|
||||
root, tables, appendixTables, err := cs.Sources(ctx)
|
||||
|
||||
@@ -500,13 +521,14 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
logger := getReqLogger(rs.lgr, "AddTableFiles")
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
cs := rs.getStore(logger, req.RepoId)
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
logger.Printf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName)
|
||||
logger.Printf("found %s", repoPath)
|
||||
|
||||
// should validate
|
||||
updates := make(map[string]int)
|
||||
@@ -524,18 +546,15 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
return &remotesapi.AddTableFilesResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoId *remotesapi.RepoId) RemoteSrvStore {
|
||||
return rs.getOrCreateStore(logger, repoId, types.Format_Default.VersionString())
|
||||
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) RemoteSrvStore {
|
||||
return rs.getOrCreateStore(logger, repoPath, types.Format_Default.VersionString())
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoId *remotesapi.RepoId, nbfVerStr string) RemoteSrvStore {
|
||||
org := repoId.Org
|
||||
repoName := repoId.RepoName
|
||||
|
||||
cs, err := rs.csCache.Get(org, repoName, nbfVerStr)
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) RemoteSrvStore {
|
||||
cs, err := rs.csCache.Get(repoPath, nbfVerStr)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("Failed to retrieve chunkstore for %s/%s\n", org, repoName)
|
||||
logger.Printf("Failed to retrieve chunkstore for %s\n", repoPath)
|
||||
}
|
||||
|
||||
return cs
|
||||
|
||||
@@ -100,16 +100,16 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
tokens := strings.Split(path, "/")
|
||||
if len(tokens) != 3 {
|
||||
i := strings.LastIndex(path, "/")
|
||||
// a table file name is currently 32 characters, plus the '/' is 33.
|
||||
if i < 0 || len(path[i:]) != 33 {
|
||||
logger.Printf("response to: %v method: %v http response code: %v", req.RequestURI, req.Method, http.StatusNotFound)
|
||||
respWr.WriteHeader(http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
org := tokens[0]
|
||||
repo := tokens[1]
|
||||
file := tokens[2]
|
||||
filepath := path[:i]
|
||||
file := path[i+1:]
|
||||
|
||||
q := req.URL.Query()
|
||||
ncs := q.Get("num_chunks")
|
||||
@@ -149,7 +149,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
statusCode = writeTableFile(req.Context(), logger, fh.dbCache, org, repo, file, num_chunks, content_hash, uint64(content_length), req.Body)
|
||||
statusCode = writeTableFile(req.Context(), logger, fh.dbCache, filepath, file, num_chunks, content_hash, uint64(content_length), req.Body)
|
||||
}
|
||||
|
||||
if statusCode != -1 {
|
||||
@@ -248,7 +248,7 @@ func (u *uploadreader) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, org, repo, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) int {
|
||||
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) int {
|
||||
_, ok := hash.MaybeParse(fileId)
|
||||
if !ok {
|
||||
logger.Println(fileId, "is not a valid hash")
|
||||
@@ -257,9 +257,9 @@ func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache,
|
||||
|
||||
logger.Println(fileId, "is valid")
|
||||
|
||||
cs, err := dbCache.Get(org, repo, types.Format_Default.VersionString())
|
||||
cs, err := dbCache.Get(path, types.Format_Default.VersionString())
|
||||
if err != nil {
|
||||
logger.Println("failed to get", org+"/"+repo, "repository:", err.Error())
|
||||
logger.Println("failed to get", path, "repository:", err.Error())
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +58,6 @@ func init() {
|
||||
}
|
||||
|
||||
var ErrUploadFailed = errors.New("upload failed")
|
||||
var ErrInvalidDoltSpecPath = errors.New("invalid dolt spec path")
|
||||
|
||||
var globalHttpFetcher HTTPFetcher = &http.Client{}
|
||||
|
||||
@@ -104,8 +103,7 @@ type ConcurrencyParams struct {
|
||||
}
|
||||
|
||||
type DoltChunkStore struct {
|
||||
org string
|
||||
repoName string
|
||||
repoId *remotesapi.RepoId
|
||||
repoPath string
|
||||
repoToken *atomic.Value // string
|
||||
host string
|
||||
@@ -120,34 +118,33 @@ type DoltChunkStore struct {
|
||||
}
|
||||
|
||||
func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, path, host string, csClient remotesapi.ChunkStoreServiceClient) (*DoltChunkStore, error) {
|
||||
tokens := strings.Split(strings.Trim(path, "/"), "/")
|
||||
if len(tokens) != 2 {
|
||||
return nil, ErrInvalidDoltSpecPath
|
||||
}
|
||||
var repoId *remotesapi.RepoId
|
||||
|
||||
// todo:
|
||||
// this may just be a dolthub thing. Need to revisit how we do this.
|
||||
org := tokens[0]
|
||||
repoName := tokens[1]
|
||||
|
||||
metadata, err := csClient.GetRepoMetadata(ctx, &remotesapi.GetRepoMetadataRequest{
|
||||
RepoId: &remotesapi.RepoId{
|
||||
path = strings.Trim(path, "/")
|
||||
tokens := strings.Split(path, "/")
|
||||
if len(tokens) == 2 {
|
||||
org := tokens[0]
|
||||
repoName := tokens[1]
|
||||
repoId = &remotesapi.RepoId{
|
||||
Org: org,
|
||||
RepoName: repoName,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
metadata, err := csClient.GetRepoMetadata(ctx, &remotesapi.GetRepoMetadataRequest{
|
||||
RepoId: repoId,
|
||||
RepoPath: path,
|
||||
ClientRepoFormat: &remotesapi.ClientRepoFormat{
|
||||
NbfVersion: nbf.VersionString(),
|
||||
NbsVersion: nbs.StorageVersion,
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cs := &DoltChunkStore{
|
||||
org: org,
|
||||
repoName: repoName,
|
||||
repoId: repoId,
|
||||
repoPath: path,
|
||||
repoToken: new(atomic.Value),
|
||||
host: host,
|
||||
@@ -163,8 +160,7 @@ func NewDoltChunkStoreFromPath(ctx context.Context, nbf *types.NomsBinFormat, pa
|
||||
|
||||
func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
@@ -180,8 +176,7 @@ func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore
|
||||
|
||||
func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
@@ -198,8 +193,7 @@ func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
|
||||
|
||||
func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
@@ -216,8 +210,7 @@ func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
|
||||
|
||||
func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoId: dcs.repoId,
|
||||
repoPath: dcs.repoPath,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
@@ -248,10 +241,7 @@ func (dcs *DoltChunkStore) getRepoId() (*remotesapi.RepoId, string) {
|
||||
if curToken != nil {
|
||||
token = curToken.(string)
|
||||
}
|
||||
return &remotesapi.RepoId{
|
||||
Org: dcs.org,
|
||||
RepoName: dcs.repoName,
|
||||
}, token
|
||||
return dcs.repoId, token
|
||||
}
|
||||
|
||||
type cacheStats struct {
|
||||
@@ -538,6 +528,12 @@ func (l *dlLocations) Add(resp *remotesapi.DownloadLoc) {
|
||||
}
|
||||
}
|
||||
|
||||
type RepoRequest interface {
|
||||
SetRepoId(*remotesapi.RepoId)
|
||||
SetRepoToken(string)
|
||||
SetRepoPath(string)
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (dlLocations, error) {
|
||||
ctx, span := tracer.Start(ctx, "remotestorage.getDLLocs", trace.WithAttributes(attribute.Int("num_hashes", len(hashes))))
|
||||
defer span.End()
|
||||
@@ -573,7 +569,7 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (d
|
||||
batchItr(len(hashesBytes), getLocsBatchSize, func(st, end int) (stop bool) {
|
||||
batch := hashesBytes[st:end]
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.GetDownloadLocsRequest{RepoId: id, ChunkHashes: batch, RepoToken: token, RepoPath: dcs.repoPath}
|
||||
req := &remotesapi.GetDownloadLocsRequest{RepoId: id, RepoPath: dcs.repoPath, RepoToken: token, ChunkHashes: batch}
|
||||
reqs = append(reqs, req)
|
||||
return false
|
||||
})
|
||||
@@ -809,10 +805,8 @@ func (dcs *DoltChunkStore) Rebase(ctx context.Context) error {
|
||||
|
||||
func (dcs *DoltChunkStore) refreshRepoMetadata(ctx context.Context) error {
|
||||
mdReq := &remotesapi.GetRepoMetadataRequest{
|
||||
RepoId: &remotesapi.RepoId{
|
||||
Org: dcs.org,
|
||||
RepoName: dcs.repoName,
|
||||
},
|
||||
RepoId: dcs.repoId,
|
||||
RepoPath: dcs.repoPath,
|
||||
ClientRepoFormat: &remotesapi.ClientRepoFormat{
|
||||
NbfVersion: dcs.nbf.VersionString(),
|
||||
NbsVersion: nbs.StorageVersion,
|
||||
@@ -1270,7 +1264,7 @@ func (dcs *DoltChunkStore) AddTableFilesToManifest(ctx context.Context, fileIdTo
|
||||
}
|
||||
|
||||
id, token := dcs.getRepoId()
|
||||
dcs.logf("Adding Table files to repo: %s/%s -\n%s", id.Org, id.RepoName, debugStr)
|
||||
dcs.logf("Adding Table files to repo: %s -\n%s", dcs.repoPath, debugStr)
|
||||
atReq := &remotesapi.AddTableFilesRequest{
|
||||
RepoId: id,
|
||||
RepoToken: token,
|
||||
|
||||
@@ -29,12 +29,10 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dfunctions"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/earl"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
@@ -513,15 +511,8 @@ func (p DoltDatabaseProvider) cloneDatabaseFromRemote(
|
||||
// TODO: this method only adds error handling. Remove?
|
||||
func getRemoteDb(ctx *sql.Context, r env.Remote, dialer dbfactory.GRPCDialProvider) (*doltdb.DoltDB, error) {
|
||||
ddb, err := r.GetRemoteDB(ctx, types.Format_Default, dialer)
|
||||
|
||||
if err != nil {
|
||||
bdr := errhand.BuildDError("error: failed to get remote db").AddCause(err)
|
||||
|
||||
if err == remotestorage.ErrInvalidDoltSpecPath {
|
||||
urlObj, _ := earl.Parse(r.Url)
|
||||
bdr.AddDetails("'%s' should be in the format 'organization/repo'", urlObj.Path)
|
||||
}
|
||||
|
||||
return nil, bdr.Build()
|
||||
}
|
||||
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
)
|
||||
@@ -94,10 +93,7 @@ func DoDoltPush(ctx *sql.Context, args []string) (int, error) {
|
||||
}
|
||||
remoteDB, err := sess.Provider().GetRemoteDB(ctx, dbData.Ddb, opts.Remote, true)
|
||||
if err != nil {
|
||||
if err == remotestorage.ErrInvalidDoltSpecPath {
|
||||
return 1, actions.HandleInvalidDoltSpecPathErr(opts.Remote.Name, opts.Remote.Url, err)
|
||||
}
|
||||
return 1, err
|
||||
return 1, actions.HandleInitRemoteStorageClientErr(opts.Remote.Name, opts.Remote.Url, err)
|
||||
}
|
||||
|
||||
tmpDir, err := dbData.Rsw.TempTableFilesDir()
|
||||
|
||||
@@ -32,9 +32,9 @@ type remotesrvStore struct {
|
||||
|
||||
var _ remotesrv.DBCache = remotesrvStore{}
|
||||
|
||||
func (s remotesrvStore) Get(org, repo, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
|
||||
func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
|
||||
sess := dsess.DSessFromSess(s.ctx.Session)
|
||||
db, err := sess.Provider().Database(s.ctx, repo)
|
||||
db, err := sess.Provider().Database(s.ctx, path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -43,32 +43,28 @@ func NewLocalCSCache(filesys filesys.Filesys) *LocalCSCache {
|
||||
}
|
||||
}
|
||||
|
||||
func (cache *LocalCSCache) Get(org, repo, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
|
||||
func (cache *LocalCSCache) Get(repopath, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
|
||||
cache.mu.Lock()
|
||||
defer cache.mu.Unlock()
|
||||
|
||||
id := filepath.Join(org, repo)
|
||||
id := filepath.FromSlash(repopath)
|
||||
|
||||
if cs, ok := cache.dbs[id]; ok {
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
var newCS *nbs.NomsBlockStore
|
||||
if cache.fs != nil {
|
||||
err := cache.fs.MkDirs(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path, err := cache.fs.Abs(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err := cache.fs.MkDirs(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
path, err := cache.fs.Abs(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
newCS, err = nbs.NewLocalStore(context.TODO(), nbfVerStr, path, defaultMemTableSize, nbs.NewUnlimitedMemQuotaProvider())
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
newCS, err := nbs.NewLocalStore(context.TODO(), nbfVerStr, path, defaultMemTableSize, nbs.NewUnlimitedMemQuotaProvider())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cache.dbs[id] = newCS
|
||||
@@ -80,6 +76,6 @@ type SingletonCSCache struct {
|
||||
s remotesrv.RemoteSrvStore
|
||||
}
|
||||
|
||||
func (cache SingletonCSCache) Get(org, repo, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
|
||||
func (cache SingletonCSCache) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
|
||||
return cache.s, nil
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ func main() {
|
||||
*httpHostParam = fmt.Sprintf("%s:%d", *httpHostParam, *httpPortParam)
|
||||
} else {
|
||||
*httpPortParam = 80
|
||||
*httpHostParam = ":80"
|
||||
log.Println("'http-port' parameter not provided. Using default port 80")
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ teardown() {
|
||||
srv_pid=$!
|
||||
|
||||
cd ../
|
||||
dolt clone http://localhost:50051/ignored_named/remote repo1
|
||||
dolt clone http://localhost:50051/remote repo1
|
||||
cd repo1
|
||||
run dolt ls
|
||||
[[ "$output" =~ "vals" ]] || false
|
||||
@@ -62,7 +62,7 @@ SQL
|
||||
cd ../
|
||||
|
||||
# By cloning here, we have a near-at-hand way to wait for the server to be ready.
|
||||
dolt clone http://localhost:50051/ignored_named/remote cloned_remote
|
||||
dolt clone http://localhost:50051/remote cloned_remote
|
||||
|
||||
dolt sql-client -u root <<SQL
|
||||
create database created;
|
||||
@@ -73,7 +73,7 @@ call dolt_add('vals');
|
||||
call dolt_commit('-m', 'add some vals');
|
||||
SQL
|
||||
|
||||
dolt clone http://localhost:50051/ignored_named/created cloned_created
|
||||
dolt clone http://localhost:50051/created cloned_created
|
||||
cd cloned_created
|
||||
run dolt ls
|
||||
[[ "$output" =~ "vals" ]] || false
|
||||
@@ -93,7 +93,7 @@ SQL
|
||||
srv_pid=$!
|
||||
cd ../
|
||||
|
||||
dolt clone http://localhost:50051/test-org/remote remote_cloned
|
||||
dolt clone http://localhost:50051/remote remote_cloned
|
||||
|
||||
cd remote_cloned
|
||||
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5);'
|
||||
@@ -110,7 +110,7 @@ SQL
|
||||
dolt sql-server --remotesapi-port 50051 &
|
||||
srv_pid=$!
|
||||
|
||||
dolt clone http://localhost:50051/test-org/remote_one remote_one_cloned
|
||||
dolt clone http://localhost:50051/remote_one remote_one_cloned
|
||||
|
||||
cd ../remote_two
|
||||
dolt init
|
||||
@@ -133,7 +133,7 @@ SQL
|
||||
cd ../../
|
||||
mkdir -p read_replica
|
||||
cd read_replica
|
||||
dolt clone http://127.0.0.1:50051/test-org/db
|
||||
dolt clone http://127.0.0.1:50051/db
|
||||
cd db
|
||||
dolt sql <<SQL
|
||||
set @@persist.dolt_read_replica_remote = 'origin';
|
||||
|
||||
Reference in New Issue
Block a user