From 8f5900b6e07d6a1cb5b071f81e938085bf4daddf Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 17 Sep 2025 20:28:23 +0000 Subject: [PATCH 01/20] Update the BlobStore.Get method to return the total object size. Not tested. --- go/store/blobstore/blobstore.go | 25 ++++++++++++++++++++++-- go/store/blobstore/blobstore_test.go | 2 +- go/store/blobstore/gcs.go | 9 +++++---- go/store/blobstore/inmem.go | 7 ++++--- go/store/blobstore/local.go | 15 +++++++------- go/store/blobstore/oci.go | 23 +++++++++++++++++----- go/store/blobstore/oss.go | 29 ++++++++++++++++------------ go/store/nbs/bs_manifest.go | 2 +- go/store/nbs/bs_persister.go | 8 ++++---- 9 files changed, 81 insertions(+), 39 deletions(-) diff --git a/go/store/blobstore/blobstore.go b/go/store/blobstore/blobstore.go index 4f8a1995c6..2ff5a97259 100644 --- a/go/store/blobstore/blobstore.go +++ b/go/store/blobstore/blobstore.go @@ -18,6 +18,8 @@ import ( "bytes" "context" "io" + "strconv" + "strings" ) // Blobstore is an interface for storing and retrieving blobs of data by key @@ -29,7 +31,7 @@ type Blobstore interface { Exists(ctx context.Context, key string) (ok bool, err error) // Get returns a byte range of from the blob keyed by |key|, and the latest store version. - Get(ctx context.Context, key string, br BlobRange) (rc io.ReadCloser, version string, err error) + Get(ctx context.Context, key string, br BlobRange) (rc io.ReadCloser, size uint64, version string, err error) // Put creates a new blob from |reader| keyed by |key|, it returns the latest store version. Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (version string, err error) @@ -44,7 +46,7 @@ type Blobstore interface { // GetBytes is a utility method calls bs.Get and handles reading the data from the returned // io.ReadCloser and closing it. func GetBytes(ctx context.Context, bs Blobstore, key string, br BlobRange) ([]byte, string, error) { - rc, ver, err := bs.Get(ctx, key, br) + rc, _, ver, err := bs.Get(ctx, key, br) if err != nil || rc == nil { return nil, ver, err @@ -65,3 +67,22 @@ func PutBytes(ctx context.Context, bs Blobstore, key string, data []byte) (strin reader := bytes.NewReader(data) return bs.Put(ctx, key, int64(len(data)), reader) } + +// parseContentRangeSize extracts the total size from a Content-Range header. +// Expected format: "bytes start-end/total" e.g., "bytes 0-1023/1234567" +// Returns 0 if the header is malformed or not present. +func parseContentRangeSize(contentRange string) uint64 { + if contentRange == "" { + return 0 + } + i := strings.Index(contentRange, "/") + if i == -1 { + return 0 + } + sizeStr := contentRange[i+1:] + size, err := strconv.ParseUint(sizeStr, 10, 64) + if err != nil { + return 0 + } + return size +} diff --git a/go/store/blobstore/blobstore_test.go b/go/store/blobstore/blobstore_test.go index b61382150f..62f8110001 100644 --- a/go/store/blobstore/blobstore_test.go +++ b/go/store/blobstore/blobstore_test.go @@ -453,7 +453,7 @@ func testConcatenate(t *testing.T, bs Blobstore, cnt int) { var off int64 for i := range blobs { length := int64(len(blobs[i].data)) - rdr, _, err := bs.Get(ctx, composite, BlobRange{ + rdr, _, _, err := bs.Get(ctx, composite, BlobRange{ offset: off, length: length, }) diff --git a/go/store/blobstore/gcs.go b/go/store/blobstore/gcs.go index f6bffcb732..f2db79c5d2 100644 --- a/go/store/blobstore/gcs.go +++ b/go/store/blobstore/gcs.go @@ -73,7 +73,7 @@ func (bs *GCSBlobstore) Exists(ctx context.Context, key string) (bool, error) { // Get retrieves an io.reader for the portion of a blob specified by br along with // its version -func (bs *GCSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) { +func (bs *GCSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) { absKey := path.Join(bs.prefix, key) oh := bs.bucket.Object(absKey) var reader *storage.Reader @@ -89,15 +89,16 @@ func (bs *GCSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.R } if err == storage.ErrObjectNotExist { - return nil, "", NotFound{"gs://" + path.Join(bs.bucketName, absKey)} + return nil, 0, "", NotFound{"gs://" + path.Join(bs.bucketName, absKey)} } else if err != nil { - return nil, "", err + return nil, 0, "", err } attrs := reader.Attrs generation := attrs.Generation + size := uint64(attrs.Size) - return reader, fmtGeneration(generation), nil + return reader, size, fmtGeneration(generation), nil } func writeObj(writer *storage.Writer, reader io.Reader) (string, error) { diff --git a/go/store/blobstore/inmem.go b/go/store/blobstore/inmem.go index 7be7b21195..9cf213260a 100644 --- a/go/store/blobstore/inmem.go +++ b/go/store/blobstore/inmem.go @@ -61,12 +61,13 @@ func (bs *InMemoryBlobstore) Path() string { // Get retrieves an io.reader for the portion of a blob specified by br along with // its version -func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) { +func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) { bs.mutex.RLock() defer bs.mutex.RUnlock() if val, ok := bs.blobs[key]; ok { if ver, ok := bs.versions[key]; ok && ver != "" { + size := uint64(len(val)) var byteRange []byte if br.isAllRange() { byteRange = val @@ -79,13 +80,13 @@ func (bs *InMemoryBlobstore) Get(ctx context.Context, key string, br BlobRange) } } - return newByteSliceReadCloser(byteRange), ver, nil + return newByteSliceReadCloser(byteRange), size, ver, nil } panic("Blob without version, or with invalid version, should no be possible.") } - return nil, "", NotFound{key} + return nil, 0, "", NotFound{key} } // Put sets the blob and the version for a key diff --git a/go/store/blobstore/local.go b/go/store/blobstore/local.go index fd1f33f45c..2bf2229a3b 100644 --- a/go/store/blobstore/local.go +++ b/go/store/blobstore/local.go @@ -81,29 +81,30 @@ func (bs *LocalBlobstore) Path() string { // Get retrieves an io.reader for the portion of a blob specified by br along with // its version -func (bs *LocalBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) { +func (bs *LocalBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) { path := filepath.Join(bs.RootDir, key) + bsExt f, err := os.Open(path) if err != nil { if os.IsNotExist(err) { - return nil, "", NotFound{key} + return nil, 0, "", NotFound{key} } - return nil, "", err + return nil, 0, "", err } info, err := f.Stat() if err != nil { - return nil, "", err + return nil, 0, "", err } ver := info.ModTime().String() + size := uint64(info.Size()) rc, err := readCloserForFileRange(f, br) if err != nil { _ = f.Close() - return nil, "", err + return nil, 0, "", err } - return rc, ver, nil + return rc, size, ver, nil } func readCloserForFileRange(f *os.File, br BlobRange) (io.ReadCloser, error) { @@ -188,7 +189,7 @@ func (bs *LocalBlobstore) CheckAndPut(ctx context.Context, expectedVersion, key defer lck.Unlock() - rc, ver, err := bs.Get(ctx, key, BlobRange{}) + rc, _, ver, err := bs.Get(ctx, key, BlobRange{}) if err != nil { if !IsNotFoundError(err) { diff --git a/go/store/blobstore/oci.go b/go/store/blobstore/oci.go index 86a0c60145..3b10c189e1 100644 --- a/go/store/blobstore/oci.go +++ b/go/store/blobstore/oci.go @@ -118,7 +118,7 @@ func (bs *OCIBlobstore) Exists(ctx context.Context, key string) (bool, error) { } // Get retrieves an io.reader for the portion of a blob specified by br along with its version -func (bs *OCIBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) { +func (bs *OCIBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) { absKey := path.Join(bs.prefix, key) req := objectstorage.GetObjectRequest{ NamespaceName: &bs.namespace, @@ -136,19 +136,32 @@ func (bs *OCIBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.R if serr, ok := common.IsServiceError(err); ok { // handle not found code if serr.GetHTTPStatusCode() == 404 { - return nil, "", NotFound{"oci://" + path.Join(bs.bucketName, absKey)} + return nil, 0, "", NotFound{"oci://" + path.Join(bs.bucketName, absKey)} } } - return nil, "", err + return nil, 0, "", err + } + + var size uint64 + // Try to get total size from Content-Range header first (for range requests) + if res.RawResponse != nil && res.RawResponse.Header != nil { + contentRange := res.RawResponse.Header.Get("Content-Range") + if contentRange != "" { + size = parseContentRangeSize(contentRange) + } + } + // Fall back to Content-Length if no Content-Range (full object request) + if size == 0 && res.ContentLength != nil { + size = uint64(*res.ContentLength) } // handle negative offset and positive length if br.offset < 0 && br.length > 0 { lr := io.LimitReader(res.Content, br.length) - return io.NopCloser(lr), fmtstr(res.ETag), nil + return io.NopCloser(lr), size, fmtstr(res.ETag), nil } - return res.Content, fmtstr(res.ETag), nil + return res.Content, size, fmtstr(res.ETag), nil } // Put sets the blob and the version for a key diff --git a/go/store/blobstore/oss.go b/go/store/blobstore/oss.go index 0600d4f65d..6b548407b1 100644 --- a/go/store/blobstore/oss.go +++ b/go/store/blobstore/oss.go @@ -67,31 +67,36 @@ func (ob *OSSBlobstore) Exists(_ context.Context, key string) (bool, error) { return ob.bucket.IsObjectExist(ob.absKey(key)) } -func (ob *OSSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, string, error) { +func (ob *OSSBlobstore) Get(ctx context.Context, key string, br BlobRange) (io.ReadCloser, uint64, string, error) { absKey := ob.absKey(key) meta, err := ob.bucket.GetObjectMeta(absKey) if isNotFoundErr(err) { - return nil, "", NotFound{"oss://" + path.Join(ob.bucketName, absKey)} + return nil, 0, "", NotFound{"oss://" + path.Join(ob.bucketName, absKey)} + } + + totalSize, err := strconv.ParseInt(meta.Get(oss.HTTPHeaderContentLength), 10, 64) + if err != nil { + return nil, 0, "", err } if br.isAllRange() { reader, err := ob.bucket.GetObject(absKey) if err != nil { - return nil, "", err + return nil, 0, "", err } - return reader, ob.getVersion(meta), nil + return reader, uint64(totalSize), ob.getVersion(meta), nil } - size, err := strconv.ParseInt(meta.Get(oss.HTTPHeaderContentLength), 10, 64) + + posBr := br.positiveRange(totalSize) + + var responseHeaders http.Header + reader, err := ob.bucket.GetObject(absKey, oss.Range(posBr.offset, posBr.offset+posBr.length-1), oss.GetResponseHeader(&responseHeaders)) if err != nil { - return nil, "", err + return nil, 0, "", err } - posBr := br.positiveRange(size) - reader, err := ob.bucket.GetObject(absKey, oss.Range(posBr.offset, posBr.offset+posBr.length-1)) - if err != nil { - return nil, "", err - } - return reader, ob.getVersion(meta), nil + + return reader, uint64(totalSize), ob.getVersion(meta), nil } func (ob *OSSBlobstore) Put(ctx context.Context, key string, totalSize int64, reader io.Reader) (string, error) { diff --git a/go/store/nbs/bs_manifest.go b/go/store/nbs/bs_manifest.go index ce8452ecf2..f5de40060f 100644 --- a/go/store/nbs/bs_manifest.go +++ b/go/store/nbs/bs_manifest.go @@ -36,7 +36,7 @@ func (bsm blobstoreManifest) Name() string { } func manifestVersionAndContents(ctx context.Context, bs blobstore.Blobstore) (string, manifestContents, error) { - reader, ver, err := bs.Get(ctx, manifestFile, blobstore.AllRange) + reader, _, ver, err := bs.Get(ctx, manifestFile, blobstore.AllRange) if err != nil { return "", manifestContents{}, err diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 349836fe59..8d33592a1c 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -160,7 +160,7 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk l := int64(off) rng := blobstore.NewBlobRange(0, l) - rdr, _, err := bsp.bs.Get(ctx, cs.hash().String(), rng) + rdr, _, _, err := bsp.bs.Get(ctx, cs.hash().String(), rng) if err != nil { return "", err } @@ -260,14 +260,14 @@ func (bsTRA *bsTableReaderAt) clone() (tableReaderAt, error) { } func (bsTRA *bsTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) { - rc, _, err := bsTRA.bs.Get(ctx, bsTRA.key, blobstore.AllRange) + rc, _, _, err := bsTRA.bs.Get(ctx, bsTRA.key, blobstore.AllRange) return rc, err } // ReadAtWithStats is the bsTableReaderAt implementation of the tableReaderAt interface func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (int, error) { br := blobstore.NewBlobRange(off, int64(len(p))) - rc, _, err := bsTRA.bs.Get(ctx, bsTRA.key, br) + rc, _, _, err := bsTRA.bs.Get(ctx, bsTRA.key, br) if err != nil { return 0, err @@ -294,7 +294,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error { - rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0)) + rc, _, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0)) if err != nil { return err } From 171072beca41d749a3c54da946e33e3f7b55ff9c Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 27 Aug 2025 17:00:10 -0700 Subject: [PATCH 02/20] Add support for archive conjoin in blobstore persister --- go/store/nbs/bs_persister.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 8d33592a1c..3eed97241a 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -94,15 +94,22 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour sized = append(sized, sourceWithSize{src, src.currentSize()}) } - // Currently, archive tables are not supported in blobstorePersister. + archiveFound := false for _, s := range sized { _, ok := s.source.(archiveChunkSource) if ok { - return nil, nil, errors.New("archive tables not supported in blobstorePersister") + archiveFound = true + break } } - plan, err := planTableConjoin(sized, stats) + var plan compactionPlan + var err error + if archiveFound { + plan, err = planArchiveConjoin(sized, stats) + } else { + plan, err = planTableConjoin(sized, stats) + } if err != nil { return nil, nil, err } From 3e33cf2427ae9b3538d7e160f7bcdfbd2b91abcb Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Tue, 2 Sep 2025 11:37:29 -0700 Subject: [PATCH 03/20] WIP --- go/store/nbs/archive_reader.go | 3 +-- go/store/nbs/bs_persister.go | 6 +++--- go/store/nbs/store.go | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/go/store/nbs/archive_reader.go b/go/store/nbs/archive_reader.go index 4f49581387..b7bd37414c 100644 --- a/go/store/nbs/archive_reader.go +++ b/go/store/nbs/archive_reader.go @@ -77,8 +77,7 @@ func (f archiveFooter) actualFooterSize() uint64 { return archiveFooterSize } -// dataSpan returns the span of the data section of the archive. This is not generally used directly since we usually -// read individual spans for each chunk. +// dataSpan returns the span of the data section of the archive. This is used during conjoin. func (f archiveFooter) dataSpan() byteSpan { return byteSpan{offset: 0, length: f.fileSize - f.actualFooterSize() - uint64(f.metadataSize) - uint64(f.indexSize)} } diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 3eed97241a..fd5440cde1 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -114,7 +114,7 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour return nil, nil, err } - name := plan.name.String() + name := plan.name.String() + plan.suffix // conjoin must contiguously append the chunk records of |sources|, but the raw content // of each source contains a chunk index in the tail. Blobstore does not expose a range @@ -214,7 +214,7 @@ func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, n return fmt.Errorf("table file size %d too small for chunk count %d", fileSz, chunkCount) } - off := int64(tableTailOffset(fileSz, chunkCount)) + off := int64(tableTailOffset(fileSz, chunkCount)) // NM4 - this is very table specific lr := io.LimitReader(r, off) // check if we can Put concurrently @@ -224,7 +224,7 @@ func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, n if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, off, lr); err != nil { return err } - if _, err := bsp.bs.Put(ctx, name+tableTailExt, int64(fileSz), r); err != nil { + if _, err := bsp.bs.Put(ctx, name+tableTailExt, int64(fileSz-uint64(off)), r); err != nil { // NM4 - not sure about this but seems like we should only write what remains. return err } } else { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index b4d84a3987..740be7b8e3 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1686,7 +1686,7 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, valctx.ValidateContext(ctx) tfp, ok := nbs.persister.(tableFilePersister) if !ok { - return errors.New("Not implemented") + return errors.New("runtime error: table file persister required for WriteTableFile") } r, sz, err := getRd() From d5cc26aa26807cf8aa914ee749139ceecd7dcc38 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 17 Sep 2025 13:29:38 -0700 Subject: [PATCH 04/20] WIP - will conflict --- go/store/nbs/bs_persister.go | 36 ++++++++++++++++++++++--- go/store/nbs/no_conjoin_bs_persister.go | 2 +- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index fd5440cde1..4143355068 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -143,7 +143,7 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour return emptyChunkSource{}, nil, err } - cs, err := newBSChunkSource(ctx, bsp.bs, plan.name, plan.chunkCount, bsp.q, stats) + cs, err := newBSTableChunkSource(ctx, bsp.bs, plan.name, plan.chunkCount, bsp.q, stats) return cs, func() {}, err } @@ -185,7 +185,17 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk // Open a table named |name|, containing |chunkCount| chunks. func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { - return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats) + cs, err := newBSTableChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats) + if err == nil { + return cs, nil + } + + if errors.Is(err, ErrTableFileNotFound) || errors.Is(err, blobstore.NotFound{}) { + // See if there is a darc file. + + } + + return nil, err } func (bsp *blobstorePersister) Exists(ctx context.Context, name string, chunkCount uint32, stats *Stats) (bool, error) { @@ -299,7 +309,27 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off return totalRead, nil } -func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { +func newBSArchiveChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, stats *Stats) (cs chunkSource, err error) { + rc, sz, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(archiveFooterSize), 0)) + if err != nil { + return nil, err + } + defer rc.Close() + + footer := make([]byte, archiveFooterSize) + _, err = io.ReadFull(rc, footer) + if err != nil { + return nil, err + } + + aRdr, err := newArchiveReaderFromFooter(ctx, &bsTableReaderAt{key: name.String(), bs: bs}, name, sz, footer, stats) + if err != nil { + return emptyChunkSource{}, err + } + return archiveChunkSource{aRdr, ""}, nil +} + +func newBSTableChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error { rc, _, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0)) if err != nil { diff --git a/go/store/nbs/no_conjoin_bs_persister.go b/go/store/nbs/no_conjoin_bs_persister.go index 900c2f2590..45cfca6ffe 100644 --- a/go/store/nbs/no_conjoin_bs_persister.go +++ b/go/store/nbs/no_conjoin_bs_persister.go @@ -74,7 +74,7 @@ func (bsp *noConjoinBlobstorePersister) ConjoinAll(ctx context.Context, sources // Open a table named |name|, containing |chunkCount| chunks. func (bsp *noConjoinBlobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCount uint32, stats *Stats) (chunkSource, error) { - return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats) + return newBSTableChunkSource(ctx, bsp.bs, name, chunkCount, bsp.q, stats) } func (bsp *noConjoinBlobstorePersister) Exists(ctx context.Context, name string, chunkCount uint32, stats *Stats) (bool, error) { From 5280bb0d56cccfa343862f1b66d7d10ba8d7a7b9 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 17 Sep 2025 16:43:11 -0700 Subject: [PATCH 05/20] WIP - stuck --- go/store/datas/pull/pull_table_file_writer.go | 4 +-- go/store/nbs/archive_reader.go | 6 ++-- go/store/nbs/aws_table_persister.go | 2 +- go/store/nbs/bs_persister.go | 31 ++++++++----------- go/store/nbs/file_table_persister.go | 2 +- go/store/nbs/gc_copier.go | 7 ++++- go/store/nbs/journal.go | 4 +-- go/store/nbs/no_conjoin_bs_persister.go | 7 +---- go/store/nbs/store.go | 8 ++--- go/store/nbs/table_persister.go | 6 ++-- 10 files changed, 37 insertions(+), 40 deletions(-) diff --git a/go/store/datas/pull/pull_table_file_writer.go b/go/store/datas/pull/pull_table_file_writer.go index 6b17760bd9..b925ed6b2c 100644 --- a/go/store/datas/pull/pull_table_file_writer.go +++ b/go/store/datas/pull/pull_table_file_writer.go @@ -371,12 +371,12 @@ func (w *PullTableFileWriter) uploadTempTableFile(ctx context.Context, tmpTblFil if uploaded != 0 { // A retry. We treat it as if what was already uploaded was rebuffered. - atomic.AddUint64(&w.bufferedSendBytes, uint64(uploaded)) + atomic.AddUint64(&w.bufferedSendBytes, uploaded) uploaded = 0 } fWithStats := countingReader{countingReader{rc, &uploaded}, &w.finishedSendBytes} - return fWithStats, uint64(fileSize), nil + return fWithStats, fileSize, nil }) } diff --git a/go/store/nbs/archive_reader.go b/go/store/nbs/archive_reader.go index b7bd37414c..7d6da9e0f8 100644 --- a/go/store/nbs/archive_reader.go +++ b/go/store/nbs/archive_reader.go @@ -192,7 +192,7 @@ func newArchiveReaderFromFooter(ctx context.Context, reader tableReaderAt, name return archiveReader{}, errors.New("runtime error: invalid footer.") } - ftr, err := buildFooter(name, fileSz, footer) + ftr, err := buildArchiveFooter(name, fileSz, footer) if err != nil { return archiveReader{}, err } @@ -369,10 +369,10 @@ func loadFooter(ctx context.Context, reader ReaderAtWithStats, name hash.Hash, f if err != nil { return } - return buildFooter(name, fileSize, buf) + return buildArchiveFooter(name, fileSize, buf) } -func buildFooter(name hash.Hash, fileSize uint64, buf []byte) (f archiveFooter, err error) { +func buildArchiveFooter(name hash.Hash, fileSize uint64, buf []byte) (f archiveFooter, err error) { f.formatVersion = buf[afrVersionOffset] f.fileSignature = string(buf[afrSigOffset:]) // Verify File Signature diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 2b6aa97793..4a4280b844 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -111,7 +111,7 @@ func (s3p awsTablePersister) Exists(ctx context.Context, name string, _ uint32, return s3or.objectExistsInChunkSource(ctx, name, stats) } -func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error { +func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, _ uint64) error { return s3p.multipartUpload(ctx, r, fileSz, fileId) } diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 4143355068..4e5578a21d 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "errors" - "fmt" "io" "time" @@ -191,8 +190,11 @@ func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCo } if errors.Is(err, ErrTableFileNotFound) || errors.Is(err, blobstore.NotFound{}) { - // See if there is a darc file. - + source, err := newBSArchiveChunkSource(ctx, bsp.bs, name, stats) + if err != nil { + return nil, err + } + return source, nil } return nil, err @@ -218,23 +220,19 @@ func (bsp *blobstorePersister) Path() string { return "" } -func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, chunkCount uint32) error { - // sanity check file size - if fileSz < indexSize(chunkCount)+footerSize { - return fmt.Errorf("table file size %d too small for chunk count %d", fileSz, chunkCount) - } +func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, splitOffset uint64) error { + lr := io.LimitReader(r, int64(splitOffset)) - off := int64(tableTailOffset(fileSz, chunkCount)) // NM4 - this is very table specific - lr := io.LimitReader(r, off) + indexLen := int64(fileSz - splitOffset) // check if we can Put concurrently rr, ok := r.(io.ReaderAt) if !ok { // sequentially write chunk records then tail - if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, off, lr); err != nil { + if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, int64(splitOffset), lr); err != nil { return err } - if _, err := bsp.bs.Put(ctx, name+tableTailExt, int64(fileSz-uint64(off)), r); err != nil { // NM4 - not sure about this but seems like we should only write what remains. + if _, err := bsp.bs.Put(ctx, name+tableTailExt, indexLen, r); err != nil { return err } } else { @@ -242,15 +240,12 @@ func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, n // see BufferedFileByteSink in byte_sink.go eg, ectx := errgroup.WithContext(ctx) eg.Go(func() error { - buf := make([]byte, indexSize(chunkCount)+footerSize) - if _, err := rr.ReadAt(buf, off); err != nil { - return err - } - _, err := bsp.bs.Put(ectx, name+tableTailExt, int64(len(buf)), bytes.NewBuffer(buf)) + srdr := io.NewSectionReader(rr, int64(splitOffset), indexLen) + _, err := bsp.bs.Put(ectx, name+tableTailExt, indexLen, srdr) return err }) eg.Go(func() error { - _, err := bsp.bs.Put(ectx, name+tableRecordsExt, off, lr) + _, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(splitOffset), lr) return err }) if err := eg.Wait(); err != nil { diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index 25a39084f2..f2122c7400 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -114,7 +114,7 @@ func (ftp *fsTablePersister) Path() string { return ftp.dir } -func (ftp *fsTablePersister) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error { +func (ftp *fsTablePersister) CopyTableFile(_ context.Context, r io.Reader, fileId string, _ uint64, _ uint64) error { tn, f, err := func() (n string, cleanup func(), err error) { ftp.removeMu.Lock() var temp *os.File diff --git a/go/store/nbs/gc_copier.go b/go/store/nbs/gc_copier.go index ecf9687937..de592834a8 100644 --- a/go/store/nbs/gc_copier.go +++ b/go/store/nbs/gc_copier.go @@ -136,7 +136,12 @@ func (gcc *gcCopier) copyTablesToDir(ctx context.Context) (ts []tableSpec, err e defer r.Close() sz := gcc.writer.FullLength() - err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, uint32(gcc.writer.ChunkCount())) + dataSplit, err := gcc.writer.ChunkDataLength() + if err != nil { + return nil, fmt.Errorf("gc_copier, ChunkDataLength() error: %w", err) + } + + err = gcc.tfp.CopyTableFile(ctx, r, filename, sz, dataSplit) if err != nil { return nil, fmt.Errorf("gc_copier, CopyTableFile error: %w", err) } diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index f815760475..a7707f3acc 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -320,11 +320,11 @@ func (j *ChunkJournal) Path() string { return filepath.Dir(j.path) } -func (j *ChunkJournal) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error { +func (j *ChunkJournal) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, splitOffset uint64) error { if j.backing.readOnly() { return errReadOnlyManifest } - return j.persister.CopyTableFile(ctx, r, fileId, fileSz, chunkCount) + return j.persister.CopyTableFile(ctx, r, fileId, fileSz, splitOffset) } // Name implements manifest. diff --git a/go/store/nbs/no_conjoin_bs_persister.go b/go/store/nbs/no_conjoin_bs_persister.go index 45cfca6ffe..90fc9fbdb9 100644 --- a/go/store/nbs/no_conjoin_bs_persister.go +++ b/go/store/nbs/no_conjoin_bs_persister.go @@ -97,12 +97,7 @@ func (bsp *noConjoinBlobstorePersister) Path() string { return "" } -func (bsp *noConjoinBlobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, chunkCount uint32) error { - // sanity check file size - if fileSz < indexSize(chunkCount)+footerSize { - return fmt.Errorf("table file size %d too small for chunk count %d", fileSz, chunkCount) - } - +func (bsp *noConjoinBlobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, _ uint64) error { _, err := bsp.bs.Put(ctx, name, int64(fileSz), r) return err } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 740be7b8e3..51cafd87f2 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1681,20 +1681,20 @@ func (nbs *NomsBlockStore) Path() (string, bool) { return "", false } -// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore -func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { +// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore - NM4 rename to WriteStorageFile ??? +func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, numChunks int, contentHash []byte, getRd chunks.ReaderCallBack) error { valctx.ValidateContext(ctx) tfp, ok := nbs.persister.(tableFilePersister) if !ok { return errors.New("runtime error: table file persister required for WriteTableFile") } - r, sz, err := getRd() + r, sz, splitOffset, err := getRd() if err != nil { return err } defer r.Close() - return tfp.CopyTableFile(ctx, r, fileName, sz, uint32(numChunks)) + return tfp.CopyTableFile(ctx, r, fileName, sz, splitOffset) } // AddTableFilesToManifest adds table files to the manifest diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index b9d6559ca8..fe75143b4e 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -74,8 +74,10 @@ type tablePersister interface { type tableFilePersister interface { tablePersister - // CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore. - CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, chunkCount uint32) error + // CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore. |splitOffset| is + // the offset in bytes within the file where the file is split between data and the index. This is only used for the + // blob store persister. + CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, splitOffset uint64) error // Path returns the file system path. Use CopyTableFile instead of Path to // copy a file to the TableFileStore. Path cannot be removed because it's used From d40055bb8b152c314a0d8c8b6c466c01548a9945 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Thu, 18 Sep 2025 15:29:42 -0700 Subject: [PATCH 06/20] Update archive-inspect to show the split offset --- go/cmd/dolt/commands/admin/archive_inspect.go | 3 +-- go/store/nbs/archive_inspect.go | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/go/cmd/dolt/commands/admin/archive_inspect.go b/go/cmd/dolt/commands/admin/archive_inspect.go index 1bced6329b..0ab4f9d139 100644 --- a/go/cmd/dolt/commands/admin/archive_inspect.go +++ b/go/cmd/dolt/commands/admin/archive_inspect.go @@ -112,12 +112,11 @@ func (cmd ArchiveInspectCmd) Exec(ctx context.Context, commandStr string, args [ cli.Printf("File size: %d bytes\n", inspector.FileSize()) cli.Printf("Format version: %d\n", inspector.FormatVersion()) cli.Printf("File signature: %s\n", inspector.FileSignature()) - cli.Println() - cli.Printf("Chunk count: %d\n", inspector.ChunkCount()) cli.Printf("Byte span count: %d\n", inspector.ByteSpanCount()) cli.Printf("Index size: %d bytes\n", inspector.IndexSize()) cli.Printf("Metadata size: %d bytes\n", inspector.MetadataSize()) + cli.Printf("Split offset: %d bytes\n", inspector.SplitOffset()) // Display metadata if present if inspector.MetadataSize() > 0 { diff --git a/go/store/nbs/archive_inspect.go b/go/store/nbs/archive_inspect.go index 7ee993b3b4..79432c69bf 100644 --- a/go/store/nbs/archive_inspect.go +++ b/go/store/nbs/archive_inspect.go @@ -153,6 +153,10 @@ func (ai *ArchiveInspector) FileSize() uint64 { return ai.reader.footer.fileSize } +func (ai *ArchiveInspector) SplitOffset() uint64 { + return ai.reader.footer.dataSpan().length +} + // ByteSpanCount returns the number of byte spans in the archive func (ai *ArchiveInspector) ByteSpanCount() uint32 { return ai.reader.footer.byteSpanCount From 2731c2e7552909787c648fa02853a9dcb2362507 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 6 Oct 2025 17:34:18 -0700 Subject: [PATCH 07/20] Shove the split offset into the tableFile struct --- .../doltcore/remotestorage/chunk_store.go | 4 ++++ go/store/chunks/tablefilestore.go | 6 +++++ go/store/nbs/store.go | 23 ++++++++++++------- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 5aa77efd56..0c76da6ea8 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -1195,6 +1195,10 @@ func (drtf DoltRemoteTableFile) NumChunks() int { return int(drtf.info.NumChunks) } +func (drtf DoltRemoteTableFile) SplitOffset() uint64 { + return drtf.info.SplitOffset +} + var ErrRemoteTableFileGet = errors.New("HTTP GET for remote table file failed") func sanitizeSignedUrl(url string) string { diff --git a/go/store/chunks/tablefilestore.go b/go/store/chunks/tablefilestore.go index de7f246351..4da439f31e 100644 --- a/go/store/chunks/tablefilestore.go +++ b/go/store/chunks/tablefilestore.go @@ -38,6 +38,12 @@ type TableFile interface { // NumChunks returns the number of chunks in a table file NumChunks() int + // SplitOffset returns the split offset of the storage file. In table files, this is generally determine by calculating + // the index size based on the number of chunks, then subtracting that from the total file size. Archive files do not + // have a deterministic way to calculate the split offset, so we either need to be told the offset or read the footer + // of the file to determine the index size then calculate the split offset. Passing the offset around similfies that. + SplitOffset() uint64 + // Open returns an io.ReadCloser which can be used to read the bytes of a // table file. It also returns the content length of the table file. Open(ctx context.Context) (io.ReadCloser, uint64, error) diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 51cafd87f2..d46b8ecee3 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1526,9 +1526,10 @@ func (nbs *NomsBlockStore) StatsSummary() string { // tableFile is our implementation of TableFile. type tableFile struct { - info TableSpecInfo - open func(ctx context.Context) (io.ReadCloser, uint64, error) - suffix string + info TableSpecInfo + open func(ctx context.Context) (io.ReadCloser, uint64, error) + suffix string + splitOffset uint64 } // LocationPrefix @@ -1550,6 +1551,8 @@ func (tf tableFile) NumChunks() int { return int(tf.info.GetChunkCount()) } +func (tf tableFile) SplitOffset() uint64 { return tf.splitOffset } + // Open returns an io.ReadCloser which can be used to read the bytes of a table file and the content length in bytes. func (tf tableFile) Open(ctx context.Context) (io.ReadCloser, uint64, error) { return tf.open(ctx) @@ -1576,7 +1579,7 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []chunks.Tab if err != nil { return hash.Hash{}, nil, nil, err } - + appendixTableFiles, err := getTableFiles(css, contents, contents.NumAppendixSpecs(), func(mc manifestContents, idx int) tableSpec { return mc.getAppendixSpec(idx) }) @@ -1612,9 +1615,12 @@ func getTableFiles(css map[hash.Hash]chunkSource, contents manifestContents, num } func newTableFile(cs chunkSource, info tableSpec) tableFile { - s := "" - if _, ok := cs.(archiveChunkSource); ok { - s = ArchiveFileSuffix + sfx := "" + dataOffset := uint64(0) + if a, ok := cs.(archiveChunkSource); ok { + sfx = ArchiveFileSuffix + dataSpan := a.aRdr.footer.dataSpan() + dataOffset = dataSpan.length } return tableFile{ @@ -1626,7 +1632,8 @@ func newTableFile(cs chunkSource, info tableSpec) tableFile { } return r, s, nil }, - suffix: s, + suffix: sfx, + splitOffset: dataOffset, } } From c720aef1adeca75a9270bc1be98e4bd9cf8c9e03 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Thu, 18 Sep 2025 16:00:27 -0700 Subject: [PATCH 08/20] WIP - checkpoint. push directy to gcs works --- go/libraries/doltcore/remotesrv/grpc.go | 1 + go/libraries/doltcore/remotesrv/http.go | 21 +++++++++++++------ .../doltcore/remotestorage/chunk_store.go | 13 ++++++++---- go/store/chunks/tablefilestore.go | 2 +- go/store/datas/pull/clone.go | 3 ++- go/store/datas/pull/pull_table_file_writer.go | 4 ++-- go/store/nbs/bs_persister.go | 8 +++---- go/store/nbs/generational_chunk_store.go | 4 ++-- go/store/nbs/journal.go | 6 ++++-- go/store/nbs/nbs_metrics_wrapper.go | 4 ++-- go/store/nbs/store.go | 8 +++---- go/store/nbs/table_persister.go | 7 ++++--- .../remotesapi/v1alpha1/chunkstore.proto | 1 + 13 files changed, 51 insertions(+), 31 deletions(-) diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index 7636c11652..6f74d86663 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -425,6 +425,7 @@ func (rs *RemoteChunkStore) getUploadUrl(md metadata.MD, repoPath string, tfd *r fileID := hash.New(tfd.Id).String() + tfd.Suffix params := url.Values{} params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks))) + // NM4. params.Add("split_offset", strconv.Itoa(int(tfd.SplitOffset))) params.Add("content_length", strconv.Itoa(int(tfd.ContentLength))) params.Add("content_hash", base64.RawURLEncoding.EncodeToString(tfd.ContentHash)) return &url.URL{ diff --git a/go/libraries/doltcore/remotesrv/http.go b/go/libraries/doltcore/remotesrv/http.go index 3737d71f22..da177f3485 100644 --- a/go/libraries/doltcore/remotesrv/http.go +++ b/go/libraries/doltcore/remotesrv/http.go @@ -140,13 +140,22 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { respWr.WriteHeader(http.StatusBadRequest) return } - num_chunks, err := strconv.Atoi(ncs) + numChunks, err := strconv.Atoi(ncs) if err != nil { logger = logger.WithField("status", http.StatusBadRequest) logger.WithError(err).Warn("bad request: num_chunks parameter did not parse") respWr.WriteHeader(http.StatusBadRequest) return } + + splitOffset, err := strconv.ParseUint(q.Get("split_offset"), 10, 64) + if err != nil { + logger = logger.WithField("status", http.StatusBadRequest) + logger.WithError(err).Warn("bad request: split_offset parameter did not parse") + respWr.WriteHeader(http.StatusBadRequest) + return + } + cls := q.Get("content_length") if cls == "" { logger = logger.WithField("status", http.StatusBadRequest) @@ -154,7 +163,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { respWr.WriteHeader(http.StatusBadRequest) return } - content_length, err := strconv.Atoi(cls) + contentLength, err := strconv.Atoi(cls) if err != nil { logger = logger.WithField("status", http.StatusBadRequest) logger.WithError(err).Warn("bad request: content_length parameter did not parse") @@ -168,7 +177,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { respWr.WriteHeader(http.StatusBadRequest) return } - content_hash, err := base64.RawURLEncoding.DecodeString(chs) + contentHash, err := base64.RawURLEncoding.DecodeString(chs) if err != nil { logger = logger.WithField("status", http.StatusBadRequest) logger.WithError(err).Warn("bad request: content_hash parameter did not parse") @@ -176,7 +185,7 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { return } - logger, statusCode = writeTableFile(req.Context(), logger, fh.dbCache, filepath, file, num_chunks, content_hash, uint64(content_length), req.Body) + logger, statusCode = writeTableFile(req.Context(), logger, fh.dbCache, filepath, file, splitOffset, numChunks, contentHash, uint64(contentLength), req.Body) } if statusCode != -1 { @@ -286,7 +295,7 @@ func (u *uploadreader) Close() error { return nil } -func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) (*logrus.Entry, int) { +func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, path, fileId string, splitOffset uint64, numChunks int, contentHash []byte, contentLength uint64, body io.ReadCloser) (*logrus.Entry, int) { if !validateFileName(fileId) { logger = logger.WithField("status", http.StatusBadRequest) logger.Warnf("%s is not a valid hash", fileId) @@ -300,7 +309,7 @@ func writeTableFile(ctx context.Context, logger *logrus.Entry, dbCache DBCache, return logger, http.StatusInternalServerError } - err = cs.WriteTableFile(ctx, fileId, numChunks, contentHash, func() (io.ReadCloser, uint64, error) { + err = cs.WriteTableFile(ctx, fileId, splitOffset, numChunks, contentHash, func() (io.ReadCloser, uint64, error) { reader := body size := contentLength return &uploadreader{ diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 0c76da6ea8..dda5fe1d34 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -915,6 +915,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has chnks = append(chnks, ch) } + hashToSplitOffset := make(map[hash.Hash]uint64) hashToCount := make(map[hash.Hash]int) hashToData := make(map[hash.Hash][]byte) hashToContentHash := make(map[hash.Hash][]byte) @@ -928,6 +929,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has } h := hash.Parse(name) + hashToSplitOffset[h] = uint64(len(data)) hashToData[h] = data hashToCount[h] = len(chnks) @@ -938,7 +940,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has for h, contentHash := range hashToContentHash { // Tables created on this path are always starting from memory tables and ending up as noms table files. // As a result, the suffix is always empty. - err := dcs.uploadTableFileWithRetries(ctx, h, "", uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) { + err := dcs.uploadTableFileWithRetries(ctx, h, "", hashToSplitOffset[h], uint64(hashToCount[h]), contentHash, func() (io.ReadCloser, uint64, error) { data := hashToData[h] return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil }) @@ -950,7 +952,9 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has return hashToCount, nil } -func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error { +// NM4 - This method is called from two paths. One is the WriteTableFile function, which works with noms tables and archives, +// and the other is the uploadChunks function, which works with noms chunks and chunk tables. +func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, _ uint64, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error { op := func() error { body, contentLength, err := getContent() if err != nil { @@ -963,6 +967,7 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table ContentHash: tableFileContentHash, NumChunks: numChunks, Suffix: suffix, + // NM4 - SplitOffset: splitOffset, } dcs.logf("getting upload location for file %s", tableFileId.String()) @@ -1066,7 +1071,7 @@ func (dcs *DoltChunkStore) SupportedOperations() chunks.TableFileStoreOps { } // WriteTableFile reads a table file from the provided reader and writes it to the chunk store. -func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { +func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { suffix := "" if strings.HasSuffix(fileId, nbs.ArchiveFileSuffix) { suffix = nbs.ArchiveFileSuffix @@ -1074,7 +1079,7 @@ func (dcs *DoltChunkStore) WriteTableFile(ctx context.Context, fileId string, nu } fileIdBytes := hash.Parse(fileId) - return dcs.uploadTableFileWithRetries(ctx, fileIdBytes, suffix, uint64(numChunks), contentHash, getRd) + return dcs.uploadTableFileWithRetries(ctx, fileIdBytes, suffix, splitOffset, uint64(numChunks), contentHash, getRd) } // AddTableFilesToManifest adds table files to the manifest diff --git a/go/store/chunks/tablefilestore.go b/go/store/chunks/tablefilestore.go index 4da439f31e..bf7ff3788a 100644 --- a/go/store/chunks/tablefilestore.go +++ b/go/store/chunks/tablefilestore.go @@ -71,7 +71,7 @@ type TableFileStore interface { Size(ctx context.Context) (uint64, error) // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore. - WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error + WriteTableFile(ctx context.Context, fileId string, splitOffSet uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error // AddTableFilesToManifest adds table files to the manifest AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int, getAddrs GetAddrsCurry) error diff --git a/go/store/datas/pull/clone.go b/go/store/datas/pull/clone.go index 2d8351bab4..988c7d9cd0 100644 --- a/go/store/datas/pull/clone.go +++ b/go/store/datas/pull/clone.go @@ -138,7 +138,8 @@ func clone(ctx context.Context, srcTS, sinkTS chunks.TableFileStore, sinkCS chun } report(TableFileEvent{EventType: DownloadStart, TableFiles: []chunks.TableFile{tblFile}}) - err = sinkTS.WriteTableFile(ctx, tblFile.FileID()+tblFile.LocationSuffix(), tblFile.NumChunks(), nil, func() (io.ReadCloser, uint64, error) { + + err = sinkTS.WriteTableFile(ctx, tblFile.FileID()+tblFile.LocationSuffix(), tblFile.SplitOffset(), tblFile.NumChunks(), nil, func() (io.ReadCloser, uint64, error) { rd, contentLength, err := tblFile.Open(ctx) if err != nil { return nil, 0, err diff --git a/go/store/datas/pull/pull_table_file_writer.go b/go/store/datas/pull/pull_table_file_writer.go index b925ed6b2c..2c398290cf 100644 --- a/go/store/datas/pull/pull_table_file_writer.go +++ b/go/store/datas/pull/pull_table_file_writer.go @@ -79,7 +79,7 @@ type PullTableFileWriterConfig struct { } type DestTableFileStore interface { - WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error + WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error AddTableFilesToManifest(ctx context.Context, fileIdToNumChunks map[string]int, getAddrs chunks.GetAddrsCurry) error } @@ -363,7 +363,7 @@ func (w *PullTableFileWriter) uploadTempTableFile(ctx context.Context, tmpTblFil // already upload bytes. var uploaded uint64 - return w.cfg.DestStore.WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) { + return w.cfg.DestStore.WriteTableFile(ctx, tmpTblFile.id, tmpTblFile.chunksLen, tmpTblFile.numChunks, tmpTblFile.contentHash, func() (io.ReadCloser, uint64, error) { rc, err := tmpTblFile.read.Reader() if err != nil { return nil, 0, err diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 4e5578a21d..ed8ec9638a 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -189,7 +189,7 @@ func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCo return cs, nil } - if errors.Is(err, ErrTableFileNotFound) || errors.Is(err, blobstore.NotFound{}) { + if blobstore.IsNotFoundError(err) { source, err := newBSArchiveChunkSource(ctx, bsp.bs, name, stats) if err != nil { return nil, err @@ -200,7 +200,7 @@ func (bsp *blobstorePersister) Open(ctx context.Context, name hash.Hash, chunkCo return nil, err } -func (bsp *blobstorePersister) Exists(ctx context.Context, name string, chunkCount uint32, stats *Stats) (bool, error) { +func (bsp *blobstorePersister) Exists(ctx context.Context, name string, _ uint32, _ *Stats) (bool, error) { return bsp.bs.Exists(ctx, name) } @@ -305,7 +305,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off } func newBSArchiveChunkSource(ctx context.Context, bs blobstore.Blobstore, name hash.Hash, stats *Stats) (cs chunkSource, err error) { - rc, sz, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(archiveFooterSize), 0)) + rc, sz, _, err := bs.Get(ctx, name.String()+ArchiveFileSuffix, blobstore.NewBlobRange(-int64(archiveFooterSize), 0)) if err != nil { return nil, err } @@ -317,7 +317,7 @@ func newBSArchiveChunkSource(ctx context.Context, bs blobstore.Blobstore, name h return nil, err } - aRdr, err := newArchiveReaderFromFooter(ctx, &bsTableReaderAt{key: name.String(), bs: bs}, name, sz, footer, stats) + aRdr, err := newArchiveReaderFromFooter(ctx, &bsTableReaderAt{key: name.String() + ArchiveFileSuffix, bs: bs}, name, sz, footer, stats) if err != nil { return emptyChunkSource{}, err } diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 2321d47e06..82c0f3c451 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -418,8 +418,8 @@ func (gcs *GenerationalNBS) Size(ctx context.Context) (uint64, error) { } // WriteTableFile will read a table file from the provided reader and write it to the new gen TableFileStore -func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { - return gcs.newGen.WriteTableFile(ctx, fileId, numChunks, contentHash, getRd) +func (gcs *GenerationalNBS) WriteTableFile(ctx context.Context, fileId string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { + return gcs.newGen.WriteTableFile(ctx, fileId, splitOffset, numChunks, contentHash, getRd) } // AddTableFilesToManifest adds table files to the manifest of the newgen cs diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index a7707f3acc..04e6ea2eb4 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -320,11 +320,13 @@ func (j *ChunkJournal) Path() string { return filepath.Dir(j.path) } -func (j *ChunkJournal) CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, splitOffset uint64) error { +func (j *ChunkJournal) CopyTableFile(ctx context.Context, r io.Reader, fileId string, _ uint64, _ uint64) error { if j.backing.readOnly() { return errReadOnlyManifest } - return j.persister.CopyTableFile(ctx, r, fileId, fileSz, splitOffset) + // we are always using an fsTablePersister, and know that implementation ignores the fileSz and splitOffset. + // Should this ever change in the future, those parameters should be passed through. + return j.persister.CopyTableFile(ctx, r, fileId, 0, 0) } // Name implements manifest. diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index ad877f01be..ded3799722 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -52,8 +52,8 @@ func (nbsMW *NBSMetricWrapper) Size(ctx context.Context) (uint64, error) { } // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore -func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { - return nbsMW.nbs.WriteTableFile(ctx, fileId, numChunks, contentHash, getRd) +func (nbsMW *NBSMetricWrapper) WriteTableFile(ctx context.Context, fileId string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { + return nbsMW.nbs.WriteTableFile(ctx, fileId, splitOffset, numChunks, contentHash, getRd) } // AddTableFilesToManifest adds table files to the manifest diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index d46b8ecee3..f1d53d38a6 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1579,7 +1579,7 @@ func (nbs *NomsBlockStore) Sources(ctx context.Context) (hash.Hash, []chunks.Tab if err != nil { return hash.Hash{}, nil, nil, err } - + appendixTableFiles, err := getTableFiles(css, contents, contents.NumAppendixSpecs(), func(mc manifestContents, idx int) tableSpec { return mc.getAppendixSpec(idx) }) @@ -1688,15 +1688,15 @@ func (nbs *NomsBlockStore) Path() (string, bool) { return "", false } -// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore - NM4 rename to WriteStorageFile ??? -func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, numChunks int, contentHash []byte, getRd chunks.ReaderCallBack) error { +// WriteTableFile will read a table file from the provided reader and write it to the TableFileStore +func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, _ int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) error { valctx.ValidateContext(ctx) tfp, ok := nbs.persister.(tableFilePersister) if !ok { return errors.New("runtime error: table file persister required for WriteTableFile") } - r, sz, splitOffset, err := getRd() + r, sz, err := getRd() if err != nil { return err } diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index fe75143b4e..334a98c884 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -74,9 +74,10 @@ type tablePersister interface { type tableFilePersister interface { tablePersister - // CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore. |splitOffset| is - // the offset in bytes within the file where the file is split between data and the index. This is only used for the - // blob store persister. + // CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore. + // + // |splitOffset| is the offset in bytes within the file where the file is split between data and the index/footer. + // This is only used for the blob store persister, as it stores the data and index/footer in separate blobs. CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, splitOffset uint64) error // Path returns the file system path. Use CopyTableFile instead of Path to diff --git a/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto b/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto index 7215695211..b091f3a758 100644 --- a/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto +++ b/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto @@ -137,6 +137,7 @@ message GetDownloadLocsResponse { string repo_token = 2; } +// NM4 - probably need to add the splitOffset. message TableFileDetails { bytes id = 1; uint64 content_length = 2; From 876f9eac1c6479e911ac341f6ec52a934f2277ec Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Thu, 18 Sep 2025 18:00:31 -0700 Subject: [PATCH 09/20] Add the split_offset field to TableFileDetails --- .../services/remotesapi/v1alpha1/chunkstore.pb.go | 13 +++++++++++-- .../services/remotesapi/v1alpha1/chunkstore.proto | 2 +- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go index 43feeac206..c284f83d9a 100644 --- a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go +++ b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go @@ -861,6 +861,7 @@ type TableFileDetails struct { ContentHash []byte `protobuf:"bytes,3,opt,name=content_hash,json=contentHash,proto3" json:"content_hash,omitempty"` NumChunks uint64 `protobuf:"varint,4,opt,name=num_chunks,json=numChunks,proto3" json:"num_chunks,omitempty"` Suffix string `protobuf:"bytes,5,opt,name=suffix,proto3" json:"suffix,omitempty"` + SplitOffset uint64 `protobuf:"varint,6,opt,name=split_offset,json=splitOffset,proto3" json:"split_offset,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -930,6 +931,13 @@ func (x *TableFileDetails) GetSuffix() string { return "" } +func (x *TableFileDetails) GetSplitOffset() uint64 { + if x != nil { + return x.SplitOffset + } + return 0 +} + type GetUploadLocsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` RepoId *RepoId `protobuf:"bytes,1,opt,name=repo_id,json=repoId,proto3" json:"repo_id,omitempty"` @@ -2196,14 +2204,15 @@ const file_dolt_services_remotesapi_v1alpha1_chunkstore_proto_rawDesc = "" + "\x17GetDownloadLocsResponse\x12B\n" + "\x04locs\x18\x01 \x03(\v2..dolt.services.remotesapi.v1alpha1.DownloadLocR\x04locs\x12\x1d\n" + "\n" + - "repo_token\x18\x02 \x01(\tR\trepoToken\"\xa3\x01\n" + + "repo_token\x18\x02 \x01(\tR\trepoToken\"\xc6\x01\n" + "\x10TableFileDetails\x12\x0e\n" + "\x02id\x18\x01 \x01(\fR\x02id\x12%\n" + "\x0econtent_length\x18\x02 \x01(\x04R\rcontentLength\x12!\n" + "\fcontent_hash\x18\x03 \x01(\fR\vcontentHash\x12\x1d\n" + "\n" + "num_chunks\x18\x04 \x01(\x04R\tnumChunks\x12\x16\n" + - "\x06suffix\x18\x05 \x01(\tR\x06suffix\"\xa9\x02\n" + + "\x06suffix\x18\x05 \x01(\tR\x06suffix\x12!\n" + + "\fsplit_offset\x18\x06 \x01(\x04R\vsplitOffset\"\xa9\x02\n" + "\x14GetUploadLocsRequest\x12B\n" + "\arepo_id\x18\x01 \x01(\v2).dolt.services.remotesapi.v1alpha1.RepoIdR\x06repoId\x12.\n" + "\x11table_file_hashes\x18\x02 \x03(\fB\x02\x18\x01R\x0ftableFileHashes\x12a\n" + diff --git a/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto b/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto index b091f3a758..bb63e8b1c7 100644 --- a/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto +++ b/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto @@ -137,13 +137,13 @@ message GetDownloadLocsResponse { string repo_token = 2; } -// NM4 - probably need to add the splitOffset. message TableFileDetails { bytes id = 1; uint64 content_length = 2; bytes content_hash = 3; uint64 num_chunks = 4; string suffix = 5; + uint64 split_offset = 6; } message GetUploadLocsRequest { From 8246ad09af03362860106ecbdef18c57e871bf6d Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Thu, 18 Sep 2025 18:08:07 -0700 Subject: [PATCH 10/20] URL query parameters for split_offset. Not backwards compatible yet --- go/libraries/doltcore/remotesrv/grpc.go | 2 +- go/libraries/doltcore/remotestorage/chunk_store.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index 6f74d86663..581ed33850 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -425,7 +425,7 @@ func (rs *RemoteChunkStore) getUploadUrl(md metadata.MD, repoPath string, tfd *r fileID := hash.New(tfd.Id).String() + tfd.Suffix params := url.Values{} params.Add("num_chunks", strconv.Itoa(int(tfd.NumChunks))) - // NM4. params.Add("split_offset", strconv.Itoa(int(tfd.SplitOffset))) + params.Add("split_offset", strconv.Itoa(int(tfd.SplitOffset))) params.Add("content_length", strconv.Itoa(int(tfd.ContentLength))) params.Add("content_hash", base64.RawURLEncoding.EncodeToString(tfd.ContentHash)) return &url.URL{ diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index dda5fe1d34..b2012009e0 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -954,7 +954,7 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has // NM4 - This method is called from two paths. One is the WriteTableFile function, which works with noms tables and archives, // and the other is the uploadChunks function, which works with noms chunks and chunk tables. -func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, _ uint64, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error { +func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, splitOffset uint64, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error { op := func() error { body, contentLength, err := getContent() if err != nil { @@ -967,7 +967,7 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table ContentHash: tableFileContentHash, NumChunks: numChunks, Suffix: suffix, - // NM4 - SplitOffset: splitOffset, + SplitOffset: splitOffset, } dcs.logf("getting upload location for file %s", tableFileId.String()) From de7aeba8d6481af56bafb10634d6266cef76882e Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Thu, 18 Sep 2025 18:24:08 -0700 Subject: [PATCH 11/20] Update test interfaces --- go/store/datas/pull/pull_table_file_writer_test.go | 6 +++--- go/store/nbs/store_test.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/store/datas/pull/pull_table_file_writer_test.go b/go/store/datas/pull/pull_table_file_writer_test.go index f62f4e67dc..3ca647d02f 100644 --- a/go/store/datas/pull/pull_table_file_writer_test.go +++ b/go/store/datas/pull/pull_table_file_writer_test.go @@ -349,7 +349,7 @@ type noopTableFileDestStore struct { writeCalled atomic.Uint32 } -func (s *noopTableFileDestStore) WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { +func (s *noopTableFileDestStore) WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { if s.writeDelay > 0 { time.Sleep(s.writeDelay) } @@ -373,7 +373,7 @@ type testDataTableFileDestStore struct { doneWriteTableFile chan struct{} } -func (s *testDataTableFileDestStore) WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { +func (s *testDataTableFileDestStore) WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { s.atWriteTableFile <- struct{}{} <-s.doWriteTableFile defer func() { @@ -400,7 +400,7 @@ type errTableFileDestStore struct { addCalled int } -func (s *errTableFileDestStore) WriteTableFile(ctx context.Context, id string, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { +func (s *errTableFileDestStore) WriteTableFile(ctx context.Context, id string, splitOffset uint64, numChunks int, contentHash []byte, getRd func() (io.ReadCloser, uint64, error)) error { rd, _, _ := getRd() if rd != nil { rd.Close() diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 45ac2d1dcc..d5e2790ceb 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -78,7 +78,7 @@ func writeLocalTableFiles(t *testing.T, st *NomsBlockStore, numTableFiles, seed fileID := addr.String() fileToData[fileID] = data fileIDToNumChunks[fileID] = i + 1 - err = st.WriteTableFile(ctx, fileID, i+1, nil, func() (io.ReadCloser, uint64, error) { + err = st.WriteTableFile(ctx, fileID, 0, i+1, nil, func() (io.ReadCloser, uint64, error) { return io.NopCloser(bytes.NewReader(data)), uint64(len(data)), nil }) require.NoError(t, err) From dc2f91bebe7eb18a504978f17737715909933ae2 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 6 Oct 2025 16:47:02 -0700 Subject: [PATCH 12/20] Add split offset to the TableFileInfo proto spec --- .../remotesapi/v1alpha1/chunkstore.pb.go | 20 +++++++++++++------ .../remotesapi/v1alpha1/chunkstore.proto | 1 + 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go index c284f83d9a..dae648a740 100644 --- a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go +++ b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go @@ -21,13 +21,12 @@ package remotesapi import ( - reflect "reflect" - sync "sync" - unsafe "unsafe" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" + unsafe "unsafe" ) const ( @@ -1743,6 +1742,7 @@ type TableFileInfo struct { Url string `protobuf:"bytes,3,opt,name=url,proto3" json:"url,omitempty"` RefreshAfter *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=refresh_after,json=refreshAfter,proto3" json:"refresh_after,omitempty"` RefreshRequest *RefreshTableFileUrlRequest `protobuf:"bytes,5,opt,name=refresh_request,json=refreshRequest,proto3" json:"refresh_request,omitempty"` + SplitOffset uint64 `protobuf:"varint,6,opt,name=split_offset,json=splitOffset,proto3" json:"split_offset,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -1812,6 +1812,13 @@ func (x *TableFileInfo) GetRefreshRequest() *RefreshTableFileUrlRequest { return nil } +func (x *TableFileInfo) GetSplitOffset() uint64 { + if x != nil { + return x.SplitOffset + } + return 0 +} + type RefreshTableFileUrlRequest struct { state protoimpl.MessageState `protogen:"open.v1"` RepoId *RepoId `protobuf:"bytes,1,opt,name=repo_id,json=repoId,proto3" json:"repo_id,omitempty"` @@ -2279,14 +2286,15 @@ const file_dolt_services_remotesapi_v1alpha1_chunkstore_proto_rawDesc = "" + "\rappendix_only\x18\x02 \x01(\bB\x02\x18\x01R\fappendixOnly\x12\x1d\n" + "\n" + "repo_token\x18\x03 \x01(\tR\trepoToken\x12\x1b\n" + - "\trepo_path\x18\x04 \x01(\tR\brepoPath\"\x82\x02\n" + + "\trepo_path\x18\x04 \x01(\tR\brepoPath\"\xa5\x02\n" + "\rTableFileInfo\x12\x17\n" + "\afile_id\x18\x01 \x01(\tR\x06fileId\x12\x1d\n" + "\n" + "num_chunks\x18\x02 \x01(\rR\tnumChunks\x12\x10\n" + "\x03url\x18\x03 \x01(\tR\x03url\x12?\n" + "\rrefresh_after\x18\x04 \x01(\v2\x1a.google.protobuf.TimestampR\frefreshAfter\x12f\n" + - "\x0frefresh_request\x18\x05 \x01(\v2=.dolt.services.remotesapi.v1alpha1.RefreshTableFileUrlRequestR\x0erefreshRequest\"\xb5\x01\n" + + "\x0frefresh_request\x18\x05 \x01(\v2=.dolt.services.remotesapi.v1alpha1.RefreshTableFileUrlRequestR\x0erefreshRequest\x12!\n" + + "\fsplit_offset\x18\x06 \x01(\x04R\vsplitOffset\"\xb5\x01\n" + "\x1aRefreshTableFileUrlRequest\x12B\n" + "\arepo_id\x18\x01 \x01(\v2).dolt.services.remotesapi.v1alpha1.RepoIdR\x06repoId\x12\x17\n" + "\afile_id\x18\x02 \x01(\tR\x06fileId\x12\x1d\n" + diff --git a/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto b/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto index bb63e8b1c7..78ea6f3faf 100644 --- a/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto +++ b/proto/dolt/services/remotesapi/v1alpha1/chunkstore.proto @@ -268,6 +268,7 @@ message TableFileInfo { string url = 3; google.protobuf.Timestamp refresh_after = 4; RefreshTableFileUrlRequest refresh_request = 5; + uint64 split_offset = 6; } message RefreshTableFileUrlRequest { From 18e9fcbec1e2c3ebf812c91d4000f80686dfb92a Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 6 Oct 2025 16:47:33 -0700 Subject: [PATCH 13/20] Make the split_offset query string arg optional --- go/libraries/doltcore/remotesrv/http.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/go/libraries/doltcore/remotesrv/http.go b/go/libraries/doltcore/remotesrv/http.go index da177f3485..5e0daffa69 100644 --- a/go/libraries/doltcore/remotesrv/http.go +++ b/go/libraries/doltcore/remotesrv/http.go @@ -148,14 +148,6 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { return } - splitOffset, err := strconv.ParseUint(q.Get("split_offset"), 10, 64) - if err != nil { - logger = logger.WithField("status", http.StatusBadRequest) - logger.WithError(err).Warn("bad request: split_offset parameter did not parse") - respWr.WriteHeader(http.StatusBadRequest) - return - } - cls := q.Get("content_length") if cls == "" { logger = logger.WithField("status", http.StatusBadRequest) @@ -185,6 +177,19 @@ func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { return } + // splitOffset is not required to allow for backwards compatibility with older clients. + splitOffset := uint64(0) + splitQstr := q.Get("split_offset") + if splitQstr != "" { + splitOffset, err = strconv.ParseUint(splitQstr, 10, 64) + if err != nil { + logger = logger.WithField("status", http.StatusBadRequest) + logger.WithError(err).Warn("bad request: split_offset parameter did not parse") + respWr.WriteHeader(http.StatusBadRequest) + return + } + } + logger, statusCode = writeTableFile(req.Context(), logger, fh.dbCache, filepath, file, splitOffset, numChunks, contentHash, uint64(contentLength), req.Body) } From ad1cf174fa825841c62e3ea7e8782c16915ee0a1 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Mon, 6 Oct 2025 17:45:02 -0700 Subject: [PATCH 14/20] More clarity in gDoc --- go/libraries/doltcore/remotestorage/chunk_store.go | 2 -- go/store/chunks/tablefilestore.go | 13 +++++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index b2012009e0..912c50d99d 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -952,8 +952,6 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has return hashToCount, nil } -// NM4 - This method is called from two paths. One is the WriteTableFile function, which works with noms tables and archives, -// and the other is the uploadChunks function, which works with noms chunks and chunk tables. func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, tableFileId hash.Hash, suffix string, splitOffset uint64, numChunks uint64, tableFileContentHash []byte, getContent func() (io.ReadCloser, uint64, error)) error { op := func() error { body, contentLength, err := getContent() diff --git a/go/store/chunks/tablefilestore.go b/go/store/chunks/tablefilestore.go index bf7ff3788a..4ff95d4515 100644 --- a/go/store/chunks/tablefilestore.go +++ b/go/store/chunks/tablefilestore.go @@ -38,10 +38,15 @@ type TableFile interface { // NumChunks returns the number of chunks in a table file NumChunks() int - // SplitOffset returns the split offset of the storage file. In table files, this is generally determine by calculating - // the index size based on the number of chunks, then subtracting that from the total file size. Archive files do not - // have a deterministic way to calculate the split offset, so we either need to be told the offset or read the footer - // of the file to determine the index size then calculate the split offset. Passing the offset around similfies that. + // SplitOffset returns the byte offset from the beginning of the storage file where we transition from data to index. + // + // In table files, this is generally determined by calculating the index size based on the number of chunks, then + // subtracting that from the total file size. + // Archive files do not have a deterministic way to calculate the split offset, so we either need to be told the + // offset or read the footer of the file to determine the index size then calculate the split offset. + // + // Passing the offset around similfies this. It is meaningful for both current storage types, though we will probably + // keep the table file's chunk count method around for a while. SplitOffset() uint64 // Open returns an io.ReadCloser which can be used to read the bytes of a From 405946676fd0ed5504a66bda6a81e3f31ddaee0f Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 8 Oct 2025 11:15:49 -0700 Subject: [PATCH 15/20] more fixes to blobstore file resolution --- go/store/nbs/bs_persister.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index ed8ec9638a..1ece2f6546 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -142,12 +142,18 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour return emptyChunkSource{}, nil, err } - cs, err := newBSTableChunkSource(ctx, bsp.bs, plan.name, plan.chunkCount, bsp.q, stats) + var cs chunkSource + if archiveFound { + cs, err = newBSArchiveChunkSource(ctx, bsp.bs, plan.name, stats) + } else { + cs, err = newBSTableChunkSource(ctx, bsp.bs, plan.name, plan.chunkCount, bsp.q, stats) + } + return cs, func() {}, err } func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunkSource) (name string, err error) { - name = cs.hash().String() + tableRecordsExt + name = cs.hash().String() + cs.suffix() + tableRecordsExt // first check if we created this sub-object on Persist() ok, err := bsp.bs.Exists(ctx, name) if err != nil { From f84a15c515c5cc509502d38e7e31b69595aec2e0 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Wed, 8 Oct 2025 16:39:09 -0700 Subject: [PATCH 16/20] Create the *.darc.records file when it doesn't exist --- go/store/nbs/bs_persister.go | 38 +++++++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 1ece2f6546..75329cb378 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -164,9 +164,19 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk // otherwise create the sub-object from |table| // (requires a round-trip for remote blobstores) + if cs.suffix() == ArchiveFileSuffix { + err = bsp.hotCreateArchiveRecords(ctx, cs) + } else { + err = bsp.hotCreateTableRecords(ctx, cs) + } + + return name, err +} + +func (bsp *blobstorePersister) hotCreateTableRecords(ctx context.Context, cs chunkSource) error { cnt, err := cs.count() if err != nil { - return "", err + return err } off := tableTailOffset(cs.currentSize(), cnt) l := int64(off) @@ -174,7 +184,7 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk rdr, _, _, err := bsp.bs.Get(ctx, cs.hash().String(), rng) if err != nil { - return "", err + return err } defer func() { if cerr := rdr.Close(); cerr != nil { @@ -182,10 +192,28 @@ func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunk } }() - if _, err = bsp.bs.Put(ctx, name, l, rdr); err != nil { - return "", err + _, err = bsp.bs.Put(ctx, cs.hash().String()+tableRecordsExt, l, rdr) + return err +} + +func (bsp *blobstorePersister) hotCreateArchiveRecords(ctx context.Context, cs chunkSource) error { + arch, ok := cs.(archiveChunkSource) + if !ok { + return errors.New("runtime error: hotCreateArchiveRecords expected archiveChunkSource") } - return name, nil + + dataLen := int64(arch.aRdr.footer.dataSpan().length) + rng := blobstore.NewBlobRange(0, dataLen) + + rdr, _, _, err := bsp.bs.Get(ctx, arch.hash().String()+ArchiveFileSuffix, rng) + if err != nil { + return err + } + defer rdr.Close() + + key := arch.hash().String() + ArchiveFileSuffix + tableRecordsExt + _, err = bsp.bs.Put(ctx, key, dataLen, rdr) + return err } // Open a table named |name|, containing |chunkCount| chunks. From 1ca2727ba553065254d0a3fe1756ebf7b3ef5f2e Mon Sep 17 00:00:00 2001 From: macneale4 Date: Thu, 9 Oct 2025 00:19:04 +0000 Subject: [PATCH 17/20] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- .../dolt/services/remotesapi/v1alpha1/chunkstore.pb.go | 7 ++++--- go/store/datas/pull/clone.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go index dae648a740..88783fe62c 100644 --- a/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go +++ b/go/gen/proto/dolt/services/remotesapi/v1alpha1/chunkstore.pb.go @@ -21,12 +21,13 @@ package remotesapi import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" ) const ( diff --git a/go/store/datas/pull/clone.go b/go/store/datas/pull/clone.go index 988c7d9cd0..bced6cbab3 100644 --- a/go/store/datas/pull/clone.go +++ b/go/store/datas/pull/clone.go @@ -138,7 +138,7 @@ func clone(ctx context.Context, srcTS, sinkTS chunks.TableFileStore, sinkCS chun } report(TableFileEvent{EventType: DownloadStart, TableFiles: []chunks.TableFile{tblFile}}) - + err = sinkTS.WriteTableFile(ctx, tblFile.FileID()+tblFile.LocationSuffix(), tblFile.SplitOffset(), tblFile.NumChunks(), nil, func() (io.ReadCloser, uint64, error) { rd, contentLength, err := tblFile.Open(ctx) if err != nil { From cb605d3ecc889f864c622b2629548326657551d4 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV <46170177+macneale4@users.noreply.github.com> Date: Fri, 10 Oct 2025 10:24:40 -0700 Subject: [PATCH 18/20] Update go/store/chunks/tablefilestore.go Co-authored-by: Aaron Son --- go/store/chunks/tablefilestore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/store/chunks/tablefilestore.go b/go/store/chunks/tablefilestore.go index 4ff95d4515..c1490f1509 100644 --- a/go/store/chunks/tablefilestore.go +++ b/go/store/chunks/tablefilestore.go @@ -45,7 +45,7 @@ type TableFile interface { // Archive files do not have a deterministic way to calculate the split offset, so we either need to be told the // offset or read the footer of the file to determine the index size then calculate the split offset. // - // Passing the offset around similfies this. It is meaningful for both current storage types, though we will probably + // Passing the offset around simplifies this. It is meaningful for both current storage types, though we will probably // keep the table file's chunk count method around for a while. SplitOffset() uint64 From b71dc7561e63f895a25d2a4bbafe72813045f347 Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Fri, 10 Oct 2025 10:57:37 -0700 Subject: [PATCH 19/20] Cope with splitOffset being unset --- go/store/nbs/bs_persister.go | 66 ++++++++++++++++++--------------- go/store/nbs/store.go | 9 ++++- go/store/nbs/table_persister.go | 3 +- 3 files changed, 46 insertions(+), 32 deletions(-) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 75329cb378..a62b349e9e 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -255,41 +255,47 @@ func (bsp *blobstorePersister) Path() string { } func (bsp *blobstorePersister) CopyTableFile(ctx context.Context, r io.Reader, name string, fileSz uint64, splitOffset uint64) error { - lr := io.LimitReader(r, int64(splitOffset)) + if splitOffset > 0 { + lr := io.LimitReader(r, int64(splitOffset)) - indexLen := int64(fileSz - splitOffset) + indexLen := int64(fileSz - splitOffset) - // check if we can Put concurrently - rr, ok := r.(io.ReaderAt) - if !ok { - // sequentially write chunk records then tail - if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, int64(splitOffset), lr); err != nil { - return err - } - if _, err := bsp.bs.Put(ctx, name+tableTailExt, indexLen, r); err != nil { - return err + // check if we can Put concurrently + rr, ok := r.(io.ReaderAt) + if !ok { + // sequentially write chunk records then tail + if _, err := bsp.bs.Put(ctx, name+tableRecordsExt, int64(splitOffset), lr); err != nil { + return err + } + if _, err := bsp.bs.Put(ctx, name+tableTailExt, indexLen, r); err != nil { + return err + } + } else { + // on the push path, we expect to Put concurrently + // see BufferedFileByteSink in byte_sink.go + eg, ectx := errgroup.WithContext(ctx) + eg.Go(func() error { + srdr := io.NewSectionReader(rr, int64(splitOffset), indexLen) + _, err := bsp.bs.Put(ectx, name+tableTailExt, indexLen, srdr) + return err + }) + eg.Go(func() error { + _, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(splitOffset), lr) + return err + }) + if err := eg.Wait(); err != nil { + return err + } } + + // finally concatenate into the complete table + _, err := bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt}) + return err } else { - // on the push path, we expect to Put concurrently - // see BufferedFileByteSink in byte_sink.go - eg, ectx := errgroup.WithContext(ctx) - eg.Go(func() error { - srdr := io.NewSectionReader(rr, int64(splitOffset), indexLen) - _, err := bsp.bs.Put(ectx, name+tableTailExt, indexLen, srdr) - return err - }) - eg.Go(func() error { - _, err := bsp.bs.Put(ectx, name+tableRecordsExt, int64(splitOffset), lr) - return err - }) - if err := eg.Wait(); err != nil { - return err - } + // no split offset, just copy the whole table. We will create the records object on demand if needed. + _, err := bsp.bs.Put(ctx, name, int64(fileSz), r) + return err } - - // finally concatenate into the complete table - _, err := bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt}) - return err } type bsTableReaderAt struct { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index f1d53d38a6..82ef562dec 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -29,6 +29,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "sync/atomic" "time" @@ -1689,7 +1690,7 @@ func (nbs *NomsBlockStore) Path() (string, bool) { } // WriteTableFile will read a table file from the provided reader and write it to the TableFileStore -func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, _ int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) error { +func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, splitOffset uint64, numChunks int, _ []byte, getRd func() (io.ReadCloser, uint64, error)) error { valctx.ValidateContext(ctx) tfp, ok := nbs.persister.(tableFilePersister) if !ok { @@ -1701,6 +1702,12 @@ func (nbs *NomsBlockStore) WriteTableFile(ctx context.Context, fileName string, return err } defer r.Close() + + if splitOffset == 0 && !strings.HasSuffix(fileName, ArchiveFileSuffix) { + splitOffset = tableTailOffset(sz, uint32(numChunks)) + } + + // CopyTableFile can cope with a 0 splitOffset. return tfp.CopyTableFile(ctx, r, fileName, sz, splitOffset) } diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index 334a98c884..b547719320 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -77,7 +77,8 @@ type tableFilePersister interface { // CopyTableFile copies the table file with the given fileId from the reader to the TableFileStore. // // |splitOffset| is the offset in bytes within the file where the file is split between data and the index/footer. - // This is only used for the blob store persister, as it stores the data and index/footer in separate blobs. + // This is only used for the blob store persister, as it stores the data and index/footer in separate blobs. In the + // event that splitOffset is 0, the blob store persister will upload the entire file as a single blob. CopyTableFile(ctx context.Context, r io.Reader, fileId string, fileSz uint64, splitOffset uint64) error // Path returns the file system path. Use CopyTableFile instead of Path to From d2016889296ff6bf87f466077177432057995eec Mon Sep 17 00:00:00 2001 From: Neil Macneale IV Date: Fri, 10 Oct 2025 11:50:51 -0700 Subject: [PATCH 20/20] mem table WriteChunks to return splitOffset --- .../doltcore/remotestorage/chunk_store.go | 4 +- go/store/nbs/aws_table_persister.go | 2 +- go/store/nbs/bs_persister.go | 15 +------ go/store/nbs/cmp_chunk_table_writer_test.go | 2 +- go/store/nbs/file_table_persister.go | 2 +- go/store/nbs/mem_table.go | 40 ++++++++++--------- go/store/nbs/mem_table_test.go | 19 ++++----- go/store/nbs/no_conjoin_bs_persister.go | 2 +- go/store/nbs/root_tracker_test.go | 2 +- go/store/nbs/table_writer.go | 2 + 10 files changed, 40 insertions(+), 50 deletions(-) diff --git a/go/libraries/doltcore/remotestorage/chunk_store.go b/go/libraries/doltcore/remotestorage/chunk_store.go index 912c50d99d..df99be4d9a 100644 --- a/go/libraries/doltcore/remotestorage/chunk_store.go +++ b/go/libraries/doltcore/remotestorage/chunk_store.go @@ -922,14 +922,14 @@ func (dcs *DoltChunkStore) uploadChunks(ctx context.Context, hashToChunk map[has // structuring so this can be done as multiple files in the future. { - name, data, err := nbs.WriteChunks(chnks) + name, data, splitOffset, err := nbs.WriteChunks(chnks) if err != nil { return map[hash.Hash]int{}, err } h := hash.Parse(name) - hashToSplitOffset[h] = uint64(len(data)) + hashToSplitOffset[h] = splitOffset hashToData[h] = data hashToCount[h] = len(chnks) diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index 4a4280b844..ed7e94dccc 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -136,7 +136,7 @@ func (s3p awsTablePersister) key(k string) string { } func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { - name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) + name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { return emptyChunkSource{}, gcBehavior_Continue, err } diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index a62b349e9e..cc281736ef 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -45,7 +45,7 @@ var _ tableFilePersister = &blobstorePersister{} // Persist makes the contents of mt durable. Chunks already present in // |haver| may be dropped in the process. func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { - address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) + address, data, splitOffset, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { return emptyChunkSource{}, gcBehavior_Continue, err } @@ -58,7 +58,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver name := address.String() // persist this table in two parts to facilitate later conjoins - records, tail := splitTableParts(data, chunkCount) + records, tail := data[:splitOffset], data[splitOffset:] // first write table records and tail (index+footer) as separate blobs eg, ectx := errgroup.WithContext(ctx) @@ -395,17 +395,6 @@ func newBSTableChunkSource(ctx context.Context, bs blobstore.Blobstore, name has return &chunkSourceAdapter{tr, name}, nil } -// splitTableParts separates a table into chunk records and meta data. -// -// +----------------------+-------+--------+ -// table format: | Chunk Record 0 ... N | Index | Footer | -// +----------------------+-------+--------+ -func splitTableParts(data []byte, count uint32) (records, tail []byte) { - o := tableTailOffset(uint64(len(data)), count) - records, tail = data[:o], data[o:] - return -} - func tableTailOffset(size uint64, count uint32) uint64 { return size - (indexSize(count) + footerSize) } diff --git a/go/store/nbs/cmp_chunk_table_writer_test.go b/go/store/nbs/cmp_chunk_table_writer_test.go index c8e3623112..c4f85cc5bd 100644 --- a/go/store/nbs/cmp_chunk_table_writer_test.go +++ b/go/store/nbs/cmp_chunk_table_writer_test.go @@ -32,7 +32,7 @@ func TestCmpChunkTableWriter(t *testing.T) { // Put some chunks in a table file and get the buffer back which contains the table file data ctx := context.Background() - expectedId, buff, err := WriteChunks(testMDChunks) + expectedId, buff, _, err := WriteChunks(testMDChunks) require.NoError(t, err) // Setup a TableReader to read compressed chunks out of diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index f2122c7400..54a09049ce 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -95,7 +95,7 @@ func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch t1 := time.Now() defer stats.PersistLatency.SampleTimeSince(t1) - name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) + name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { return emptyChunkSource{}, gcBehavior_Continue, err } diff --git a/go/store/nbs/mem_table.go b/go/store/nbs/mem_table.go index 2efdeb96d9..ac838a794b 100644 --- a/go/store/nbs/mem_table.go +++ b/go/store/nbs/mem_table.go @@ -41,7 +41,9 @@ const ( chunkNotAdded ) -func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) { +// WriteChunks writes the provided chunks to a newly created memory table and returns the name and data of the resulting +// table. +func WriteChunks(chunks []chunks.Chunk) (name string, data []byte, splitOffset uint64, err error) { var size uint64 for _, chunk := range chunks { size += uint64(len(chunk.Data())) @@ -52,26 +54,25 @@ func WriteChunks(chunks []chunks.Chunk) (string, []byte, error) { return writeChunksToMT(mt, chunks) } -func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (string, []byte, error) { +func writeChunksToMT(mt *memTable, chunks []chunks.Chunk) (name string, data []byte, splitOffset uint64, err error) { for _, chunk := range chunks { res := mt.addChunk(chunk.Hash(), chunk.Data()) if res == chunkNotAdded { - return "", nil, errors.New("didn't create this memory table with enough space to add all the chunks") + return "", nil, 0, errors.New("didn't create this memory table with enough space to add all the chunks") } } var stats Stats - name, data, count, _, err := mt.write(nil, nil, &stats) - + h, data, splitOffset, count, _, err := mt.write(nil, nil, &stats) if err != nil { - return "", nil, err + return "", nil, 0, err } if count != uint32(len(chunks)) { - return "", nil, errors.New("didn't write everything") + return "", nil, 0, errors.New("didn't write everything") } - return name.String(), data, nil + return h.String(), data, splitOffset, nil } type memTable struct { @@ -81,7 +82,7 @@ type memTable struct { pendingRefs []hasRecord getChildAddrs []chunks.GetAddrsCb maxData uint64 - totalData uint64 + totalData uint64 // size of uncompressed data in chunks } func newMemTable(memTableSize uint64) *memTable { @@ -220,11 +221,11 @@ func (mt *memTable) extract(ctx context.Context, chunks chan<- extractRecord) er return nil } -func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, count uint32, gcb gcBehavior, err error) { +func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name hash.Hash, data []byte, splitOffset uint64, chunkCount uint32, gcb gcBehavior, err error) { gcb = gcBehavior_Continue numChunks := uint64(len(mt.order)) if numChunks == 0 { - return hash.Hash{}, nil, 0, gcBehavior_Continue, fmt.Errorf("mem table cannot write with zero chunks") + return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, fmt.Errorf("mem table cannot write with zero chunks") } maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData) // todo: memory quota @@ -235,10 +236,10 @@ func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name sort.Sort(hasRecordByPrefix(mt.order)) // hasMany() requires addresses to be sorted. _, gcb, err = haver.hasMany(mt.order, keeper) if err != nil { - return hash.Hash{}, nil, 0, gcBehavior_Continue, err + return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, err } if gcb != gcBehavior_Continue { - return hash.Hash{}, nil, 0, gcb, err + return hash.Hash{}, nil, 0, 0, gcb, err } sort.Sort(hasRecordByOrder(mt.order)) // restore "insertion" order for write @@ -248,23 +249,24 @@ func (mt *memTable) write(haver chunkReader, keeper keeperF, stats *Stats) (name if !addr.has { h := addr.a tw.addChunk(*h, mt.chunks[*h]) - count++ + chunkCount++ } } tableSize, name, err := tw.finish() - if err != nil { - return hash.Hash{}, nil, 0, gcBehavior_Continue, err + return hash.Hash{}, nil, 0, 0, gcBehavior_Continue, err } - if count > 0 { + splitOffset = tableSize - (indexSize(chunkCount) + footerSize) + + if chunkCount > 0 { stats.BytesPerPersist.Sample(uint64(tableSize)) stats.CompressedChunkBytesPerPersist.Sample(uint64(tw.totalCompressedData)) stats.UncompressedChunkBytesPerPersist.Sample(uint64(tw.totalUncompressedData)) - stats.ChunksPerPersist.Sample(uint64(count)) + stats.ChunksPerPersist.Sample(uint64(chunkCount)) } - return name, buff[:tableSize], count, gcBehavior_Continue, nil + return name, buff[:tableSize], splitOffset, chunkCount, gcBehavior_Continue, nil } func (mt *memTable) close() error { diff --git a/go/store/nbs/mem_table_test.go b/go/store/nbs/mem_table_test.go index 3e97be099c..dfd0d99119 100644 --- a/go/store/nbs/mem_table_test.go +++ b/go/store/nbs/mem_table_test.go @@ -64,20 +64,17 @@ func mustChunk(chunk chunks.Chunk, err error) chunks.Chunk { } func TestWriteChunks(t *testing.T) { - name, data, err := WriteChunks(testMDChunks) - if err != nil { - t.Error(err) - } + name, data, splitOffSet, err := WriteChunks(testMDChunks) + require.NoError(t, err) + // Size of written data is stable so long as we don't change testMDChunks + assert.Equal(t, uint64(845), splitOffSet) + assert.Equal(t, 1089, len(data)) dir, err := os.MkdirTemp("", "write_chunks_test") - if err != nil { - t.Error(err) - } + require.NoError(t, err) err = os.WriteFile(dir+name, data, os.ModePerm) - if err != nil { - t.Error(err) - } + require.NoError(t, err) } func TestMemTableAddHasGetChunk(t *testing.T) { @@ -169,7 +166,7 @@ func TestMemTableWrite(t *testing.T) { defer tr2.close() assert.True(tr2.has(computeAddr(chunks[2]), nil)) - _, data, count, _, err := mt.write(chunkReaderGroup{tr1, tr2}, nil, &Stats{}) + _, data, _, count, _, err := mt.write(chunkReaderGroup{tr1, tr2}, nil, &Stats{}) require.NoError(t, err) assert.Equal(uint32(1), count) diff --git a/go/store/nbs/no_conjoin_bs_persister.go b/go/store/nbs/no_conjoin_bs_persister.go index 90fc9fbdb9..1668edce43 100644 --- a/go/store/nbs/no_conjoin_bs_persister.go +++ b/go/store/nbs/no_conjoin_bs_persister.go @@ -40,7 +40,7 @@ var _ tableFilePersister = &noConjoinBlobstorePersister{} // Persist makes the contents of mt durable. Chunks already present in // |haver| may be dropped in the process. func (bsp *noConjoinBlobstorePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, keeper keeperF, stats *Stats) (chunkSource, gcBehavior, error) { - address, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) + address, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { return emptyChunkSource{}, gcBehavior_Continue, err } else if gcb != gcBehavior_Continue { diff --git a/go/store/nbs/root_tracker_test.go b/go/store/nbs/root_tracker_test.go index a98292dff7..2857bd2034 100644 --- a/go/store/nbs/root_tracker_test.go +++ b/go/store/nbs/root_tracker_test.go @@ -510,7 +510,7 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c return emptyChunkSource{}, gcBehavior_Continue, nil } - name, data, chunkCount, gcb, err := mt.write(haver, keeper, stats) + name, data, _, chunkCount, gcb, err := mt.write(haver, keeper, stats) if err != nil { return emptyChunkSource{}, gcBehavior_Continue, err } else if gcb != gcBehavior_Continue { diff --git a/go/store/nbs/table_writer.go b/go/store/nbs/table_writer.go index e04a3e9928..7f029a328c 100644 --- a/go/store/nbs/table_writer.go +++ b/go/store/nbs/table_writer.go @@ -123,6 +123,8 @@ func (tw *tableWriter) addChunk(h hash.Hash, data []byte) bool { return true } +// finish completed table, writing the index and footer. Returns the total length of the table file and the hash used +// to identify the table. func (tw *tableWriter) finish() (tableFileLength uint64, blockAddr hash.Hash, err error) { err = tw.writeIndex()