mirror of
https://github.com/dolthub/dolt.git
synced 2025-12-30 08:50:01 -06:00
proto,go/.../remotestorage: Add repo_token support.
doltremoteapi can return an opaque token which makes it cheaper to access the repository than just using the RepoId. This updates protos and remotestorage implementation to cache that token and use it for future accesses of the same repository.
This commit is contained in:
@@ -23,13 +23,12 @@
|
||||
package eventsapi
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
durationpb "google.golang.org/protobuf/types/known/durationpb"
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -100,6 +99,7 @@ type ClientEventMetric struct {
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
// Types that are assignable to MetricOneof:
|
||||
//
|
||||
// *ClientEventMetric_Duration
|
||||
// *ClientEventMetric_Count
|
||||
MetricOneof isClientEventMetric_MetricOneof `protobuf_oneof:"metric_oneof"`
|
||||
|
||||
@@ -8,7 +8,6 @@ package eventsapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -23,11 +23,10 @@
|
||||
package eventsapi
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -8,7 +8,6 @@ package remotesapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -21,11 +21,10 @@
|
||||
package remotesapi
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,7 +8,6 @@ package remotesapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -106,6 +106,7 @@ type ConcurrencyParams struct {
|
||||
type DoltChunkStore struct {
|
||||
org string
|
||||
repoName string
|
||||
repoToken *atomic.Value // string
|
||||
host string
|
||||
csClient remotesapi.ChunkStoreServiceClient
|
||||
cache ChunkCache
|
||||
@@ -147,32 +148,42 @@ func NewDoltChunkStore(ctx context.Context, nbf *types.NomsBinFormat, org, repoN
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &DoltChunkStore{
|
||||
cs := &DoltChunkStore{
|
||||
org: org,
|
||||
repoName: repoName,
|
||||
repoToken: new(atomic.Value),
|
||||
host: host,
|
||||
csClient: csClient,
|
||||
cache: newMapChunkCache(),
|
||||
metadata: metadata,
|
||||
nbf: nbf,
|
||||
httpFetcher: globalHttpFetcher,
|
||||
concurrency: defaultConcurrency}, nil
|
||||
concurrency: defaultConcurrency,
|
||||
}
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithHTTPFetcher(fetcher HTTPFetcher) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
host: dcs.host,
|
||||
csClient: dcs.csClient,
|
||||
cache: dcs.cache, metadata: dcs.metadata, nbf: dcs.nbf, httpFetcher: fetcher, concurrency: dcs.concurrency,
|
||||
stats: dcs.stats}
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
csClient: dcs.csClient,
|
||||
cache: dcs.cache,
|
||||
metadata: dcs.metadata,
|
||||
nbf: dcs.nbf,
|
||||
httpFetcher: fetcher,
|
||||
concurrency: dcs.concurrency,
|
||||
stats: dcs.stats,
|
||||
}
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) WithNoopChunkCache() *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
csClient: dcs.csClient,
|
||||
cache: noopChunkCache,
|
||||
@@ -189,6 +200,7 @@ func (dcs *DoltChunkStore) WithChunkCache(cache ChunkCache) *DoltChunkStore {
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
csClient: dcs.csClient,
|
||||
cache: cache,
|
||||
@@ -205,6 +217,7 @@ func (dcs *DoltChunkStore) WithDownloadConcurrency(concurrency ConcurrencyParams
|
||||
return &DoltChunkStore{
|
||||
org: dcs.org,
|
||||
repoName: dcs.repoName,
|
||||
repoToken: new(atomic.Value),
|
||||
host: dcs.host,
|
||||
csClient: dcs.csClient,
|
||||
cache: dcs.cache,
|
||||
@@ -227,11 +240,16 @@ func (dcs *DoltChunkStore) logf(fmt string, args ...interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (dcs *DoltChunkStore) getRepoId() *remotesapi.RepoId {
|
||||
func (dcs *DoltChunkStore) getRepoId() (*remotesapi.RepoId, string) {
|
||||
var token string
|
||||
curToken := dcs.repoToken.Load()
|
||||
if curToken != nil {
|
||||
token = curToken.(string)
|
||||
}
|
||||
return &remotesapi.RepoId{
|
||||
Org: dcs.org,
|
||||
RepoName: dcs.repoName,
|
||||
}
|
||||
}, token
|
||||
}
|
||||
|
||||
type cacheStats struct {
|
||||
@@ -552,7 +570,8 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (d
|
||||
hashesBytes := HashesToSlices(hashes)
|
||||
batchItr(len(hashesBytes), getLocsBatchSize, func(st, end int) (stop bool) {
|
||||
batch := hashesBytes[st:end]
|
||||
req := &remotesapi.GetDownloadLocsRequest{RepoId: dcs.getRepoId(), ChunkHashes: batch}
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.GetDownloadLocsRequest{RepoId: id, ChunkHashes: batch, RepoToken: token}
|
||||
reqs = append(reqs, req)
|
||||
return false
|
||||
})
|
||||
@@ -582,6 +601,9 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (d
|
||||
}
|
||||
return NewRpcError(err, "StreamDownloadLocations", dcs.host, reqs[completedReqs])
|
||||
}
|
||||
if resp.RepoToken != "" {
|
||||
dcs.repoToken.Store(resp.RepoToken)
|
||||
}
|
||||
select {
|
||||
case resCh <- resp.Locs:
|
||||
completedReqs += 1
|
||||
@@ -692,7 +714,8 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
|
||||
currByteSl := byteSl[st:end]
|
||||
|
||||
// send a request to the remote api to determine which chunks the remote api already has
|
||||
req := &remotesapi.HasChunksRequest{RepoId: dcs.getRepoId(), Hashes: currByteSl}
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.HasChunksRequest{RepoId: id, RepoToken: token, Hashes: currByteSl}
|
||||
var resp *remotesapi.HasChunksResponse
|
||||
resp, err = dcs.csClient.HasChunks(ctx, req)
|
||||
if err != nil {
|
||||
@@ -700,6 +723,10 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
|
||||
return true
|
||||
}
|
||||
|
||||
if resp.RepoToken != "" {
|
||||
dcs.repoToken.Store(resp.RepoToken)
|
||||
}
|
||||
|
||||
numAbsent := len(resp.Absent)
|
||||
sort.Slice(resp.Absent, func(i, j int) bool {
|
||||
return resp.Absent[i] < resp.Absent[j]
|
||||
@@ -764,13 +791,17 @@ func (dcs *DoltChunkStore) Version() string {
|
||||
// Rebase brings this ChunkStore into sync with the persistent storage's
|
||||
// current root.
|
||||
func (dcs *DoltChunkStore) Rebase(ctx context.Context) error {
|
||||
req := &remotesapi.RebaseRequest{RepoId: dcs.getRepoId()}
|
||||
_, err := dcs.csClient.Rebase(ctx, req)
|
||||
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.RebaseRequest{RepoId: id, RepoToken: token}
|
||||
resp, err := dcs.csClient.Rebase(ctx, req)
|
||||
if err != nil {
|
||||
return NewRpcError(err, "Rebase", dcs.host, req)
|
||||
}
|
||||
|
||||
if resp.RepoToken != "" {
|
||||
dcs.repoToken.Store(token)
|
||||
}
|
||||
|
||||
return dcs.refreshRepoMetadata(ctx)
|
||||
}
|
||||
|
||||
@@ -789,6 +820,9 @@ func (dcs *DoltChunkStore) refreshRepoMetadata(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return NewRpcError(err, "GetRepoMetadata", dcs.host, mdReq)
|
||||
}
|
||||
if metadata.RepoToken != "" {
|
||||
dcs.repoToken.Store(metadata.RepoToken)
|
||||
}
|
||||
dcs.metadata = metadata
|
||||
return nil
|
||||
}
|
||||
@@ -796,13 +830,17 @@ func (dcs *DoltChunkStore) refreshRepoMetadata(ctx context.Context) error {
|
||||
// Root returns the root of the database as of the time the ChunkStore
|
||||
// was opened or the most recent call to Rebase.
|
||||
func (dcs *DoltChunkStore) Root(ctx context.Context) (hash.Hash, error) {
|
||||
req := &remotesapi.RootRequest{RepoId: dcs.getRepoId()}
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.RootRequest{RepoId: id, RepoToken: token}
|
||||
resp, err := dcs.csClient.Root(ctx, req)
|
||||
|
||||
if err != nil {
|
||||
return hash.Hash{}, NewRpcError(err, "Root", dcs.host, req)
|
||||
}
|
||||
|
||||
if resp.RepoToken != "" {
|
||||
dcs.repoToken.Store(resp.RepoToken)
|
||||
}
|
||||
|
||||
return hash.New(resp.RootHash), nil
|
||||
}
|
||||
|
||||
@@ -821,8 +859,9 @@ func (dcs *DoltChunkStore) Commit(ctx context.Context, current, last hash.Hash)
|
||||
chnkTblInfo = append(chnkTblInfo, &remotesapi.ChunkTableInfo{Hash: h[:], ChunkCount: uint32(cnt)})
|
||||
}
|
||||
|
||||
id, _ := dcs.getRepoId()
|
||||
req := &remotesapi.CommitRequest{
|
||||
RepoId: dcs.getRepoId(),
|
||||
RepoId: id,
|
||||
Current: current[:],
|
||||
Last: last[:],
|
||||
ChunkTableInfo: chnkTblInfo,
|
||||
@@ -928,7 +967,8 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table
|
||||
}
|
||||
|
||||
dcs.logf("getting upload location for file %s", tableFileId.String())
|
||||
req := &remotesapi.GetUploadLocsRequest{RepoId: dcs.getRepoId(), TableFileDetails: []*remotesapi.TableFileDetails{tbfd}}
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.GetUploadLocsRequest{RepoId: id, RepoToken: token, TableFileDetails: []*remotesapi.TableFileDetails{tbfd}}
|
||||
resp, err := dcs.csClient.GetUploadLocations(ctx, req)
|
||||
if err != nil {
|
||||
if err != nil {
|
||||
@@ -936,6 +976,10 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table
|
||||
}
|
||||
}
|
||||
|
||||
if resp.RepoToken != "" {
|
||||
dcs.repoToken.Store(resp.RepoToken)
|
||||
}
|
||||
|
||||
if len(resp.Locs) != 1 {
|
||||
return NewRpcError(errors.New("unexpected upload location count"), "GetUploadLocations", dcs.host, req)
|
||||
}
|
||||
@@ -1221,9 +1265,11 @@ func (dcs *DoltChunkStore) AddTableFilesToManifest(ctx context.Context, fileIdTo
|
||||
chnkTblInfo = append(chnkTblInfo, &remotesapi.ChunkTableInfo{Hash: fileIdBytes[:], ChunkCount: uint32(numChunks)})
|
||||
}
|
||||
|
||||
dcs.logf("Adding Table files to repo: %s/%s -\n%s", dcs.getRepoId().Org, dcs.getRepoId().RepoName, debugStr)
|
||||
id, token := dcs.getRepoId()
|
||||
dcs.logf("Adding Table files to repo: %s/%s -\n%s", id.Org, id.RepoName, debugStr)
|
||||
atReq := &remotesapi.AddTableFilesRequest{
|
||||
RepoId: dcs.getRepoId(),
|
||||
RepoId: id,
|
||||
RepoToken: token,
|
||||
ChunkTableInfo: chnkTblInfo,
|
||||
ClientRepoFormat: &remotesapi.ClientRepoFormat{
|
||||
NbfVersion: dcs.nbf.VersionString(),
|
||||
@@ -1232,11 +1278,14 @@ func (dcs *DoltChunkStore) AddTableFilesToManifest(ctx context.Context, fileIdTo
|
||||
}
|
||||
|
||||
atResp, err := dcs.csClient.AddTableFiles(ctx, atReq)
|
||||
|
||||
if err != nil {
|
||||
return NewRpcError(err, "AddTableFiles", dcs.host, atReq)
|
||||
}
|
||||
|
||||
if atResp.RepoToken != "" {
|
||||
dcs.repoToken.Store(atResp.RepoToken)
|
||||
}
|
||||
|
||||
if !atResp.Success {
|
||||
return errors.New("update table files failed")
|
||||
}
|
||||
@@ -1252,11 +1301,15 @@ func (dcs *DoltChunkStore) PruneTableFiles(ctx context.Context) error {
|
||||
// Sources retrieves the current root hash, a list of all the table files (which may include appendix table files)
|
||||
// and a list of only appendix table files
|
||||
func (dcs *DoltChunkStore) Sources(ctx context.Context) (hash.Hash, []nbs.TableFile, []nbs.TableFile, error) {
|
||||
req := &remotesapi.ListTableFilesRequest{RepoId: dcs.getRepoId()}
|
||||
id, token := dcs.getRepoId()
|
||||
req := &remotesapi.ListTableFilesRequest{RepoId: id, RepoToken: token}
|
||||
resp, err := dcs.csClient.ListTableFiles(ctx, req)
|
||||
if err != nil {
|
||||
return hash.Hash{}, nil, nil, NewRpcError(err, "ListTableFiles", dcs.host, req)
|
||||
}
|
||||
if resp.RepoToken != "" {
|
||||
dcs.repoToken.Store(resp.RepoToken)
|
||||
}
|
||||
sourceFiles := getTableFiles(dcs, resp.TableFileInfo)
|
||||
appendixFiles := getTableFiles(dcs, resp.AppendixTableFileInfo)
|
||||
return hash.New(resp.RootHash), sourceFiles, appendixFiles, nil
|
||||
|
||||
@@ -51,6 +51,16 @@ service ChunkStoreService {
|
||||
rpc AddTableFiles(AddTableFilesRequest) returns (AddTableFilesResponse);
|
||||
}
|
||||
|
||||
// RepoId is how repositories are represented on dolthub, for example
|
||||
// `dolthub/us-housing-prices-v2` has org = dolthub, repo_name =
|
||||
// us-housing-prices-v2.
|
||||
//
|
||||
// All methods of ChunkStoreService targeting a repository take a RepoId. Many
|
||||
// will also take a `repo_token`, which can be provided in addition to a
|
||||
// RepoId. `repo_token`s can be returned from repository-accessing RPCs on
|
||||
// ChunkStoreService, and providing them instead of RepoId for future calls to
|
||||
// ChunkStoreService can be more efficient than providing RepoId itself.
|
||||
// `repo_token`s are fully opaque to the client.
|
||||
message RepoId {
|
||||
string org = 1;
|
||||
string repo_name = 2;
|
||||
@@ -59,10 +69,14 @@ message RepoId {
|
||||
message HasChunksRequest {
|
||||
RepoId repo_id = 1;
|
||||
repeated bytes hashes = 2;
|
||||
|
||||
string repo_token = 3;
|
||||
}
|
||||
|
||||
message HasChunksResponse {
|
||||
repeated int32 absent = 1;
|
||||
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
message HttpGetChunk {
|
||||
@@ -104,10 +118,14 @@ message UploadLoc {
|
||||
message GetDownloadLocsRequest {
|
||||
RepoId repo_id = 1;
|
||||
repeated bytes chunk_hashes = 2;
|
||||
|
||||
string repo_token = 3;
|
||||
}
|
||||
|
||||
message GetDownloadLocsResponse {
|
||||
repeated DownloadLoc locs = 1;
|
||||
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
message TableFileDetails {
|
||||
@@ -120,25 +138,35 @@ message GetUploadLocsRequest {
|
||||
RepoId repo_id = 1;
|
||||
repeated bytes table_file_hashes = 2 [deprecated = true];
|
||||
repeated TableFileDetails table_file_details = 3;
|
||||
|
||||
string repo_token = 4;
|
||||
}
|
||||
|
||||
message GetUploadLocsResponse {
|
||||
repeated UploadLoc locs = 1;
|
||||
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
message RebaseRequest {
|
||||
RepoId repo_id = 1;
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
message RebaseResponse {
|
||||
string repo_token = 1;
|
||||
}
|
||||
|
||||
message RootRequest {
|
||||
RepoId repo_id = 1;
|
||||
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
message RootResponse {
|
||||
bytes root_hash = 1;
|
||||
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
message ChunkTableInfo {
|
||||
@@ -161,6 +189,8 @@ message CommitResponse {
|
||||
message GetRepoMetadataRequest {
|
||||
RepoId repo_id = 1;
|
||||
ClientRepoFormat client_repo_format = 14;
|
||||
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
message GetRepoMetadataResponse {
|
||||
@@ -173,6 +203,8 @@ message GetRepoMetadataResponse {
|
||||
// Approximate number of bytes required for storage of all
|
||||
// currently-referenced repository table files.
|
||||
uint64 storage_size = 3;
|
||||
|
||||
string repo_token = 4;
|
||||
}
|
||||
|
||||
message ClientRepoFormat {
|
||||
@@ -183,6 +215,8 @@ message ClientRepoFormat {
|
||||
message ListTableFilesRequest {
|
||||
RepoId repo_id = 1;
|
||||
bool appendix_only = 2 [deprecated = true];
|
||||
|
||||
string repo_token = 3;
|
||||
}
|
||||
|
||||
message TableFileInfo {
|
||||
@@ -196,17 +230,23 @@ message TableFileInfo {
|
||||
message RefreshTableFileUrlRequest {
|
||||
RepoId repo_id = 1;
|
||||
string file_id = 2;
|
||||
|
||||
string repo_token = 3;
|
||||
}
|
||||
|
||||
message RefreshTableFileUrlResponse {
|
||||
string url = 1;
|
||||
google.protobuf.Timestamp refresh_after = 2;
|
||||
|
||||
string repo_token = 3;
|
||||
}
|
||||
|
||||
message ListTableFilesResponse {
|
||||
bytes root_hash = 1;
|
||||
repeated TableFileInfo table_file_info = 2;
|
||||
repeated TableFileInfo appendix_table_file_info = 3;
|
||||
|
||||
string repo_token = 4;
|
||||
}
|
||||
|
||||
enum ManifestAppendixOption {
|
||||
@@ -226,8 +266,11 @@ message AddTableFilesRequest {
|
||||
// are removed from the manifest specs. If `append_option` is `APPEND`, then the
|
||||
// supplied table files are added to the appendix and to specs.
|
||||
ManifestAppendixOption appendix_option = 4;
|
||||
|
||||
string repo_token = 5;
|
||||
}
|
||||
|
||||
message AddTableFilesResponse {
|
||||
bool success = 1;
|
||||
string repo_token = 2;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user