Push and Pull v2 (#93)

This commit is contained in:
Brian Hendriks
2019-09-30 11:03:38 -07:00
committed by GitHub
43 changed files with 2200 additions and 433 deletions

View File

@@ -54,7 +54,6 @@ teardown() {
dolt remote add test-remote http://localhost:50051/test-org/test-repo
run dolt push test-remote master
[ "$status" -eq 0 ]
[ "$output" = "" ]
[ -d "$BATS_TMPDIR/remotes-$$/test-org/test-repo" ]
run dolt pull test-remote
[ "$status" -eq 0 ]
@@ -77,7 +76,6 @@ teardown() {
dolt checkout -b test-branch
run dolt push test-remote test-branch
[ "$status" -eq 0 ]
[ "$output" = "" ]
run dolt pull test-remote
[ "$status" -eq 0 ]
skip "Should say up to date not fast forward"
@@ -203,7 +201,6 @@ teardown() {
# [[ "$output" =~ "up to date" ]]
run dolt fetch
[ "$status" -eq 0 ]
[ "$output" = "" ]
run dolt merge origin/master
[ "$status" -eq 0 ]
[[ "$output" =~ "Fast-forward" ]]

View File

@@ -36,7 +36,7 @@ func testHomeDirFunc() (string, error) {
}
func createTestEnv() *env.DoltEnv {
initialDirs := []string{testHomeDir, filepath.Join(workingDir, dbfactory.DoltDir)}
initialDirs := []string{testHomeDir, filepath.Join(workingDir, dbfactory.DoltDir), filepath.Join(workingDir, dbfactory.DoltDataDir)}
fs := filesys.NewInMemFS(initialDirs, nil, workingDir)
dEnv := env.Load(context.Background(), testHomeDirFunc, fs, doltdb.InMemDoltDB)

View File

@@ -26,7 +26,6 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/ref"
"github.com/liquidata-inc/dolt/go/libraries/events"
"github.com/liquidata-inc/dolt/go/libraries/utils/argparser"
"github.com/liquidata-inc/dolt/go/store/datas"
)
var fetchShortDesc = "Download objects and refs from another repository"
@@ -147,7 +146,7 @@ func fetchRefSpecs(ctx context.Context, dEnv *env.DoltEnv, rem env.Remote, refSp
remoteTrackRef := rs.DestRef(branchRef)
if remoteTrackRef != nil {
verr := fetchRemoteBranch(ctx, rem, srcDB, dEnv.DoltDB, branchRef, remoteTrackRef)
verr := fetchRemoteBranch(ctx, dEnv, rem, srcDB, dEnv.DoltDB, branchRef, remoteTrackRef)
if verr != nil {
return verr
@@ -159,7 +158,7 @@ func fetchRefSpecs(ctx context.Context, dEnv *env.DoltEnv, rem env.Remote, refSp
return nil
}
func fetchRemoteBranch(ctx context.Context, rem env.Remote, srcDB, destDB *doltdb.DoltDB, srcRef, destRef ref.DoltRef) errhand.VerboseError {
func fetchRemoteBranch(ctx context.Context, dEnv *env.DoltEnv, rem env.Remote, srcDB, destDB *doltdb.DoltDB, srcRef, destRef ref.DoltRef) errhand.VerboseError {
evt := events.GetEventFromContext(ctx)
evt.SetAttribute(eventsapi.AttributeID_ACTIVE_REMOTE_URL, rem.Url)
@@ -169,14 +168,9 @@ func fetchRemoteBranch(ctx context.Context, rem env.Remote, srcDB, destDB *doltd
if err != nil {
return errhand.BuildDError("error: unable to find '%s' on '%s'", srcRef.GetPath(), rem.Name).Build()
} else {
progChan := make(chan datas.PullProgress)
stopChan := make(chan struct{})
go progFunc(progChan, stopChan)
err = actions.Fetch(ctx, destRef, srcDB, destDB, cm, progChan)
close(progChan)
<-stopChan
wg, progChan, pullerEventCh := runProgFuncs()
err = actions.Fetch(ctx, dEnv, destRef, srcDB, destDB, cm, progChan, pullerEventCh)
stopProgFuncs(wg, progChan, pullerEventCh)
if err != nil {
return errhand.BuildDError("error: fetch failed").AddCause(err).Build()

View File

@@ -81,7 +81,7 @@ func pullRemoteBranch(ctx context.Context, dEnv *env.DoltEnv, r env.Remote, srcR
return errhand.BuildDError("error: failed to get remote db").AddCause(err).Build()
}
verr := fetchRemoteBranch(ctx, r, srcDB, dEnv.DoltDB, srcRef, destRef)
verr := fetchRemoteBranch(ctx, dEnv, r, srcDB, dEnv.DoltDB, srcRef, destRef)
if verr != nil {
return verr

View File

@@ -17,6 +17,7 @@ package commands
import (
"context"
"fmt"
"sync"
"time"
"github.com/dustin/go-humanize"
@@ -177,7 +178,7 @@ func Push(ctx context.Context, commandStr string, args []string, dEnv *env.DoltE
} else if src == ref.EmptyBranchRef {
verr = deleteRemoteBranch(ctx, dest, remoteRef, dEnv.DoltDB, destDB, remote)
} else {
verr = pushToRemoteBranch(ctx, src, dest, remoteRef, dEnv.DoltDB, destDB, remote)
verr = pushToRemoteBranch(ctx, dEnv, src, dest, remoteRef, dEnv.DoltDB, destDB, remote)
}
}
@@ -231,7 +232,7 @@ func deleteRemoteBranch(ctx context.Context, toDelete, remoteRef ref.DoltRef, lo
return nil
}
func pushToRemoteBranch(ctx context.Context, srcRef, destRef, remoteRef ref.DoltRef, localDB, remoteDB *doltdb.DoltDB, remote env.Remote) errhand.VerboseError {
func pushToRemoteBranch(ctx context.Context, dEnv *env.DoltEnv, srcRef, destRef, remoteRef ref.DoltRef, localDB, remoteDB *doltdb.DoltDB, remote env.Remote) errhand.VerboseError {
evt := events.GetEventFromContext(ctx)
evt.SetAttribute(eventsapi.AttributeID_ACTIVE_REMOTE_URL, remote.Url)
@@ -241,14 +242,9 @@ func pushToRemoteBranch(ctx context.Context, srcRef, destRef, remoteRef ref.Dolt
if err != nil {
return errhand.BuildDError("error: unable to find %v", srcRef.GetPath()).Build()
} else {
progChan := make(chan datas.PullProgress, 16)
stopChan := make(chan struct{})
go progFunc(progChan, stopChan)
err = actions.Push(ctx, destRef.(ref.BranchRef), remoteRef.(ref.RemoteRef), localDB, remoteDB, cm, progChan)
close(progChan)
<-stopChan
wg, progChan, pullerEventCh := runProgFuncs()
err = actions.Push(ctx, dEnv, destRef.(ref.BranchRef), remoteRef.(ref.RemoteRef), localDB, remoteDB, cm, progChan, pullerEventCh)
stopProgFuncs(wg, progChan, pullerEventCh)
if err != nil {
if err == doltdb.ErrUpToDate {
@@ -269,7 +265,58 @@ func pushToRemoteBranch(ctx context.Context, srcRef, destRef, remoteRef ref.Dolt
return nil
}
func progFunc(progChan chan datas.PullProgress, stopChan chan struct{}) {
func pullerProgFunc(pullerEventCh chan datas.PullerEvent) {
var pos int
for evt := range pullerEventCh {
switch evt.EventType {
case datas.NewLevelTWEvent:
if evt.TWEventDetails.TreeLevel == -1 {
continue
}
pos = cli.DeleteAndPrint(0, fmt.Sprintf("Tree Level: %d has %d new chunks. Determining how many are needed.", evt.TWEventDetails.TreeLevel, evt.TWEventDetails.ChunksInLevel))
case datas.DestDBHasTWEvent:
if evt.TWEventDetails.TreeLevel == -1 {
continue
}
cli.DeleteAndPrint(pos, fmt.Sprintf("Tree Level: %d has %d new chunks of which %d already exist in the database. Buffering %d chunks.\n", evt.TWEventDetails.TreeLevel, evt.TWEventDetails.ChunksInLevel, evt.TWEventDetails.ChunksAlreadyHad, evt.TWEventDetails.ChunksInLevel-evt.TWEventDetails.ChunksAlreadyHad))
pos = 0
case datas.LevelUpdateTWEvent:
if evt.TWEventDetails.TreeLevel == -1 {
continue
}
toBuffer := evt.TWEventDetails.ChunksInLevel - evt.TWEventDetails.ChunksAlreadyHad
var percentBuffered float64
if toBuffer > 0 {
percentBuffered = 100 * float64(evt.TWEventDetails.ChunksBuffered) / float64(toBuffer)
}
pos = cli.DeleteAndPrint(pos, fmt.Sprintf("Tree Level: %d. %.2f%% of new chunks buffered.", evt.TWEventDetails.TreeLevel, percentBuffered))
case datas.LevelDoneTWEvent:
if evt.TWEventDetails.TreeLevel == -1 {
continue
}
_ = cli.DeleteAndPrint(pos, fmt.Sprintf("Tree Level: %d. %.2f%% of new chunks buffered.", evt.TWEventDetails.TreeLevel, 100.0))
pos = 0
cli.Println("")
case datas.StartUploadTableFile:
pos = cli.DeleteAndPrint(pos, fmt.Sprintf("Uploading table file %d of %d. %s bytes", evt.TFEventDetails.TableFilesUploaded+1, evt.TFEventDetails.TableFileCount, humanize.Bytes(uint64(evt.TFEventDetails.CurrentFileSize))))
case datas.EndUpdateTableFile:
pos = cli.DeleteAndPrint(pos, fmt.Sprintf("Successfully uploaded %d of %d table files.", evt.TFEventDetails.TableFilesUploaded, evt.TFEventDetails.TableFileCount))
}
}
}
func progFunc(progChan chan datas.PullProgress) {
var latest datas.PullProgress
last := time.Now().UnixNano() - 1
lenPrinted := 0
@@ -301,7 +348,32 @@ func progFunc(progChan chan datas.PullProgress, stopChan chan struct{}) {
if lenPrinted > 0 {
cli.Println()
}
stopChan <- struct{}{}
}
func runProgFuncs() (*sync.WaitGroup, chan datas.PullProgress, chan datas.PullerEvent) {
pullerEventCh := make(chan datas.PullerEvent, 128)
progChan := make(chan datas.PullProgress, 128)
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
progFunc(progChan)
}()
wg.Add(1)
go func() {
defer wg.Done()
pullerProgFunc(pullerEventCh)
}()
return wg, progChan, pullerEventCh
}
func stopProgFuncs(wg *sync.WaitGroup, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) {
close(progChan)
close(pullerEventCh)
wg.Wait()
}
func bytesPerSec(bytes uint64, start time.Time) string {

View File

@@ -1262,6 +1262,100 @@ func (m *ListTableFilesResponse) GetTableFileInfo() []*TableFileInfo {
return nil
}
type AddTableFilesRequest struct {
RepoId *RepoId `protobuf:"bytes,1,opt,name=repo_id,json=repoId,proto3" json:"repo_id,omitempty"`
ClientRepoFormat *ClientRepoFormat `protobuf:"bytes,2,opt,name=client_repo_format,json=clientRepoFormat,proto3" json:"client_repo_format,omitempty"`
ChunkTableInfo []*ChunkTableInfo `protobuf:"bytes,3,rep,name=chunk_table_info,json=chunkTableInfo,proto3" json:"chunk_table_info,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AddTableFilesRequest) Reset() { *m = AddTableFilesRequest{} }
func (m *AddTableFilesRequest) String() string { return proto.CompactTextString(m) }
func (*AddTableFilesRequest) ProtoMessage() {}
func (*AddTableFilesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_702c187af9ca94ec, []int{26}
}
func (m *AddTableFilesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AddTableFilesRequest.Unmarshal(m, b)
}
func (m *AddTableFilesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AddTableFilesRequest.Marshal(b, m, deterministic)
}
func (m *AddTableFilesRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_AddTableFilesRequest.Merge(m, src)
}
func (m *AddTableFilesRequest) XXX_Size() int {
return xxx_messageInfo_AddTableFilesRequest.Size(m)
}
func (m *AddTableFilesRequest) XXX_DiscardUnknown() {
xxx_messageInfo_AddTableFilesRequest.DiscardUnknown(m)
}
var xxx_messageInfo_AddTableFilesRequest proto.InternalMessageInfo
func (m *AddTableFilesRequest) GetRepoId() *RepoId {
if m != nil {
return m.RepoId
}
return nil
}
func (m *AddTableFilesRequest) GetClientRepoFormat() *ClientRepoFormat {
if m != nil {
return m.ClientRepoFormat
}
return nil
}
func (m *AddTableFilesRequest) GetChunkTableInfo() []*ChunkTableInfo {
if m != nil {
return m.ChunkTableInfo
}
return nil
}
type AddTableFilesResponse struct {
Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *AddTableFilesResponse) Reset() { *m = AddTableFilesResponse{} }
func (m *AddTableFilesResponse) String() string { return proto.CompactTextString(m) }
func (*AddTableFilesResponse) ProtoMessage() {}
func (*AddTableFilesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_702c187af9ca94ec, []int{27}
}
func (m *AddTableFilesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_AddTableFilesResponse.Unmarshal(m, b)
}
func (m *AddTableFilesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_AddTableFilesResponse.Marshal(b, m, deterministic)
}
func (m *AddTableFilesResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_AddTableFilesResponse.Merge(m, src)
}
func (m *AddTableFilesResponse) XXX_Size() int {
return xxx_messageInfo_AddTableFilesResponse.Size(m)
}
func (m *AddTableFilesResponse) XXX_DiscardUnknown() {
xxx_messageInfo_AddTableFilesResponse.DiscardUnknown(m)
}
var xxx_messageInfo_AddTableFilesResponse proto.InternalMessageInfo
func (m *AddTableFilesResponse) GetSuccess() bool {
if m != nil {
return m.Success
}
return false
}
func init() {
proto.RegisterType((*RepoId)(nil), "dolt.services.remotesapi.v1alpha1.RepoId")
proto.RegisterType((*HasChunksRequest)(nil), "dolt.services.remotesapi.v1alpha1.HasChunksRequest")
@@ -1289,6 +1383,8 @@ func init() {
proto.RegisterType((*ListTableFilesRequest)(nil), "dolt.services.remotesapi.v1alpha1.ListTableFilesRequest")
proto.RegisterType((*TableFileInfo)(nil), "dolt.services.remotesapi.v1alpha1.TableFileInfo")
proto.RegisterType((*ListTableFilesResponse)(nil), "dolt.services.remotesapi.v1alpha1.ListTableFilesResponse")
proto.RegisterType((*AddTableFilesRequest)(nil), "dolt.services.remotesapi.v1alpha1.AddTableFilesRequest")
proto.RegisterType((*AddTableFilesResponse)(nil), "dolt.services.remotesapi.v1alpha1.AddTableFilesResponse")
}
func init() {
@@ -1296,75 +1392,78 @@ func init() {
}
var fileDescriptor_702c187af9ca94ec = []byte{
// 1083 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x57, 0x5b, 0x6f, 0x1b, 0xc5,
0x17, 0xcf, 0x26, 0xfe, 0xfb, 0x72, 0x7c, 0x89, 0xb3, 0xfa, 0x37, 0xb5, 0x8c, 0x50, 0xd3, 0x95,
0x40, 0xa1, 0xa5, 0xde, 0xc6, 0xad, 0xd4, 0x16, 0x5e, 0x50, 0x42, 0x1b, 0x47, 0x0a, 0xa8, 0x4c,
0x0a, 0x25, 0x44, 0x68, 0xb5, 0x5e, 0x8f, 0xed, 0x15, 0xbb, 0x33, 0xdb, 0x9d, 0xd9, 0xf0, 0xc8,
0x13, 0x97, 0x17, 0xc4, 0x0b, 0xdf, 0x07, 0xf8, 0x64, 0xa0, 0xb9, 0xac, 0xb3, 0xeb, 0x1a, 0x75,
0x52, 0x59, 0xbc, 0xf9, 0x1c, 0xcf, 0x39, 0xe7, 0xf7, 0x3b, 0x33, 0xe7, 0xb2, 0x30, 0x9c, 0xd0,
0x88, 0xbb, 0x0c, 0xa7, 0x97, 0x61, 0x80, 0x99, 0x9b, 0xe2, 0x98, 0x72, 0xcc, 0xfc, 0x24, 0x74,
0x2f, 0x0f, 0xfc, 0x28, 0x99, 0xfb, 0x07, 0x6e, 0x30, 0xcf, 0xc8, 0x77, 0x8c, 0xd3, 0x14, 0x0f,
0x92, 0x94, 0x72, 0x6a, 0xdf, 0x16, 0x36, 0x83, 0xdc, 0x66, 0x70, 0x65, 0x33, 0xc8, 0x6d, 0x9c,
0x47, 0x50, 0x45, 0x38, 0xa1, 0x27, 0x13, 0xbb, 0x0b, 0x5b, 0x34, 0x9d, 0xf5, 0xac, 0x3d, 0x6b,
0xbf, 0x81, 0xc4, 0x4f, 0xfb, 0x1d, 0x68, 0xa4, 0x38, 0xa1, 0x1e, 0xf1, 0x63, 0xdc, 0xdb, 0x94,
0xfa, 0xba, 0x50, 0x7c, 0xee, 0xc7, 0xd8, 0x21, 0xd0, 0x1d, 0xf9, 0xec, 0x48, 0x86, 0x44, 0xf8,
0x55, 0x86, 0x19, 0xb7, 0x0f, 0xa1, 0x26, 0x0d, 0xc2, 0x89, 0x74, 0xd3, 0x1c, 0x7e, 0x30, 0x78,
0x23, 0x82, 0x81, 0x0a, 0x8f, 0xaa, 0xa9, 0x82, 0xb1, 0x0b, 0xd5, 0xb9, 0xcf, 0xe6, 0x98, 0xf5,
0x36, 0xf7, 0xb6, 0xf6, 0x5b, 0x48, 0x4b, 0xce, 0x5d, 0xd8, 0x29, 0xc4, 0x63, 0x09, 0x25, 0x0c,
0x8b, 0xc3, 0xfe, 0x98, 0x61, 0xc2, 0x7b, 0xd6, 0xde, 0xd6, 0xfe, 0xff, 0x90, 0x96, 0x9c, 0xc7,
0xd0, 0x1a, 0x71, 0x9e, 0x1c, 0x63, 0x2e, 0x0d, 0x04, 0xb7, 0x2c, 0x8d, 0x72, 0x6e, 0x59, 0x1a,
0xfd, 0x6b, 0x98, 0xe7, 0x00, 0xc8, 0x27, 0x33, 0xac, 0xec, 0x6c, 0xa8, 0x08, 0xbd, 0x34, 0x6c,
0x21, 0xf9, 0x5b, 0x58, 0xd2, 0xe9, 0x94, 0x61, 0x2e, 0x53, 0x52, 0x41, 0x5a, 0x12, 0xfa, 0x08,
0x93, 0x19, 0x9f, 0xf7, 0xb6, 0xf6, 0xac, 0xfd, 0x36, 0xd2, 0x92, 0x33, 0x5b, 0x60, 0x91, 0x8e,
0x57, 0x60, 0x79, 0x0a, 0xd5, 0x54, 0xfc, 0xa5, 0xb0, 0x34, 0x87, 0xf7, 0x4c, 0xb2, 0xb6, 0x00,
0x89, 0xb4, 0xb1, 0xf3, 0xa7, 0x05, 0xcd, 0x4f, 0xe9, 0xf7, 0x24, 0xa2, 0xfe, 0xe4, 0x94, 0x06,
0xf6, 0x29, 0xd4, 0xe7, 0x9c, 0x27, 0xde, 0x0c, 0x73, 0x7d, 0x1d, 0xae, 0x81, 0xe3, 0x62, 0xde,
0x46, 0x1b, 0xa8, 0x36, 0x57, 0xb2, 0xfd, 0x12, 0x3a, 0xb9, 0x37, 0x4f, 0x06, 0x94, 0xf4, 0xaf,
0xe5, 0x53, 0x62, 0x1e, 0x6d, 0xa0, 0xd6, 0xbc, 0x20, 0x1f, 0x02, 0xd4, 0x23, 0x1a, 0xf8, 0x3c,
0xa4, 0xc4, 0x79, 0x0f, 0x76, 0xc4, 0xd9, 0xe7, 0x94, 0xf1, 0x17, 0xfe, 0x38, 0xc2, 0xcf, 0xc2,
0x68, 0x45, 0xc2, 0x9c, 0xdf, 0x2d, 0x68, 0x7c, 0x99, 0xe4, 0x3c, 0xdf, 0x87, 0x6d, 0x2e, 0x0e,
0x7b, 0xd3, 0x30, 0xc2, 0x5e, 0xe1, 0xbe, 0xda, 0x3c, 0xf7, 0x31, 0x12, 0x17, 0x77, 0x06, 0x0d,
0xc9, 0x20, 0xa1, 0x8c, 0x6b, 0xf0, 0x0f, 0x0d, 0xc1, 0x97, 0x00, 0x8d, 0x36, 0x90, 0x4c, 0xac,
0x50, 0x96, 0xd0, 0xff, 0x00, 0xbb, 0xc7, 0x98, 0x17, 0xae, 0x60, 0xad, 0x85, 0x71, 0x1b, 0x5a,
0xb2, 0xc0, 0xbd, 0xd2, 0xbb, 0x6d, 0x4a, 0xdd, 0x48, 0x3d, 0xde, 0x6f, 0xe1, 0xe6, 0x6b, 0x00,
0x74, 0xa5, 0x1c, 0x42, 0x25, 0xa2, 0x01, 0x93, 0x75, 0xd2, 0x1c, 0x0e, 0x0c, 0xc2, 0x17, 0xdc,
0x20, 0x69, 0xeb, 0xfc, 0x64, 0xc1, 0xff, 0x8f, 0x31, 0x5f, 0x64, 0x7e, 0xad, 0xf4, 0xee, 0xc0,
0xce, 0xd2, 0x2d, 0x2e, 0x38, 0x6e, 0x97, 0xee, 0x11, 0x33, 0xe7, 0x1c, 0x6e, 0x2c, 0xe1, 0xd0,
0x2c, 0x3f, 0x29, 0xb1, 0xfc, 0xd0, 0x00, 0xc5, 0xc2, 0x89, 0xe6, 0x78, 0x06, 0x6d, 0x84, 0xc7,
0x3e, 0xc3, 0x6b, 0xe4, 0xe6, 0x74, 0xa1, 0x93, 0x3b, 0x55, 0x40, 0x9d, 0x2f, 0xa0, 0x89, 0x28,
0xe5, 0xeb, 0x0c, 0x72, 0x17, 0x5a, 0xca, 0xa5, 0xce, 0x85, 0xe8, 0xde, 0x94, 0xf2, 0x62, 0x41,
0xd4, 0x85, 0x42, 0xe4, 0xd0, 0x79, 0x0a, 0x1d, 0x59, 0xe1, 0xf2, 0x51, 0x9f, 0x90, 0x29, 0x5d,
0xd9, 0xea, 0x6e, 0x81, 0x7a, 0x5e, 0x5e, 0x40, 0x33, 0xa2, 0x6a, 0xa6, 0x8d, 0x40, 0xaa, 0x8e,
0x84, 0xc6, 0xf9, 0x6b, 0x13, 0xda, 0x47, 0x34, 0x8e, 0xc3, 0x75, 0x32, 0xb1, 0x7b, 0x50, 0x0b,
0xb2, 0x34, 0xc5, 0x3a, 0x64, 0x0b, 0xe5, 0xa2, 0x00, 0x19, 0xf9, 0x8c, 0xcb, 0x0e, 0xdb, 0x42,
0xf2, 0xb7, 0x7d, 0x01, 0x5d, 0x05, 0x52, 0x3d, 0x9f, 0x90, 0x4c, 0x69, 0xaf, 0x22, 0xef, 0xff,
0xc0, 0x20, 0x74, 0x39, 0x0b, 0xa8, 0x13, 0x94, 0xb3, 0xe2, 0x83, 0x1d, 0x44, 0x21, 0x26, 0xdc,
0x93, 0xac, 0xa6, 0x34, 0x8d, 0x7d, 0xde, 0xeb, 0x48, 0x66, 0x0f, 0x4c, 0xdc, 0x4b, 0x63, 0xc1,
0xef, 0x99, 0x34, 0x45, 0xdd, 0x60, 0x49, 0xe3, 0xdc, 0x81, 0x4e, 0x9e, 0x42, 0x7d, 0x73, 0x3d,
0xa8, 0xb1, 0x2c, 0x08, 0x30, 0x63, 0x32, 0x87, 0x75, 0x94, 0x8b, 0xce, 0x1f, 0x96, 0x6c, 0x31,
0xc2, 0xfa, 0x33, 0xcc, 0xfd, 0x89, 0xcf, 0xfd, 0x75, 0x26, 0xfe, 0x3f, 0x60, 0x7b, 0x21, 0x5b,
0x54, 0x99, 0x80, 0xa6, 0x7d, 0x0b, 0x9a, 0x64, 0x3c, 0xf5, 0x2e, 0x71, 0xca, 0x42, 0x4a, 0x74,
0xbf, 0x07, 0x32, 0x9e, 0x7e, 0xa5, 0x34, 0xea, 0x00, 0x5b, 0x1c, 0xd8, 0xcc, 0x0f, 0x30, 0x7d,
0xc0, 0x79, 0x01, 0xdd, 0x65, 0x08, 0x6b, 0xf0, 0x7a, 0x01, 0x37, 0x4e, 0xc3, 0x42, 0xff, 0x5f,
0x67, 0xdb, 0x73, 0xce, 0xa1, 0xbd, 0x70, 0x2c, 0x5f, 0xdc, 0x4d, 0xa8, 0xc9, 0x0e, 0xa8, 0x9d,
0x36, 0x50, 0x55, 0x88, 0x27, 0x13, 0xfb, 0x5d, 0x00, 0x92, 0xc5, 0x9e, 0x5a, 0xf2, 0x74, 0x2d,
0x36, 0x48, 0x16, 0xab, 0x95, 0x28, 0x9f, 0x92, 0x5b, 0x57, 0x53, 0xf2, 0x37, 0x0b, 0x76, 0x97,
0x81, 0x1b, 0xf4, 0x06, 0xfb, 0xeb, 0xd2, 0x3c, 0x95, 0xf5, 0xa4, 0xf6, 0x92, 0xfb, 0x06, 0xf4,
0x4a, 0x64, 0x0a, 0x13, 0x58, 0x88, 0xc3, 0xbf, 0x6b, 0xb0, 0x23, 0xe1, 0x9e, 0x89, 0x25, 0xf5,
0x4c, 0xf9, 0xb1, 0x7f, 0xb1, 0x60, 0x7b, 0xe9, 0x4d, 0xd8, 0x4f, 0x0c, 0x42, 0xad, 0x2e, 0x84,
0xfe, 0x47, 0x6f, 0x63, 0xaa, 0xf3, 0x72, 0x09, 0x8d, 0xc5, 0x92, 0x69, 0x9b, 0xbc, 0xf8, 0xe5,
0x15, 0xb8, 0xff, 0xf0, 0x7a, 0x46, 0x3a, 0xee, 0xaf, 0x6a, 0xb2, 0x16, 0x46, 0xae, 0xdc, 0x28,
0x98, 0x69, 0x1e, 0x56, 0xec, 0x1c, 0xa6, 0x79, 0x58, 0xb9, 0x2d, 0xfc, 0x6c, 0x81, 0x5d, 0x9c,
0xb0, 0x1a, 0xcd, 0x23, 0x33, 0x97, 0xaf, 0x2d, 0x08, 0xfd, 0xc7, 0xd7, 0x37, 0xd4, 0x48, 0x62,
0xf1, 0x7d, 0x22, 0x46, 0xa7, 0x7d, 0xdf, 0xa8, 0xb8, 0x0a, 0xa3, 0xbb, 0x7f, 0x70, 0x0d, 0x0b,
0x1d, 0x6e, 0x06, 0x15, 0x31, 0x44, 0x6d, 0x93, 0x05, 0xa9, 0x30, 0xc0, 0xfb, 0xae, 0xf1, 0xf9,
0x2b, 0x5e, 0xaa, 0xeb, 0x1b, 0xf1, 0x2a, 0xcd, 0x58, 0x23, 0x5e, 0x4b, 0x23, 0xe5, 0x47, 0x0b,
0x3a, 0xe5, 0x5e, 0x60, 0x9b, 0xdc, 0xc9, 0xca, 0xbe, 0xd7, 0x7f, 0xf2, 0x16, 0x96, 0x0a, 0xc7,
0xe1, 0xf9, 0x37, 0x2f, 0x67, 0x21, 0x9f, 0x67, 0xe3, 0x41, 0x40, 0x63, 0x37, 0x0a, 0x5f, 0x65,
0xa1, 0x28, 0xc0, 0x7b, 0x21, 0x09, 0x5c, 0xf9, 0x81, 0x3b, 0xa3, 0xee, 0x0c, 0x13, 0x57, 0x7e,
0xbf, 0xba, 0x6f, 0xfc, 0xe4, 0xfd, 0xf8, 0x4a, 0x37, 0xae, 0x4a, 0x9b, 0x07, 0xff, 0x04, 0x00,
0x00, 0xff, 0xff, 0xa2, 0xf7, 0x14, 0x4c, 0x29, 0x0f, 0x00, 0x00,
// 1136 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x58, 0xdd, 0x6f, 0x1b, 0x45,
0x10, 0xcf, 0xd9, 0xc1, 0x89, 0xc7, 0x1f, 0x71, 0x4e, 0x4d, 0x6a, 0x19, 0xa1, 0xa6, 0x2b, 0x81,
0x42, 0x4b, 0x7d, 0x8d, 0x5b, 0xa9, 0x2d, 0xbc, 0x40, 0x42, 0x1b, 0x47, 0x0a, 0xa8, 0x6c, 0x0a,
0x25, 0x44, 0xc8, 0x3a, 0x9f, 0xd7, 0xf6, 0x89, 0xf3, 0xae, 0x7b, 0xbb, 0x17, 0x1e, 0x91, 0x10,
0x5f, 0x2f, 0x88, 0x17, 0x9e, 0xf8, 0x67, 0x80, 0xff, 0x0c, 0xed, 0xc7, 0x39, 0x77, 0xae, 0xab,
0x6c, 0x2a, 0xd3, 0x37, 0xef, 0x64, 0x7f, 0x33, 0xbf, 0xdf, 0xec, 0xce, 0xec, 0xe4, 0xa0, 0x33,
0x60, 0x91, 0xf0, 0x38, 0x89, 0xcf, 0xc3, 0x80, 0x70, 0x2f, 0x26, 0x13, 0x26, 0x08, 0xf7, 0xa7,
0xa1, 0x77, 0xbe, 0xe7, 0x47, 0xd3, 0xb1, 0xbf, 0xe7, 0x05, 0xe3, 0x84, 0x7e, 0xc7, 0x05, 0x8b,
0x49, 0x7b, 0x1a, 0x33, 0xc1, 0xdc, 0x9b, 0x12, 0xd3, 0x4e, 0x31, 0xed, 0x0b, 0x4c, 0x3b, 0xc5,
0xa0, 0x07, 0x50, 0xc2, 0x64, 0xca, 0x8e, 0x06, 0x6e, 0x03, 0x8a, 0x2c, 0x1e, 0x35, 0x9d, 0x1d,
0x67, 0xb7, 0x8c, 0xe5, 0x4f, 0xf7, 0x6d, 0x28, 0xc7, 0x64, 0xca, 0x7a, 0xd4, 0x9f, 0x90, 0x66,
0x41, 0xd9, 0xd7, 0xa5, 0xe1, 0x73, 0x7f, 0x42, 0x10, 0x85, 0x46, 0xd7, 0xe7, 0x07, 0x2a, 0x24,
0x26, 0x2f, 0x12, 0xc2, 0x85, 0xbb, 0x0f, 0x6b, 0x0a, 0x10, 0x0e, 0x94, 0x9b, 0x4a, 0xe7, 0xfd,
0xf6, 0xa5, 0x0c, 0xda, 0x3a, 0x3c, 0x2e, 0xc5, 0x9a, 0xc6, 0x36, 0x94, 0xc6, 0x3e, 0x1f, 0x13,
0xde, 0x2c, 0xec, 0x14, 0x77, 0xab, 0xd8, 0xac, 0xd0, 0x6d, 0xd8, 0xcc, 0xc4, 0xe3, 0x53, 0x46,
0x39, 0x91, 0x9b, 0xfd, 0x3e, 0x27, 0x54, 0x34, 0x9d, 0x9d, 0xe2, 0xee, 0x5b, 0xd8, 0xac, 0xd0,
0x43, 0xa8, 0x76, 0x85, 0x98, 0x1e, 0x12, 0xa1, 0x00, 0x52, 0x5b, 0x12, 0x47, 0xa9, 0xb6, 0x24,
0x8e, 0x5e, 0x19, 0xe6, 0x29, 0x00, 0xf6, 0xe9, 0x88, 0x68, 0x9c, 0x0b, 0xab, 0xd2, 0xae, 0x80,
0x55, 0xac, 0x7e, 0x4b, 0x24, 0x1b, 0x0e, 0x39, 0x11, 0x2a, 0x25, 0xab, 0xd8, 0xac, 0xa4, 0x3d,
0x22, 0x74, 0x24, 0xc6, 0xcd, 0xe2, 0x8e, 0xb3, 0x5b, 0xc3, 0x66, 0x85, 0x46, 0x33, 0x2e, 0xca,
0xf1, 0x02, 0x2e, 0x8f, 0xa1, 0x14, 0xcb, 0x3f, 0x69, 0x2e, 0x95, 0xce, 0x1d, 0x9b, 0xac, 0xcd,
0x48, 0x62, 0x03, 0x46, 0xff, 0x38, 0x50, 0xf9, 0x94, 0x7d, 0x4f, 0x23, 0xe6, 0x0f, 0x8e, 0x59,
0xe0, 0x1e, 0xc3, 0xfa, 0x58, 0x88, 0x69, 0x6f, 0x44, 0x84, 0x39, 0x0e, 0xcf, 0xc2, 0x71, 0x36,
0x6f, 0xdd, 0x15, 0xbc, 0x36, 0xd6, 0x6b, 0xf7, 0x39, 0xd4, 0x53, 0x6f, 0x3d, 0x15, 0x50, 0xc9,
0xbf, 0x92, 0x4f, 0xc5, 0xb9, 0xbb, 0x82, 0xab, 0xe3, 0xcc, 0x7a, 0x1f, 0x60, 0x3d, 0x62, 0x81,
0x2f, 0x42, 0x46, 0xd1, 0xbb, 0xb0, 0x29, 0xf7, 0x3e, 0x65, 0x5c, 0x3c, 0xf3, 0xfb, 0x11, 0x79,
0x12, 0x46, 0x0b, 0x12, 0x86, 0xfe, 0x74, 0xa0, 0xfc, 0xe5, 0x34, 0xd5, 0xf9, 0x1e, 0x6c, 0x08,
0xb9, 0xb9, 0x37, 0x0c, 0x23, 0xd2, 0xcb, 0x9c, 0x57, 0x4d, 0xa4, 0x3e, 0xba, 0xf2, 0xe0, 0x4e,
0xa0, 0xac, 0x14, 0x4c, 0x19, 0x17, 0x86, 0xfc, 0x7d, 0x4b, 0xf2, 0x39, 0x42, 0xdd, 0x15, 0xac,
0x12, 0x2b, 0x8d, 0x39, 0xf6, 0x3f, 0xc0, 0xf6, 0x21, 0x11, 0x99, 0x23, 0x58, 0x6a, 0x61, 0xdc,
0x84, 0xaa, 0x2a, 0xf0, 0x5e, 0xee, 0xde, 0x56, 0x94, 0xad, 0xab, 0x2f, 0xef, 0xb7, 0x70, 0xfd,
0x25, 0x02, 0xa6, 0x52, 0xf6, 0x61, 0x35, 0x62, 0x01, 0x57, 0x75, 0x52, 0xe9, 0xb4, 0x2d, 0xc2,
0x67, 0xdc, 0x60, 0x85, 0x45, 0xbf, 0x38, 0x70, 0xed, 0x90, 0x88, 0x59, 0xe6, 0x97, 0x2a, 0xef,
0x16, 0x6c, 0xce, 0x9d, 0xe2, 0x4c, 0xe3, 0x46, 0xee, 0x1c, 0x09, 0x47, 0xa7, 0xb0, 0x35, 0xc7,
0xc3, 0xa8, 0xfc, 0x38, 0xa7, 0xf2, 0x03, 0x0b, 0x16, 0x33, 0x27, 0x46, 0xe3, 0x09, 0xd4, 0x30,
0xe9, 0xfb, 0x9c, 0x2c, 0x51, 0x1b, 0x6a, 0x40, 0x3d, 0x75, 0xaa, 0x89, 0xa2, 0x2f, 0xa0, 0x82,
0x19, 0x13, 0xcb, 0x0c, 0x72, 0x1b, 0xaa, 0xda, 0xa5, 0xc9, 0x85, 0xec, 0xde, 0x8c, 0x89, 0x6c,
0x41, 0xac, 0x4b, 0x83, 0xcc, 0x21, 0x7a, 0x0c, 0x75, 0x55, 0xe1, 0xea, 0x52, 0x1f, 0xd1, 0x21,
0x5b, 0xd8, 0xea, 0x6e, 0x80, 0xbe, 0x5e, 0xbd, 0x80, 0x25, 0x54, 0xd7, 0x4c, 0x0d, 0x83, 0x32,
0x1d, 0x48, 0x0b, 0xfa, 0xb7, 0x00, 0xb5, 0x03, 0x36, 0x99, 0x84, 0xcb, 0x54, 0xe2, 0x36, 0x61,
0x2d, 0x48, 0xe2, 0x98, 0x98, 0x90, 0x55, 0x9c, 0x2e, 0x25, 0xc9, 0xc8, 0xe7, 0x42, 0x75, 0xd8,
0x2a, 0x56, 0xbf, 0xdd, 0x33, 0x68, 0x68, 0x92, 0xfa, 0xfa, 0x84, 0x74, 0xc8, 0x9a, 0xab, 0xea,
0xfc, 0xf7, 0x2c, 0x42, 0xe7, 0xb3, 0x80, 0xeb, 0x41, 0x3e, 0x2b, 0x3e, 0xb8, 0x41, 0x14, 0x12,
0x2a, 0x7a, 0x4a, 0xd5, 0x90, 0xc5, 0x13, 0x5f, 0x34, 0xeb, 0x4a, 0xd9, 0x3d, 0x1b, 0xf7, 0x0a,
0x2c, 0xf5, 0x3d, 0x51, 0x50, 0xdc, 0x08, 0xe6, 0x2c, 0xe8, 0x16, 0xd4, 0xd3, 0x14, 0x9a, 0x93,
0x6b, 0xc2, 0x1a, 0x4f, 0x82, 0x80, 0x70, 0xae, 0x72, 0xb8, 0x8e, 0xd3, 0x25, 0xfa, 0xdb, 0x51,
0x2d, 0x46, 0xa2, 0x3f, 0x23, 0xc2, 0x1f, 0xf8, 0xc2, 0x5f, 0x66, 0xe2, 0xdf, 0x80, 0xda, 0x33,
0xd5, 0xa2, 0xf2, 0x02, 0x8c, 0xec, 0x1b, 0x50, 0xa1, 0xfd, 0x61, 0xef, 0x9c, 0xc4, 0x3c, 0x64,
0xd4, 0xf4, 0x7b, 0xa0, 0xfd, 0xe1, 0x57, 0xda, 0xa2, 0x37, 0xf0, 0xd9, 0x86, 0x42, 0xba, 0x81,
0x9b, 0x0d, 0xe8, 0x19, 0x34, 0xe6, 0x29, 0x2c, 0xc1, 0xeb, 0x19, 0x6c, 0x1d, 0x87, 0x99, 0xfe,
0xbf, 0xcc, 0xb6, 0x87, 0x4e, 0xa1, 0x36, 0x73, 0xac, 0x6e, 0xdc, 0x75, 0x58, 0x53, 0x1d, 0xd0,
0x38, 0x2d, 0xe3, 0x92, 0x5c, 0x1e, 0x0d, 0xdc, 0x77, 0x00, 0x68, 0x32, 0xe9, 0xe9, 0x21, 0xcf,
0xd4, 0x62, 0x99, 0x26, 0x13, 0x3d, 0x12, 0xa5, 0xaf, 0x64, 0xf1, 0xe2, 0x95, 0xfc, 0xc3, 0x81,
0xed, 0x79, 0xe2, 0x16, 0xbd, 0xc1, 0xfd, 0x3a, 0xf7, 0x9e, 0xaa, 0x7a, 0xd2, 0x73, 0xc9, 0x5d,
0x0b, 0x79, 0x39, 0x31, 0x99, 0x17, 0x58, 0x2e, 0xd1, 0x5f, 0x05, 0xb8, 0xf6, 0xc9, 0x60, 0xf0,
0xbf, 0x64, 0xf2, 0x15, 0x97, 0xb7, 0xb0, 0xc4, 0xcb, 0xbb, 0xb0, 0xd5, 0x14, 0x97, 0xd4, 0x6a,
0xd0, 0x1e, 0x6c, 0xcd, 0xe5, 0xe6, 0xb2, 0x76, 0xd0, 0xf9, 0xa9, 0x0c, 0x9b, 0xca, 0xeb, 0x89,
0x1c, 0xfa, 0x4f, 0x74, 0x70, 0xf7, 0x37, 0x07, 0x36, 0xe6, 0x6a, 0xcc, 0x7d, 0x64, 0xc1, 0x6f,
0x71, 0x63, 0x69, 0x7d, 0xf8, 0x3a, 0x50, 0x43, 0xfd, 0x1c, 0xca, 0xb3, 0xa1, 0xdd, 0xb5, 0x39,
0x84, 0xf9, 0x7f, 0x29, 0x5a, 0xf7, 0xaf, 0x06, 0x32, 0x71, 0x7f, 0xd7, 0x93, 0x4a, 0x66, 0x84,
0x51, 0x13, 0x1a, 0xb7, 0xcd, 0xc3, 0x82, 0x19, 0xce, 0x36, 0x0f, 0x0b, 0xa7, 0xaf, 0x5f, 0x1d,
0x70, 0xb3, 0x13, 0x8b, 0x61, 0xf3, 0xc0, 0xce, 0xe5, 0x4b, 0x03, 0x57, 0xeb, 0xe1, 0xd5, 0x81,
0x86, 0xc9, 0x44, 0xfe, 0xbf, 0x27, 0x47, 0x11, 0xf7, 0xae, 0x55, 0x89, 0x65, 0x46, 0xa1, 0xd6,
0xde, 0x15, 0x10, 0x26, 0xdc, 0x08, 0x56, 0xe5, 0x50, 0xe2, 0xda, 0x0c, 0x9c, 0x99, 0x81, 0xa8,
0xe5, 0x59, 0xef, 0xbf, 0xd0, 0xa5, 0x5f, 0x51, 0x2b, 0x5d, 0xb9, 0x99, 0xc5, 0x4a, 0xd7, 0xdc,
0x13, 0xfd, 0xb3, 0x03, 0xf5, 0x7c, 0x6f, 0x75, 0x6d, 0xce, 0x64, 0xe1, 0x3b, 0xd2, 0x7a, 0xf4,
0x1a, 0x48, 0xc3, 0xe3, 0x47, 0x07, 0x6a, 0xb9, 0xae, 0x61, 0x75, 0xa7, 0x16, 0xf5, 0x60, 0xab,
0x3b, 0xb5, 0xb0, 0x41, 0xed, 0x9f, 0x7e, 0xf3, 0x7c, 0x14, 0x8a, 0x71, 0xd2, 0x6f, 0x07, 0x6c,
0xe2, 0x45, 0xe1, 0x8b, 0x24, 0x94, 0x5d, 0xe0, 0x4e, 0x48, 0x03, 0x4f, 0x7d, 0xb5, 0x18, 0x31,
0x6f, 0x44, 0xa8, 0xa7, 0x3e, 0x4a, 0x78, 0x97, 0x7e, 0xc7, 0xf8, 0xe8, 0xc2, 0xd6, 0x2f, 0x29,
0xcc, 0xbd, 0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0xac, 0x42, 0x71, 0x29, 0xfe, 0x10, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@@ -1390,6 +1489,7 @@ type ChunkStoreServiceClient interface {
Root(ctx context.Context, in *RootRequest, opts ...grpc.CallOption) (*RootResponse, error)
Commit(ctx context.Context, in *CommitRequest, opts ...grpc.CallOption) (*CommitResponse, error)
ListTableFiles(ctx context.Context, in *ListTableFilesRequest, opts ...grpc.CallOption) (*ListTableFilesResponse, error)
AddTableFiles(ctx context.Context, in *AddTableFilesRequest, opts ...grpc.CallOption) (*AddTableFilesResponse, error)
}
type chunkStoreServiceClient struct {
@@ -1472,6 +1572,15 @@ func (c *chunkStoreServiceClient) ListTableFiles(ctx context.Context, in *ListTa
return out, nil
}
func (c *chunkStoreServiceClient) AddTableFiles(ctx context.Context, in *AddTableFilesRequest, opts ...grpc.CallOption) (*AddTableFilesResponse, error) {
out := new(AddTableFilesResponse)
err := c.cc.Invoke(ctx, "/dolt.services.remotesapi.v1alpha1.ChunkStoreService/AddTableFiles", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ChunkStoreServiceServer is the server API for ChunkStoreService service.
type ChunkStoreServiceServer interface {
GetRepoMetadata(context.Context, *GetRepoMetadataRequest) (*GetRepoMetadataResponse, error)
@@ -1485,6 +1594,7 @@ type ChunkStoreServiceServer interface {
Root(context.Context, *RootRequest) (*RootResponse, error)
Commit(context.Context, *CommitRequest) (*CommitResponse, error)
ListTableFiles(context.Context, *ListTableFilesRequest) (*ListTableFilesResponse, error)
AddTableFiles(context.Context, *AddTableFilesRequest) (*AddTableFilesResponse, error)
}
// UnimplementedChunkStoreServiceServer can be embedded to have forward compatible implementations.
@@ -1515,6 +1625,9 @@ func (*UnimplementedChunkStoreServiceServer) Commit(ctx context.Context, req *Co
func (*UnimplementedChunkStoreServiceServer) ListTableFiles(ctx context.Context, req *ListTableFilesRequest) (*ListTableFilesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListTableFiles not implemented")
}
func (*UnimplementedChunkStoreServiceServer) AddTableFiles(ctx context.Context, req *AddTableFilesRequest) (*AddTableFilesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddTableFiles not implemented")
}
func RegisterChunkStoreServiceServer(s *grpc.Server, srv ChunkStoreServiceServer) {
s.RegisterService(&_ChunkStoreService_serviceDesc, srv)
@@ -1664,6 +1777,24 @@ func _ChunkStoreService_ListTableFiles_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
func _ChunkStoreService_AddTableFiles_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddTableFilesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ChunkStoreServiceServer).AddTableFiles(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dolt.services.remotesapi.v1alpha1.ChunkStoreService/AddTableFiles",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ChunkStoreServiceServer).AddTableFiles(ctx, req.(*AddTableFilesRequest))
}
return interceptor(ctx, in, info, handler)
}
var _ChunkStoreService_serviceDesc = grpc.ServiceDesc{
ServiceName: "dolt.services.remotesapi.v1alpha1.ChunkStoreService",
HandlerType: (*ChunkStoreServiceServer)(nil),
@@ -1700,6 +1831,10 @@ var _ChunkStoreService_serviceDesc = grpc.ServiceDesc{
MethodName: "ListTableFiles",
Handler: _ChunkStoreService_ListTableFiles_Handler,
},
{
MethodName: "AddTableFiles",
Handler: _ChunkStoreService_AddTableFiles_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "dolt/services/remotesapi/v1alpha1/chunkstore.proto",

View File

@@ -43,6 +43,8 @@ const (
creationBranch = "create"
MasterBranch = "master"
CommitStructName = "Commit"
defaultChunksPerTF = 256 * 1024
)
// LocalDirDoltDB stores the db in the current directory
@@ -622,26 +624,50 @@ func (ddb *DoltDB) DeleteBranch(ctx context.Context, dref ref.DoltRef) error {
// PushChunks initiates a push into a database from the source database given, at the commit given. Pull progress is
// communicated over the provided channel.
func (ddb *DoltDB) PushChunks(ctx context.Context, srcDB *DoltDB, cm *Commit, progChan chan datas.PullProgress) error {
func (ddb *DoltDB) PushChunks(ctx context.Context, tempDir string, srcDB *DoltDB, cm *Commit, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) error {
rf, err := types.NewRef(cm.commitSt, ddb.db.Format())
if err != nil {
return err
}
return datas.Pull(ctx, srcDB.db, ddb.db, rf, progChan)
if datas.CanUsePuller(srcDB.db) && datas.CanUsePuller(ddb.db) {
puller, err := datas.NewPuller(ctx, tempDir, defaultChunksPerTF, srcDB.db, ddb.db, rf.TargetHash(), pullerEventCh)
if err == datas.ErrDBUpToDate {
return nil
} else if err != nil {
return err
}
return puller.Pull(ctx)
} else {
return datas.Pull(ctx, srcDB.db, ddb.db, rf, progChan)
}
}
// PullChunks initiates a pull into a database from the source database given, at the commit given. Progress is
// communicated over the provided channel.
func (ddb *DoltDB) PullChunks(ctx context.Context, srcDB *DoltDB, cm *Commit, progChan chan datas.PullProgress) error {
func (ddb *DoltDB) PullChunks(ctx context.Context, tempDir string, srcDB *DoltDB, cm *Commit, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) error {
rf, err := types.NewRef(cm.commitSt, ddb.db.Format())
if err != nil {
return err
}
return datas.PullWithoutBatching(ctx, srcDB.db, ddb.db, rf, progChan)
if datas.CanUsePuller(srcDB.db) && datas.CanUsePuller(ddb.db) {
puller, err := datas.NewPuller(ctx, tempDir, 256*1024, srcDB.db, ddb.db, rf.TargetHash(), pullerEventCh)
if err == datas.ErrDBUpToDate {
return nil
} else if err != nil {
return err
}
return puller.Pull(ctx)
} else {
return datas.PullWithoutBatching(ctx, srcDB.db, ddb.db, rf, progChan)
}
}
func (ddb *DoltDB) Clone(ctx context.Context, destDB *DoltDB, eventCh chan<- datas.TableFileEvent) error {

View File

@@ -18,9 +18,9 @@ import (
"context"
"errors"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/ref"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/doltdb"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/ref"
"github.com/liquidata-inc/dolt/go/store/datas"
)
@@ -31,7 +31,7 @@ var ErrCantFF = errors.New("can't fast forward merge")
// the given commit via a fast forward merge. If this is the case, an attempt will be made to update the branch in the
// destination db to the given commit via fast forward move. If that succeeds the tracking branch is updated in the
// source db.
func Push(ctx context.Context, destRef ref.BranchRef, remoteRef ref.RemoteRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan datas.PullProgress) error {
func Push(ctx context.Context, dEnv *env.DoltEnv, destRef ref.BranchRef, remoteRef ref.RemoteRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) error {
canFF, err := srcDB.CanFastForward(ctx, remoteRef, commit)
if err != nil {
@@ -40,7 +40,7 @@ func Push(ctx context.Context, destRef ref.BranchRef, remoteRef ref.RemoteRef, s
return ErrCantFF
}
err = destDB.PushChunks(ctx, srcDB, commit, progChan)
err = destDB.PushChunks(ctx, dEnv.TempTableFilesDir(), srcDB, commit, progChan, pullerEventCh)
if err != nil {
return err
@@ -83,8 +83,8 @@ func DeleteRemoteBranch(ctx context.Context, targetRef ref.BranchRef, remoteRef
return nil
}
func Fetch(ctx context.Context, destRef ref.DoltRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan datas.PullProgress) error {
err := destDB.PullChunks(ctx, srcDB, commit, progChan)
func Fetch(ctx context.Context, dEnv *env.DoltEnv, destRef ref.DoltRef, srcDB, destDB *doltdb.DoltDB, commit *doltdb.Commit, progChan chan datas.PullProgress, pullerEventCh chan datas.PullerEvent) error {
err := destDB.PullChunks(ctx, dEnv.TempTableFilesDir(), srcDB, commit, progChan, pullerEventCh)
if err != nil {
return err

View File

@@ -18,8 +18,10 @@ import (
"context"
"crypto/tls"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/pkg/errors"
"google.golang.org/grpc"
@@ -43,6 +45,7 @@ const (
DefaultMetricsPort = "443"
DefaultRemotesApiHost = "doltremoteapi.dolthub.com"
DefaultRemotesApiPort = "443"
tempTablesDir = "temptf"
)
var ErrPreexistingDoltDir = errors.New(".dolt dir already exists")
@@ -84,6 +87,29 @@ func Load(ctx context.Context, hdp HomeDirProvider, fs filesys.Filesys, urlStr s
hdp,
}
if dbLoadErr == nil && dEnv.HasDoltDir() {
if !dEnv.HasDoltTempTableDir() {
err := os.Mkdir(dEnv.TempTableFilesDir(), os.ModePerm)
dEnv.DBLoadError = err
} else {
// fire and forget cleanup routine. Will delete as many old temp files as it can during the main commands execution.
// The process will not wait for this to finish so this may not always complete.
go func() {
_ = fs.Iter(dEnv.TempTableFilesDir(), true, func(path string, size int64, isDir bool) (stop bool) {
if !isDir {
lm, exists := fs.LastModified(path)
if exists && time.Now().Sub(lm) > (time.Hour*24) {
_ = fs.DeleteFile(path)
}
}
return false
})
}()
}
}
dbfactory.InitializeFactories(dEnv)
return dEnv
@@ -98,6 +124,12 @@ func (dEnv *DoltEnv) HasDoltDataDir() bool {
return dEnv.hasDoltDataDir("./")
}
func (dEnv *DoltEnv) HasDoltTempTableDir() bool {
ex, _ := dEnv.FS.Exists(dEnv.TempTableFilesDir())
return ex
}
// GetDoltDir returns the path to the .dolt directory
func (dEnv *DoltEnv) GetDoltDir() string {
if !dEnv.HasDoltDataDir() {
@@ -579,3 +611,7 @@ func (dEnv *DoltEnv) GetDefaultRemote() (Remote, errhand.VerboseError) {
func (dEnv *DoltEnv) GetUserHomeDir() (string, error) {
return getHomeDir(dEnv.hdp)
}
func (dEnv *DoltEnv) TempTableFilesDir() string {
return filepath.Join(dEnv.GetDoltDir(), tempTablesDir)
}

View File

@@ -45,7 +45,9 @@ func createTestEnv(isInitialized bool, hasLocalConfig bool) *DoltEnv {
if isInitialized {
doltDir := filepath.Join(workingDir, dbfactory.DoltDir)
doltDataDir := filepath.Join(workingDir, dbfactory.DoltDataDir)
initialDirs = append(initialDirs, doltDir)
initialDirs = append(initialDirs, doltDataDir)
hashStr := hash.Hash{}.String()
masterRef := ref.NewBranchRef("master")

View File

@@ -22,20 +22,20 @@ import (
// chunkCache is an interface used for caching chunks
type chunkCache interface {
// Put puts a slice of chunks into the cache.
Put(c []chunks.Chunk)
Put(c []chunks.Chunkable)
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
Get(h hash.HashSet) map[hash.Hash]chunks.Chunk
Get(h hash.HashSet) map[hash.Hash]chunks.Chunkable
// Has takes a set of hashes and returns the set of hashes that the cache currently does not have in it.
Has(h hash.HashSet) (absent hash.HashSet)
// PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully
// and false is returned if that chunk is already is the cache.
PutChunk(chunk *chunks.Chunk) bool
PutChunk(chunk chunks.Chunkable) bool
// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
GetAndClearChunksToFlush() map[hash.Hash]chunks.Chunk
GetAndClearChunksToFlush() map[hash.Hash]chunks.Chunkable
}

View File

@@ -28,12 +28,12 @@ import (
"time"
"github.com/cenkalti/backoff"
"github.com/golang/snappy"
eventsapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/eventsapi/v1alpha1"
remotesapi "github.com/liquidata-inc/dolt/go/gen/proto/dolt/services/remotesapi/v1alpha1"
"github.com/liquidata-inc/dolt/go/libraries/events"
"github.com/liquidata-inc/dolt/go/libraries/utils/iohelp"
"github.com/liquidata-inc/dolt/go/store/atomicerr"
"github.com/liquidata-inc/dolt/go/store/chunks"
"github.com/liquidata-inc/dolt/go/store/hash"
"github.com/liquidata-inc/dolt/go/store/nbs"
@@ -118,6 +118,7 @@ func NewDoltChunkStore(ctx context.Context, nbf *types.NomsBinFormat, org, repoN
NbsVersion: nbs.StorageVersion,
},
})
if err != nil {
counter.Inc()
return nil, err
@@ -159,9 +160,51 @@ func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk,
}
}
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
ae := atomicerr.New()
wg := &sync.WaitGroup{}
foundCmp := make(chan chunks.Chunkable, 1024)
wg.Add(1)
go func() {
defer wg.Done()
var err error
for chable := range foundCmp {
if err != nil {
continue // drain
}
var c chunks.Chunk
c, err = chable.ToChunk()
if ae.SetIfError(err) {
continue
}
foundChunks <- &c
}
}()
err := dcs.GetManyCompressed(ctx, hashes, foundCmp)
close(foundCmp)
wg.Wait()
if err != nil {
return err
}
if err := ae.Get(); err != nil {
return err
}
return nil
}
// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks
// which have been found. Any non-present chunks will silently be ignored.
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
func (dcs *DoltChunkStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error {
hashToChunk := dcs.cache.Get(hashes)
notCached := make([]hash.Hash, 0, len(hashes))
@@ -171,7 +214,7 @@ func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
if c.IsEmpty() {
notCached = append(notCached, h)
} else {
foundChunks <- &c
foundChunks <- c
}
}
@@ -280,7 +323,7 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (m
return resourceToUrlAndRanges, nil
}
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, foundChunks chan *chunks.Chunk) error {
func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.HashSet, notCached []hash.Hash, foundChunks chan chunks.Chunkable) error {
// get the locations where the chunks can be downloaded from
resourceToUrlAndRanges, err := dcs.getDLLocs(ctx, notCached)
@@ -291,7 +334,7 @@ func (dcs *DoltChunkStore) readChunksAndCache(ctx context.Context, hashes hash.H
var wg sync.WaitGroup
// channel to receive chunks on
chunkChan := make(chan *chunks.Chunk, 128)
chunkChan := make(chan chunks.Chunkable, 128)
// start a go routine to receive the downloaded chunks on
wg.Add(1)
@@ -354,7 +397,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
hashSl, byteSl := HashSetToSlices(notCached)
absent := make(hash.HashSet)
var found []chunks.Chunk
var found []chunks.Chunkable
var err error
batchItr(len(hashSl), maxHasManyBatchSize, func(st, end int) (stop bool) {
@@ -425,7 +468,7 @@ func (dcs *DoltChunkStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha
// to Flush(). Put may be called concurrently with other calls to Put(),
// Get(), GetMany(), Has() and HasMany().
func (dcs *DoltChunkStore) Put(ctx context.Context, c chunks.Chunk) error {
dcs.cache.Put([]chunks.Chunk{c})
dcs.cache.Put([]chunks.Chunkable{c})
return nil
}
@@ -545,7 +588,13 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
}
chnks := make([]chunks.Chunk, 0, len(hashToChunk))
for _, ch := range hashToChunk {
for _, chable := range hashToChunk {
ch, err := chable.ToChunk()
if err != nil {
return nil, err
}
chnks = append(chnks, ch)
}
@@ -589,7 +638,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
data := hashToData[h]
switch typedLoc := loc.Location.(type) {
case *remotesapi.UploadLoc_HttpPost:
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, data)
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, bytes.NewBuffer(data))
default:
break
}
@@ -603,17 +652,31 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context) (map[hash.Hash]int,
return hashToCount, nil
}
func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, hashBytes []byte, post *remotesapi.HttpPostTableFile, data []byte) error {
//resp, err := http(post.Url, "application/octet-stream", bytes.NewBuffer(data))
req, err := http.NewRequest(http.MethodPut, post.Url, bytes.NewBuffer(data))
type Sizer interface {
Size() int64
}
func (dcs *DoltChunkStore) httpPostUpload(ctx context.Context, hashBytes []byte, post *remotesapi.HttpPostTableFile, rd io.Reader) error {
req, err := http.NewRequest(http.MethodPut, post.Url, rd)
if err != nil {
return err
}
if sizer, ok := rd.(Sizer); ok {
req.ContentLength = sizer.Size()
}
var resp *http.Response
op := func() error {
var err error
resp, err = dcs.httpFetcher.Do(req.WithContext(ctx))
if err == nil {
defer func() {
_ = resp.Body.Close()
}()
}
return processHttpResp(resp, err)
}
@@ -681,15 +744,15 @@ const (
// creates work functions for each download and executes them in parallel. The work functions write downloaded chunks
// to chunkChan
func (dcs *DoltChunkStore) downloadChunks(ctx context.Context, resourceToUrlAndRanges map[string]urlAndRanges, chunkChan chan *chunks.Chunk) error {
var allChunks []chunks.Chunk
func (dcs *DoltChunkStore) downloadChunks(ctx context.Context, resourceToUrlAndRanges map[string]urlAndRanges, chunkChan chan chunks.Chunkable) error {
var allChunks []chunks.Chunkable
aggLocs := aggregateDownloads(chunkAggDistance, resourceToUrlAndRanges)
// loop over all the aggLocs that need to be downloaded and create a work function for each
var work []func() error
for _, loc := range aggLocs {
var err error
var chnks []chunks.Chunk
var chnks []chunks.Chunkable
switch typedLoc := loc.Location.(type) {
case *remotesapi.DownloadLoc_HttpGet:
panic("deprecated")
@@ -714,7 +777,7 @@ func (dcs *DoltChunkStore) downloadChunks(ctx context.Context, resourceToUrlAndR
// getRangeDownloadFunc returns a work function that does the downloading of one or more chunks and writes those chunks
// to the chunkChan
func (dcs *DoltChunkStore) getRangeDownloadFunc(ctx context.Context, urlStr string, ranges []*remotesapi.RangeChunk, chunkChan chan *chunks.Chunk) func() error {
func (dcs *DoltChunkStore) getRangeDownloadFunc(ctx context.Context, urlStr string, ranges []*remotesapi.RangeChunk, chunkChan chan chunks.Chunkable) func() error {
numRanges := len(ranges)
offset := ranges[0].Offset
length := ranges[numRanges-1].Offset - offset + uint64(ranges[numRanges-1].Length)
@@ -730,15 +793,14 @@ func (dcs *DoltChunkStore) getRangeDownloadFunc(ctx context.Context, urlStr stri
// are then decoded to chunks and written to the chunkChan
for _, r := range ranges {
chunkStart := r.Offset - offset
chunkEnd := chunkStart + uint64(r.Length) - 4
chunkBytes, err := snappy.Decode(nil, comprData[chunkStart:chunkEnd])
chunkEnd := chunkStart + uint64(r.Length)
cmpChnk, err := nbs.NewCompressedChunk(hash.New(r.Hash), comprData[chunkStart:chunkEnd])
if err != nil {
return err
}
chunk := chunks.NewChunk(chunkBytes)
chunkChan <- &chunk
chunkChan <- cmpChnk
}
return nil
@@ -768,6 +830,13 @@ func rangeDownloadWithRetries(ctx context.Context, fetcher HTTPFetcher, offset,
var resp *http.Response
resp, err = fetcher.Do(req.WithContext(ctx))
if err == nil {
defer func() {
_ = resp.Body.Close()
}()
}
respErr := processHttpResp(resp, err)
if respErr != nil {
@@ -810,7 +879,7 @@ func collapseBuffers(allBufs [][]byte, length uint64) []byte {
return collapsed
}
func (dcs *DoltChunkStore) getDownloadWorkForLoc(ctx context.Context, getRange *remotesapi.HttpGetRange, chunkChan chan *chunks.Chunk) []func() error {
func (dcs *DoltChunkStore) getDownloadWorkForLoc(ctx context.Context, getRange *remotesapi.HttpGetRange, chunkChan chan chunks.Chunkable) []func() error {
var work []func() error
rangeCount := len(getRange.Ranges)
@@ -822,9 +891,57 @@ func (dcs *DoltChunkStore) getDownloadWorkForLoc(ctx context.Context, getRange *
return []func() error{dcs.getRangeDownloadFunc(ctx, getRange.Url, getRange.Ranges, chunkChan)}
}
// NewSink still needs to be implemented in order to write to a DoltChunkStore using the TableFileStore interface
func (dcs *DoltChunkStore) NewSink(ctx context.Context, fileId string, numChunks int) (nbs.WriteCloserWithContext, error) {
panic("Not implemented")
// WriteTableFile reads a table file from the provided reader and writes it to the chunk store.
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader) error {
fileIdBytes := hash.Parse(fileId)
req := &remotesapi.GetUploadLocsRequest{RepoId: dcs.getRepoId(), TableFileHashes: [][]byte{fileIdBytes[:]}}
resp, err := dcs.csClient.GetUploadLocations(ctx, req)
if err != nil {
return err
}
if len(resp.Locs) != 1 {
return errors.New("unexpected upload location count")
}
loc := resp.Locs[0]
switch typedLoc := loc.Location.(type) {
case *remotesapi.UploadLoc_HttpPost:
err = dcs.httpPostUpload(ctx, loc.TableFileHash, typedLoc.HttpPost, rd)
if err != nil {
return err
}
default:
return errors.New("unsupported upload location")
}
chnkTblInfo := []*remotesapi.ChunkTableInfo{
{Hash: fileIdBytes[:], ChunkCount: uint32(numChunks)},
}
atReq := &remotesapi.AddTableFilesRequest{
RepoId: dcs.getRepoId(),
ChunkTableInfo: chnkTblInfo,
ClientRepoFormat: &remotesapi.ClientRepoFormat{
NbfVersion: dcs.nbf.VersionString(),
NbsVersion: nbs.StorageVersion,
},
}
atResp, err := dcs.csClient.AddTableFiles(ctx, atReq)
if err != nil {
return NewRpcError(err, "UpdateManifest", dcs.host, atReq)
}
if !atResp.Success {
return errors.New("update table files failed")
}
return nil
}
// Sources retrieves the current root hash, and a list of all the table files

View File

@@ -24,20 +24,20 @@ import (
// mapChunkCache is a ChunkCache implementation that stores everything in an in memory map.
type mapChunkCache struct {
mu *sync.Mutex
hashToChunk map[hash.Hash]chunks.Chunk
toFlush map[hash.Hash]chunks.Chunk
hashToChunk map[hash.Hash]chunks.Chunkable
toFlush map[hash.Hash]chunks.Chunkable
}
func newMapChunkCache() *mapChunkCache {
return &mapChunkCache{
&sync.Mutex{},
make(map[hash.Hash]chunks.Chunk),
make(map[hash.Hash]chunks.Chunk),
make(map[hash.Hash]chunks.Chunkable),
make(map[hash.Hash]chunks.Chunkable),
}
}
// Put puts a slice of chunks into the cache.
func (mcc *mapChunkCache) Put(chnks []chunks.Chunk) {
func (mcc *mapChunkCache) Put(chnks []chunks.Chunkable) {
mcc.mu.Lock()
defer mcc.mu.Unlock()
@@ -61,8 +61,8 @@ func (mcc *mapChunkCache) Put(chnks []chunks.Chunk) {
// Get gets a map of hash to chunk for a set of hashes. In the event that a chunk is not in the cache, chunks.Empty.
// is put in it's place
func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]chunks.Chunk {
hashToChunk := make(map[hash.Hash]chunks.Chunk)
func (mcc *mapChunkCache) Get(hashes hash.HashSet) map[hash.Hash]chunks.Chunkable {
hashToChunk := make(map[hash.Hash]chunks.Chunkable)
mcc.mu.Lock()
defer mcc.mu.Unlock()
@@ -96,14 +96,14 @@ func (mcc *mapChunkCache) Has(hashes hash.HashSet) (absent hash.HashSet) {
// PutChunk puts a single chunk in the cache. true returns in the event that the chunk was cached successfully
// and false is returned if that chunk is already is the cache.
func (mcc *mapChunkCache) PutChunk(ch *chunks.Chunk) bool {
func (mcc *mapChunkCache) PutChunk(ch chunks.Chunkable) bool {
mcc.mu.Lock()
defer mcc.mu.Unlock()
h := ch.Hash()
if existing, ok := mcc.hashToChunk[h]; !ok || existing.IsEmpty() {
mcc.hashToChunk[h] = *ch
mcc.toFlush[h] = *ch
mcc.hashToChunk[h] = ch
mcc.toFlush[h] = ch
return true
}
@@ -112,8 +112,8 @@ func (mcc *mapChunkCache) PutChunk(ch *chunks.Chunk) bool {
// GetAndClearChunksToFlush gets a map of hash to chunk which includes all the chunks that were put in the cache
// between the last time GetAndClearChunksToFlush was called and now.
func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]chunks.Chunk {
newToFlush := make(map[hash.Hash]chunks.Chunk)
func (mcc *mapChunkCache) GetAndClearChunksToFlush() map[hash.Hash]chunks.Chunkable {
newToFlush := make(map[hash.Hash]chunks.Chunkable)
mcc.mu.Lock()
defer mcc.mu.Unlock()

View File

@@ -21,13 +21,14 @@ import (
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/liquidata-inc/dolt/go/store/chunks"
"github.com/liquidata-inc/dolt/go/store/hash"
)
func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []chunks.Chunk) {
chks := make([]chunks.Chunk, n)
func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []chunks.Chunkable) {
chks := make([]chunks.Chunkable, n)
hashes := make(hash.HashSet)
for i := 0; i < n; i++ {
size := int(rng.Int31n(99) + 1)
@@ -37,7 +38,7 @@ func genRandomChunks(rng *rand.Rand, n int) (hash.HashSet, []chunks.Chunk) {
}
chk := chunks.NewChunk(bytes)
chks[i] = chk
chks[i] = &chk
hashes[chk.Hash()] = struct{}{}
}
@@ -81,10 +82,20 @@ func TestMapChunkCache(t *testing.T) {
absent = mapChunkCache.Has(joinedHashes)
assert.True(t, reflect.DeepEqual(absent, moreHashes), "unexpected absent hashset (seed %d)", seed)
assert.False(t, mapChunkCache.PutChunk(&chks[0]), "existing chunk should return false (seed %d)", seed)
assert.True(t, mapChunkCache.PutChunk(&moreChks[0]), "new chunk should return true (seed %d)", seed)
c, err := chks[0].ToChunk()
require.NoError(t, err)
assert.False(t, mapChunkCache.PutChunk(&c), "existing chunk should return false (seed %d)", seed)
c, err = moreChks[0].ToChunk()
require.NoError(t, err)
assert.True(t, mapChunkCache.PutChunk(&c), "new chunk should return true (seed %d)", seed)
toFlush = mapChunkCache.GetAndClearChunksToFlush()
assert.True(t, reflect.DeepEqual(toFlush, map[hash.Hash]chunks.Chunk{moreChks[0].Hash(): moreChks[0]}), "Missing or unexpected chunks to flush (seed %d)", seed)
expected := map[hash.Hash]chunks.Chunkable{moreChks[0].Hash(): moreChks[0]}
eq := reflect.DeepEqual(toFlush, expected)
assert.True(t, eq, "Missing or unexpected chunks to flush (seed %d)", seed)
}

View File

@@ -29,21 +29,21 @@ var noopChunkCache = &noopChunkCacheImpl{}
type noopChunkCacheImpl struct {
}
func (*noopChunkCacheImpl) Put(chnks []chunks.Chunk) {
func (*noopChunkCacheImpl) Put(chnks []chunks.Chunkable) {
}
func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]chunks.Chunk {
return make(map[hash.Hash]chunks.Chunk)
func (*noopChunkCacheImpl) Get(hashes hash.HashSet) map[hash.Hash]chunks.Chunkable {
return make(map[hash.Hash]chunks.Chunkable)
}
func (*noopChunkCacheImpl) Has(hashes hash.HashSet) (absent hash.HashSet) {
return hashes
}
func (*noopChunkCacheImpl) PutChunk(ch *chunks.Chunk) bool {
func (*noopChunkCacheImpl) PutChunk(ch chunks.Chunkable) bool {
return true
}
func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]chunks.Chunk {
func (*noopChunkCacheImpl) GetAndClearChunksToFlush() map[hash.Hash]chunks.Chunkable {
panic("noopChunkCache does not support GetAndClearChunksToFlush().")
}

View File

@@ -136,3 +136,16 @@ func (c RetryingChunkStoreServiceClient) ListTableFiles(ctx context.Context, in
return resp, err
}
func (c RetryingChunkStoreServiceClient) AddTableFiles(ctx context.Context, in *remotesapi.AddTableFilesRequest, opts ...grpc.CallOption) (*remotesapi.AddTableFilesResponse, error) {
var resp *remotesapi.AddTableFilesResponse
op := func() error {
var err error
resp, err = c.client.AddTableFiles(ctx, in, opts...)
return processGrpcErr(err)
}
err := backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return resp, err
}

View File

@@ -45,6 +45,10 @@ func (c Chunk) Data() []byte {
return c.data
}
func (c Chunk) ToChunk() (Chunk, error) {
return c, nil
}
func (c Chunk) IsEmpty() bool {
return len(c.data) == 0
}

View File

@@ -28,6 +28,19 @@ import (
"github.com/liquidata-inc/dolt/go/store/hash"
)
// Chunkable is an interface for working with data that can be converted into a chunk. Two implementations include chunks.Chunk
// itself, and nbs.CompressedChunk which allows you to get the chunk from the compressed data.
type Chunkable interface {
// ToChunk gets a chunks.Chunk from the Chunkable
ToChunk() (Chunk, error)
// Hash returns the hash of the Chunk.
Hash() hash.Hash
// IsEmpty returns true if the chunk contains no data.
IsEmpty() bool
}
// ChunkStore is the core storage abstraction in noms. We can put data
// anyplace we have a ChunkStore implementation for.
type ChunkStore interface {
@@ -40,6 +53,10 @@ type ChunkStore interface {
// found. Any non-present chunks will silently be ignored.
GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error
// GetManyCompressed gets the Chunkable obects with |hashes| from the store. On return, |foundChunks| will have been
// fully sent all chunks which have been found. Any non-present chunks will silently be ignored.
GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan Chunkable) error
// Returns true iff the value at the address |h| is contained in the
// store
Has(ctx context.Context, h hash.Hash) (bool, error)

View File

@@ -26,6 +26,7 @@ import (
"sync"
"github.com/liquidata-inc/dolt/go/store/constants"
"github.com/liquidata-inc/dolt/go/store/d"
"github.com/liquidata-inc/dolt/go/store/hash"
)
@@ -39,12 +40,18 @@ type MemoryStorage struct {
data map[hash.Hash]Chunk
rootHash hash.Hash
mu sync.RWMutex
version string
}
// NewView vends a MemoryStoreView backed by this MemoryStorage. It's
// initialized with the currently "persisted" root.
func (ms *MemoryStorage) NewView() ChunkStore {
return &MemoryStoreView{storage: ms, rootHash: ms.rootHash}
version := ms.version
if version == "" {
version = constants.NomsVersion
}
return &MemoryStoreView{storage: ms, rootHash: ms.rootHash, version: version}
}
// Get retrieves the Chunk with the Hash h, returning EmptyChunk if it's not
@@ -109,6 +116,7 @@ type MemoryStoreView struct {
pending map[hash.Hash]Chunk
rootHash hash.Hash
mu sync.RWMutex
version string
storage *MemoryStorage
}
@@ -138,6 +146,22 @@ func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, fou
return nil
}
func (ms *MemoryStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan Chunkable) error {
for h := range hashes {
c, err := ms.Get(ctx, h)
if err != nil {
return err
}
if !c.IsEmpty() {
foundChunks <- c
}
}
return nil
}
func (ms *MemoryStoreView) Has(ctx context.Context, h hash.Hash) (bool, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
@@ -161,7 +185,7 @@ func (ms *MemoryStoreView) HasMany(ctx context.Context, hashes hash.HashSet) (ha
}
func (ms *MemoryStoreView) Version() string {
return constants.NomsVersion
return ms.version
}
func (ms *MemoryStoreView) Put(ctx context.Context, c Chunk) error {

View File

@@ -68,6 +68,11 @@ func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundC
return s.ChunkStore.GetMany(ctx, hashes, foundChunks)
}
func (s *TestStoreView) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan Chunkable) error {
s.Reads += len(hashes)
return s.ChunkStore.GetManyCompressed(ctx, hashes, foundCmpChunks)
}
func (s *TestStoreView) Has(ctx context.Context, h hash.Hash) (bool, error) {
s.Hases++
return s.ChunkStore.Has(ctx, h)

View File

@@ -159,9 +159,6 @@ func runCat(ctx context.Context, args []string) int {
//Want a clean db every loop
sp, _ := spec.ForDatabase("mem")
db := sp.GetDatabase(ctx)
value, err := types.DecodeValue(chunk, db)
d.PanicIfError(err)
fmt.Printf(" chunk[%d].raw.len: %d\n", cidx, len(currCD.compressed))
@@ -179,12 +176,32 @@ func runCat(ctx context.Context, args []string) int {
}
if !catNoShow {
value, err := types.DecodeValue(chunk, db)
if err != nil {
fmt.Println(" error reading value (Could be a format issue).")
continue
}
fmt.Printf(" chunk[%d].value.kind: %s\n", cidx, value.Kind())
fmt.Printf(" chunk[%d].value:\n\n", cidx)
printValue(ctx, os.Stdout, value, filepath.Dir(chunkFile)+"::#"+b32Hash)
fmt.Println()
}
refIdx := 0
err = types.WalkRefs(chunk, db.Format(), func(ref types.Ref) error {
if refIdx == 0 {
fmt.Printf(" chunk[%d] references chunks:\n", cidx)
}
fmt.Printf(" Ref Hash: %s\n", ref.TargetHash().String())
refIdx++
return nil
})
d.PanicIfError(err)
fmt.Println()
}
@@ -289,6 +306,7 @@ func parseChunks(bytes []byte, pos int, sizes []int) (int, []chunkData) {
var cd []chunkData
for i := len(sizes) - 1; i >= 0; i-- {
uncompressed, err := snappy.Decode(nil, chunkBytes[i])
d.PanicIfError(err)
cd = append(cd, chunkData{
compressed: chunkBytes[i],

View File

@@ -26,6 +26,8 @@ import (
"context"
"io"
"github.com/liquidata-inc/dolt/go/store/nbs"
"github.com/liquidata-inc/dolt/go/store/chunks"
"github.com/liquidata-inc/dolt/go/store/types"
)
@@ -139,3 +141,12 @@ type Database interface {
func NewDatabase(cs chunks.ChunkStore) Database {
return newDatabase(cs)
}
// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all
// Databases support this yet.
func CanUsePuller(db Database) bool {
cs := db.chunkStore()
_, ok := cs.(nbs.TableFileStore)
return ok
}

View File

@@ -108,44 +108,49 @@ func clone(ctx context.Context, srcTS, sinkTS nbs.TableFileStore, eventCh chan<-
// should parallelize at some point
for _, tblFile := range tblFiles {
rd, err := tblFile.Open()
if err != nil {
return err
}
wr, err := sinkTS.NewSink(ctx, tblFile.FileID(), tblFile.NumChunks())
if err != nil {
return err
}
if eventCh != nil {
eventCh <- TableFileEvent{DownloadStart, []nbs.TableFile{tblFile}}
}
_, err = io.Copy(wr, rd)
err := func() (err error) {
var rd io.ReadCloser
rd, err = tblFile.Open()
if err != nil {
if eventCh != nil {
eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}}
if err != nil {
if eventCh != nil {
eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}}
}
return err
}
return err
}
defer func() {
closeErr := rd.Close()
err = wr.Close(ctx)
if err == nil {
err = closeErr
}
}()
if err != nil {
if eventCh != nil {
eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}}
err = sinkTS.WriteTableFile(ctx, tblFile.FileID(), tblFile.NumChunks(), rd)
if err != nil {
if eventCh != nil {
eventCh <- TableFileEvent{DownloadFailed, []nbs.TableFile{tblFile}}
}
return err
}
return err
}
if eventCh != nil {
eventCh <- TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}}
}
if eventCh != nil {
eventCh <- TableFileEvent{DownloadSuccess, []nbs.TableFile{tblFile}}
return nil
}()
if err != nil {
return err
}
}

View File

@@ -421,9 +421,15 @@ func (ttfs *TestTableFileStore) Sources(ctx context.Context) (hash.Hash, []nbs.T
return ttfs.root, tblFiles, nil
}
func (ttfs *TestTableFileStore) NewSink(ctx context.Context, fileID string, numChunks int) (nbs.WriteCloserWithContext, error) {
tblFile := &TestTableFileWriter{fileID, numChunks, bytes.NewBuffer(nil), ttfs}
return tblFile, nil
func (ttfs *TestTableFileStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader) error {
tblFile := &TestTableFileWriter{fileId, numChunks, bytes.NewBuffer(nil), ttfs}
_, err := io.Copy(tblFile, rd)
if err != nil {
return err
}
return tblFile.Close(ctx)
}
func (ttfs *TestTableFileStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {

414
go/store/datas/puller.go Normal file
View File

@@ -0,0 +1,414 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datas
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/liquidata-inc/dolt/go/store/atomicerr"
"github.com/liquidata-inc/dolt/go/store/chunks"
"github.com/liquidata-inc/dolt/go/store/hash"
"github.com/liquidata-inc/dolt/go/store/nbs"
"github.com/liquidata-inc/dolt/go/store/types"
)
type FileReaderWithSize struct {
*os.File
size int64
}
func (rd FileReaderWithSize) Size() int64 {
return rd.size
}
// ErrDBUpToDate is the error code returned from NewPuller in the event that there is no work to do.
var ErrDBUpToDate = errors.New("the database does not need to be pulled as it's already up to date")
const (
maxChunkWorkers = 2
)
// FilledWriters store CmpChunkTableWriter that have been filled and are ready to be flushed. In the future will likely
// add the md5 of the data to this structure to be used to verify table upload calls.
type FilledWriters struct {
wr *nbs.CmpChunkTableWriter
}
// CmpChnkAndRefs holds a CompressedChunk and all of it's references
type CmpChnkAndRefs struct {
cmpChnk nbs.CompressedChunk
refs map[hash.Hash]int
}
// Puller is used to sync data between to Databases
type Puller struct {
fmt *types.NomsBinFormat
srcDB Database
sinkDB Database
rootChunkHash hash.Hash
downloaded hash.HashSet
wr *nbs.CmpChunkTableWriter
tempDir string
chunksPerTF int
eventCh chan PullerEvent
}
type PullerEventType int
const (
NewLevelTWEvent PullerEventType = iota
DestDBHasTWEvent
LevelUpdateTWEvent
LevelDoneTWEvent
StartUploadTableFile
EndUpdateTableFile
)
type TreeWalkEventDetails struct {
TreeLevel int
ChunksInLevel int
ChunksAlreadyHad int
ChunksBuffered int
ChildrenFound int
TableFilesGenerated int
}
type TableFileEventDetails struct {
TableFileCount int
TableFilesUploaded int
CurrentFileSize int64
}
type PullerEvent struct {
EventType PullerEventType
TWEventDetails TreeWalkEventDetails
TFEventDetails TableFileEventDetails
}
func NewTWPullerEvent(et PullerEventType, details *TreeWalkEventDetails) PullerEvent {
return PullerEvent{EventType: et, TWEventDetails: *details}
}
func NewTFPullerEvent(et PullerEventType, details *TableFileEventDetails) PullerEvent {
return PullerEvent{EventType: et, TFEventDetails: *details}
}
// NewPuller creates a new Puller instance to do the syncing. If a nil puller is returned without error that means
// that there is nothing to pull and the sinkDB is already up to date.
func NewPuller(ctx context.Context, tempDir string, chunksPerTF int, srcDB, sinkDB Database, rootChunkHash hash.Hash, eventCh chan PullerEvent) (*Puller, error) {
if eventCh == nil {
panic("eventCh is required")
}
// Sanity Check
exists, err := srcDB.chunkStore().Has(ctx, rootChunkHash)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.New("not found")
}
exists, err = sinkDB.chunkStore().Has(ctx, rootChunkHash)
if err != nil {
return nil, err
}
if exists {
return nil, ErrDBUpToDate
}
if srcDB.chunkStore().Version() != sinkDB.chunkStore().Version() {
return nil, fmt.Errorf("cannot pull from src to sink; src version is %v and sink version is %v", srcDB.chunkStore().Version(), sinkDB.chunkStore().Version())
}
wr, err := nbs.NewCmpChunkTableWriter()
if err != nil {
return nil, err
}
return &Puller{
fmt: srcDB.Format(),
srcDB: srcDB,
sinkDB: sinkDB,
rootChunkHash: rootChunkHash,
downloaded: hash.HashSet{},
tempDir: tempDir,
wr: wr,
chunksPerTF: chunksPerTF,
eventCh: eventCh,
}, nil
}
func (p *Puller) processCompletedTables(ctx context.Context, ae *atomicerr.AtomicError, completedTables <-chan FilledWriters) {
type tempTblFile struct {
id string
path string
numChunks int
}
var tblFiles []tempTblFile
var err error
for tblFile := range completedTables {
if err != nil {
continue // drain
}
var id string
id, err = tblFile.wr.Finish()
if ae.SetIfError(err) {
continue
}
path := filepath.Join(p.tempDir, id)
err = tblFile.wr.FlushToFile(path)
if ae.SetIfError(err) {
continue
}
tblFiles = append(tblFiles, tempTblFile{id, path, tblFile.wr.Size()})
}
if ae.IsSet() {
return
}
details := &TableFileEventDetails{TableFileCount: len(tblFiles)}
// Write tables in reverse order so that on a partial success, it will still be true that if a db has a chunk, it
// also has all of that chunks references.
for i := len(tblFiles) - 1; i >= 0; i-- {
tmpTblFile := tblFiles[i]
fi, err := os.Stat(tmpTblFile.path)
if ae.SetIfError(err) {
return
}
f, err := os.Open(tmpTblFile.path)
if ae.SetIfError(err) {
return
}
details.CurrentFileSize = fi.Size()
p.eventCh <- NewTFPullerEvent(StartUploadTableFile, details)
fWithSize := FileReaderWithSize{f, fi.Size()}
err = p.sinkDB.chunkStore().(nbs.TableFileStore).WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, fWithSize)
go func() {
_ = os.Remove(tmpTblFile.path)
}()
if ae.SetIfError(err) {
return
}
details.TableFilesUploaded++
p.eventCh <- NewTFPullerEvent(EndUpdateTableFile, details)
}
}
// Pull executes the sync operation
func (p *Puller) Pull(ctx context.Context) error {
twDetails := &TreeWalkEventDetails{TreeLevel: -1}
leaves := make(hash.HashSet)
absent := make(hash.HashSet)
absent.Insert(p.rootChunkHash)
ae := atomicerr.New()
wg := &sync.WaitGroup{}
completedTables := make(chan FilledWriters, 8)
wg.Add(1)
go func() {
defer wg.Done()
p.processCompletedTables(ctx, ae, completedTables)
}()
for len(absent) > 0 {
limitToNewChunks(absent, p.downloaded)
chunksInLevel := len(absent)
twDetails.ChunksInLevel = chunksInLevel
p.eventCh <- NewTWPullerEvent(NewLevelTWEvent, twDetails)
var err error
absent, err = p.sinkDB.chunkStore().HasMany(ctx, absent)
twDetails.ChunksAlreadyHad = chunksInLevel - len(absent)
p.eventCh <- NewTWPullerEvent(DestDBHasTWEvent, twDetails)
if len(absent) > 0 {
leaves, absent, err = p.getCmp(ctx, twDetails, leaves, absent, completedTables)
if err != nil {
return err
}
}
}
if p.wr.Size() > 0 {
completedTables <- FilledWriters{p.wr}
}
close(completedTables)
wg.Wait()
return ae.Get()
}
func limitToNewChunks(absent hash.HashSet, downloaded hash.HashSet) {
smaller := absent
longer := downloaded
if len(absent) > len(downloaded) {
smaller = downloaded
longer = absent
}
for k := range smaller {
if longer.Has(k) {
absent.Remove(k)
}
}
}
func (p *Puller) getCmp(ctx context.Context, twDetails *TreeWalkEventDetails, leaves, batch hash.HashSet, completedTables chan FilledWriters) (hash.HashSet, hash.HashSet, error) {
found := make(chan chunks.Chunkable, 4096)
processed := make(chan CmpChnkAndRefs, 4096)
ae := atomicerr.New()
go func() {
defer close(found)
err := p.srcDB.chunkStore().GetManyCompressed(ctx, batch, found)
ae.SetIfError(err)
}()
batchSize := len(batch)
numChunkWorkers := (batchSize / 1024) + 1
if numChunkWorkers > maxChunkWorkers {
numChunkWorkers = maxChunkWorkers
}
go func() {
defer close(processed)
for chable := range found {
if ae.IsSet() {
break
}
cmpChnk, ok := chable.(nbs.CompressedChunk)
if !ok {
ae.SetIfError(errors.New("requires an nbs.CompressedChunk"))
break
}
p.downloaded.Insert(cmpChnk.H)
if leaves.Has(cmpChnk.H) {
processed <- CmpChnkAndRefs{cmpChnk: cmpChnk}
} else {
chnk, err := chable.ToChunk()
if ae.SetIfError(err) {
return
}
refs := make(map[hash.Hash]int)
err = types.WalkRefs(chnk, p.fmt, func(r types.Ref) error {
refs[r.TargetHash()] = int(r.Height())
return nil
})
processed <- CmpChnkAndRefs{cmpChnk: cmpChnk, refs: refs}
}
}
}()
var err error
var maxHeight int
nextLeaves := make(hash.HashSet, batchSize)
nextLevel := make(hash.HashSet, batchSize)
twDetails.ChunksBuffered = 0
for cmpAndRef := range processed {
if err != nil {
// drain to prevent deadlock
continue
}
twDetails.ChunksBuffered++
if twDetails.ChunksBuffered%1000 == 0 {
p.eventCh <- NewTWPullerEvent(LevelUpdateTWEvent, twDetails)
}
err = p.wr.AddCmpChunk(cmpAndRef.cmpChnk)
if p.wr.Size() >= p.chunksPerTF {
completedTables <- FilledWriters{p.wr}
p.wr, err = nbs.NewCmpChunkTableWriter()
if ae.SetIfError(err) {
continue
}
}
for h, height := range cmpAndRef.refs {
nextLevel.Insert(h)
twDetails.ChildrenFound++
if height == 1 {
nextLeaves.Insert(h)
}
if height > maxHeight {
maxHeight = height
}
}
}
if err := ae.Get(); err != nil {
return nil, nil, err
}
if twDetails.ChunksBuffered != len(batch) {
return nil, nil, errors.New("failed to get all chunks.")
}
p.eventCh <- NewTWPullerEvent(LevelDoneTWEvent, twDetails)
twDetails.TreeLevel = maxHeight
return nextLeaves, nextLevel, nil
}

View File

@@ -0,0 +1,514 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datas
import (
"context"
"encoding/json"
"errors"
"os"
"path/filepath"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/liquidata-inc/dolt/go/store/nbs"
"github.com/liquidata-inc/dolt/go/store/types"
"github.com/liquidata-inc/dolt/go/store/util/clienttest"
)
func mustTuple(tpl types.Tuple, err error) types.Tuple {
if err != nil {
panic(err)
}
return tpl
}
func addTableValues(ctx context.Context, vrw types.ValueReadWriter, m types.Map, tableName string, alternatingKeyVals ...types.Value) (types.Map, error) {
val, ok, err := m.MaybeGet(ctx, types.String(tableName))
if err != nil {
return types.EmptyMap, err
}
var tblMap types.Map
if ok {
mv, err := val.(types.Ref).TargetValue(ctx, vrw)
if err != nil {
return types.EmptyMap, err
}
me := mv.(types.Map).Edit()
for i := 0; i < len(alternatingKeyVals); i += 2 {
me.Set(alternatingKeyVals[i], alternatingKeyVals[i+1])
}
tblMap, err = me.Map(ctx)
if err != nil {
return types.EmptyMap, err
}
} else {
tblMap, err = types.NewMap(ctx, vrw, alternatingKeyVals...)
if err != nil {
return types.EmptyMap, err
}
}
tblRef, err := writeValAndGetRef(ctx, vrw, tblMap)
if err != nil {
return types.EmptyMap, err
}
me := m.Edit()
me.Set(types.String(tableName), tblRef)
return me.Map(ctx)
}
func deleteTableValues(ctx context.Context, vrw types.ValueReadWriter, m types.Map, tableName string, keys ...types.Value) (types.Map, error) {
if len(keys) == 0 {
return m, nil
}
val, ok, err := m.MaybeGet(ctx, types.String(tableName))
if err != nil {
return types.EmptyMap, err
}
if !ok {
return types.EmptyMap, errors.New("can't delete from table that wasn't created")
}
mv, err := val.(types.Ref).TargetValue(ctx, vrw)
if err != nil {
return types.EmptyMap, err
}
me := mv.(types.Map).Edit()
for _, k := range keys {
me.Remove(k)
}
tblMap, err := me.Map(ctx)
if err != nil {
return types.EmptyMap, err
}
tblRef, err := writeValAndGetRef(ctx, vrw, tblMap)
if err != nil {
return types.EmptyMap, err
}
me = m.Edit()
me.Set(types.String(tableName), tblRef)
return me.Map(ctx)
}
func tempDirDB(ctx context.Context) (Database, error) {
dir := filepath.Join(os.TempDir(), uuid.New().String())
err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
return nil, err
}
st, err := nbs.NewLocalStore(ctx, types.Format_Default.VersionString(), dir, clienttest.DefaultMemTableSize)
if err != nil {
return nil, err
}
return NewDatabase(st), nil
}
func TestPuller(t *testing.T) {
deltas := []struct {
name string
sets map[string][]types.Value
deletes map[string][]types.Value
tblDeletes []string
}{
{
"empty",
map[string][]types.Value{},
map[string][]types.Value{},
[]string{},
},
{
"employees",
map[string][]types.Value{
"employees": {
mustTuple(types.NewTuple(types.Format_Default, types.String("Hendriks"), types.String("Brian"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Software Engineer"), types.Int(39))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Sehn"), types.String("Timothy"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("CEO"), types.Int(39))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Son"), types.String("Aaron"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Software Engineer"), types.Int(36))),
},
},
map[string][]types.Value{},
[]string{},
},
{
"ip to country",
map[string][]types.Value{
"ip_to_country": {
types.String("5.183.230.1"), types.String("BZ"),
types.String("5.180.188.1"), types.String("AU"),
types.String("2.56.9.244"), types.String("GB"),
types.String("20.175.7.56"), types.String("US"),
},
},
map[string][]types.Value{},
[]string{},
},
{
"more ips",
map[string][]types.Value{
"ip_to_country": {
types.String("20.175.193.85"), types.String("US"),
types.String("5.196.110.191"), types.String("FR"),
types.String("4.14.242.160"), types.String("CA"),
},
},
map[string][]types.Value{},
[]string{},
},
{
"more employees",
map[string][]types.Value{
"employees": {
mustTuple(types.NewTuple(types.Format_Default, types.String("Jesuele"), types.String("Matt"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Software Engineer"), types.NullValue)),
mustTuple(types.NewTuple(types.Format_Default, types.String("Wilkins"), types.String("Daylon"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Software Engineer"), types.NullValue)),
mustTuple(types.NewTuple(types.Format_Default, types.String("Katie"), types.String("McCulloch"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Software Engineer"), types.NullValue)),
},
},
map[string][]types.Value{},
[]string{},
},
{
"delete ips table",
map[string][]types.Value{},
map[string][]types.Value{},
[]string{"ip_to_country"},
},
{
"delete some employees",
map[string][]types.Value{},
map[string][]types.Value{
"employees": {
mustTuple(types.NewTuple(types.Format_Default, types.String("Hendriks"), types.String("Brian"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Sehn"), types.String("Timothy"))),
mustTuple(types.NewTuple(types.Format_Default, types.String("Son"), types.String("Aaron"))),
},
},
[]string{},
},
}
ctx := context.Background()
db, err := tempDirDB(ctx)
require.NoError(t, err)
ds, err := db.GetDataset(ctx, "ds")
require.NoError(t, err)
rootMap, err := types.NewMap(ctx, db)
require.NoError(t, err)
parent := types.EmptySet
states := map[string]types.Ref{}
for _, delta := range deltas {
for tbl, sets := range delta.sets {
rootMap, err = addTableValues(ctx, db, rootMap, tbl, sets...)
require.NoError(t, err)
}
for tbl, dels := range delta.deletes {
rootMap, err = deleteTableValues(ctx, db, rootMap, tbl, dels...)
require.NoError(t, err)
}
me := rootMap.Edit()
for _, tbl := range delta.tblDeletes {
me.Remove(types.String(tbl))
}
rootMap, err = me.Map(ctx)
require.NoError(t, err)
commitOpts := CommitOptions{Parents: parent}
ds, err = db.Commit(ctx, ds, rootMap, commitOpts)
require.NoError(t, err)
r, ok, err := ds.MaybeHeadRef()
require.NoError(t, err)
require.True(t, ok)
parent, err = types.NewSet(ctx, db, r)
require.NoError(t, err)
states[delta.name] = r
}
tbl, err := makeABigTable(ctx, db)
require.NoError(t, err)
tblRef, err := writeValAndGetRef(ctx, db, tbl)
require.NoError(t, err)
me := rootMap.Edit()
me.Set(types.String("big_table"), tblRef)
rootMap, err = me.Map(ctx)
require.NoError(t, err)
commitOpts := CommitOptions{Parents: parent}
ds, err = db.Commit(ctx, ds, rootMap, commitOpts)
require.NoError(t, err)
r, ok, err := ds.MaybeHeadRef()
require.NoError(t, err)
require.True(t, ok)
states["add big table"] = r
for k, rootRef := range states {
t.Run(k, func(t *testing.T) {
eventCh := make(chan PullerEvent, 128)
go func() {
for evt := range eventCh {
var details interface{}
switch evt.EventType {
case NewLevelTWEvent, DestDBHasTWEvent, LevelUpdateTWEvent:
details = evt.TWEventDetails
default:
details = evt.TFEventDetails
}
jsonBytes, err := json.Marshal(details)
if err == nil {
t.Logf("event_type: %d details: %s\n", evt.EventType, string(jsonBytes))
}
}
}()
sinkdb, err := tempDirDB(ctx)
require.NoError(t, err)
tmpDir := filepath.Join(os.TempDir(), uuid.New().String())
err = os.MkdirAll(tmpDir, os.ModePerm)
require.NoError(t, err)
plr, err := NewPuller(ctx, tmpDir, 128, db, sinkdb, rootRef.TargetHash(), eventCh)
require.NoError(t, err)
err = plr.Pull(ctx)
close(eventCh)
require.NoError(t, err)
sinkDS, err := sinkdb.GetDataset(ctx, "ds")
sinkDS, err = sinkdb.FastForward(ctx, sinkDS, rootRef)
require.NoError(t, err)
require.NoError(t, err)
sinkRootRef, ok, err := sinkDS.MaybeHeadRef()
require.NoError(t, err)
require.True(t, ok)
eq, err := pullerRefEquality(ctx, rootRef, sinkRootRef, db, sinkdb)
require.NoError(t, err)
assert.True(t, eq)
})
}
}
func makeABigTable(ctx context.Context, db Database) (types.Map, error) {
m, err := types.NewMap(ctx, db)
if err != nil {
return types.EmptyMap, nil
}
me := m.Edit()
for i := 0; i < 256*1024; i++ {
tpl, err := types.NewTuple(db.Format(), types.UUID(uuid.New()), types.String(uuid.New().String()), types.Float(float64(i)))
if err != nil {
return types.EmptyMap, err
}
me.Set(types.Int(i), tpl)
}
return me.Map(ctx)
}
func pullerRefEquality(ctx context.Context, expectad, actual types.Ref, srcDB, sinkDB Database) (bool, error) {
expectedVal, err := expectad.TargetValue(ctx, srcDB)
if err != nil {
return false, err
}
actualVal, err := actual.TargetValue(ctx, sinkDB)
exPs, exTbls, err := parentsAndTables(expectedVal.(types.Struct))
actPs, actTbls, err := parentsAndTables(actualVal.(types.Struct))
if !exPs.Equals(actPs) {
return false, nil
}
err = exTbls.IterAll(ctx, func(key, exVal types.Value) error {
actVal, ok, err := actTbls.MaybeGet(ctx, key)
if err != nil {
return err
}
if !ok {
return errors.New("Missing table " + string(key.(types.String)))
}
exMapVal, err := exVal.(types.Ref).TargetValue(ctx, srcDB)
if err != nil {
return err
}
actMapVal, err := actVal.(types.Ref).TargetValue(ctx, sinkDB)
if err != nil {
return err
}
return errIfNotEqual(ctx, exMapVal.(types.Map), actMapVal.(types.Map))
})
if err != nil {
return false, err
}
return exTbls.Equals(actTbls), nil
}
var errNotEqual = errors.New("not equal")
func errIfNotEqual(ctx context.Context, ex, act types.Map) error {
exItr, err := ex.Iterator(ctx)
if err != nil {
return err
}
actItr, err := act.Iterator(ctx)
if err != nil {
return err
}
for {
exK, exV, err := exItr.Next(ctx)
if err != nil {
return err
}
actK, actV, err := actItr.Next(ctx)
if err != nil {
return err
}
if actK == nil && exK == nil {
break
} else if exK == nil || actK == nil {
return errNotEqual
}
if exV == nil && actV == nil {
continue
} else if exV == nil || actV == nil {
return errNotEqual
}
if !exK.Equals(actK) || !exV.Equals(actV) {
return errNotEqual
}
}
return nil
}
func parentsAndTables(cm types.Struct) (types.Set, types.Map, error) {
ps, ok, err := cm.MaybeGet("parents")
if err != nil {
return types.EmptySet, types.EmptyMap, err
}
if !ok {
return types.EmptySet, types.EmptyMap, err
}
tbls, ok, err := cm.MaybeGet("value")
if err != nil {
return types.EmptySet, types.EmptyMap, err
}
if !ok {
return types.EmptySet, types.EmptyMap, err
}
return ps.(types.Set), tbls.(types.Map), nil
}
func writeValAndGetRef(ctx context.Context, vrw types.ValueReadWriter, val types.Value) (types.Ref, error) {
valRef, err := types.NewRef(val, vrw.Format())
if err != nil {
return types.Ref{}, err
}
targetVal, err := valRef.TargetValue(ctx, vrw)
if err != nil {
return types.Ref{}, err
}
if targetVal == nil {
_, err = vrw.WriteValue(ctx, val)
if err != nil {
return types.Ref{}, err
}
}
return valRef, err
}

View File

@@ -50,6 +50,10 @@ func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found
panic("not impl")
}
func (fb fileBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error {
panic("not impl")
}
func (fb fileBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
panic("not impl")
}

View File

@@ -43,6 +43,10 @@ func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, found
panic("not impl")
}
func (nb nullBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundChunks chan chunks.Chunkable) error {
panic("not impl")
}
func (nb nullBlockStore) Has(ctx context.Context, h hash.Hash) (bool, error) {
panic("not impl")
}

289
go/store/nbs/byte_sink.go Normal file
View File

@@ -0,0 +1,289 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"errors"
"io"
"os"
"path/filepath"
"sync"
"github.com/google/uuid"
"github.com/liquidata-inc/dolt/go/libraries/utils/iohelp"
"github.com/liquidata-inc/dolt/go/store/atomicerr"
)
func flushSinkToFile(sink ByteSink, path string) (err error) {
var f *os.File
f, err = os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer func() {
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()
err = sink.Flush(f)
return err
}
// A ByteSink is an interface for writing bytes which can later be flushed to a writer
type ByteSink interface {
io.Writer
// Flush writes all the data that was written to the ByteSink to the supplied writer
Flush(wr io.Writer) error
// FlushToFile writes all the data that was written to the ByteSink to a file at the given path
FlushToFile(path string) error
}
// ErrBuffFull used by the FixedBufferSink when the data written is larger than the buffer allocated.
var ErrBufferFull = errors.New("buffer full")
// FixedBufferByteSink is a ByteSink implementation with a buffer whose size will not change. Writing more
// data than the fixed buffer can hold will result in an error
type FixedBufferByteSink struct {
buff []byte
pos uint64
}
// NewFixedBufferTableSink creates a FixedBufferTableSink which will use the supplied buffer
func NewFixedBufferTableSink(buff []byte) *FixedBufferByteSink {
if len(buff) == 0 {
panic("must provide a buffer")
}
return &FixedBufferByteSink{buff: buff}
}
// Write writes a byte array to the sink.
func (sink *FixedBufferByteSink) Write(src []byte) (int, error) {
dest := sink.buff[sink.pos:]
destLen := len(dest)
srcLen := len(src)
if destLen < srcLen {
return 0, ErrBufferFull
}
copy(dest, src)
sink.pos += uint64(srcLen)
return srcLen, nil
}
// Flush writes all the data that was written to the ByteSink to the supplied writer
func (sink *FixedBufferByteSink) Flush(wr io.Writer) error {
return iohelp.WriteAll(wr, sink.buff[:sink.pos])
}
// FlushToFile writes all the data that was written to the ByteSink to a file at the given path
func (sink *FixedBufferByteSink) FlushToFile(path string) (err error) {
return flushSinkToFile(sink, path)
}
// BlockBufferByteSink allocates blocks of data with a given block size to store the bytes written to the sink. New
// blocks are allocated as needed in order to handle all the data of the Write calls.
type BlockBufferByteSink struct {
blockSize int
pos uint64
blocks [][]byte
}
// NewBlockBufferTableSink creates a BlockBufferByteSink with the provided block size.
func NewBlockBufferTableSink(blockSize int) *BlockBufferByteSink {
block := make([]byte, 0, blockSize)
return &BlockBufferByteSink{blockSize, 0, [][]byte{block}}
}
// Write writes a byte array to the sink.
func (sink *BlockBufferByteSink) Write(src []byte) (int, error) {
srcLen := len(src)
currBlockIdx := len(sink.blocks) - 1
currBlock := sink.blocks[currBlockIdx]
remaining := cap(currBlock) - len(currBlock)
if remaining >= srcLen {
currBlock = append(currBlock, src...)
sink.blocks[currBlockIdx] = currBlock
} else {
if remaining > 0 {
currBlock = append(currBlock, src[:remaining]...)
sink.blocks[currBlockIdx] = currBlock
}
newBlock := make([]byte, 0, sink.blockSize)
newBlock = append(newBlock, src[remaining:]...)
sink.blocks = append(sink.blocks, newBlock)
}
sink.pos += uint64(srcLen)
return srcLen, nil
}
// Flush writes all the data that was written to the ByteSink to the supplied writer
func (sink *BlockBufferByteSink) Flush(wr io.Writer) (err error) {
return iohelp.WriteAll(wr, sink.blocks...)
}
// FlushToFile writes all the data that was written to the ByteSink to a file at the given path
func (sink *BlockBufferByteSink) FlushToFile(path string) (err error) {
return flushSinkToFile(sink, path)
}
// BufferedFileByteSink is a ByteSink implementation that buffers some amount of data before it passes it
// to a background writing thread to be flushed to a file.
type BufferedFileByteSink struct {
blockSize int
pos uint64
currentBlock []byte
writeCh chan []byte
ae *atomicerr.AtomicError
wg *sync.WaitGroup
wr io.WriteCloser
path string
}
// NewBufferedFileByteSink creates a BufferedFileByteSink
func NewBufferedFileByteSink(blockSize, chBufferSize int) (*BufferedFileByteSink, error) {
path := filepath.Join(os.TempDir(), uuid.New().String()+".tf")
f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
if err != nil {
return nil, err
}
sink := &BufferedFileByteSink{
blockSize: blockSize,
currentBlock: make([]byte, blockSize),
writeCh: make(chan []byte, chBufferSize),
ae: atomicerr.New(),
wg: &sync.WaitGroup{},
wr: f,
path: path,
}
sink.wg.Add(1)
go func() {
defer sink.wg.Done()
sink.backgroundWrite()
}()
return sink, nil
}
// Write writes a byte array to the sink.
func (sink *BufferedFileByteSink) Write(src []byte) (int, error) {
srcLen := len(src)
remaining := cap(sink.currentBlock) - len(sink.currentBlock)
if remaining >= srcLen {
sink.currentBlock = append(sink.currentBlock, src...)
if remaining == srcLen {
sink.writeCh <- sink.currentBlock
sink.currentBlock = nil
}
} else {
if remaining > 0 {
sink.currentBlock = append(sink.currentBlock, src[:remaining]...)
sink.writeCh <- sink.currentBlock
}
newBlock := make([]byte, 0, sink.blockSize)
newBlock = append(newBlock, src[remaining:]...)
sink.currentBlock = newBlock
}
sink.pos += uint64(srcLen)
return srcLen, nil
}
func (sink *BufferedFileByteSink) backgroundWrite() {
var err error
for buff := range sink.writeCh {
if err != nil {
continue // drain
}
err = iohelp.WriteAll(sink.wr, buff)
sink.ae.SetIfError(err)
}
err = sink.wr.Close()
sink.ae.SetIfError(err)
}
// Flush writes all the data that was written to the ByteSink to the supplied writer
func (sink *BufferedFileByteSink) Flush(wr io.Writer) (err error) {
toWrite := len(sink.currentBlock)
if toWrite > 0 {
sink.writeCh <- sink.currentBlock[:toWrite]
}
close(sink.writeCh)
sink.wg.Wait()
if err := sink.ae.Get(); err != nil {
return err
}
var f *os.File
f, err = os.Open(sink.path)
if err != nil {
return err
}
defer func() {
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()
_, err = io.Copy(wr, f)
return err
}
// FlushToFile writes all the data that was written to the ByteSink to a file at the given path
func (sink *BufferedFileByteSink) FlushToFile(path string) (err error) {
toWrite := len(sink.currentBlock)
if toWrite > 0 {
sink.writeCh <- sink.currentBlock[:toWrite]
}
close(sink.writeCh)
sink.wg.Wait()
if err := sink.ae.Get(); err != nil {
return err
}
return os.Rename(sink.path, path)
}

View File

@@ -0,0 +1,118 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package nbs
import (
"bytes"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
func TestBlockBufferTableSink(t *testing.T) {
createSink := func() ByteSink {
return NewBlockBufferTableSink(128)
}
suite.Run(t, &TableSinkSuite{createSink, t})
}
func TestFixedBufferTableSink(t *testing.T) {
createSink := func() ByteSink {
return NewFixedBufferTableSink(make([]byte, 32*1024))
}
suite.Run(t, &TableSinkSuite{createSink, t})
}
func TestBufferedFileByteSink(t *testing.T) {
createSink := func() ByteSink {
sink, err := NewBufferedFileByteSink(4*1024, 16)
require.NoError(t, err)
return sink
}
suite.Run(t, &TableSinkSuite{createSink, t})
}
type TableSinkSuite struct {
sinkFactory func() ByteSink
t *testing.T
}
func (suite *TableSinkSuite) SetT(t *testing.T) {
suite.t = t
}
func (suite *TableSinkSuite) T() *testing.T {
return suite.t
}
func writeToSink(sink ByteSink) error {
data := make([]byte, 64)
for i := 0; i < 64; i++ {
data[i] = byte(i)
}
for i := 0; i < 32; i++ {
_, err := sink.Write(data)
if err != nil {
return err
}
}
return nil
}
func verifyContents(t *testing.T, bytes []byte) {
for i := 0; i < 64*32; i++ {
assert.Equal(t, byte(i%64), bytes[i])
}
}
func (suite *TableSinkSuite) TestWriteAndFlush() {
sink := suite.sinkFactory()
err := writeToSink(sink)
require.NoError(suite.t, err)
bb := bytes.NewBuffer(nil)
err = sink.Flush(bb)
assert.NoError(suite.t, err)
verifyContents(suite.t, bb.Bytes())
}
func (suite *TableSinkSuite) TestWriteAndFlushToFile() {
sink := suite.sinkFactory()
err := writeToSink(sink)
require.NoError(suite.t, err)
path := filepath.Join(os.TempDir(), uuid.New().String())
err = sink.FlushToFile(path)
require.NoError(suite.t, err)
data, err := ioutil.ReadFile(path)
require.NoError(suite.t, err)
verifyContents(suite.t, data)
}

View File

@@ -20,107 +20,13 @@ import (
"errors"
"hash"
"io"
"os"
"sort"
"github.com/golang/snappy"
"github.com/liquidata-inc/dolt/go/libraries/utils/iohelp"
)
// A ByteSink is an interface for writing bytes which can later be flushed to a writer
type ByteSink interface {
io.Writer
// Flush writes all the data that was written to the ByteSink to the supplied writer
Flush(wr io.Writer) error
}
// ErrBuffFull used by the FixedBufferSink when the data written is larger than the buffer allocated.
var ErrBufferFull = errors.New("buffer full")
// FixedBufferByteSink is a ByteSink implementation with a buffer whose size will not change. Writing more
// data than the fixed buffer can hold will result in an error
type FixedBufferByteSink struct {
buff []byte
pos uint64
}
// NewFixedBufferTableSink creates a FixedBufferTableSink which will use the supplied buffer
func NewFixedBufferTableSink(buff []byte) *FixedBufferByteSink {
if len(buff) == 0 {
panic("must provide a buffer")
}
return &FixedBufferByteSink{buff: buff}
}
// Write writes a byte array to the sink.
func (sink *FixedBufferByteSink) Write(src []byte) (int, error) {
dest := sink.buff[sink.pos:]
destLen := len(dest)
srcLen := len(src)
if destLen < srcLen {
return 0, ErrBufferFull
}
copy(dest, src)
sink.pos += uint64(srcLen)
return srcLen, nil
}
// Flush writes all the data that was written to the ByteSink to the supplied writer
func (sink *FixedBufferByteSink) Flush(wr io.Writer) error {
return iohelp.WriteAll(wr, sink.buff[:sink.pos])
}
// BlockBufferByteSink allocates blocks of data with a given block size to store the bytes written to the sink. New
// blocks are allocated as needed in order to handle all the data of the Write calls.
type BlockBufferByteSink struct {
blockSize int
pos uint64
blocks [][]byte
}
// NewBlockBufferTableSink creates a BlockBufferByteSink with the provided block size.
func NewBlockBufferTableSink(blockSize int) *BlockBufferByteSink {
block := make([]byte, 0, blockSize)
return &BlockBufferByteSink{blockSize, 0, [][]byte{block}}
}
// Write writes a byte array to the sink.
func (sink *BlockBufferByteSink) Write(src []byte) (int, error) {
srcLen := len(src)
currBlockIdx := len(sink.blocks) - 1
currBlock := sink.blocks[currBlockIdx]
remaining := cap(currBlock) - len(currBlock)
if remaining >= srcLen {
currBlock = append(currBlock, src...)
sink.blocks[currBlockIdx] = currBlock
} else {
if remaining > 0 {
currBlock = append(currBlock, src[:remaining]...)
sink.blocks[currBlockIdx] = currBlock
}
newBlock := make([]byte, 0, sink.blockSize)
newBlock = append(newBlock, src[remaining:]...)
sink.blocks = append(sink.blocks, newBlock)
}
sink.pos += uint64(srcLen)
return srcLen, nil
}
// Flush writes all the data that was written to the ByteSink to the supplied writer
func (sink *BlockBufferByteSink) Flush(wr io.Writer) (err error) {
return iohelp.WriteAll(wr, sink.blocks...)
}
const defaultTableSinkBlockSize = 2 * 1024 * 1024
const defaultChBufferSize = 32 * 1024
// ErrNotFinished is an error returned by a CmpChunkTableWriter when a call to Flush* is called before Finish is called
var ErrNotFinished = errors.New("not finished")
@@ -138,8 +44,19 @@ type CmpChunkTableWriter struct {
}
// NewCmpChunkTableWriter creates a new CmpChunkTableWriter instance with a default ByteSink
func NewCmpChunkTableWriter() *CmpChunkTableWriter {
return &CmpChunkTableWriter{NewBlockBufferTableSink(defaultTableSinkBlockSize), 0, 0, nil, nil}
func NewCmpChunkTableWriter() (*CmpChunkTableWriter, error) {
s, err := NewBufferedFileByteSink(defaultTableSinkBlockSize, defaultChBufferSize)
if err != nil {
return nil, err
}
return &CmpChunkTableWriter{s, 0, 0, nil, nil}, nil
}
// Size returns the number of compressed chunks that have been added
func (tw *CmpChunkTableWriter) Size() int {
return len(tw.prefixes)
}
// AddCmpChunk adds a compressed chunk
@@ -210,20 +127,7 @@ func (tw *CmpChunkTableWriter) FlushToFile(path string) error {
return ErrNotFinished
}
f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
err = tw.sink.Flush(f)
if err != nil {
_ = f.Close()
return err
}
return f.Close()
return tw.sink.FlushToFile(path)
}
// Flush can be called after Finish in order to write the data out to the writer provided.

View File

@@ -22,46 +22,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/liquidata-inc/dolt/go/store/atomicerr"
"github.com/liquidata-inc/dolt/go/store/chunks"
"github.com/liquidata-inc/dolt/go/store/hash"
)
func TestBlockBufferTableSink(t *testing.T) {
suite.Run(t, &TableSinkSuite{sink: NewBlockBufferTableSink(128)})
}
func TestFixedBufferTableSink(t *testing.T) {
suite.Run(t, &TableSinkSuite{sink: NewFixedBufferTableSink(make([]byte, 32*1024))})
}
type TableSinkSuite struct {
sink ByteSink
t *testing.T
}
func (suite *TableSinkSuite) SetT(t *testing.T) {
suite.t = t
}
func (suite *TableSinkSuite) T() *testing.T {
return suite.t
}
func (suite *TableSinkSuite) TestWrite() {
data := make([]byte, 64)
for i := 0; i < 64; i++ {
data[i] = byte(i)
}
for i := 0; i < 32; i++ {
_, err := suite.sink.Write(data)
assert.NoError(suite.t, err)
}
}
func TestCmpChunkTableWriter(t *testing.T) {
// Put some chunks in a table file and get the buffer back which contains the table file data
ctx := context.Background()
@@ -82,7 +48,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
ae := atomicerr.New()
wg := &sync.WaitGroup{}
reqs := toGetRecords(hashes)
found := make(chan CompressedChunk, 128)
found := make(chan chunks.Chunkable, 128)
go func() {
defer close(found)
@@ -91,9 +57,10 @@ func TestCmpChunkTableWriter(t *testing.T) {
}()
// for all the chunks we find, write them using the compressed writer
tw := NewCmpChunkTableWriter()
tw, err := NewCmpChunkTableWriter()
require.NoError(t, err)
for cmpChnk := range found {
err = tw.AddCmpChunk(cmpChnk)
err = tw.AddCmpChunk(cmpChnk.(CompressedChunk))
require.NoError(t, err)
}

View File

@@ -153,7 +153,7 @@ func (mt *memTable) getMany(ctx context.Context, reqs []getRecord, foundChunks c
return remaining
}
func (mt *memTable) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
func (mt *memTable) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
panic("not implemented")
}

View File

@@ -277,7 +277,7 @@ func (crg chunkReaderGroup) getMany(ctx context.Context, reqs []getRecord, found
return true
}
func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
func (crg chunkReaderGroup) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
for _, haver := range crg {
remaining := haver.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, stats)

View File

@@ -134,7 +134,7 @@ func (ccs *persistingChunkSource) getMany(ctx context.Context, reqs []getRecord,
return cr.getMany(ctx, reqs, foundChunks, wg, ae, stats)
}
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
cr := ccs.getReader()
if cr == nil {
@@ -266,7 +266,7 @@ func (ecs emptyChunkSource) getMany(ctx context.Context, reqs []getRecord, found
return true
}
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
return true
}

View File

@@ -410,7 +410,7 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
})
}
func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan CompressedChunk) error {
func (nbs *NomsBlockStore) GetManyCompressed(ctx context.Context, hashes hash.HashSet, foundCmpChunks chan chunks.Chunkable) error {
return nbs.getManyWithFunc(ctx, hashes, func(ctx context.Context, cr chunkReader, reqs []getRecord, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
return cr.getManyCompressed(ctx, reqs, foundCmpChunks, wg, ae, nbs.stats)
})
@@ -932,63 +932,53 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []TableFile,
return contents.GetRoot(), tableFiles, nil
}
// NewSink returns a writer for a new table file. When the writer is closed the table file is persisted
func (nbs *NomsBlockStore) NewSink(ctx context.Context, fileId string, numChunks int) (WriteCloserWithContext, error) {
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader) error {
fsPersister, ok := nbs.p.(*fsTablePersister)
if !ok {
return nil, errors.New("Not implemented")
return errors.New("Not implemented")
}
path := filepath.Join(fsPersister.dir, fileId)
f, err := os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
err := func() (err error) {
var f *os.File
f, err = os.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.ModePerm)
if err != nil {
return err
}
defer func() {
closeErr := f.Close()
if err == nil {
err = closeErr
}
}()
_, err = io.Copy(f, rd)
return err
}()
if err != nil {
return nil, err
return err
}
return NomsBlockStoreTableSink{fileId, numChunks, f, nbs}, nil
fileIdHash, ok := hash.MaybeParse(fileId)
if !ok {
return errors.New("invalid base32 encoded hash: " + fileId)
}
_, err = nbs.UpdateManifest(ctx, map[hash.Hash]uint32{fileIdHash: uint32(numChunks)})
return err
}
// SetRootChunk changes the root chunk hash from the previous value to the new root.
func (nbs *NomsBlockStore) SetRootChunk(ctx context.Context, root, previous hash.Hash) error {
return nbs.updateManifest(ctx, root, previous)
}
// NomsBlockStoreTableSink is an implementation of a WriteCloserWithContext which expects a table file to be
// written to it, and when closed will persist the table file, and update the manifest of the associated nbs
type NomsBlockStoreTableSink struct {
fileId string
numChunks int
wr io.WriteCloser
nbs *NomsBlockStore
}
// Write writes the bytes of a table file
func (ts NomsBlockStoreTableSink) Write(p []byte) (n int, err error) {
return ts.wr.Write(p)
}
// Close closes the writer, and persists it in the manifest of the associated nbs
func (ts NomsBlockStoreTableSink) Close(ctx context.Context) error {
if ts.wr == nil {
return errors.New("already closed")
}
err := ts.wr.Close()
ts.wr = nil
if err != nil {
return err
}
fileIdHash, ok := hash.MaybeParse(ts.fileId)
if !ok {
return errors.New("invalid base32 encoded hash: " + ts.fileId)
}
_, err = ts.nbs.UpdateManifest(ctx, map[hash.Hash]uint32{fileIdHash: uint32(ts.numChunks)})
return err
}

View File

@@ -15,6 +15,7 @@
package nbs
import (
"bytes"
"context"
"fmt"
"io/ioutil"
@@ -50,13 +51,7 @@ func TestNBSAsTableFileStore(t *testing.T) {
data, addr, err := buildTable(chunkData)
fileID := addr.String()
fileToData[fileID] = data
sink, err := st.NewSink(ctx, fileID, i+1)
require.NoError(t, err)
n, err := sink.Write(data)
require.NoError(t, err)
assert.Equal(t, n, len(data))
err = sink.Close(ctx)
err = st.WriteTableFile(ctx, fileID, i+1, bytes.NewReader(data))
require.NoError(t, err)
}
@@ -76,6 +71,9 @@ func TestNBSAsTableFileStore(t *testing.T) {
data, err := ioutil.ReadAll(rd)
require.NoError(t, err)
err = rd.Close()
require.NoError(t, err)
assert.Equal(t, expected, data)
}
}

View File

@@ -228,7 +228,7 @@ type chunkReader interface {
hasMany(addrs []hasRecord) (bool, error)
get(ctx context.Context, h addr, stats *Stats) ([]byte, error)
getMany(ctx context.Context, reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool
extract(ctx context.Context, chunks chan<- extractRecord) error
count() (uint32, error)
uncompressedLen() (uint64, error)
@@ -249,7 +249,7 @@ type chunkReadPlanner interface {
ctx context.Context,
reqs []getRecord,
offsetRecords offsetRecSlice,
foundCmpChunks chan CompressedChunk,
foundCmpChunks chan chunks.Chunkable,
wg *sync.WaitGroup,
ae *atomicerr.AtomicError,
stats *Stats,
@@ -280,21 +280,13 @@ type TableFile interface {
Open() (io.ReadCloser, error)
}
// WriteCloserWithContext is an interface which extends io.Writer and has a Close method that takes a context
type WriteCloserWithContext interface {
io.Writer
// Close closes the writer
Close(ctx context.Context) error
}
// TableFileStore is an interface for interacting with table files directly
type TableFileStore interface {
// Sources retrieves the current root hash, and a list of all the table files
Sources(ctx context.Context) (hash.Hash, []TableFile, error)
// NewSink returns a writer for a new table file. When the writer is closed the table file is persisted
NewSink(ctx context.Context, fileId string, numChunks int) (WriteCloserWithContext, error)
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
WriteTableFile(ctx context.Context, fileId string, numChunks int, rd io.Reader) error
// SetRootChunk changes the root chunk hash from the previous value to the new root.
SetRootChunk(ctx context.Context, root, previous hash.Hash) error

View File

@@ -37,12 +37,20 @@ import (
"github.com/liquidata-inc/dolt/go/store/hash"
)
// CompressedChunk represents a chunk of data in a table file which is still compressed via snappy. CompressedChunk
// implements chunks.Chunkable
type CompressedChunk struct {
H hash.Hash
// H is the hash of the chunk
H hash.Hash
// FullCompressedChunk is the entirety of the compressed chunk data including the crc
FullCompressedChunk []byte
CompressedData []byte
// CompressedData is just the snappy encoded byte buffer that stores the chunk data
CompressedData []byte
}
// NewCompressedChunk creates a CompressedChunk
func NewCompressedChunk(h hash.Hash, buff []byte) (CompressedChunk, error) {
dataLen := uint64(len(buff)) - checksumSize
@@ -56,7 +64,8 @@ func NewCompressedChunk(h hash.Hash, buff []byte) (CompressedChunk, error) {
return CompressedChunk{H: h, FullCompressedChunk: buff, CompressedData: compressedData}, nil
}
func (cmp CompressedChunk) Decompress() (chunks.Chunk, error) {
// ToChunk snappy decodes the compressed data and returns a chunks.Chunk
func (cmp CompressedChunk) ToChunk() (chunks.Chunk, error) {
data, err := snappy.Decode(nil, cmp.CompressedData)
if err != nil {
@@ -66,6 +75,17 @@ func (cmp CompressedChunk) Decompress() (chunks.Chunk, error) {
return chunks.NewChunk(data), nil
}
// Hash returns the hash of the data
func (cmp CompressedChunk) Hash() hash.Hash {
return cmp.H
}
// IsEmpty returns true if the chunk contains no data.
func (cmp CompressedChunk) IsEmpty() bool {
return len(cmp.CompressedData) == 0
}
// ErrInvalidTableFile is an error returned when a table file is corrupt or invalid.
var ErrInvalidTableFile = errors.New("invalid or corrupt table file")
type tableIndex struct {
@@ -337,7 +357,7 @@ func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, er
return nil, errors.New("failed to get data")
}
chnk, err := cmp.Decompress()
chnk, err := cmp.ToChunk()
if err != nil {
return nil, err
@@ -363,7 +383,7 @@ func (tr tableReader) readCompressedAtOffsets(
readStart, readEnd uint64,
reqs []getRecord,
offsets offsetRecSlice,
foundCmpChunks chan CompressedChunk,
foundCmpChunks chan chunks.Chunkable,
stats *Stats,
) error {
return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error {
@@ -381,7 +401,7 @@ func (tr tableReader) readAtOffsets(
stats *Stats,
) error {
return tr.readAtOffsetsWithCB(ctx, readStart, readEnd, reqs, offsets, stats, func(cmp CompressedChunk) error {
chk, err := cmp.Decompress()
chk, err := cmp.ToChunk()
if err != nil {
return err
@@ -457,7 +477,7 @@ func (tr tableReader) getMany(
tr.getManyAtOffsets(ctx, reqs, offsetRecords, foundChunks, wg, ae, stats)
return remaining
}
func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
// Pass #1: Iterate over |reqs| and |tr.prefixes| (both sorted by address) and build the set
// of table locations which must be read in order to satisfy the getMany operation.
offsetRecords, remaining := tr.findOffsets(reqs)
@@ -465,15 +485,7 @@ func (tr tableReader) getManyCompressed(ctx context.Context, reqs []getRecord, f
return remaining
}
func (tr tableReader) getManyCompressedAtOffsets(
ctx context.Context,
reqs []getRecord,
offsetRecords offsetRecSlice,
foundCmpChunks chan CompressedChunk,
wg *sync.WaitGroup,
ae *atomicerr.AtomicError,
stats *Stats,
) {
func (tr tableReader) getManyCompressedAtOffsets(ctx context.Context, reqs []getRecord, offsetRecords offsetRecSlice, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) {
tr.getManyAtOffsetsWithReadFunc(ctx, reqs, offsetRecords, wg, ae, stats, func(
ctx context.Context,
readStart, readEnd uint64,
@@ -533,10 +545,12 @@ func (tr tableReader) getManyAtOffsetsWithReadFunc(
var batch offsetRecSlice
var readStart, readEnd uint64
for i, rec := range offsetRecords {
for i := 0; i < len(offsetRecords); {
if ae.IsSet() {
break
}
rec := offsetRecords[i]
length := tr.lengths[rec.ordinal]
if batch == nil {
@@ -544,12 +558,14 @@ func (tr tableReader) getManyAtOffsetsWithReadFunc(
batch[0] = offsetRecords[i]
readStart = rec.offset
readEnd = readStart + uint64(length)
i++
continue
}
if newReadEnd, canRead := canReadAhead(rec, tr.lengths[rec.ordinal], readStart, readEnd, tr.blockSize); canRead {
batch = append(batch, rec)
readEnd = newReadEnd
i++
continue
}
@@ -718,7 +734,7 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord)
return err
}
chnk, err := cmp.Decompress()
chnk, err := cmp.ToChunk()
if err != nil {
return err

View File

@@ -163,7 +163,7 @@ func (ts tableSet) getMany(ctx context.Context, reqs []getRecord, foundChunks ch
return f(ts.novel) && f(ts.upstream)
}
func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan CompressedChunk, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foundCmpChunks chan chunks.Chunkable, wg *sync.WaitGroup, ae *atomicerr.AtomicError, stats *Stats) bool {
f := func(css chunkSources) bool {
for _, haver := range css {
if ae.IsSet() {
@@ -173,7 +173,9 @@ func (ts tableSet) getManyCompressed(ctx context.Context, reqs []getRecord, foun
if rp, ok := haver.(chunkReadPlanner); ok {
offsets, remaining := rp.findOffsets(reqs)
rp.getManyCompressedAtOffsets(ctx, reqs, offsets, foundCmpChunks, wg, ae, stats)
if len(offsets) > 0 {
rp.getManyCompressedAtOffsets(ctx, reqs, offsets, foundCmpChunks, wg, ae, stats)
}
if !remaining {
return false

View File

@@ -65,13 +65,14 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh
indices := make([]int32, len(absent))
logger(fmt.Sprintf("missing chunks: %v", indices))
n := 0
for h := range absent {
indices[n] = int32(hashToIndex[h])
n++
}
//logger(fmt.Sprintf("missing chunks: %v", indices))
resp := &remotesapi.HasChunksResponse{
Absent: indices,
}
@@ -302,6 +303,35 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
return resp, nil
}
// AddTableFiles updates the remote manifest with new table files without modifying the root hash.
func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.AddTableFilesRequest) (*remotesapi.AddTableFilesResponse, error) {
logger := getReqLogger("GRPC", "Commit")
defer func() { logger("finished") }()
cs := rs.getStore(req.RepoId, "Commit")
if cs == nil {
return nil, status.Error(codes.Internal, "Could not get chunkstore")
}
logger(fmt.Sprintf("found %s/%s", req.RepoId.Org, req.RepoId.RepoName))
// should validate
updates := make(map[hash.Hash]uint32)
for _, cti := range req.ChunkTableInfo {
updates[hash.New(cti.Hash)] = cti.ChunkCount
}
_, err := cs.UpdateManifest(ctx, updates)
if err != nil {
logger(fmt.Sprintf("error occurred updating the manifest: %s", err.Error()))
return nil, status.Error(codes.Internal, "manifest update error")
}
return &remotesapi.AddTableFilesResponse{Success: true}, nil
}
func (rs *RemoteChunkStore) getStore(repoId *remotesapi.RepoId, rpcName string) *nbs.NomsBlockStore {
org := repoId.Org
repoName := repoId.RepoName

View File

@@ -152,6 +152,16 @@ func readFile(logger func(string), org, repo, fileId string, writer io.Writer) i
return http.StatusInternalServerError
}
defer func() {
err := f.Close()
if err != nil {
logger(fmt.Sprintf("Close failed. file: %s, err: %v", path, err))
} else {
logger("Close Successful")
}
}()
n, err := io.Copy(writer, f)
if err != nil {
@@ -215,24 +225,34 @@ func readLocalRange(logger func(string), org, repo, fileId string, offset, lengt
f, err := os.Open(path)
if err != nil {
logger(fmt.Sprintf("Failed to open %s", path))
logger(fmt.Sprintf("Failed to open %s: %v", path, err))
return nil, http.StatusInternalServerError
}
defer func() {
err := f.Close()
if err != nil {
logger(fmt.Sprintf("Close failed. file: %s, err: %v", path, err))
} else {
logger("Close Successful")
}
}()
logger(fmt.Sprintf("Successfully opened file"))
pos, err := f.Seek(int64(offset), 0)
if err != nil {
logger(fmt.Sprintf("Failed to seek to %d", offset))
logger(fmt.Sprintf("Failed to seek to %d: %v", offset, err))
return nil, http.StatusInternalServerError
}
logger(fmt.Sprintf("Seek succeeded. Current position is %d", pos))
diff := int64(offset) - pos
diff := offset - pos
data, err := iohelp.ReadNBytes(f, int(diff+int64(length)))
if err != nil {
logger(fmt.Sprintf("Failed to read %d bytes", diff+int64(length)))
logger(fmt.Sprintf("Failed to read %d bytes: %v", diff+length, err))
return nil, http.StatusInternalServerError
}

View File

@@ -37,6 +37,8 @@ service ChunkStoreService {
rpc Commit(CommitRequest) returns (CommitResponse);
rpc ListTableFiles(ListTableFilesRequest) returns (ListTableFilesResponse);
rpc AddTableFiles(AddTableFilesRequest) returns (AddTableFilesResponse);
}
message RepoId {
@@ -169,4 +171,14 @@ message TableFileInfo {
message ListTableFilesResponse {
bytes root_hash = 1;
repeated TableFileInfo table_file_info = 2;
}
message AddTableFilesRequest {
RepoId repo_id = 1;
ClientRepoFormat client_repo_format = 2;
repeated ChunkTableInfo chunk_table_info = 3;
}
message AddTableFilesResponse {
bool success = 1;
}