Merge pull request #8810 from dolthub/aaron/nbs-iterate-all-chunks-stats

[no-release-notes] go: nbs/store: Small cleanups to iterateAllChunks.
This commit is contained in:
Aaron Son
2025-02-05 09:55:49 -08:00
committed by GitHub
9 changed files with 43 additions and 56 deletions

View File

@@ -177,7 +177,7 @@ func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgrou
}, keeper, stats)
}
func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), _ *Stats) error {
addrCount := uint32(len(acs.aRdr.prefixes))
for i := uint32(0); i < addrCount; i++ {
var h hash.Hash

View File

@@ -716,6 +716,6 @@ func (tcs *testChunkSource) currentSize() uint64 {
panic("never used")
}
func (tcs *testChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error {
func (tcs *testChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk), _ *Stats) error {
panic("never used")
}

View File

@@ -17,7 +17,6 @@ package nbs
import (
"context"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -54,7 +53,3 @@ func (csa chunkSourceAdapter) clone() (chunkSource, error) {
}
return &chunkSourceAdapter{tr, csa.h}, nil
}
func (csa chunkSourceAdapter) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
panic("unimplemented")
}

View File

@@ -94,6 +94,6 @@ func (ecs emptyChunkSource) clone() (chunkSource, error) {
return ecs, nil
}
func (ecs emptyChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk)) error {
func (ecs emptyChunkSource) iterateAllChunks(_ context.Context, _ func(chunks.Chunk), _ *Stats) error {
return nil
}

View File

@@ -30,7 +30,6 @@ import (
"path/filepath"
"time"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -170,49 +169,6 @@ func (ftr *fileTableReader) hash() hash.Hash {
return ftr.h
}
func (ftr *fileTableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
count := ftr.idx.chunkCount()
rdr, err := ftr.tableReader.r.Reader(ctx)
if err != nil {
return err
}
skr, ok := rdr.(io.ReadSeeker)
if !ok {
return errors.New("runtime error: reader does not support seeking")
}
for i := uint32(0); i < count; i++ {
if ctx.Err() != nil {
return ctx.Err()
}
var h hash.Hash
ie, err := ftr.idx.indexEntry(i, &h)
if err != nil {
return err
}
readNBytes, err := readNFrom(skr, ie.Offset(), ie.Length())
if err != nil {
return err
}
cchk, err := NewCompressedChunk(h, readNBytes)
if err != nil {
return err
}
chk, err := cchk.ToChunk()
if err != nil {
return err
}
cb(chk)
}
return nil
}
func (ftr *fileTableReader) Close() error {
return ftr.tableReader.close()
}

View File

@@ -230,7 +230,7 @@ func (s journalChunkSource) close() error {
return nil
}
func (s journalChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error {
func (s journalChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), _ *Stats) error {
var finalErr error
// TODO - a less time consuming lock is possible here. Using s.journal.snapshot and processJournalRecords()

View File

@@ -1979,7 +1979,7 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error {
func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error {
for _, v := range nbs.tables.novel {
err := v.iterateAllChunks(ctx, cb)
err := v.iterateAllChunks(ctx, cb, nbs.stats)
if err != nil {
return err
}
@@ -1988,7 +1988,7 @@ func (nbs *NomsBlockStore) IterateAllChunks(ctx context.Context, cb func(chunk c
}
}
for _, v := range nbs.tables.upstream {
err := v.iterateAllChunks(ctx, cb)
err := v.iterateAllChunks(ctx, cb, nbs.stats)
if err != nil {
return err
}

View File

@@ -265,7 +265,7 @@ type chunkSource interface {
// If there is a failure reading the chunk, the error will be returned - note that this can happen in the middle of
// the scan, and will likely mean that the scan didn't complete. Note that errors returned by this method are not
// related to the callback - if the callback discovers an error, it must manage that out of band.
iterateAllChunks(context.Context, func(chunk chunks.Chunk)) error
iterateAllChunks(context.Context, func(chunk chunks.Chunk), *Stats) error
}
type chunkSources []chunkSource

View File

@@ -754,3 +754,39 @@ func (tr tableReader) clone() (tableReader, error) {
blockSize: tr.blockSize,
}, nil
}
func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk), stats *Stats) error {
count := tr.idx.chunkCount()
for i := uint32(0); i < count; i++ {
if ctx.Err() != nil {
return ctx.Err()
}
var h hash.Hash
ie, err := tr.idx.indexEntry(i, &h)
if err != nil {
return err
}
res := make([]byte, ie.Length())
n, err := tr.r.ReadAtWithStats(ctx, res, int64(ie.Offset()), stats)
if err != nil {
return err
}
if uint32(n) != ie.Length() {
return errors.New("failed to read all data")
}
cchk, err := NewCompressedChunk(h, res)
if err != nil {
return err
}
chk, err := cchk.ToChunk()
if err != nil {
return err
}
cb(chk)
}
return nil
}