Merge pull request #9939 from dolthub/macneale4/blobstore-conjoin

blobstore push/pull and conjoin
This commit is contained in:
Neil Macneale IV
2025-10-15 10:49:01 -07:00
committed by GitHub
36 changed files with 378 additions and 198 deletions

View File

@@ -112,12 +112,11 @@ func (cmd ArchiveInspectCmd) Exec(ctx context.Context, commandStr string, args [
cli.Printf("File size: %d bytes\n", inspector.FileSize())
cli.Printf("Format version: %d\n", inspector.FormatVersion())
cli.Printf("File signature: %s\n", inspector.FileSignature())
cli.Println()
cli.Printf("Chunk count: %d\n", inspector.ChunkCount())
cli.Printf("Byte span count: %d\n", inspector.ByteSpanCount())
cli.Printf("Index size: %d bytes\n", inspector.IndexSize())
cli.Printf("Metadata size: %d bytes\n", inspector.MetadataSize())
cli.Printf("Split offset: %d bytes\n", inspector.SplitOffset())
// Display metadata if present
if inspector.MetadataSize() > 0 {

View File

@@ -861,6 +861,7 @@ type TableFileDetails struct {
ContentHash []byte `protobuf:"bytes,3,opt,name=content_hash,json=contentHash,proto3" json:"content_hash,omitempty"`
NumChunks uint64 `protobuf:"varint,4,opt,name=num_chunks,json=numChunks,proto3" json:"num_chunks,omitempty"`
Suffix string `protobuf:"bytes,5,opt,name=suffix,proto3" json:"suffix,omitempty"`
SplitOffset uint64 `protobuf:"varint,6,opt,name=split_offset,json=splitOffset,proto3" json:"split_offset,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -930,6 +931,13 @@ func (x *TableFileDetails) GetSuffix() string {
return ""
}
func (x *TableFileDetails) GetSplitOffset() uint64 {
if x != nil {
return x.SplitOffset
}
return 0
}
type GetUploadLocsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
RepoId *RepoId `protobuf:"bytes,1,opt,name=repo_id,json=repoId,proto3" json:"repo_id,omitempty"`
@@ -1735,6 +1743,7 @@ type TableFileInfo struct {
Url string `protobuf:"bytes,3,opt,name=url,proto3" json:"url,omitempty"`
RefreshAfter *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=refresh_after,json=refreshAfter,proto3" json:"refresh_after,omitempty"`
RefreshRequest *RefreshTableFileUrlRequest `protobuf:"bytes,5,opt,name=refresh_request,json=refreshRequest,proto3" json:"refresh_request,omitempty"`
SplitOffset uint64 `protobuf:"varint,6,opt,name=split_offset,json=splitOffset,proto3" json:"split_offset,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1804,6 +1813,13 @@ func (x *TableFileInfo) GetRefreshRequest() *RefreshTableFileUrlRequest {
return nil
}
func (x *TableFileInfo) GetSplitOffset() uint64 {
if x != nil {
return x.SplitOffset
}
return 0
}
type RefreshTableFileUrlRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
RepoId *RepoId `protobuf:"bytes,1,opt,name=repo_id,json=repoId,proto3" json:"repo_id,omitempty"`
@@ -2196,14 +2212,15 @@ const file_dolt_services_remotesapi_v1alpha1_chunkstore_proto_rawDesc = "" +
"\x17GetDownloadLocsResponse\x12B\n" +
"\x04locs\x18\x01 \x03(\v2..dolt.services.remotesapi.v1alpha1.DownloadLocR\x04locs\x12\x1d\n" +
"\n" +
"repo_token\x18\x02 \x01(\tR\trepoToken\"\xa3\x01\n" +
"repo_token\x18\x02 \x01(\tR\trepoToken\"\xc6\x01\n" +
"\x10TableFileDetails\x12\x0e\n" +
"\x02id\x18\x01 \x01(\fR\x02id\x12%\n" +
"\x0econtent_length\x18\x02 \x01(\x04R\rcontentLength\x12!\n" +
"\fcontent_hash\x18\x03 \x01(\fR\vcontentHash\x12\x1d\n" +
"\n" +
"num_chunks\x18\x04 \x01(\x04R\tnumChunks\x12\x16\n" +
"\x06suffix\x18\x05 \x01(\tR\x06suffix\"\xa9\x02\n" +
"\x06suffix\x18\x05 \x01(\tR\x06suffix\x12!\n" +
"\fsplit_offset\x18\x06 \x01(\x04R\vsplitOffset\"\xa9\x02\n" +
"\x14GetUploadLocsRequest\x12B\n" +
"\arepo_id\x18\x01 \x01(\v2).dolt.services.remotesapi.v1alpha1.RepoIdR\x06repoId\x12.\n" +
"\x11table_file_hashes\x18\x02 \x03(\fB\x02\x18\x01R\x0ftableFileHashes\x12a\n" +
@@ -2270,14 +2287,15 @@ const file_dolt_services_remotesapi_v1alpha1_chunkstore_proto_rawDesc = "" +
"\rappendix_only\x18\x02 \x01(\bB\x02\x18\x01R\fappendixOnly\x12\x1d\n" +
"\n" +
"repo_token\x18\x03 \x01(\tR\trepoToken\x12\x1b\n" +
"\trepo_path\x18\x04 \x01(\tR\brepoPath\"\x82\x02\n" +
"\trepo_path\x18\x04 \x01(\tR\brepoPath\"\xa5\x02\n" +
"\rTableFileInfo\x12\x17\n" +
"\afile_id\x18\x01 \x01(\tR\x06fileId\x12\x1d\n" +
"\n" +
"num_chunks\x18\x02 \x01(\rR\tnumChunks\x12\x10\n" +
"\x03url\x18\x03 \x01(\tR\x03url\x12?\n" +
"\rrefresh_after\x18\x04 \x01(\v2\x1a.google.protobuf.TimestampR\frefreshAfter\x12f\n" +
"\x0frefresh_request\x18\x05 \x01(\v2=.dolt.services.remotesapi.v1alpha1.RefreshTableFileUrlRequestR\x0erefreshRequest\"\xb5\x01\n" +
"\x0frefresh_request\x18\x05 \x01(\v2=.dolt.services.remotesapi.v1alpha1.RefreshTableFileUrlRequestR\x0erefreshRequest\x12!\n" +
"\fsplit_offset\x18\x06 \x01(\x04R\vsplitOffset\"\xb5\x01\n" +
"\x1aRefreshTableFileUrlRequest\x12B\n" +
"\arepo_id\x18\x01 \x01(\v2).dolt.services.remotesapi.v1alpha1.RepoIdR\x06repoId\x12\x17\n" +
"\afile_id\x18\x02 \x01(\tR\x06fileId\x12\x1d\n" +

View File

@@ -425,6 +425,7 @@ func (rs *RemoteChunkStore) getUploadUrl(md metadata.MD, repoPath string, tfd *r
fileID := hash.New(tfd.Id).String() + tfd.Suffix
params := url.Values{}
params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks)))
params.Add("split_offset", strconv.Itoa(int(tfd.SplitOffset)))
params.Add("content_length", strconv.Itoa(int(tfd.ContentLength)))
params.Add("content_hash", base64.RawURLEncoding.EncodeToString(tfd.ContentHash))
return &url.URL{

View File

@@ -140,13 +140,14 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
respWr.WriteHeader(http.StatusBadRequest)
return
}
num_chunks, err := strconv.Atoi(ncs)
numChunks, err := strconv.Atoi(ncs)
if err != nil {
logger = logger.WithField("status", http.StatusBadRequest)
logger.WithError(err).Warn("bad request: num_chunks parameter did not parse")
respWr.WriteHeader(http.StatusBadRequest)
return
}
cls := q.Get("content_length")
if cls == "" {
logger = logger.WithField("status", http.StatusBadRequest)
@@ -154,7 +155,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
respWr.WriteHeader(http.StatusBadRequest)
return
}
content_length, err := strconv.Atoi(cls)
contentLength, err := strconv.Atoi(cls)
if err != nil {
logger = logger.WithField("status", http.StatusBadRequest)
logger.WithError(err).Warn("bad request: content_length parameter did not parse")
@@ -168,7 +169,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
respWr.WriteHeader(http.StatusBadRequest)
return
}
content_hash, err := base64.RawURLEncoding.DecodeString(chs)
contentHash, err := base64.RawURLEncoding.DecodeString(chs)
if err != nil {
logger = logger.WithField("status", http.StatusBadRequest)
logger.WithError(err).Warn("bad request: content_hash parameter did not parse")
@@ -176,7 +177,20 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
return
}
logger, statusCode = writeTableFile(req.Context(), logger, fh.dbCache, filepath, file, num_chunks, content_hash, uint64(content_length), req.Body)
// splitOffset is not required to allow for backwards compatibility with older clients.
splitOffset := uint64(0)
splitQstr := q.Get("split_offset")
if splitQstr != "" {
splitOffset, err = strconv.ParseUint(splitQstr, 10, 64)
if err != nil {
logger = logger.WithField("status", http.StatusBadRequest)
logger.WithError(err).Warn("bad request: split_offset parameter did not parse")
respWr.WriteHeader(http.StatusBadRequest)
return
}
}
logger, statusCode = writeTableFile(req.Context(), logger, fh.dbCache, filepath, file, splitOffset, numChunks, contentHash, uint64(contentLength), req.Body)
}
if statusCode != -1 {
@@ -286,7 +300,7 @@ func (u *uploadreader) Close() error {
return nil
}
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) (*logrus.Entry, int) {
func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, splitOffset uint64, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) (*logrus.Entry, int) {
if !validateFileName(fileId) {
logger = logger.WithField("status", http.StatusBadRequest)
logger.Warnf("%s is not a valid hash", fileId)
@@ -300,7 +314,7 @@ func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache,
return logger, http.StatusInternalServerError
}
err = cs.WriteTableFile(ctx, fileId, numChunks, contentHash, func() (io.ReadCloser, uint64, error) {
err = cs.WriteTableFile(ctx, fileId, splitOffset, numChunks, contentHash, func() (io.ReadCloser, uint64, error) {
reader := body
size := contentLength
return &uploadreader{

View File

@@ -915,19 +915,21 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has
chnks = append(chnks, ch)
}
hashToSplitOffset := make(map[hash.Hash]uint64)
hashToCount := make(map[hash.Hash]int)
hashToData := make(map[hash.Hash][]byte)
hashToContentHash := make(map[hash.Hash][]byte)
// structuring so this can be done as multiple files in the future.
{
name, data, err := nbs.WriteChunks(chnks)
name, data, splitOffset, err := nbs.WriteChunks(chnks)
if err != nil {
return map[hash.Hash]int{}, err
}
h := hash.Parse(name)
hashToSplitOffset[h] = splitOffset
hashToData[h] = data
hashToCount[h] = len(chnks)
@@ -938,7 +940,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has
for h, contentHash := range hashToContentHash {
// Tables created on this path are always starting from memory tables and ending up as noms table files.
// As a result, the suffix is always empty.
err := dcs.uploadTableFileWithRetries(ctx, h, "", uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) {
err := dcs.uploadTableFileWithRetries(ctx, h, "", hashToSplitOffset[h], uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) {
data := hashToData[h]
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil
})
@@ -950,7 +952,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has
return hashToCount, nil
}
func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error {
func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, splitOffset uint64, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error {
op := func() error {
body, contentLength, err := getContent()
if err != nil {
@@ -963,6 +965,7 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table
ContentHash: tableFileContentHash,
NumChunks: numChunks,
Suffix: suffix,
SplitOffset: splitOffset,
}
dcs.logf("getting upload location for file %s", tableFileId.String())
@@ -1066,7 +1069,7 @@ func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps {
}
// WriteTableFile reads a table file from the provided reader and writes it to the chunk store.
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
suffix := ""
if strings.HasSuffix(fileId, nbs.ArchiveFileSuffix) {
suffix = nbs.ArchiveFileSuffix
@@ -1074,7 +1077,7 @@ func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, nu
}
fileIdBytes := hash.Parse(fileId)
return dcs.uploadTableFileWithRetries(ctx, fileIdBytes, suffix, uint64(numChunks), contentHash, getRd)
return dcs.uploadTableFileWithRetries(ctx, fileIdBytes, suffix, splitOffset, uint64(numChunks), contentHash, getRd)
}
// AddTableFilesToManifest adds table files to the manifest
@@ -1195,6 +1198,10 @@ func (drtf DoltRemoteTableFile) NumChunks() int {
return int(drtf.info.NumChunks)
}
func (drtf DoltRemoteTableFile) SplitOffset() uint64 {
return drtf.info.SplitOffset
}
var ErrRemoteTableFileGet = errors.New("HTTP GET for remote table file failed")
func sanitizeSignedUrl(url string) string {

View File

@@ -18,6 +18,8 @@ import (
"bytes"
"context"
"io"
"strconv"
"strings"
)
// Blobstore is an interface for storing and retrieving blobs of data by key
@@ -29,7 +31,7 @@ type Blobstore interface {
Exists(ctx context.Context, key string) (ok bool, err error)
// Get returns a byte range of from the blob keyed by |key|, and the latest store version.
Get(ctx context.Context, key string, br BlobRange) (rc io.ReadCloser, version string, err error)
Get(ctx context.Context, key string, br BlobRange) (rc io.ReadCloser, size uint64, version string, err error)
// Put creates a new blob from |reader| keyed by |key|, it returns the latest store version.
Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (version string, err error)
@@ -44,7 +46,7 @@ type Blobstore interface {
// GetBytes is a utility method calls bs.Get and handles reading the data from the returned
// io.ReadCloser and closing it.
func GetBytes(ctx context.Context, bs Blobstore, key string, br BlobRange) ([]byte, string, error) {
rc, ver, err := bs.Get(ctx, key, br)
rc, _, ver, err := bs.Get(ctx, key, br)
if err != nil || rc == nil {
return nil, ver, err
@@ -65,3 +67,22 @@ func PutBytes(ctx context.Context, bs Blobstore, key string, data []byte) (strin
reader := bytes.NewReader(data)
return bs.Put(ctx, key, int64(len(data)), reader)
}
// parseContentRangeSize extracts the total size from a Content-Range header.
// Expected format: "bytes start-end/total" e.g., "bytes 0-1023/1234567"
// Returns 0 if the header is malformed or not present.
func parseContentRangeSize(contentRange string) uint64 {
if contentRange == "" {
return 0
}
i := strings.Index(contentRange, "/")
if i == -1 {
return 0
}
sizeStr := contentRange[i+1:]
size, err := strconv.ParseUint(sizeStr, 10, 64)
if err != nil {
return 0
}
return size
}

View File

@@ -453,7 +453,7 @@ func testConcatenate(t *testing.T, bs Blobstore, cnt int) {
var off int64
for i := range blobs {
length := int64(len(blobs[i].data))
rdr, _, err := bs.Get(ctx, composite, BlobRange{
rdr, _, _, err := bs.Get(ctx, composite, BlobRange{
offset: off,
length: length,
})

View File

@@ -73,7 +73,7 @@ func (bs *GCSBlobstore) Exists(ctx context.Context, key string) (bool, error) {
// Get retrieves an io.reader for the portion of a blob specified by br along with
// its version
func (bs *GCSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
func (bs *GCSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) {
absKey := path.Join(bs.prefix, key)
oh := bs.bucket.Object(absKey)
var reader *storage.Reader
@@ -89,15 +89,16 @@ func (bs *GCSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.R
}
if err == storage.ErrObjectNotExist {
return nil, "", NotFound{"gs://" + path.Join(bs.bucketName, absKey)}
return nil, 0, "", NotFound{"gs://" + path.Join(bs.bucketName, absKey)}
} else if err != nil {
return nil, "", err
return nil, 0, "", err
}
attrs := reader.Attrs
generation := attrs.Generation
size := uint64(attrs.Size)
return reader, fmtGeneration(generation), nil
return reader, size, fmtGeneration(generation), nil
}
func writeObj(writer *storage.Writer, reader io.Reader) (string, error) {

View File

@@ -61,12 +61,13 @@ func (bs *InMemoryBlobstore) Path() string {
// Get retrieves an io.reader for the portion of a blob specified by br along with
// its version
func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) {
bs.mutex.RLock()
defer bs.mutex.RUnlock()
if val, ok := bs.blobs[key]; ok {
if ver, ok := bs.versions[key]; ok && ver != "" {
size := uint64(len(val))
var byteRange []byte
if br.isAllRange() {
byteRange = val
@@ -79,13 +80,13 @@ func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange)
}
}
return newByteSliceReadCloser(byteRange), ver, nil
return newByteSliceReadCloser(byteRange), size, ver, nil
}
panic("Blob without version, or with invalid version, should no be possible.")
}
return nil, "", NotFound{key}
return nil, 0, "", NotFound{key}
}
// Put sets the blob and the version for a key

View File

@@ -81,29 +81,30 @@ func (bs *LocalBlobstore) Path() string {
// Get retrieves an io.reader for the portion of a blob specified by br along with
// its version
func (bs *LocalBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
func (bs *LocalBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) {
path := filepath.Join(bs.RootDir, key) + bsExt
f, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, "", NotFound{key}
return nil, 0, "", NotFound{key}
}
return nil, "", err
return nil, 0, "", err
}
info, err := f.Stat()
if err != nil {
return nil, "", err
return nil, 0, "", err
}
ver := info.ModTime().String()
size := uint64(info.Size())
rc, err := readCloserForFileRange(f, br)
if err != nil {
_ = f.Close()
return nil, "", err
return nil, 0, "", err
}
return rc, ver, nil
return rc, size, ver, nil
}
func readCloserForFileRange(f *os.File, br BlobRange) (io.ReadCloser, error) {
@@ -188,7 +189,7 @@ func (bs *LocalBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key
defer lck.Unlock()
rc, ver, err := bs.Get(ctx, key, BlobRange{})
rc, _, ver, err := bs.Get(ctx, key, BlobRange{})
if err != nil {
if !IsNotFoundError(err) {

View File

@@ -118,7 +118,7 @@ func (bs *OCIBlobstore) Exists(ctx context.Context, key string) (bool, error) {
}
// Get retrieves an io.reader for the portion of a blob specified by br along with its version
func (bs *OCIBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
func (bs *OCIBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) {
absKey := path.Join(bs.prefix, key)
req := objectstorage.GetObjectRequest{
NamespaceName: &bs.namespace,
@@ -136,19 +136,32 @@ func (bs *OCIBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.R
if serr, ok := common.IsServiceError(err); ok {
// handle not found code
if serr.GetHTTPStatusCode() == 404 {
return nil, "", NotFound{"oci://" + path.Join(bs.bucketName, absKey)}
return nil, 0, "", NotFound{"oci://" + path.Join(bs.bucketName, absKey)}
}
}
return nil, "", err
return nil, 0, "", err
}
var size uint64
// Try to get total size from Content-Range header first (for range requests)
if res.RawResponse != nil && res.RawResponse.Header != nil {
contentRange := res.RawResponse.Header.Get("Content-Range")
if contentRange != "" {
size = parseContentRangeSize(contentRange)
}
}
// Fall back to Content-Length if no Content-Range (full object request)
if size == 0 && res.ContentLength != nil {
size = uint64(*res.ContentLength)
}
// handle negative offset and positive length
if br.offset < 0 && br.length > 0 {
lr := io.LimitReader(res.Content, br.length)
return io.NopCloser(lr), fmtstr(res.ETag), nil
return io.NopCloser(lr), size, fmtstr(res.ETag), nil
}
return res.Content, fmtstr(res.ETag), nil
return res.Content, size, fmtstr(res.ETag), nil
}
// Put sets the blob and the version for a key

View File

@@ -67,31 +67,36 @@ func (ob *OSSBlobstore) Exists(_ context.Context, key string) (bool, error) {
return ob.bucket.IsObjectExist(ob.absKey(key))
}
func (ob *OSSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) {
func (ob *OSSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) {
absKey := ob.absKey(key)
meta, err := ob.bucket.GetObjectMeta(absKey)
if isNotFoundErr(err) {
return nil, "", NotFound{"oss://" + path.Join(ob.bucketName, absKey)}
return nil, 0, "", NotFound{"oss://" + path.Join(ob.bucketName, absKey)}
}
totalSize, err := strconv.ParseInt(meta.Get(oss.HTTPHeaderContentLength), 10, 64)
if err != nil {
return nil, 0, "", err
}
if br.isAllRange() {
reader, err := ob.bucket.GetObject(absKey)
if err != nil {
return nil, "", err
return nil, 0, "", err
}
return reader, ob.getVersion(meta), nil
return reader, uint64(totalSize), ob.getVersion(meta), nil
}
size, err := strconv.ParseInt(meta.Get(oss.HTTPHeaderContentLength), 10, 64)
posBr := br.positiveRange(totalSize)
var responseHeaders http.Header
reader, err := ob.bucket.GetObject(absKey, oss.Range(posBr.offset, posBr.offset+posBr.length-1), oss.GetResponseHeader(&responseHeaders))
if err != nil {
return nil, "", err
return nil, 0, "", err
}
posBr := br.positiveRange(size)
reader, err := ob.bucket.GetObject(absKey, oss.Range(posBr.offset, posBr.offset+posBr.length-1))
if err != nil {
return nil, "", err
}
return reader, ob.getVersion(meta), nil
return reader, uint64(totalSize), ob.getVersion(meta), nil
}
func (ob *OSSBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) {

View File

@@ -38,6 +38,17 @@ type TableFile interface {
// NumChunks returns the number of chunks in a table file
NumChunks() int
// SplitOffset returns the byte offset from the beginning of the storage file where we transition from data to index.
//
// In table files, this is generally determined by calculating the index size based on the number of chunks, then
// subtracting that from the total file size.
// Archive files do not have a deterministic way to calculate the split offset, so we either need to be told the
// offset or read the footer of the file to determine the index size then calculate the split offset.
//
// Passing the offset around simplifies this. It is meaningful for both current storage types, though we will probably
// keep the table file's chunk count method around for a while.
SplitOffset() uint64
// Open returns an io.ReadCloser which can be used to read the bytes of a
// table file. It also returns the content length of the table file.
Open(ctx context.Context) (io.ReadCloser, uint64, error)
@@ -65,7 +76,7 @@ type TableFileStore interface {
Size(ctx context.Context) (uint64, error)
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore.
WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
WriteTableFile(ctx context.Context, fileId string, splitOffSet uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
// AddTableFilesToManifest adds table files to the manifest
AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int, getAddrs GetAddrsCurry) error

View File

@@ -138,7 +138,8 @@ func clone(ctx context.Context, srcTS, sinkTS chunks.TableFileStore, sinkCS chun
}
report(TableFileEvent{EventType: DownloadStart, TableFiles: []chunks.TableFile{tblFile}})
err = sinkTS.WriteTableFile(ctx, tblFile.FileID()+tblFile.LocationSuffix(), tblFile.NumChunks(), nil, func() (io.ReadCloser, uint64, error) {
err = sinkTS.WriteTableFile(ctx, tblFile.FileID()+tblFile.LocationSuffix(), tblFile.SplitOffset(), tblFile.NumChunks(), nil, func() (io.ReadCloser, uint64, error) {
rd, contentLength, err := tblFile.Open(ctx)
if err != nil {
return nil, 0, err

View File

@@ -79,7 +79,7 @@ type PullTableFileWriterConfig struct {
}
type DestTableFileStore interface {
WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error
AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int, getAddrs chunks.GetAddrsCurry) error
}
@@ -363,7 +363,7 @@ func (w *PullTableFileWriter) uploadTempTableFile(ctx context.Context, tmpTblFil
// already upload bytes.
var uploaded uint64
return w.cfg.DestStore.WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) {
return w.cfg.DestStore.WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.chunksLen, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) {
rc, err := tmpTblFile.read.Reader()
if err != nil {
return nil, 0, err
@@ -371,12 +371,12 @@ func (w *PullTableFileWriter) uploadTempTableFile(ctx context.Context, tmpTblFil
if uploaded != 0 {
// A retry. We treat it as if what was already uploaded was rebuffered.
atomic.AddUint64(&w.bufferedSendBytes, uint64(uploaded))
atomic.AddUint64(&w.bufferedSendBytes, uploaded)
uploaded = 0
}
fWithStats := countingReader{countingReader{rc, &uploaded}, &w.finishedSendBytes}
return fWithStats, uint64(fileSize), nil
return fWithStats, fileSize, nil
})
}

View File

@@ -349,7 +349,7 @@ type noopTableFileDestStore struct {
writeCalled atomic.Uint32
}
func (s *noopTableFileDestStore) WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
func (s *noopTableFileDestStore) WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
if s.writeDelay > 0 {
time.Sleep(s.writeDelay)
}
@@ -373,7 +373,7 @@ type testDataTableFileDestStore struct {
doneWriteTableFile chan struct{}
}
func (s *testDataTableFileDestStore) WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
func (s *testDataTableFileDestStore) WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
s.atWriteTableFile <- struct{}{}
<-s.doWriteTableFile
defer func() {
@@ -400,7 +400,7 @@ type errTableFileDestStore struct {
addCalled int
}
func (s *errTableFileDestStore) WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
func (s *errTableFileDestStore) WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
rd, _, _ := getRd()
if rd != nil {
rd.Close()

View File

@@ -153,6 +153,10 @@ func (ai *ArchiveInspector) FileSize() uint64 {
return ai.reader.footer.fileSize
}
func (ai *ArchiveInspector) SplitOffset() uint64 {
return ai.reader.footer.dataSpan().length
}
// ByteSpanCount returns the number of byte spans in the archive
func (ai *ArchiveInspector) ByteSpanCount() uint32 {
return ai.reader.footer.byteSpanCount

View File

@@ -77,8 +77,7 @@ func (f archiveFooter) actualFooterSize() uint64 {
return archiveFooterSize
}
// dataSpan returns the span of the data section of the archive. This is not generally used directly since we usually
// read individual spans for each chunk.
// dataSpan returns the span of the data section of the archive. This is used during conjoin.
func (f archiveFooter) dataSpan() byteSpan {
return byteSpan{offset: 0, length: f.fileSize - f.actualFooterSize() - uint64(f.metadataSize) - uint64(f.indexSize)}
}
@@ -193,7 +192,7 @@ func newArchiveReaderFromFooter(ctx context.Context, reader tableReaderAt, name
return archiveReader{}, errors.New("runtime error: invalid footer.")
}
ftr, err := buildFooter(name, fileSz, footer)
ftr, err := buildArchiveFooter(name, fileSz, footer)
if err != nil {
return archiveReader{}, err
}
@@ -370,10 +369,10 @@ func loadFooter(ctx context.Context, reader ReaderAtWithStats, name hash.Hash, f
if err != nil {
return
}
return buildFooter(name, fileSize, buf)
return buildArchiveFooter(name, fileSize, buf)
}
func buildFooter(name hash.Hash, fileSize uint64, buf []byte) (f archiveFooter, err error) {
func buildArchiveFooter(name hash.Hash, fileSize uint64, buf []byte) (f archiveFooter, err error) {
f.formatVersion = buf[afrVersionOffset]
f.fileSignature = string(buf[afrSigOffset:])
// Verify File Signature

View File

@@ -111,7 +111,7 @@ func (s3p awsTablePersister) Exists(ctx context.Context, name string, _ uint32,
return s3or.objectExistsInChunkSource(ctx, name, stats)
}
func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error {
func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, _ uint64) error {
return s3p.multipartUpload(ctx, r, fileSz, fileId)
}
@@ -136,7 +136,7 @@ func (s3p awsTablePersister) key(k string) string {
}
func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
}

View File

@@ -36,7 +36,7 @@ func (bsm blobstoreManifest) Name() string {
}
func manifestVersionAndContents(ctx context.Context, bs blobstore.Blobstore) (string, manifestContents, error) {
reader, ver, err := bs.Get(ctx, manifestFile, blobstore.AllRange)
reader, _, ver, err := bs.Get(ctx, manifestFile, blobstore.AllRange)
if err != nil {
return "", manifestContents{}, err

View File

@@ -18,7 +18,6 @@ import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
@@ -46,7 +45,7 @@ var _ tableFilePersister = &blobstorePersister{}
// Persist makes the contents of mt durable. Chunks already present in
// |haver| may be dropped in the process.
func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
address, data, splitOffset, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
}
@@ -59,7 +58,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver
name := address.String()
// persist this table in two parts to facilitate later conjoins
records, tail := splitTableParts(data, chunkCount)
records, tail := data[:splitOffset], data[splitOffset:]
// first write table records and tail (index+footer) as separate blobs
eg, ectx := errgroup.WithContext(ctx)
@@ -94,20 +93,27 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
sized = append(sized, sourceWithSize{src, src.currentSize()})
}
// Currently, archive tables are not supported in blobstorePersister.
archiveFound := false
for _, s := range sized {
_, ok := s.source.(archiveChunkSource)
if ok {
return nil, nil, errors.New("archive tables not supported in blobstorePersister")
archiveFound = true
break
}
}
plan, err := planTableConjoin(sized, stats)
var plan compactionPlan
var err error
if archiveFound {
plan, err = planArchiveConjoin(sized, stats)
} else {
plan, err = planTableConjoin(sized, stats)
}
if err != nil {
return nil, nil, err
}
name := plan.name.String()
name := plan.name.String() + plan.suffix
// conjoin must contiguously append the chunk records of |sources|, but the raw content
// of each source contains a chunk index in the tail. Blobstore does not expose a range
@@ -136,12 +142,18 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
return emptyChunkSource{}, nil, err
}
cs, err := newBSChunkSource(ctx, bsp.bs, plan.name, plan.chunkCount, bsp.q, stats)
var cs chunkSource
if archiveFound {
cs, err = newBSArchiveChunkSource(ctx, bsp.bs, plan.name, stats)
} else {
cs, err = newBSTableChunkSource(ctx, bsp.bs, plan.name, plan.chunkCount, bsp.q, stats)
}
return cs, func() {}, err
}
func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunkSource) (name string, err error) {
name = cs.hash().String() + tableRecordsExt
name = cs.hash().String() + cs.suffix() + tableRecordsExt
// first check if we created this sub-object on Persist()
ok, err := bsp.bs.Exists(ctx, name)
if err != nil {
@@ -152,17 +164,27 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk
// otherwise create the sub-object from |table|
// (requires a round-trip for remote blobstores)
if cs.suffix() == ArchiveFileSuffix {
err = bsp.hotCreateArchiveRecords(ctx, cs)
} else {
err = bsp.hotCreateTableRecords(ctx, cs)
}
return name, err
}
func (bsp *blobstorePersister) hotCreateTableRecords(ctx context.Context, cs chunkSource) error {
cnt, err := cs.count()
if err != nil {
return "", err
return err
}
off := tableTailOffset(cs.currentSize(), cnt)
l := int64(off)
rng := blobstore.NewBlobRange(0, l)
rdr, _, err := bsp.bs.Get(ctx, cs.hash().String(), rng)
rdr, _, _, err := bsp.bs.Get(ctx, cs.hash().String(), rng)
if err != nil {
return "", err
return err
}
defer func() {
if cerr := rdr.Close(); cerr != nil {
@@ -170,18 +192,49 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk
}
}()
if _, err = bsp.bs.Put(ctx, name, l, rdr); err != nil {
return "", err
_, err = bsp.bs.Put(ctx, cs.hash().String()+tableRecordsExt, l, rdr)
return err
}
func (bsp *blobstorePersister) hotCreateArchiveRecords(ctx context.Context, cs chunkSource) error {
arch, ok := cs.(archiveChunkSource)
if !ok {
return errors.New("runtime error: hotCreateArchiveRecords expected archiveChunkSource")
}
return name, nil
dataLen := int64(arch.aRdr.footer.dataSpan().length)
rng := blobstore.NewBlobRange(0, dataLen)
rdr, _, _, err := bsp.bs.Get(ctx, arch.hash().String()+ArchiveFileSuffix, rng)
if err != nil {
return err
}
defer rdr.Close()
key := arch.hash().String() + ArchiveFileSuffix + tableRecordsExt
_, err = bsp.bs.Put(ctx, key, dataLen, rdr)
return err
}
// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
cs, err := newBSTableChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
if err == nil {
return cs, nil
}
if blobstore.IsNotFoundError(err) {
source, err := newBSArchiveChunkSource(ctx, bsp.bs, name, stats)
if err != nil {
return nil, err
}
return source, nil
}
return nil, err
}
func (bsp *blobstorePersister) Exists(ctx context.Context, name string, chunkCount uint32, stats *Stats) (bool, error) {
func (bsp *blobstorePersister) Exists(ctx context.Context, name string, _ uint32, _ *Stats) (bool, error) {
return bsp.bs.Exists(ctx, name)
}
@@ -201,49 +254,48 @@ func (bsp *blobstorePersister) Path() string {
return ""
}
func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, chunkCount uint32) error {
// sanity check file size
if fileSz < indexSize(chunkCount)+footerSize {
return fmt.Errorf("table file size %d too small for chunk count %d", fileSz, chunkCount)
}
func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, splitOffset uint64) error {
if splitOffset > 0 {
lr := io.LimitReader(r, int64(splitOffset))
off := int64(tableTailOffset(fileSz, chunkCount))
lr := io.LimitReader(r, off)
indexLen := int64(fileSz - splitOffset)
// check if we can Put concurrently
rr, ok := r.(io.ReaderAt)
if !ok {
// sequentially write chunk records then tail
if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, off, lr); err != nil {
return err
}
if _, err := bsp.bs.Put(ctx, name+tableTailExt, int64(fileSz), r); err != nil {
return err
}
} else {
// on the push path, we expect to Put concurrently
// see BufferedFileByteSink in byte_sink.go
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() error {
buf := make([]byte, indexSize(chunkCount)+footerSize)
if _, err := rr.ReadAt(buf, off); err != nil {
// check if we can Put concurrently
rr, ok := r.(io.ReaderAt)
if !ok {
// sequentially write chunk records then tail
if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, int64(splitOffset), lr); err != nil {
return err
}
if _, err := bsp.bs.Put(ctx, name+tableTailExt, indexLen, r); err != nil {
return err
}
} else {
// on the push path, we expect to Put concurrently
// see BufferedFileByteSink in byte_sink.go
eg, ectx := errgroup.WithContext(ctx)
eg.Go(func() error {
srdr := io.NewSectionReader(rr, int64(splitOffset), indexLen)
_, err := bsp.bs.Put(ectx, name+tableTailExt, indexLen, srdr)
return err
})
eg.Go(func() error {
_, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(splitOffset), lr)
return err
})
if err := eg.Wait(); err != nil {
return err
}
_, err := bsp.bs.Put(ectx, name+tableTailExt, int64(len(buf)), bytes.NewBuffer(buf))
return err
})
eg.Go(func() error {
_, err := bsp.bs.Put(ectx, name+tableRecordsExt, off, lr)
return err
})
if err := eg.Wait(); err != nil {
return err
}
}
// finally concatenate into the complete table
_, err := bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt})
return err
// finally concatenate into the complete table
_, err := bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt})
return err
} else {
// no split offset, just copy the whole table. We will create the records object on demand if needed.
_, err := bsp.bs.Put(ctx, name, int64(fileSz), r)
return err
}
}
type bsTableReaderAt struct {
@@ -260,14 +312,14 @@ func (bsTRA *bsTableReaderAt) clone() (tableReaderAt, error) {
}
func (bsTRA *bsTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
rc, _, err := bsTRA.bs.Get(ctx, bsTRA.key, blobstore.AllRange)
rc, _, _, err := bsTRA.bs.Get(ctx, bsTRA.key, blobstore.AllRange)
return rc, err
}
// ReadAtWithStats is the bsTableReaderAt implementation of the tableReaderAt interface
func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (int, error) {
br := blobstore.NewBlobRange(off, int64(len(p)))
rc, _, err := bsTRA.bs.Get(ctx, bsTRA.key, br)
rc, _, _, err := bsTRA.bs.Get(ctx, bsTRA.key, br)
if err != nil {
return 0, err
@@ -292,9 +344,29 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off
return totalRead, nil
}
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
func newBSArchiveChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, stats *Stats) (cs chunkSource, err error) {
rc, sz, _, err := bs.Get(ctx, name.String()+ArchiveFileSuffix, blobstore.NewBlobRange(-int64(archiveFooterSize), 0))
if err != nil {
return nil, err
}
defer rc.Close()
footer := make([]byte, archiveFooterSize)
_, err = io.ReadFull(rc, footer)
if err != nil {
return nil, err
}
aRdr, err := newArchiveReaderFromFooter(ctx, &bsTableReaderAt{key: name.String() + ArchiveFileSuffix, bs: bs}, name, sz, footer, stats)
if err != nil {
return emptyChunkSource{}, err
}
return archiveChunkSource{aRdr, ""}, nil
}
func newBSTableChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0))
rc, _, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0))
if err != nil {
return err
}
@@ -323,17 +395,6 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Has
return &chunkSourceAdapter{tr, name}, nil
}
// splitTableParts separates a table into chunk records and meta data.
//
// +----------------------+-------+--------+
// table format: | Chunk Record 0 ... N | Index | Footer |
// +----------------------+-------+--------+
func splitTableParts(data []byte, count uint32) (records, tail []byte) {
o := tableTailOffset(uint64(len(data)), count)
records, tail = data[:o], data[o:]
return
}
func tableTailOffset(size uint64, count uint32) uint64 {
return size - (indexSize(count) + footerSize)
}

View File

@@ -32,7 +32,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
// Put some chunks in a table file and get the buffer back which contains the table file data
ctx := context.Background()
expectedId, buff, err := WriteChunks(testMDChunks)
expectedId, buff, _, err := WriteChunks(testMDChunks)
require.NoError(t, err)
// Setup a TableReader to read compressed chunks out of

View File

@@ -95,7 +95,7 @@ func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch
t1 := time.Now()
defer stats.PersistLatency.SampleTimeSince(t1)
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
}
@@ -114,7 +114,7 @@ func (ftp *fsTablePersister) Path() string {
return ftp.dir
}
func (ftp *fsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error {
func (ftp *fsTablePersister) CopyTableFile(_ context.Context, r io.Reader, fileId string, _ uint64, _ uint64) error {
tn, f, err := func() (n string, cleanup func(), err error) {
ftp.removeMu.Lock()
var temp *os.File

View File

@@ -136,7 +136,12 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err e
defer r.Close()
sz := gcc.writer.FullLength()
err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount()))
dataSplit, err := gcc.writer.ChunkDataLength()
if err != nil {
return nil, fmt.Errorf("gc_copier, ChunkDataLength() error: %w", err)
}
err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, dataSplit)
if err != nil {
return nil, fmt.Errorf("gc_copier, CopyTableFile error: %w", err)
}

View File

@@ -418,8 +418,8 @@ func (gcs *GenerationalNBS) Size(ctx context.Context) (uint64, error) {
}
// WriteTableFile will read a table file from the provided reader and write it to the new gen TableFileStore
func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, contentHash, getRd)
func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
return gcs.newGen.WriteTableFile(ctx, fileId, splitOffset, numChunks, contentHash, getRd)
}
// AddTableFilesToManifest adds table files to the manifest of the newgen cs

View File

@@ -320,11 +320,13 @@ func (j *ChunkJournal) Path() string {
return filepath.Dir(j.path)
}
func (j *ChunkJournal) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error {
func (j *ChunkJournal) CopyTableFile(ctx context.Context, r io.Reader, fileId string, _ uint64, _ uint64) error {
if j.backing.readOnly() {
return errReadOnlyManifest
}
return j.persister.CopyTableFile(ctx, r, fileId, fileSz, chunkCount)
// we are always using an fsTablePersister, and know that implementation ignores the fileSz and splitOffset.
// Should this ever change in the future, those parameters should be passed through.
return j.persister.CopyTableFile(ctx, r, fileId, 0, 0)
}
// Name implements manifest.

View File

@@ -41,7 +41,9 @@ const (
chunkNotAdded
)
func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) {
// WriteChunks writes the provided chunks to a newly created memory table and returns the name and data of the resulting
// table.
func WriteChunks(chunks []chunks.Chunk) (name string, data []byte, splitOffset uint64, err error) {
var size uint64
for _, chunk := range chunks {
size += uint64(len(chunk.Data()))
@@ -52,26 +54,25 @@ func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) {
return writeChunksToMT(mt, chunks)
}
func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error) {
func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (name string, data []byte, splitOffset uint64, err error) {
for _, chunk := range chunks {
res := mt.addChunk(chunk.Hash(), chunk.Data())
if res == chunkNotAdded {
return "", nil, errors.New("didn't create this memory table with enough space to add all the chunks")
return "", nil, 0, errors.New("didn't create this memory table with enough space to add all the chunks")
}
}
var stats Stats
name, data, count, _, err := mt.write(nil, nil, &stats)
h, data, splitOffset, count, _, err := mt.write(nil, nil, &stats)
if err != nil {
return "", nil, err
return "", nil, 0, err
}
if count != uint32(len(chunks)) {
return "", nil, errors.New("didn't write everything")
return "", nil, 0, errors.New("didn't write everything")
}
return name.String(), data, nil
return h.String(), data, splitOffset, nil
}
type memTable struct {
@@ -81,7 +82,7 @@ type memTable struct {
pendingRefs []hasRecord
getChildAddrs []chunks.GetAddrsCb
maxData uint64
totalData uint64
totalData uint64 // size of uncompressed data in chunks
}
func newMemTable(memTableSize uint64) *memTable {
@@ -220,11 +221,11 @@ func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) er
return nil
}
func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, count uint32, gcb gcBehavior, err error) {
func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, splitOffset uint64, chunkCount uint32, gcb gcBehavior, err error) {
gcb = gcBehavior_Continue
numChunks := uint64(len(mt.order))
if numChunks == 0 {
return hash.Hash{}, nil, 0, gcBehavior_Continue, fmt.Errorf("mem table cannot write with zero chunks")
return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, fmt.Errorf("mem table cannot write with zero chunks")
}
maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData)
// todo: memory quota
@@ -235,10 +236,10 @@ func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name
sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted.
_, gcb, err = haver.hasMany(mt.order, keeper)
if err != nil {
return hash.Hash{}, nil, 0, gcBehavior_Continue, err
return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, err
}
if gcb != gcBehavior_Continue {
return hash.Hash{}, nil, 0, gcb, err
return hash.Hash{}, nil, 0, 0, gcb, err
}
sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write
@@ -248,23 +249,24 @@ func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name
if !addr.has {
h := addr.a
tw.addChunk(*h, mt.chunks[*h])
count++
chunkCount++
}
}
tableSize, name, err := tw.finish()
if err != nil {
return hash.Hash{}, nil, 0, gcBehavior_Continue, err
return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, err
}
if count > 0 {
splitOffset = tableSize - (indexSize(chunkCount) + footerSize)
if chunkCount > 0 {
stats.BytesPerPersist.Sample(uint64(tableSize))
stats.CompressedChunkBytesPerPersist.Sample(uint64(tw.totalCompressedData))
stats.UncompressedChunkBytesPerPersist.Sample(uint64(tw.totalUncompressedData))
stats.ChunksPerPersist.Sample(uint64(count))
stats.ChunksPerPersist.Sample(uint64(chunkCount))
}
return name, buff[:tableSize], count, gcBehavior_Continue, nil
return name, buff[:tableSize], splitOffset, chunkCount, gcBehavior_Continue, nil
}
func (mt *memTable) close() error {

View File

@@ -64,20 +64,17 @@ func mustChunk(chunk chunks.Chunk, err error) chunks.Chunk {
}
func TestWriteChunks(t *testing.T) {
name, data, err := WriteChunks(testMDChunks)
if err != nil {
t.Error(err)
}
name, data, splitOffSet, err := WriteChunks(testMDChunks)
require.NoError(t, err)
// Size of written data is stable so long as we don't change testMDChunks
assert.Equal(t, uint64(845), splitOffSet)
assert.Equal(t, 1089, len(data))
dir, err := os.MkdirTemp("", "write_chunks_test")
if err != nil {
t.Error(err)
}
require.NoError(t, err)
err = os.WriteFile(dir+name, data, os.ModePerm)
if err != nil {
t.Error(err)
}
require.NoError(t, err)
}
func TestMemTableAddHasGetChunk(t *testing.T) {
@@ -169,7 +166,7 @@ func TestMemTableWrite(t *testing.T) {
defer tr2.close()
assert.True(tr2.has(computeAddr(chunks[2]), nil))
_, data, count, _, err := mt.write(chunkReaderGroup{tr1, tr2}, nil, &Stats{})
_, data, _, count, _, err := mt.write(chunkReaderGroup{tr1, tr2}, nil, &Stats{})
require.NoError(t, err)
assert.Equal(uint32(1), count)

View File

@@ -52,8 +52,8 @@ func (nbsMW *NBSMetricWrapper) Size(ctx context.Context) (uint64, error) {
}
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
return nbsMW.nbs.WriteTableFile(ctx, fileId, numChunks, contentHash, getRd)
func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
return nbsMW.nbs.WriteTableFile(ctx, fileId, splitOffset, numChunks, contentHash, getRd)
}
// AddTableFilesToManifest adds table files to the manifest

View File

@@ -40,7 +40,7 @@ var _ tableFilePersister = &noConjoinBlobstorePersister{}
// Persist makes the contents of mt durable. Chunks already present in
// |haver| may be dropped in the process.
func (bsp *noConjoinBlobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) {
address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
address, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
} else if gcb != gcBehavior_Continue {
@@ -74,7 +74,7 @@ func (bsp *noConjoinBlobstorePersister) ConjoinAll(ctx context.Context, sources
// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
return newBSTableChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats)
}
func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name string, chunkCount uint32, stats *Stats) (bool, error) {
@@ -97,12 +97,7 @@ func (bsp *noConjoinBlobstorePersister) Path() string {
return ""
}
func (bsp *noConjoinBlobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, chunkCount uint32) error {
// sanity check file size
if fileSz < indexSize(chunkCount)+footerSize {
return fmt.Errorf("table file size %d too small for chunk count %d", fileSz, chunkCount)
}
func (bsp *noConjoinBlobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, _ uint64) error {
_, err := bsp.bs.Put(ctx, name, int64(fileSz), r)
return err
}

View File

@@ -510,7 +510,7 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c
return emptyChunkSource{}, gcBehavior_Continue, nil
}
name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats)
name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats)
if err != nil {
return emptyChunkSource{}, gcBehavior_Continue, err
} else if gcb != gcBehavior_Continue {

View File

@@ -29,6 +29,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
@@ -1526,9 +1527,10 @@ func (nbs *NomsBlockStore) StatsSummary() string {
// tableFile is our implementation of TableFile.
type tableFile struct {
info TableSpecInfo
open func(ctx context.Context) (io.ReadCloser, uint64, error)
suffix string
info TableSpecInfo
open func(ctx context.Context) (io.ReadCloser, uint64, error)
suffix string
splitOffset uint64
}
// LocationPrefix
@@ -1550,6 +1552,8 @@ func (tf tableFile) NumChunks() int {
return int(tf.info.GetChunkCount())
}
func (tf tableFile) SplitOffset() uint64 { return tf.splitOffset }
// Open returns an io.ReadCloser which can be used to read the bytes of a table file and the content length in bytes.
func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) {
return tf.open(ctx)
@@ -1612,9 +1616,12 @@ func getTableFiles(css map[hash.Hash]chunkSource, contents manifestContents, num
}
func newTableFile(cs chunkSource, info tableSpec) tableFile {
s := ""
if _, ok := cs.(archiveChunkSource); ok {
s = ArchiveFileSuffix
sfx := ""
dataOffset := uint64(0)
if a, ok := cs.(archiveChunkSource); ok {
sfx = ArchiveFileSuffix
dataSpan := a.aRdr.footer.dataSpan()
dataOffset = dataSpan.length
}
return tableFile{
@@ -1626,7 +1633,8 @@ func newTableFile(cs chunkSource, info tableSpec) tableFile {
}
return r, s, nil
},
suffix: s,
suffix: sfx,
splitOffset: dataOffset,
}
}
@@ -1682,11 +1690,11 @@ func (nbs *NomsBlockStore) Path() (string, bool) {
}
// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore
func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error {
func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, numChunks int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) error {
valctx.ValidateContext(ctx)
tfp, ok := nbs.persister.(tableFilePersister)
if !ok {
return errors.New("Not implemented")
return errors.New("runtime error: table file persister required for WriteTableFile")
}
r, sz, err := getRd()
@@ -1694,7 +1702,13 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string,
return err
}
defer r.Close()
return tfp.CopyTableFile(ctx, r, fileName, sz, uint32(numChunks))
if splitOffset == 0 && !strings.HasSuffix(fileName, ArchiveFileSuffix) {
splitOffset = tableTailOffset(sz, uint32(numChunks))
}
// CopyTableFile can cope with a 0 splitOffset.
return tfp.CopyTableFile(ctx, r, fileName, sz, splitOffset)
}
// AddTableFilesToManifest adds table files to the manifest

View File

@@ -78,7 +78,7 @@ func writeLocalTableFiles(t *testing.T, st *NomsBlockStore, numTableFiles, seed
fileID := addr.String()
fileToData[fileID] = data
fileIDToNumChunks[fileID] = i + 1
err = st.WriteTableFile(ctx, fileID, i+1, nil, func() (io.ReadCloser, uint64, error) {
err = st.WriteTableFile(ctx, fileID, 0, i+1, nil, func() (io.ReadCloser, uint64, error) {
return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil
})
require.NoError(t, err)

View File

@@ -75,7 +75,11 @@ type tableFilePersister interface {
tablePersister
// CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore.
CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error
//
// |splitOffset| is the offset in bytes within the file where the file is split between data and the index/footer.
// This is only used for the blob store persister, as it stores the data and index/footer in separate blobs. In the
// event that splitOffset is 0, the blob store persister will upload the entire file as a single blob.
CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, splitOffset uint64) error
// Path returns the file system path. Use CopyTableFile instead of Path to
// copy a file to the TableFileStore. Path cannot be removed because it's used

View File

@@ -123,6 +123,8 @@ func (tw *tableWriter) addChunk(h hash.Hash, data []byte) bool {
return true
}
// finish completed table, writing the index and footer. Returns the total length of the table file and the hash used
// to identify the table.
func (tw *tableWriter) finish() (tableFileLength uint64, blockAddr hash.Hash, err error) {
err = tw.writeIndex()

View File

@@ -143,6 +143,7 @@ message TableFileDetails {
bytes content_hash = 3;
uint64 num_chunks = 4;
string suffix = 5;
uint64 split_offset = 6;
}
message GetUploadLocsRequest {
@@ -267,6 +268,7 @@ message TableFileInfo {
string url = 3;
google.protobuf.Timestamp refresh_after = 4;
RefreshTableFileUrlRequest refresh_request = 5;
uint64 split_offset = 6;
}
message RefreshTableFileUrlRequest {