proto,go: chunkstore: Sketch out refreshing table file URLs on fetch.

This commit is contained in:
Aaron Son
2021-05-06 11:32:50 -07:00
parent 3fa5d642b0
commit 8faae88d55
4 changed files with 623 additions and 366 deletions
File diff suppressed because it is too large Load Diff
@@ -33,6 +33,7 @@ type ChunkStoreServiceClient interface {
Root(ctx context.Context, in *RootRequest, opts ...grpc.CallOption) (*RootResponse, error)
Commit(ctx context.Context, in *CommitRequest, opts ...grpc.CallOption) (*CommitResponse, error)
ListTableFiles(ctx context.Context, in *ListTableFilesRequest, opts ...grpc.CallOption) (*ListTableFilesResponse, error)
RefreshTableFileUrl(ctx context.Context, in *RefreshTableFileUrlRequest, opts ...grpc.CallOption) (*RefreshTableFileUrlResponse, error)
AddTableFiles(ctx context.Context, in *AddTableFilesRequest, opts ...grpc.CallOption) (*AddTableFilesResponse, error)
}
@@ -147,6 +148,15 @@ func (c *chunkStoreServiceClient) ListTableFiles(ctx context.Context, in *ListTa
return out, nil
}
func (c *chunkStoreServiceClient) RefreshTableFileUrl(ctx context.Context, in *RefreshTableFileUrlRequest, opts ...grpc.CallOption) (*RefreshTableFileUrlResponse, error) {
out := new(RefreshTableFileUrlResponse)
err := c.cc.Invoke(ctx, "/dolt.services.remotesapi.v1alpha1.ChunkStoreService/RefreshTableFileUrl", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *chunkStoreServiceClient) AddTableFiles(ctx context.Context, in *AddTableFilesRequest, opts ...grpc.CallOption) (*AddTableFilesResponse, error) {
out := new(AddTableFilesResponse)
err := c.cc.Invoke(ctx, "/dolt.services.remotesapi.v1alpha1.ChunkStoreService/AddTableFiles", in, out, opts...)
@@ -176,6 +186,7 @@ type ChunkStoreServiceServer interface {
Root(context.Context, *RootRequest) (*RootResponse, error)
Commit(context.Context, *CommitRequest) (*CommitResponse, error)
ListTableFiles(context.Context, *ListTableFilesRequest) (*ListTableFilesResponse, error)
RefreshTableFileUrl(context.Context, *RefreshTableFileUrlRequest) (*RefreshTableFileUrlResponse, error)
AddTableFiles(context.Context, *AddTableFilesRequest) (*AddTableFilesResponse, error)
mustEmbedUnimplementedChunkStoreServiceServer()
}
@@ -211,6 +222,9 @@ func (*UnimplementedChunkStoreServiceServer) Commit(context.Context, *CommitRequ
func (*UnimplementedChunkStoreServiceServer) ListTableFiles(context.Context, *ListTableFilesRequest) (*ListTableFilesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListTableFiles not implemented")
}
func (*UnimplementedChunkStoreServiceServer) RefreshTableFileUrl(context.Context, *RefreshTableFileUrlRequest) (*RefreshTableFileUrlResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RefreshTableFileUrl not implemented")
}
func (*UnimplementedChunkStoreServiceServer) AddTableFiles(context.Context, *AddTableFilesRequest) (*AddTableFilesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddTableFiles not implemented")
}
@@ -390,6 +404,24 @@ func _ChunkStoreService_ListTableFiles_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
func _ChunkStoreService_RefreshTableFileUrl_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RefreshTableFileUrlRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ChunkStoreServiceServer).RefreshTableFileUrl(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dolt.services.remotesapi.v1alpha1.ChunkStoreService/RefreshTableFileUrl",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ChunkStoreServiceServer).RefreshTableFileUrl(ctx, req.(*RefreshTableFileUrlRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ChunkStoreService_AddTableFiles_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddTableFilesRequest)
if err := dec(in); err != nil {
@@ -444,6 +476,10 @@ var _ChunkStoreService_serviceDesc = grpc.ServiceDesc{
MethodName: "ListTableFiles",
Handler: _ChunkStoreService_ListTableFiles_Handler,
},
{
MethodName: "RefreshTableFileUrl",
Handler: _ChunkStoreService_RefreshTableFileUrl_Handler,
},
{
MethodName: "AddTableFiles",
Handler: _ChunkStoreService_AddTableFiles_Handler,
@@ -1067,6 +1067,14 @@ func sanitizeSignedUrl(url string) string {
// Open returns an io.ReadCloser which can be used to read the bytes of a table file.
func (drtf DoltRemoteTableFile) Open(ctx context.Context) (io.ReadCloser, error) {
if drtf.info.RefreshAfter != nil && drtf.info.RefreshAfter.AsTime().After(time.Now()) {
resp, err := drtf.dcs.csClient.RefreshTableFileUrl(ctx, drtf.info.RefreshRequest)
if err == nil {
drtf.info.Url = resp.Url
drtf.info.RefreshAfter = resp.RefreshAfter
}
}
req, err := http.NewRequestWithContext(ctx, http.MethodGet, drtf.info.Url, nil)
if err != nil {
return nil, err
@@ -16,6 +16,8 @@ syntax = "proto3";
package dolt.services.remotesapi.v1alpha1;
import "google/protobuf/timestamp.proto";
option go_package = "github.com/dolthub/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1;remotesapi";
service ChunkStoreService {
@@ -44,6 +46,8 @@ service ChunkStoreService {
rpc ListTableFiles(ListTableFilesRequest) returns (ListTableFilesResponse);
rpc RefreshTableFileUrl(RefreshTableFileUrlRequest) returns (RefreshTableFileUrlResponse);
rpc AddTableFiles(AddTableFilesRequest) returns (AddTableFilesResponse);
}
@@ -183,6 +187,18 @@ message TableFileInfo {
string file_id = 1;
uint32 num_chunks = 2;
string url = 3;
google.protobuf.Timestamp refresh_after = 4;
RefreshTableFileUrlRequest refresh_request = 5;
}
message RefreshTableFileUrlRequest {
RepoId repo_id = 1;
string file_id = 2;
}
message RefreshTableFileUrlResponse {
string url = 1;
google.protobuf.Timestamp refresh_after = 2;
}
message ListTableFilesResponse {