mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-11 01:55:08 -05:00
Cope with splitOffset being unset
This commit is contained in:
@@ -255,41 +255,47 @@ func (bsp *blobstorePersister) Path() string {
|
||||
}
|
||||
|
||||
func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, splitOffset uint64) error {
|
||||
lr := io.LimitReader(r, int64(splitOffset))
|
||||
if splitOffset > 0 {
|
||||
lr := io.LimitReader(r, int64(splitOffset))
|
||||
|
||||
indexLen := int64(fileSz - splitOffset)
|
||||
indexLen := int64(fileSz - splitOffset)
|
||||
|
||||
// check if we can Put concurrently
|
||||
rr, ok := r.(io.ReaderAt)
|
||||
if !ok {
|
||||
// sequentially write chunk records then tail
|
||||
if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, int64(splitOffset), lr); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := bsp.bs.Put(ctx, name+tableTailExt, indexLen, r); err != nil {
|
||||
return err
|
||||
// check if we can Put concurrently
|
||||
rr, ok := r.(io.ReaderAt)
|
||||
if !ok {
|
||||
// sequentially write chunk records then tail
|
||||
if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, int64(splitOffset), lr); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := bsp.bs.Put(ctx, name+tableTailExt, indexLen, r); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// on the push path, we expect to Put concurrently
|
||||
// see BufferedFileByteSink in byte_sink.go
|
||||
eg, ectx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error {
|
||||
srdr := io.NewSectionReader(rr, int64(splitOffset), indexLen)
|
||||
_, err := bsp.bs.Put(ectx, name+tableTailExt, indexLen, srdr)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() error {
|
||||
_, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(splitOffset), lr)
|
||||
return err
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// finally concatenate into the complete table
|
||||
_, err := bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt})
|
||||
return err
|
||||
} else {
|
||||
// on the push path, we expect to Put concurrently
|
||||
// see BufferedFileByteSink in byte_sink.go
|
||||
eg, ectx := errgroup.WithContext(ctx)
|
||||
eg.Go(func() error {
|
||||
srdr := io.NewSectionReader(rr, int64(splitOffset), indexLen)
|
||||
_, err := bsp.bs.Put(ectx, name+tableTailExt, indexLen, srdr)
|
||||
return err
|
||||
})
|
||||
eg.Go(func() error {
|
||||
_, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(splitOffset), lr)
|
||||
return err
|
||||
})
|
||||
if err := eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
// no split offset, just copy the whole table. We will create the records object on demand if needed.
|
||||
_, err := bsp.bs.Put(ctx, name, int64(fileSz), r)
|
||||
return err
|
||||
}
|
||||
|
||||
// finally concatenate into the complete table
|
||||
_, err := bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt})
|
||||
return err
|
||||
}
|
||||
|
||||
type bsTableReaderAt struct {
|
||||
|
||||
@@ -29,6 +29,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -1689,7 +1690,7 @@ func (nbs *NomsBlockStore) Path() (string, bool) {
|
||||
}
|
||||
|
||||
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
|
||||
func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, _ int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) error {
|
||||
func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, numChunks int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) error {
|
||||
valctx.ValidateContext(ctx)
|
||||
tfp, ok := nbs.persister.(tableFilePersister)
|
||||
if !ok {
|
||||
@@ -1701,6 +1702,12 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string,
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
if splitOffset == 0 && !strings.HasSuffix(fileName, ArchiveFileSuffix) {
|
||||
splitOffset = tableTailOffset(sz, uint32(numChunks))
|
||||
}
|
||||
|
||||
// CopyTableFile can cope with a 0 splitOffset.
|
||||
return tfp.CopyTableFile(ctx, r, fileName, sz, splitOffset)
|
||||
}
|
||||
|
||||
|
||||
@@ -77,7 +77,8 @@ type tableFilePersister interface {
|
||||
// CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore.
|
||||
//
|
||||
// |splitOffset| is the offset in bytes within the file where the file is split between data and the index/footer.
|
||||
// This is only used for the blob store persister, as it stores the data and index/footer in separate blobs.
|
||||
// This is only used for the blob store persister, as it stores the data and index/footer in separate blobs. In the
|
||||
// event that splitOffset is 0, the blob store persister will upload the entire file as a single blob.
|
||||
CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, splitOffset uint64) error
|
||||
|
||||
// Path returns the file system path. Use CopyTableFile instead of Path to
|
||||
|
||||
Reference in New Issue
Block a user