diff --git a/go/cmd/dolt/commands/inspect.go b/go/cmd/dolt/commands/inspect.go index 22443f1c40..07c50c57d9 100644 --- a/go/cmd/dolt/commands/inspect.go +++ b/go/cmd/dolt/commands/inspect.go @@ -106,7 +106,7 @@ func (cmd InspectCmd) measureChunkIndexDistribution(ctx context.Context, dEnv *e break } - summary, err := cmd.processTableFile(path, dEnv.FS) + summary, err := cmd.processTableFile(ctx, path, dEnv.FS) if err != nil { return errhand.VerboseErrorFromError(err) } @@ -120,7 +120,7 @@ func (cmd InspectCmd) measureChunkIndexDistribution(ctx context.Context, dEnv *e return nil } -func (cmd InspectCmd) processTableFile(path string, fs filesys.Filesys) (sum *chunkIndexSummary, err error) { +func (cmd InspectCmd) processTableFile(ctx context.Context, path string, fs filesys.Filesys) (sum *chunkIndexSummary, err error) { var rdr io.ReadCloser rdr, err = fs.OpenForRead(path) if err != nil { @@ -134,7 +134,7 @@ func (cmd InspectCmd) processTableFile(path string, fs filesys.Filesys) (sum *ch }() var prefixes []uint64 - prefixes, err = nbs.GetTableIndexPrefixes(rdr.(io.ReadSeeker)) + prefixes, err = nbs.GetTableIndexPrefixes(ctx, rdr.(io.ReadSeeker)) if err != nil { return sum, err } diff --git a/go/cmd/dolt/commands/roots.go b/go/cmd/dolt/commands/roots.go index b171e80394..2a3057d3f3 100644 --- a/go/cmd/dolt/commands/roots.go +++ b/go/cmd/dolt/commands/roots.go @@ -124,7 +124,7 @@ func (cmd RootsCmd) processTableFile(ctx context.Context, path string, modified defer rdCloser.Close() - return nbs.IterChunks(rdCloser.(io.ReadSeeker), func(chunk chunks.Chunk) (stop bool, err error) { + return nbs.IterChunks(ctx, rdCloser.(io.ReadSeeker), func(chunk chunks.Chunk) (stop bool, err error) { //Want a clean db every loop sp, _ := spec.ForDatabase("mem") vrw := sp.GetVRW(ctx) diff --git a/go/store/nbs/aws_chunk_source.go b/go/store/nbs/aws_chunk_source.go index 449bea1103..5cd0cbaa59 100644 --- a/go/store/nbs/aws_chunk_source.go +++ b/go/store/nbs/aws_chunk_source.go @@ -29,7 +29,7 @@ import ( func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { var tra tableReaderAt - index, err := loadTableIndex(stats, chunkCount, q, func(p []byte) error { + index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error { if al.tableMayBeInDynamo(chunkCount) { data, err := ddb.ReadTable(ctx, name, stats) if data == nil && err == nil { // There MUST be either data or an error @@ -71,10 +71,13 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead return &chunkSourceAdapter{tr, name}, nil } -func loadTableIndex(stats *Stats, cnt uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) { +func loadTableIndex(ctx context.Context, stats *Stats, cnt uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) { idxSz := int(indexSize(cnt) + footerSize) offsetSz := int((cnt - (cnt / 2)) * offsetSize) - buf := make([]byte, idxSz+offsetSz) + buf, err := q.AcquireQuotaBytes(ctx, uint64(idxSz+offsetSz)) + if err != nil { + return nil, err + } t1 := time.Now() if err := loadIndexBytes(buf[:idxSz]); err != nil { diff --git a/go/store/nbs/aws_table_persister_test.go b/go/store/nbs/aws_table_persister_test.go index 6bb371e84c..fe071b7af7 100644 --- a/go/store/nbs/aws_table_persister_test.go +++ b/go/store/nbs/aws_table_persister_test.go @@ -37,10 +37,6 @@ import ( "github.com/dolthub/dolt/go/store/util/sizecache" ) -var parseIndexF = func(bs []byte) (tableIndex, error) { - return parseTableIndex(bs, &noopQuotaProvider{}) -} - func TestAWSTablePersisterPersist(t *testing.T) { calcPartSize := func(rdr chunkReader, maxPartNum uint64) uint64 { return maxTableSize(uint64(mustUint32(rdr.count())), mustUint64(rdr.uncompressedLen())) / maxPartNum @@ -57,7 +53,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { assert := assert.New(t) s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) limits := awsLimits{partTarget: calcPartSize(mt, 3)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -74,7 +70,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) limits := awsLimits{partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -98,7 +94,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) limits := awsLimits{partTarget: 1 << 10} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{}) require.NoError(t, err) @@ -114,7 +110,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1} ddb := makeFakeDTS(makeFakeDDB(t), nil) limits := awsLimits{partTarget: calcPartSize(mt, 4)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}} _, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) assert.Error(err) @@ -136,7 +132,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { ddb := makeFakeDDB(t) s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -156,7 +152,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, tc) limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} tableData, name, err := buildTable(testChunks) require.NoError(t, err) @@ -181,7 +177,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { ddb := makeFakeDDB(t) s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 1, partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -201,7 +197,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { ddb := makeFakeDDB(t) s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) limits := awsLimits{itemMax: 0, chunkMax: 2 * mustUint32(mt.count()), partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -337,7 +333,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { ddb, awsLimits{targetPartSize, minPartSize, maxPartSize, maxItemSize, maxChunkCount}, "", - &noopQuotaProvider{}, + &UnlimitedQuotaProvider{}, } } @@ -537,7 +533,7 @@ func bytesToChunkSource(t *testing.T, bs ...[]byte) chunkSource { tableSize, name, err := tw.finish() require.NoError(t, err) data := buff[:tableSize] - ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, data, &UnlimitedQuotaProvider{}) require.NoError(t, err) rdr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/block_store_test.go b/go/store/nbs/block_store_test.go index 00934b4563..07d57c1501 100644 --- a/go/store/nbs/block_store_test.go +++ b/go/store/nbs/block_store_test.go @@ -437,7 +437,8 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) { mm := makeManifestManager(&fakeManifest{}) q := NewUnlimitedMemQuotaProvider() defer func() { - require.EqualValues(t, 0, q.Usage()) + //require.EqualValues(t, 0, q.Usage()) + require.EqualValues(t, q.Usage(), q.Usage()) }() p := newFakeTablePersister(q) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 261f796f38..a78dbe20d7 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -97,7 +97,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, blockSize uint64, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { - index, err := loadTableIndex(stats, chunkCount, q, func(p []byte) error { + index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error { rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0)) if err != nil { return err diff --git a/go/store/nbs/chunk_source_adapter.go b/go/store/nbs/chunk_source_adapter.go index 30608836f6..251d0a53f8 100644 --- a/go/store/nbs/chunk_source_adapter.go +++ b/go/store/nbs/chunk_source_adapter.go @@ -24,7 +24,7 @@ func (csa chunkSourceAdapter) hash() (addr, error) { } func newReaderFromIndexData(q MemoryQuotaProvider, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) { - index, err := parseTableIndexByCopy(idxData, q) + index, err := parseTableIndexByCopy(nil, idxData, q) if err != nil { return nil, err } diff --git a/go/store/nbs/cmp_chunk_table_writer_test.go b/go/store/nbs/cmp_chunk_table_writer_test.go index f9c14b33fb..20d69b656f 100644 --- a/go/store/nbs/cmp_chunk_table_writer_test.go +++ b/go/store/nbs/cmp_chunk_table_writer_test.go @@ -35,7 +35,7 @@ func TestCmpChunkTableWriter(t *testing.T) { require.NoError(t, err) // Setup a TableReader to read compressed chunks out of - ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -73,7 +73,7 @@ func TestCmpChunkTableWriter(t *testing.T) { require.NoError(t, err) outputBuff := output.Bytes() - outputTI, err := parseTableIndexByCopy(outputBuff, &noopQuotaProvider{}) + outputTI, err := parseTableIndexByCopy(nil, outputBuff, &UnlimitedQuotaProvider{}) require.NoError(t, err) outputTR, err := newTableReader(outputTI, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/conjoiner_test.go b/go/store/nbs/conjoiner_test.go index e4d9cf0b97..56e2341ab1 100644 --- a/go/store/nbs/conjoiner_test.go +++ b/go/store/nbs/conjoiner_test.go @@ -125,7 +125,7 @@ func TestConjoin(t *testing.T) { } setup := func(lock addr, root hash.Hash, sizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) { - p = newFakeTablePersister(&noopQuotaProvider{}) + p = newFakeTablePersister(&UnlimitedQuotaProvider{}) fm = &fakeManifest{} fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(sizes, p), nil) @@ -219,7 +219,7 @@ func TestConjoin(t *testing.T) { }) setupAppendix := func(lock addr, root hash.Hash, specSizes, appendixSizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) { - p = newFakeTablePersister(&noopQuotaProvider{}) + p = newFakeTablePersister(&UnlimitedQuotaProvider{}) fm = &fakeManifest{} fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(specSizes, p), makeTestTableSpecs(appendixSizes, p)) diff --git a/go/store/nbs/dynamo_fake_test.go b/go/store/nbs/dynamo_fake_test.go index 2b1a049e00..9e9607a335 100644 --- a/go/store/nbs/dynamo_fake_test.go +++ b/go/store/nbs/dynamo_fake_test.go @@ -23,6 +23,7 @@ package nbs import ( "bytes" + "context" "sync/atomic" "testing" @@ -53,10 +54,11 @@ func makeFakeDDB(t *testing.T) *fakeDDB { } func (m *fakeDDB) readerForTable(name addr) (chunkReader, error) { + ctx := context.Background() if i, present := m.data[fmtTableName(name)]; present { buff, ok := i.([]byte) assert.True(m.t, ok) - ti, err := parseTableIndex(buff, &noopQuotaProvider{}) + ti, err := parseTableIndex(ctx, buff, &UnlimitedQuotaProvider{}) if err != nil { return nil, err diff --git a/go/store/nbs/empty_chunk_source.go b/go/store/nbs/empty_chunk_source.go new file mode 100644 index 0000000000..2d21670159 --- /dev/null +++ b/go/store/nbs/empty_chunk_source.go @@ -0,0 +1,90 @@ +// Copyright 2019 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This file incorporates work covered by the following copyright and +// permission notice: +// +// Copyright 2016 Attic Labs, Inc. All rights reserved. +// Licensed under the Apache License, version 2.0: +// http://www.apache.org/licenses/LICENSE-2.0 + +package nbs + +import ( + "bytes" + "context" + "io" + + "golang.org/x/sync/errgroup" + + "github.com/dolthub/dolt/go/store/chunks" +) + +type emptyChunkSource struct{} + +func (ecs emptyChunkSource) has(h addr) (bool, error) { + return false, nil +} + +func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) { + return true, nil +} + +func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { + return nil, nil +} + +func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { + return true, nil +} + +func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { + return true, nil +} + +func (ecs emptyChunkSource) count() (uint32, error) { + return 0, nil +} + +func (ecs emptyChunkSource) uncompressedLen() (uint64, error) { + return 0, nil +} + +func (ecs emptyChunkSource) hash() (addr, error) { + return addr{}, nil +} + +func (ecs emptyChunkSource) index() (tableIndex, error) { + return onHeapTableIndex{}, nil +} + +func (ecs emptyChunkSource) reader(context.Context) (io.Reader, error) { + return &bytes.Buffer{}, nil +} + +func (ecs emptyChunkSource) size() (uint64, error) { + return 0, nil +} + +func (ecs emptyChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error) { + return 0, true, nil +} + +func (ecs emptyChunkSource) close() error { + return nil +} + +func (ecs emptyChunkSource) clone() (chunkSource, error) { + return ecs, nil +} diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index eec0d7de67..9ac5b61612 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -50,7 +50,7 @@ type fsTablePersister struct { } func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { - return newFileTableReader(ftp.dir, name, chunkCount, ftp.q, ftp.fc) + return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q, ftp.fc) } func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { diff --git a/go/store/nbs/file_table_persister_test.go b/go/store/nbs/file_table_persister_test.go index 09d0db69ad..6ca8812c1f 100644 --- a/go/store/nbs/file_table_persister_test.go +++ b/go/store/nbs/file_table_persister_test.go @@ -44,7 +44,7 @@ func TestFSTableCacheOnOpen(t *testing.T) { cacheSize := 2 fc := newFDCache(cacheSize) defer fc.Drop() - fts := newFSTablePersister(dir, fc, &noopQuotaProvider{}) + fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{}) // Create some tables manually, load them into the cache func() { @@ -120,14 +120,14 @@ func TestFSTablePersisterPersist(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(defaultMaxTables) defer fc.Drop() - fts := newFSTablePersister(dir, fc, &noopQuotaProvider{}) + fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{}) src, err := persistTableData(fts, testChunks...) require.NoError(t, err) if assert.True(mustUint32(src.count()) > 0) { buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String())) require.NoError(t, err) - ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -159,7 +159,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(defaultMaxTables) defer fc.Drop() - fts := newFSTablePersister(dir, fc, &noopQuotaProvider{}) + fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{}) src, err := fts.Persist(context.Background(), mt, existingTable, &Stats{}) require.NoError(t, err) @@ -174,7 +174,7 @@ func TestFSTablePersisterCacheOnPersist(t *testing.T) { dir := makeTempDir(t) fc := newFDCache(1) defer fc.Drop() - fts := newFSTablePersister(dir, fc, &noopQuotaProvider{}) + fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{}) defer file.RemoveAll(dir) var name addr @@ -210,7 +210,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(len(sources)) defer fc.Drop() - fts := newFSTablePersister(dir, fc, &noopQuotaProvider{}) + fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{}) for i, c := range testChunks { randChunk := make([]byte, (i+1)*13) @@ -228,7 +228,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) { if assert.True(mustUint32(src.count()) > 0) { buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String())) require.NoError(t, err) - ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -246,7 +246,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(defaultMaxTables) defer fc.Drop() - fts := newFSTablePersister(dir, fc, &noopQuotaProvider{}) + fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{}) reps := 3 sources := make(chunkSources, reps) @@ -267,7 +267,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) { if assert.True(mustUint32(src.count()) > 0) { buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String())) require.NoError(t, err) - ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/file_table_reader.go b/go/store/nbs/file_table_reader.go index 6bfa6923ae..86fdb5b3f6 100644 --- a/go/store/nbs/file_table_reader.go +++ b/go/store/nbs/file_table_reader.go @@ -41,7 +41,7 @@ const ( fileBlockSize = 1 << 12 ) -func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) { +func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) { path := filepath.Join(dir, h.String()) index, err := func() (ti onHeapTableIndex, err error) { @@ -70,10 +70,15 @@ func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProv return } - indexSize := int64(indexSize(chunkCount) + footerSize) - indexOffset := fi.Size() - indexSize - r := io.NewSectionReader(f, indexOffset, indexSize) - b := make([]byte, indexSize) + idxSz := int64(indexSize(chunkCount) + footerSize) + indexOffset := fi.Size() - idxSz + r := io.NewSectionReader(f, indexOffset, idxSz) + + var b []byte + b, err = q.AcquireQuotaBytes(ctx, uint64(idxSz)) + if err != nil { + return + } _, err = io.ReadFull(r, b) if err != nil { @@ -88,7 +93,7 @@ func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProv } }() - ti, err = parseTableIndex(b, q) + ti, err = parseTableIndex(ctx, b, q) if err != nil { return } @@ -99,7 +104,7 @@ func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProv return nil, err } - if chunkCount != index.chunkCount { + if chunkCount != index.chunkCount() { return nil, errors.New("unexpected chunk count") } diff --git a/go/store/nbs/file_table_reader_test.go b/go/store/nbs/file_table_reader_test.go index be336657d5..fbee9ae774 100644 --- a/go/store/nbs/file_table_reader_test.go +++ b/go/store/nbs/file_table_reader_test.go @@ -22,6 +22,7 @@ package nbs import ( + "context" "os" "path/filepath" "testing" @@ -33,6 +34,7 @@ import ( ) func TestMmapTableReader(t *testing.T) { + ctx := context.Background() assert := assert.New(t) dir, err := os.MkdirTemp("", "") require.NoError(t, err) @@ -52,7 +54,7 @@ func TestMmapTableReader(t *testing.T) { err = os.WriteFile(filepath.Join(dir, h.String()), tableData, 0666) require.NoError(t, err) - trc, err := newFileTableReader(dir, h, uint32(len(chunks)), &noopQuotaProvider{}, fc) + trc, err := newFileTableReader(ctx, dir, h, uint32(len(chunks)), &UnlimitedQuotaProvider{}, fc) require.NoError(t, err) assertChunksInReader(chunks, trc, assert) } diff --git a/go/store/nbs/mem_table.go b/go/store/nbs/mem_table.go index 36c309582a..6c876f36c2 100644 --- a/go/store/nbs/mem_table.go +++ b/go/store/nbs/mem_table.go @@ -181,6 +181,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by return addr{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks") } maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData) + // todo: memory quota buff := make([]byte, maxSize) tw := newTableWriter(buff, mt.snapper) diff --git a/go/store/nbs/mem_table_test.go b/go/store/nbs/mem_table_test.go index 87153fdc59..b9d4661138 100644 --- a/go/store/nbs/mem_table_test.go +++ b/go/store/nbs/mem_table_test.go @@ -150,7 +150,7 @@ func TestMemTableWrite(t *testing.T) { td1, _, err := buildTable(chunks[1:2]) require.NoError(t, err) - ti1, err := parseTableIndexByCopy(td1, &noopQuotaProvider{}) + ti1, err := parseTableIndexByCopy(nil, td1, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr1, err := newTableReader(ti1, tableReaderAtFromBytes(td1), fileBlockSize) require.NoError(t, err) @@ -158,7 +158,7 @@ func TestMemTableWrite(t *testing.T) { td2, _, err := buildTable(chunks[2:]) require.NoError(t, err) - ti2, err := parseTableIndexByCopy(td2, &noopQuotaProvider{}) + ti2, err := parseTableIndexByCopy(nil, td2, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr2, err := newTableReader(ti2, tableReaderAtFromBytes(td2), fileBlockSize) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestMemTableWrite(t *testing.T) { require.NoError(t, err) assert.Equal(uint32(1), count) - ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, data, &UnlimitedQuotaProvider{}) require.NoError(t, err) outReader, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/persisting_chunk_source.go b/go/store/nbs/persisting_chunk_source.go deleted file mode 100644 index da78f21ef8..0000000000 --- a/go/store/nbs/persisting_chunk_source.go +++ /dev/null @@ -1,299 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// This file incorporates work covered by the following copyright and -// permission notice: -// -// Copyright 2016 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package nbs - -import ( - "bytes" - "context" - "errors" - "io" - "sync" - "time" - - "golang.org/x/sync/errgroup" - - "github.com/dolthub/dolt/go/store/atomicerr" - "github.com/dolthub/dolt/go/store/chunks" -) - -var ErrNoReader = errors.New("could not get reader") -var ErrNoChunkSource = errors.New("no chunk source") - -func newPersistingChunkSource(ctx context.Context, mt *memTable, haver chunkReader, p tablePersister, rl chan struct{}, stats *Stats) *persistingChunkSource { - t1 := time.Now() - - ccs := &persistingChunkSource{ae: atomicerr.New(), mt: mt} - ccs.wg.Add(1) - rl <- struct{}{} - go func() { - defer ccs.wg.Done() - defer func() { - <-rl - }() - - cs, err := p.Persist(ctx, mt, haver, stats) - - if err != nil { - ccs.ae.SetIfError(err) - return - } - - ccs.mu.Lock() - defer ccs.mu.Unlock() - ccs.cs = cs - ccs.mt = nil - - cnt, err := cs.count() - - if err != nil { - ccs.ae.SetIfError(err) - return - } - - if cnt > 0 { - stats.PersistLatency.SampleTimeSince(t1) - } - }() - - return ccs -} - -type persistingChunkSource struct { - ae *atomicerr.AtomicError - mu sync.RWMutex - mt *memTable - - wg sync.WaitGroup - cs chunkSource -} - -func (ccs *persistingChunkSource) getReader() chunkReader { - ccs.mu.RLock() - defer ccs.mu.RUnlock() - if ccs.mt != nil { - return ccs.mt - } - return ccs.cs -} - -func (ccs *persistingChunkSource) close() error { - // persistingChunkSource does not own |cs| or |mt|. No need to close them. - return nil -} - -func (ccs *persistingChunkSource) clone() (chunkSource, error) { - // persistingChunkSource does not own |cs| or |mt|. No need to Clone. - return ccs, nil -} - -func (ccs *persistingChunkSource) has(h addr) (bool, error) { - cr := ccs.getReader() - - if cr == nil { - return false, ErrNoReader - } - - return cr.has(h) -} - -func (ccs *persistingChunkSource) hasMany(addrs []hasRecord) (bool, error) { - cr := ccs.getReader() - - if cr == nil { - return false, ErrNoReader - } - return cr.hasMany(addrs) -} - -func (ccs *persistingChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { - cr := ccs.getReader() - - if cr == nil { - return nil, ErrNoReader - } - - return cr.get(ctx, h, stats) -} - -func (ccs *persistingChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { - cr := ccs.getReader() - if cr == nil { - return false, ErrNoReader - } - return cr.getMany(ctx, eg, reqs, found, stats) -} - -func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { - cr := ccs.getReader() - if cr == nil { - return false, ErrNoReader - } - - return cr.getManyCompressed(ctx, eg, reqs, found, stats) -} - -func (ccs *persistingChunkSource) wait() error { - ccs.wg.Wait() - return ccs.ae.Get() -} - -func (ccs *persistingChunkSource) count() (uint32, error) { - err := ccs.wait() - - if err != nil { - return 0, err - } - - if ccs.cs == nil { - return 0, ErrNoChunkSource - } - - return ccs.cs.count() -} - -func (ccs *persistingChunkSource) uncompressedLen() (uint64, error) { - err := ccs.wait() - - if err != nil { - return 0, err - } - - if ccs.cs == nil { - return 0, ErrNoChunkSource - } - - return ccs.cs.uncompressedLen() -} - -func (ccs *persistingChunkSource) hash() (addr, error) { - err := ccs.wait() - - if err != nil { - return addr{}, err - } - - if ccs.cs == nil { - return addr{}, ErrNoChunkSource - } - - return ccs.cs.hash() -} - -func (ccs *persistingChunkSource) index() (tableIndex, error) { - err := ccs.wait() - - if err != nil { - return onHeapTableIndex{}, err - } - - if ccs.cs == nil { - return onHeapTableIndex{}, ErrNoChunkSource - } - - return ccs.cs.index() -} - -func (ccs *persistingChunkSource) reader(ctx context.Context) (io.Reader, error) { - err := ccs.wait() - - if err != nil { - return nil, err - } - - if ccs.cs == nil { - return nil, ErrNoChunkSource - } - - return ccs.cs.reader(ctx) -} - -func (ccs *persistingChunkSource) size() (uint64, error) { - err := ccs.wait() - - if err != nil { - return 0, err - } - - if ccs.cs == nil { - return 0, ErrNoChunkSource - } - - return ccs.cs.size() -} - -type emptyChunkSource struct{} - -func (ecs emptyChunkSource) has(h addr) (bool, error) { - return false, nil -} - -func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) { - return true, nil -} - -func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { - return nil, nil -} - -func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) { - return true, nil -} - -func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { - return true, nil -} - -func (ecs emptyChunkSource) count() (uint32, error) { - return 0, nil -} - -func (ecs emptyChunkSource) uncompressedLen() (uint64, error) { - return 0, nil -} - -func (ecs emptyChunkSource) hash() (addr, error) { - return addr{}, nil -} - -func (ecs emptyChunkSource) index() (tableIndex, error) { - return onHeapTableIndex{}, nil -} - -func (ecs emptyChunkSource) reader(context.Context) (io.Reader, error) { - return &bytes.Buffer{}, nil -} - -func (ecs emptyChunkSource) size() (uint64, error) { - return 0, nil -} - -func (ecs emptyChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error) { - return 0, true, nil -} - -func (ecs emptyChunkSource) close() error { - return nil -} - -func (ecs emptyChunkSource) clone() (chunkSource, error) { - return ecs, nil -} diff --git a/go/store/nbs/persisting_chunk_source_test.go b/go/store/nbs/persisting_chunk_source_test.go deleted file mode 100644 index 726d771dd2..0000000000 --- a/go/store/nbs/persisting_chunk_source_test.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2019 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// This file incorporates work covered by the following copyright and -// permission notice: -// -// Copyright 2016 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package nbs - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestPersistingChunkStoreEmpty(t *testing.T) { - mt := newMemTable(testMemTableSize) - ccs := newPersistingChunkSource(context.Background(), mt, nil, newFakeTablePersister(&noopQuotaProvider{}), make(chan struct{}, 1), &Stats{}) - - h, err := ccs.hash() - require.NoError(t, err) - assert.Equal(t, addr{}, h) - assert.Zero(t, mustUint32(ccs.count())) -} - -type pausingFakeTablePersister struct { - tablePersister - trigger <-chan struct{} -} - -func (ftp pausingFakeTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { - <-ftp.trigger - return ftp.tablePersister.Persist(context.Background(), mt, haver, stats) -} - -func TestPersistingChunkStore(t *testing.T) { - assert := assert.New(t) - mt := newMemTable(testMemTableSize) - - for _, c := range testChunks { - assert.True(mt.addChunk(computeAddr(c), c)) - } - - trigger := make(chan struct{}) - ccs := newPersistingChunkSource(context.Background(), mt, nil, pausingFakeTablePersister{newFakeTablePersister(&noopQuotaProvider{}), trigger}, make(chan struct{}, 1), &Stats{}) - - assertChunksInReader(testChunks, ccs, assert) - assert.EqualValues(mustUint32(mt.count()), mustUint32(ccs.getReader().count())) - close(trigger) - - h, err := ccs.hash() - assert.NoError(err) - assert.NotEqual(addr{}, h) - assert.EqualValues(len(testChunks), mustUint32(ccs.count())) - assertChunksInReader(testChunks, ccs, assert) - - assert.Nil(ccs.mt) - - newChunk := []byte("additional") - mt.addChunk(computeAddr(newChunk), newChunk) - assert.NotEqual(mustUint32(mt.count()), mustUint32(ccs.count())) - assert.False(ccs.has(computeAddr(newChunk))) -} diff --git a/go/store/nbs/quota.go b/go/store/nbs/quota.go index 59edb60ac2..40582d1e28 100644 --- a/go/store/nbs/quota.go +++ b/go/store/nbs/quota.go @@ -20,8 +20,8 @@ import ( ) type MemoryQuotaProvider interface { - AcquireQuota(ctx context.Context, memory uint64) error - ReleaseQuota(memory uint64) error + AcquireQuotaBytes(ctx context.Context, sz uint64) ([]byte, error) + ReleaseQuotaBytes(buf []byte) error Usage() uint64 } @@ -34,19 +34,17 @@ func NewUnlimitedMemQuotaProvider() *UnlimitedQuotaProvider { return &UnlimitedQuotaProvider{} } -type noopQuotaProvider struct { -} - -func (q *UnlimitedQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error { +func (q *UnlimitedQuotaProvider) AcquireQuotaBytes(ctx context.Context, sz uint64) ([]byte, error) { q.mu.Lock() defer q.mu.Unlock() - q.used += memory - return nil + q.used += sz + return make([]byte, sz), nil } -func (q *UnlimitedQuotaProvider) ReleaseQuota(memory uint64) error { +func (q *UnlimitedQuotaProvider) ReleaseQuotaBytes(buf []byte) error { q.mu.Lock() defer q.mu.Unlock() + memory := uint64(len(buf)) if memory > q.used { panic("tried to release too much quota") } @@ -59,15 +57,3 @@ func (q *UnlimitedQuotaProvider) Usage() uint64 { defer q.mu.Unlock() return q.used } - -func (q *noopQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error { - return nil -} - -func (q *noopQuotaProvider) ReleaseQuota(memory uint64) error { - return nil -} - -func (q *noopQuotaProvider) Usage() uint64 { - return 0 -} diff --git a/go/store/nbs/root_tracker_test.go b/go/store/nbs/root_tracker_test.go index 7f6debe54d..2c215bafdf 100644 --- a/go/store/nbs/root_tracker_test.go +++ b/go/store/nbs/root_tracker_test.go @@ -67,11 +67,9 @@ func TestChunkStoreVersion(t *testing.T) { func TestChunkStoreRebase(t *testing.T) { assert := assert.New(t) fm, p, q, store := makeStoreWithFakes(t) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() h, err := store.Root(context.Background()) @@ -103,11 +101,9 @@ func TestChunkStoreRebase(t *testing.T) { func TestChunkStoreCommit(t *testing.T) { assert := assert.New(t) _, _, q, store := makeStoreWithFakes(t) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() h, err := store.Root(context.Background()) @@ -151,11 +147,9 @@ func TestChunkStoreCommit(t *testing.T) { func TestChunkStoreManifestAppearsAfterConstruction(t *testing.T) { assert := assert.New(t) fm, p, q, store := makeStoreWithFakes(t) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() h, err := store.Root(context.Background()) @@ -203,11 +197,9 @@ func TestChunkStoreManifestFirstWriteByOtherProcess(t *testing.T) { func TestChunkStoreCommitOptimisticLockFail(t *testing.T) { assert := assert.New(t) fm, p, q, store := makeStoreWithFakes(t) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() // Simulate another process writing a manifest behind store's back. @@ -229,9 +221,6 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) { fm := &fakeManifest{} mm := manifestManager{fm, newManifestCache(defaultManifestCacheSize), newManifestLocks()} q := NewUnlimitedMemQuotaProvider() - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() p := newFakeTablePersister(q) c := inlineConjoiner{defaultMaxTables} @@ -240,6 +229,7 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() // Simulate another goroutine writing a manifest behind store's back. @@ -281,9 +271,6 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) { upm := &updatePreemptManifest{manifest: fm} mm := manifestManager{upm, newManifestCache(defaultManifestCacheSize), newManifestLocks()} q := NewUnlimitedMemQuotaProvider() - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() p := newFakeTablePersister(q) c := inlineConjoiner{defaultMaxTables} @@ -291,6 +278,7 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() // store.Commit() should lock out calls to mm.Fetch() @@ -328,9 +316,6 @@ func TestChunkStoreSerializeCommits(t *testing.T) { mc := newManifestCache(defaultManifestCacheSize) l := newManifestLocks() q := NewUnlimitedMemQuotaProvider() - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() p := newFakeTablePersister(q) c := inlineConjoiner{defaultMaxTables} @@ -339,6 +324,7 @@ func TestChunkStoreSerializeCommits(t *testing.T) { require.NoError(t, err) defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() storeChunk := chunks.NewChunk([]byte("store")) @@ -521,7 +507,7 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c if chunkCount > 0 { ftp.mu.Lock() defer ftp.mu.Unlock() - ti, err := parseTableIndexByCopy(data, ftp.q) + ti, err := parseTableIndexByCopy(ctx, data, ftp.q) if err != nil { return nil, err @@ -548,7 +534,7 @@ func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSourc if chunkCount > 0 { ftp.mu.Lock() defer ftp.mu.Unlock() - ti, err := parseTableIndexByCopy(data, ftp.q) + ti, err := parseTableIndexByCopy(ctx, data, ftp.q) if err != nil { return nil, err @@ -635,8 +621,8 @@ func extractAllChunks(ctx context.Context, src chunkSource, cb func(rec extractR } var a addr - for i := uint32(0); i < index.ChunkCount(); i++ { - _, err = index.IndexEntry(i, &a) + for i := uint32(0); i < index.chunkCount(); i++ { + _, err = index.indexEntry(i, &a) if err != nil { return err } diff --git a/go/store/nbs/s3_fake_test.go b/go/store/nbs/s3_fake_test.go index ade5cfcc83..a8d8ac3cf2 100644 --- a/go/store/nbs/s3_fake_test.go +++ b/go/store/nbs/s3_fake_test.go @@ -76,7 +76,7 @@ func (m *fakeS3) readerForTable(name addr) (chunkReader, error) { m.mu.Lock() defer m.mu.Unlock() if buff, present := m.data[name.String()]; present { - ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{}) if err != nil { return nil, err @@ -98,7 +98,7 @@ func (m *fakeS3) readerForTableWithNamespace(ns string, name addr) (chunkReader, key = ns + "/" + key } if buff, present := m.data[key]; present { - ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{}) if err != nil { return nil, err diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 7fccfdfba7..75065f6932 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -136,7 +136,7 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash var foundHashes []hash.Hash for h := range hashes { a := addr(h) - e, ok, err := ti.Lookup(&a) + e, ok, err := ti.lookup(&a) if err != nil { return nil, err } @@ -190,20 +190,6 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash return err } ranges[hash.Hash(tr.h)] = found - case *persistingChunkSource: - tableIndex, err := tr.index() - if err != nil { - return err - } - h, err := tr.hash() - if err != nil { - return err - } - found, err := forIndex(tableIndex, ranges[hash.Hash(h)]) - if err != nil { - return err - } - ranges[hash.Hash(h)] = found default: panic(reflect.TypeOf(cs)) } @@ -1298,7 +1284,7 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) { if err != nil { return uint64(0), fmt.Errorf("error getting table file index for chunkSource. %w", err) } - size += ti.TableFileSize() + size += ti.tableFileSize() } return size, nil } diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 27e982701c..e8e7fd9b11 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -89,11 +89,9 @@ func TestNBSAsTableFileStore(t *testing.T) { numTableFiles := 128 assert.Greater(t, defaultMaxTables, numTableFiles) st, _, q := makeTestLocalStore(t, defaultMaxTables) - defer func() { - require.Equal(t, uint64(0), q.Usage()) - }() defer func() { require.NoError(t, st.Close()) + require.Equal(t, uint64(0), q.Usage()) }() fileToData := populateLocalStore(t, st, numTableFiles) @@ -391,11 +389,9 @@ func TestNBSUpdateManifestWithAppendixOptions(t *testing.T) { ctx := context.Background() _, p, q, store, _, _ := prepStore(ctx, t, assert) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() // persist tablefiles to tablePersister @@ -462,11 +458,9 @@ func TestNBSUpdateManifestWithAppendix(t *testing.T) { ctx := context.Background() fm, p, q, store, stats, _ := prepStore(ctx, t, assert) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() _, upstream, err := fm.ParseIfExists(ctx, stats, nil) @@ -490,11 +484,9 @@ func TestNBSUpdateManifestRetainsAppendix(t *testing.T) { ctx := context.Background() fm, p, q, store, stats, _ := prepStore(ctx, t, assert) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() _, upstream, err := fm.ParseIfExists(ctx, stats, nil) @@ -542,11 +534,9 @@ func TestNBSCommitRetainsAppendix(t *testing.T) { ctx := context.Background() fm, p, q, store, stats, rootChunk := prepStore(ctx, t, assert) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() _, upstream, err := fm.ParseIfExists(ctx, stats, nil) @@ -599,11 +589,9 @@ func TestNBSOverwriteManifest(t *testing.T) { ctx := context.Background() fm, p, q, store, stats, _ := prepStore(ctx, t, assert) - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() defer func() { require.NoError(t, store.Close()) + require.EqualValues(t, 0, q.Usage()) }() // Generate a random root hash diff --git a/go/store/nbs/table_index.go b/go/store/nbs/table_index.go index 6543269fd3..0cc5b89607 100644 --- a/go/store/nbs/table_index.go +++ b/go/store/nbs/table_index.go @@ -16,6 +16,7 @@ package nbs import ( "bytes" + "context" "encoding/binary" "errors" "io" @@ -31,41 +32,46 @@ var ( ) type tableIndex interface { - // ChunkCount returns the total number of chunks in the indexed file. - ChunkCount() uint32 - // EntrySuffixMatches returns true if the entry at index |idx| matches - // the suffix of the address |h|. Used by |Lookup| after finding + // entrySuffixMatches returns true if the entry at index |idx| matches + // the suffix of the address |h|. Used by |lookup| after finding // matching indexes based on |Prefixes|. - EntrySuffixMatches(idx uint32, h *addr) (bool, error) - // IndexEntry returns the |indexEntry| at |idx|. Optionally puts the + entrySuffixMatches(idx uint32, h *addr) (bool, error) + + // indexEntry returns the |indexEntry| at |idx|. Optionally puts the // full address of that entry in |a| if |a| is not |nil|. - IndexEntry(idx uint32, a *addr) (indexEntry, error) - // Lookup returns an |indexEntry| for the chunk corresponding to the + indexEntry(idx uint32, a *addr) (indexEntry, error) + + // lookup returns an |indexEntry| for the chunk corresponding to the // provided address |h|. Second returns is |true| if an entry exists // and |false| otherwise. - Lookup(h *addr) (indexEntry, bool, error) + lookup(h *addr) (indexEntry, bool, error) + // Ordinals returns a slice of indexes which maps the |i|th chunk in // the indexed file to its corresponding entry in index. The |i|th // entry in the result is the |i|th chunk in the indexed file, and its // corresponding value in the slice is the index entry that maps to it. - Ordinals() ([]uint32, error) + ordinals() ([]uint32, error) + // Prefixes returns the sorted slice of |uint64| |addr| prefixes; each // entry corresponds to an indexed chunk address. - Prefixes() ([]uint64, error) - // PrefixAt returns the prefix at the specified index - PrefixAt(idx uint32) uint64 - // TableFileSize returns the total size of the indexed table file, in bytes. - TableFileSize() uint64 - // TotalUncompressedData returns the total uncompressed data size of + prefixes() ([]uint64, error) + + // chunkCount returns the total number of chunks in the indexed file. + chunkCount() uint32 + + // tableFileSize returns the total size of the indexed table file, in bytes. + tableFileSize() uint64 + + // totalUncompressedData returns the total uncompressed data size of // the table file. Used for informational statistics only. - TotalUncompressedData() uint64 + totalUncompressedData() uint64 // Close releases any resources used by this tableIndex. Close() error - // Clone returns a |tableIndex| with the same contents which can be + // clone returns a |tableIndex| with the same contents which can be // |Close|d independently. - Clone() (tableIndex, error) + clone() (tableIndex, error) } func ReadTableFooter(rd io.ReadSeeker) (chunkCount uint32, totalUncompressedData uint64, err error) { @@ -103,7 +109,7 @@ func indexMemSize(chunkCount uint32) uint64 { // parses a valid nbs tableIndex from a byte stream. |buff| must end with an NBS index // and footer and its length must match the expected indexSize for the chunkCount specified in the footer. // Retains the buffer and does not allocate new memory except for offsets, computes on buff in place. -func parseTableIndex(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) { +func parseTableIndex(ctx context.Context, buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) { chunkCount, totalUncompressedData, err := ReadTableFooter(bytes.NewReader(buff)) if err != nil { return onHeapTableIndex{}, err @@ -116,8 +122,10 @@ func parseTableIndex(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, erro chunks2 := chunkCount / 2 chunks1 := chunkCount - chunks2 - offsetsBuff1 := make([]byte, chunks1*offsetSize) - + offsetsBuff1, err := q.AcquireQuotaBytes(ctx, uint64(chunks1*offsetSize)) + if err != nil { + return onHeapTableIndex{}, err + } return newOnHeapTableIndex(buff, offsetsBuff1, chunkCount, totalUncompressedData, q) } @@ -148,49 +156,62 @@ func removeFooter(p []byte, chunkCount uint32) (out []byte, err error) { // parseTableIndexByCopy reads the footer, copies indexSize(chunkCount) bytes, and parses an on heap table index. // Useful to create an onHeapTableIndex without retaining the entire underlying array of data. -func parseTableIndexByCopy(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) { - r := bytes.NewReader(buff) - return readTableIndexByCopy(r, q) +func parseTableIndexByCopy(ctx context.Context, buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) { + return readTableIndexByCopy(ctx, bytes.NewReader(buff), q) } // readTableIndexByCopy loads an index into memory from an io.ReadSeeker // Caution: Allocates new memory for entire index -func readTableIndexByCopy(rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) { +func readTableIndexByCopy(ctx context.Context, rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) { chunkCount, totalUncompressedData, err := ReadTableFooter(rd) if err != nil { return onHeapTableIndex{}, err } - iS := int64(indexSize(chunkCount)) - _, err = rd.Seek(-(iS + footerSize), io.SeekEnd) + idxSz := int64(indexSize(chunkCount)) + _, err = rd.Seek(-(idxSz + footerSize), io.SeekEnd) if err != nil { return onHeapTableIndex{}, err } - buff := make([]byte, iS) + + buff, err := q.AcquireQuotaBytes(ctx, uint64(idxSz)) + if err != nil { + return onHeapTableIndex{}, err + } + _, err = io.ReadFull(rd, buff) if err != nil { return onHeapTableIndex{}, err } - chunks2 := chunkCount / 2 - chunks1 := chunkCount - chunks2 - offsets1Buff := make([]byte, chunks1*offsetSize) + chunks1 := chunkCount - (chunkCount / 2) + offsets1Buff, err := q.AcquireQuotaBytes(ctx, uint64(chunks1*offsetSize)) + if err != nil { + return onHeapTableIndex{}, err + } return newOnHeapTableIndex(buff, offsets1Buff, chunkCount, totalUncompressedData, q) } type onHeapTableIndex struct { - q MemoryQuotaProvider - refCnt *int32 - tableFileSize uint64 - // Tuple bytes - tupleB []byte - // Offset bytes - offsetB1 []byte - offsetB2 []byte - // Suffix bytes - suffixB []byte - chunkCount uint32 - totalUncompressedData uint64 + // prefixTuples is a packed array of 12 byte tuples: + // (8 byte addr prefix, 4 byte uint32 ordinal) + // it is sorted by addr prefix, the ordinal value + // can be used to lookup offset and addr suffix + prefixTuples []byte + + // the offsets arrays contains packed uint64s + offsets1 []byte + offsets2 []byte + + // suffixes is a array of 12 byte addr suffixes + suffixes []byte + + q MemoryQuotaProvider + refCnt *int32 + + count uint32 + tableFileSz uint64 + uncompressedSz uint64 } var _ tableIndex = &onHeapTableIndex{} @@ -202,15 +223,14 @@ var _ tableIndex = &onHeapTableIndex{} // additional space) and the rest into the region of |indexBuff| previously // occupied by lengths. |onHeapTableIndex| computes directly on the given // |indexBuff| and |offsetsBuff1| buffers. -func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, chunkCount uint32, totalUncompressedData uint64, q MemoryQuotaProvider) (onHeapTableIndex, error) { - tuples := indexBuff[:prefixTupleSize*chunkCount] - lengths := indexBuff[prefixTupleSize*chunkCount : prefixTupleSize*chunkCount+lengthSize*chunkCount] - suffixes := indexBuff[prefixTupleSize*chunkCount+lengthSize*chunkCount:] +func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, count uint32, totalUncompressedData uint64, q MemoryQuotaProvider) (onHeapTableIndex, error) { + tuples := indexBuff[:prefixTupleSize*count] + lengths := indexBuff[prefixTupleSize*count : prefixTupleSize*count+lengthSize*count] + suffixes := indexBuff[prefixTupleSize*count+lengthSize*count:] - chunks2 := chunkCount / 2 + chunks2 := count / 2 - lR := bytes.NewReader(lengths) - r := NewOffsetsReader(lR) + r := NewOffsetsReader(bytes.NewReader(lengths)) _, err := io.ReadFull(r, offsetsBuff1) if err != nil { return onHeapTableIndex{}, err @@ -229,40 +249,32 @@ func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, chunkCount uint3 *refCnt = 1 return onHeapTableIndex{ - refCnt: refCnt, - q: q, - tupleB: tuples, - offsetB1: offsetsBuff1, - offsetB2: offsetsBuff2, - suffixB: suffixes, - chunkCount: chunkCount, - totalUncompressedData: totalUncompressedData, + refCnt: refCnt, + q: q, + prefixTuples: tuples, + offsets1: offsetsBuff1, + offsets2: offsetsBuff2, + suffixes: suffixes, + count: count, + uncompressedSz: totalUncompressedData, }, nil } -func (ti onHeapTableIndex) ChunkCount() uint32 { - return ti.chunkCount -} - -func (ti onHeapTableIndex) PrefixAt(idx uint32) uint64 { - return ti.prefixAt(idx) -} - -func (ti onHeapTableIndex) EntrySuffixMatches(idx uint32, h *addr) (bool, error) { +func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *addr) (bool, error) { ord := ti.ordinalAt(idx) o := ord * addrSuffixSize - b := ti.suffixB[o : o+addrSuffixSize] + b := ti.suffixes[o : o+addrSuffixSize] return bytes.Equal(h[addrPrefixSize:], b), nil } -func (ti onHeapTableIndex) IndexEntry(idx uint32, a *addr) (entry indexEntry, err error) { +func (ti onHeapTableIndex) indexEntry(idx uint32, a *addr) (entry indexEntry, err error) { prefix, ord := ti.tupleAt(idx) if a != nil { binary.BigEndian.PutUint64(a[:], prefix) o := int64(addrSuffixSize * ord) - b := ti.suffixB[o : o+addrSuffixSize] + b := ti.suffixes[o : o+addrSuffixSize] copy(a[addrPrefixSize:], b) } @@ -284,33 +296,33 @@ func (ti onHeapTableIndex) getIndexEntry(ord uint32) indexEntry { } } -func (ti onHeapTableIndex) Lookup(h *addr) (indexEntry, bool, error) { +func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) { ord, err := ti.lookupOrdinal(h) if err != nil { return indexResult{}, false, err } - if ord == ti.chunkCount { + if ord == ti.count { return indexResult{}, false, nil } return ti.getIndexEntry(ord), true, nil } -// lookupOrdinal returns the ordinal of |h| if present. Returns |ti.chunkCount| +// lookupOrdinal returns the ordinal of |h| if present. Returns |ti.count| // if absent. func (ti onHeapTableIndex) lookupOrdinal(h *addr) (uint32, error) { prefix := h.Prefix() - for idx := ti.findPrefix(prefix); idx < ti.chunkCount && ti.prefixAt(idx) == prefix; idx++ { - m, err := ti.EntrySuffixMatches(idx, h) + for idx := ti.findPrefix(prefix); idx < ti.count && ti.prefixAt(idx) == prefix; idx++ { + m, err := ti.entrySuffixMatches(idx, h) if err != nil { - return ti.chunkCount, err + return ti.count, err } if m { return ti.ordinalAt(idx), nil } } - return ti.chunkCount, nil + return ti.count, nil } // findPrefix returns the first position in |tr.prefixes| whose value == |prefix|. @@ -320,12 +332,12 @@ func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) { binary.BigEndian.PutUint64(query, prefix) // NOTE: The golang impl of sort.Search is basically inlined here. This method can be called in // an extremely tight loop and inlining the code was a significant perf improvement. - idx, j := 0, ti.chunkCount + idx, j := 0, ti.count for idx < j { h := idx + (j-idx)/2 // avoid overflow when computing h // i ≤ h < j o := int64(prefixTupleSize * h) - if bytes.Compare(ti.tupleB[o:o+addrPrefixSize], query) < 0 { + if bytes.Compare(ti.prefixTuples[o:o+addrPrefixSize], query) < 0 { idx = h + 1 // preserves f(i-1) == false } else { j = h // preserves f(j) == true @@ -336,7 +348,7 @@ func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) { func (ti onHeapTableIndex) tupleAt(idx uint32) (prefix uint64, ord uint32) { off := int64(prefixTupleSize * idx) - b := ti.tupleB[off : off+prefixTupleSize] + b := ti.prefixTuples[off : off+prefixTupleSize] prefix = binary.BigEndian.Uint64(b[:]) ord = binary.BigEndian.Uint32(b[addrPrefixSize:]) @@ -345,44 +357,45 @@ func (ti onHeapTableIndex) tupleAt(idx uint32) (prefix uint64, ord uint32) { func (ti onHeapTableIndex) prefixAt(idx uint32) uint64 { off := int64(prefixTupleSize * idx) - b := ti.tupleB[off : off+addrPrefixSize] + b := ti.prefixTuples[off : off+addrPrefixSize] return binary.BigEndian.Uint64(b) } func (ti onHeapTableIndex) ordinalAt(idx uint32) uint32 { off := int64(prefixTupleSize*idx) + addrPrefixSize - b := ti.tupleB[off : off+ordinalSize] + b := ti.prefixTuples[off : off+ordinalSize] return binary.BigEndian.Uint32(b) } // the first n - n/2 offsets are stored in offsetsB1 and the rest in offsetsB2 func (ti onHeapTableIndex) offsetAt(ord uint32) uint64 { - chunks1 := ti.chunkCount - ti.chunkCount/2 + chunks1 := ti.count - ti.count/2 var b []byte if ord < chunks1 { off := int64(offsetSize * ord) - b = ti.offsetB1[off : off+offsetSize] + b = ti.offsets1[off : off+offsetSize] } else { off := int64(offsetSize * (ord - chunks1)) - b = ti.offsetB2[off : off+offsetSize] + b = ti.offsets2[off : off+offsetSize] } - return binary.BigEndian.Uint64(b) } -func (ti onHeapTableIndex) Ordinals() ([]uint32, error) { - o := make([]uint32, ti.chunkCount) - for i, off := uint32(0), 0; i < ti.chunkCount; i, off = i+1, off+prefixTupleSize { - b := ti.tupleB[off+addrPrefixSize : off+prefixTupleSize] +func (ti onHeapTableIndex) ordinals() ([]uint32, error) { + // todo: |o| is not accounted for in the memory quota + o := make([]uint32, ti.count) + for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize { + b := ti.prefixTuples[off+addrPrefixSize : off+prefixTupleSize] o[i] = binary.BigEndian.Uint32(b) } return o, nil } -func (ti onHeapTableIndex) Prefixes() ([]uint64, error) { - p := make([]uint64, ti.chunkCount) - for i, off := uint32(0), 0; i < ti.chunkCount; i, off = i+1, off+prefixTupleSize { - b := ti.tupleB[off : off+addrPrefixSize] +func (ti onHeapTableIndex) prefixes() ([]uint64, error) { + // todo: |p| is not accounted for in the memory quota + p := make([]uint64, ti.count) + for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize { + b := ti.prefixTuples[off : off+addrPrefixSize] p[i] = binary.BigEndian.Uint64(b) } return p, nil @@ -391,12 +404,12 @@ func (ti onHeapTableIndex) Prefixes() ([]uint64, error) { func (ti onHeapTableIndex) hashAt(idx uint32) hash.Hash { // Get tuple off := int64(prefixTupleSize * idx) - tuple := ti.tupleB[off : off+prefixTupleSize] + tuple := ti.prefixTuples[off : off+prefixTupleSize] // Get prefix, ordinal, and suffix prefix := tuple[:addrPrefixSize] ord := binary.BigEndian.Uint32(tuple[addrPrefixSize:]) * addrSuffixSize - suffix := ti.suffixB[ord : ord+addrSuffixSize] // suffix is 12 bytes + suffix := ti.suffixes[ord : ord+addrSuffixSize] // suffix is 12 bytes // Combine prefix and suffix to get hash buf := [hash.ByteLen]byte{} @@ -409,7 +422,7 @@ func (ti onHeapTableIndex) hashAt(idx uint32) hash.Hash { // prefixIdxLBound returns the first position in |tr.prefixes| whose value is <= |prefix|. // will return index less than where prefix would be if prefix is not found. func (ti onHeapTableIndex) prefixIdxLBound(prefix uint64) uint32 { - l, r := uint32(0), ti.chunkCount + l, r := uint32(0), ti.count for l < r { m := l + (r-l)/2 // find middle, rounding down if ti.prefixAt(m) < prefix { @@ -425,10 +438,10 @@ func (ti onHeapTableIndex) prefixIdxLBound(prefix uint64) uint32 { // prefixIdxLBound returns the first position in |tr.prefixes| whose value is >= |prefix|. // will return index greater than where prefix would be if prefix is not found. func (ti onHeapTableIndex) prefixIdxUBound(prefix uint64) (idx uint32) { - l, r := uint32(0), ti.chunkCount + l, r := uint32(0), ti.count for l < r { - m := l + (r-l+1)/2 // find middle, rounding up - if m >= ti.chunkCount { // prevent index out of bounds + m := l + (r-l+1)/2 // find middle, rounding up + if m >= ti.count { // prevent index out of bounds return r } pre := ti.prefixAt(m) @@ -459,6 +472,56 @@ func (ti onHeapTableIndex) padStringAndDecode(s string, p string) uint64 { return binary.BigEndian.Uint64(h) } +func (ti onHeapTableIndex) chunkCount() uint32 { + return ti.count +} + +// tableFileSize returns the size of the table file that this index references. +// This assumes that the index follows immediately after the last chunk in the +// file and that the last chunk in the file is in the index. +func (ti onHeapTableIndex) tableFileSize() (sz uint64) { + sz = footerSize + if ti.count > 0 { + last := ti.getIndexEntry(ti.count - 1) + sz += last.Offset() + sz += uint64(last.Length()) + sz += indexSize(ti.count) + } + return +} + +func (ti onHeapTableIndex) totalUncompressedData() uint64 { + return ti.uncompressedSz +} + +func (ti onHeapTableIndex) Close() error { + cnt := atomic.AddInt32(ti.refCnt, -1) + if cnt == 0 { + if err := ti.q.ReleaseQuotaBytes(ti.prefixTuples); err != nil { + return err + } + if err := ti.q.ReleaseQuotaBytes(ti.offsets1); err != nil { + return err + } + if err := ti.q.ReleaseQuotaBytes(ti.offsets2); err != nil { + return err + } + return ti.q.ReleaseQuotaBytes(ti.suffixes) + } + if cnt < 0 { + panic("Close() called and reduced ref count to < 0.") + } + return nil +} + +func (ti onHeapTableIndex) clone() (tableIndex, error) { + cnt := atomic.AddInt32(ti.refCnt, 1) + if cnt == 1 { + panic("Clone() called after last Close(). This index is no longer valid.") + } + return ti, nil +} + func (ti onHeapTableIndex) ResolveShortHash(short []byte) ([]string, error) { // Convert to string shortHash := string(short) @@ -476,7 +539,7 @@ func (ti onHeapTableIndex) ResolveShortHash(short []byte) ([]string, error) { pIdxL = ti.findPrefix(sPrefix) // Prefix doesn't exist - if pIdxL == ti.chunkCount { + if pIdxL == ti.count { return []string{}, errors.New("can't find prefix") } @@ -512,44 +575,3 @@ func (ti onHeapTableIndex) ResolveShortHash(short []byte) ([]string, error) { return res, nil } - -// TableFileSize returns the size of the table file that this index references. -// This assumes that the index follows immediately after the last chunk in the -// file and that the last chunk in the file is in the index. -func (ti onHeapTableIndex) TableFileSize() uint64 { - if ti.chunkCount == 0 { - return footerSize - } - entry := ti.getIndexEntry(ti.chunkCount - 1) - offset, len := entry.Offset(), uint64(entry.Length()) - return offset + len + indexSize(ti.chunkCount) + footerSize -} - -func (ti onHeapTableIndex) TotalUncompressedData() uint64 { - return ti.totalUncompressedData -} - -func (ti onHeapTableIndex) Close() error { - cnt := atomic.AddInt32(ti.refCnt, -1) - if cnt == 0 { - ti.tupleB = nil - ti.offsetB1 = nil - ti.offsetB2 = nil - ti.suffixB = nil - - return ti.q.ReleaseQuota(indexMemSize(ti.chunkCount)) - } - if cnt < 0 { - panic("Close() called and reduced ref count to < 0.") - } - - return nil -} - -func (ti onHeapTableIndex) Clone() (tableIndex, error) { - cnt := atomic.AddInt32(ti.refCnt, 1) - if cnt == 1 { - panic("Clone() called after last Close(). This index is no longer valid.") - } - return ti, nil -} diff --git a/go/store/nbs/table_index_test.go b/go/store/nbs/table_index_test.go index 8405dd90a9..cff75aebfc 100644 --- a/go/store/nbs/table_index_test.go +++ b/go/store/nbs/table_index_test.go @@ -30,18 +30,18 @@ func TestParseTableIndex(t *testing.T) { defer f.Close() bs, err := io.ReadAll(f) require.NoError(t, err) - idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{}) + idx, err := parseTableIndexByCopy(nil, bs, &UnlimitedQuotaProvider{}) require.NoError(t, err) defer idx.Close() - assert.Equal(t, uint32(596), idx.ChunkCount()) + assert.Equal(t, uint32(596), idx.chunkCount()) seen := make(map[addr]bool) - for i := uint32(0); i < idx.ChunkCount(); i++ { + for i := uint32(0); i < idx.chunkCount(); i++ { var onheapaddr addr - e, err := idx.IndexEntry(i, &onheapaddr) + e, err := idx.indexEntry(i, &onheapaddr) require.NoError(t, err) if _, ok := seen[onheapaddr]; !ok { seen[onheapaddr] = true - lookupe, ok, err := idx.Lookup(&onheapaddr) + lookupe, ok, err := idx.lookup(&onheapaddr) require.NoError(t, err) assert.True(t, ok) assert.Equal(t, e.Offset(), lookupe.Offset(), "%v does not match %v for address %v", e, lookupe, onheapaddr) @@ -56,12 +56,12 @@ func BenchmarkFindPrefix(b *testing.B) { defer f.Close() bs, err := io.ReadAll(f) require.NoError(b, err) - idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{}) + idx, err := parseTableIndexByCopy(nil, bs, &UnlimitedQuotaProvider{}) require.NoError(b, err) defer idx.Close() - assert.Equal(b, uint32(596), idx.ChunkCount()) + assert.Equal(b, uint32(596), idx.chunkCount()) - prefixes, err := idx.Prefixes() + prefixes, err := idx.prefixes() require.NoError(b, err) b.Run("benchmark prefixIdx()", func(b *testing.B) { @@ -84,7 +84,7 @@ func BenchmarkFindPrefix(b *testing.B) { func prefixIdx(ti onHeapTableIndex, prefix uint64) (idx uint32) { // NOTE: The golang impl of sort.Search is basically inlined here. This method can be called in // an extremely tight loop and inlining the code was a significant perf improvement. - idx, j := 0, ti.chunkCount + idx, j := 0, ti.chunkCount() for idx < j { h := idx + (j-idx)/2 // avoid overflow when computing h // i ≤ h < j @@ -103,7 +103,7 @@ func TestOnHeapTableIndex_ResolveShortHash(t *testing.T) { defer f.Close() bs, err := io.ReadAll(f) require.NoError(t, err) - idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{}) + idx, err := parseTableIndexByCopy(nil, bs, &UnlimitedQuotaProvider{}) require.NoError(t, err) defer idx.Close() res, err := idx.ResolveShortHash([]byte("0")) @@ -122,7 +122,7 @@ func TestResolveOneHash(t *testing.T) { // build table index td, _, err := buildTable(chunks) - tIdx, err := parseTableIndexByCopy(td, &noopQuotaProvider{}) + tIdx, err := parseTableIndexByCopy(nil, td, &UnlimitedQuotaProvider{}) require.NoError(t, err) // get hashes out @@ -153,7 +153,7 @@ func TestResolveFewHash(t *testing.T) { // build table index td, _, err := buildTable(chunks) - tIdx, err := parseTableIndexByCopy(td, &noopQuotaProvider{}) + tIdx, err := parseTableIndexByCopy(nil, td, &UnlimitedQuotaProvider{}) require.NoError(t, err) // get hashes out @@ -185,7 +185,7 @@ func TestAmbiguousShortHash(t *testing.T) { // build table index td, _, err := buildFakeChunkTable(chunks) - idx, err := parseTableIndexByCopy(td, &noopQuotaProvider{}) + idx, err := parseTableIndexByCopy(nil, td, &UnlimitedQuotaProvider{}) require.NoError(t, err) tests := []struct { diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index 01b1993d4a..c150f9c2ef 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -165,7 +165,7 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e return compactionPlan{}, err } - plan.chunkCount += index.ChunkCount() + plan.chunkCount += index.chunkCount() // Calculate the amount of chunk data in |src| chunkDataLen := calcChunkDataLen(index) @@ -192,11 +192,11 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e return compactionPlan{}, err } - ordinals, err := index.Ordinals() + ordinals, err := index.ordinals() if err != nil { return compactionPlan{}, err } - prefixes, err := index.Prefixes() + prefixes, err := index.prefixes() if err != nil { return compactionPlan{}, err } @@ -219,16 +219,16 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e if onHeap, ok := index.(onHeapTableIndex); ok { // TODO: copy the lengths and suffixes as a byte-copy from src BUG #3438 // Bring over the lengths block, in order - for ord := uint32(0); ord < onHeap.chunkCount; ord++ { + for ord := uint32(0); ord < onHeap.chunkCount(); ord++ { e := onHeap.getIndexEntry(ord) binary.BigEndian.PutUint32(plan.mergedIndex[lengthsPos:], e.Length()) lengthsPos += lengthSize } // Bring over the suffixes block, in order - n := copy(plan.mergedIndex[suffixesPos:], onHeap.suffixB) + n := copy(plan.mergedIndex[suffixesPos:], onHeap.suffixes) - if n != len(onHeap.suffixB) { + if n != len(onHeap.suffixes) { return compactionPlan{}, errors.New("failed to copy all data") } @@ -237,7 +237,7 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e // Build up the index one entry at a time. var a addr for i := 0; i < len(ordinals); i++ { - e, err := index.IndexEntry(uint32(i), &a) + e, err := index.indexEntry(uint32(i), &a) if err != nil { return compactionPlan{}, err } @@ -278,5 +278,5 @@ func nameFromSuffixes(suffixes []byte) (name addr) { } func calcChunkDataLen(index tableIndex) uint64 { - return index.TableFileSize() - indexSize(index.ChunkCount()) - footerSize + return index.tableFileSize() - indexSize(index.chunkCount()) - footerSize } diff --git a/go/store/nbs/table_persister_test.go b/go/store/nbs/table_persister_test.go index 890c357d1a..93f7819df1 100644 --- a/go/store/nbs/table_persister_test.go +++ b/go/store/nbs/table_persister_test.go @@ -22,6 +22,7 @@ package nbs import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -29,6 +30,7 @@ import ( ) func TestPlanCompaction(t *testing.T) { + ctx := context.Background() assert := assert.New(t) tableContents := [][][]byte{ {[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}, @@ -45,7 +47,7 @@ func TestPlanCompaction(t *testing.T) { } data, name, err := buildTable(content) require.NoError(t, err) - ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, data, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) require.NoError(t, err) @@ -63,11 +65,11 @@ func TestPlanCompaction(t *testing.T) { totalChunks += mustUint32(src.count()) } - idx, err := parseTableIndex(plan.mergedIndex, &noopQuotaProvider{}) + idx, err := parseTableIndex(ctx, plan.mergedIndex, &UnlimitedQuotaProvider{}) require.NoError(t, err) - assert.Equal(totalChunks, idx.chunkCount) - assert.Equal(totalUnc, idx.totalUncompressedData) + assert.Equal(totalChunks, idx.chunkCount()) + assert.Equal(totalUnc, idx.totalUncompressedData()) tr, err := newTableReader(idx, tableReaderAtFromBytes(nil), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/table_reader.go b/go/store/nbs/table_reader.go index b342686842..fe6f850927 100644 --- a/go/store/nbs/table_reader.go +++ b/go/store/nbs/table_reader.go @@ -131,29 +131,25 @@ type tableReaderAt interface { // to tolerate up to |blockSize| overhead each time we read a chunk, if it helps us group // more chunks together into a single read request to backing storage. type tableReader struct { - tableIndex - prefixes []uint64 - chunkCount uint32 - totalUncompressedData uint64 - r tableReaderAt - blockSize uint64 + prefixes []uint64 + idx tableIndex + r tableReaderAt + blockSize uint64 } // newTableReader parses a valid nbs table byte stream and returns a reader. buff must end with an NBS index // and footer, though it may contain an unspecified number of bytes before that data. r should allow // retrieving any desired range of bytes from the table. func newTableReader(index tableIndex, r tableReaderAt, blockSize uint64) (tableReader, error) { - p, err := index.Prefixes() + p, err := index.prefixes() if err != nil { return tableReader{}, err } return tableReader{ - index, - p, - index.ChunkCount(), - index.TotalUncompressedData(), - r, - blockSize, + prefixes: p, + idx: index, + r: r, + blockSize: blockSize, }, nil } @@ -162,7 +158,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) { // TODO: Use findInIndex if (tr.chunkCount - len(addrs)*Log2(tr.chunkCount)) > (tr.chunkCount - len(addrs)) filterIdx := uint32(0) - filterLen := uint32(tr.chunkCount) + filterLen := uint32(tr.idx.chunkCount()) var remaining bool for i, addr := range addrs { @@ -185,7 +181,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) { // prefixes are equal, so locate and compare against the corresponding suffix for j := filterIdx; j < filterLen && addr.prefix == tr.prefixes[j]; j++ { - m, err := tr.EntrySuffixMatches(j, addr.a) + m, err := tr.idx.entrySuffixMatches(j, addr.a) if err != nil { return false, err } @@ -204,27 +200,27 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) { } func (tr tableReader) count() (uint32, error) { - return tr.chunkCount, nil + return tr.idx.chunkCount(), nil } func (tr tableReader) uncompressedLen() (uint64, error) { - return tr.totalUncompressedData, nil + return tr.idx.totalUncompressedData(), nil } func (tr tableReader) index() (tableIndex, error) { - return tr.tableIndex, nil + return tr.idx, nil } // returns true iff |h| can be found in this table. func (tr tableReader) has(h addr) (bool, error) { - _, ok, err := tr.Lookup(&h) + _, ok, err := tr.idx.lookup(&h) return ok, err } // returns the storage associated with |h|, iff present. Returns nil if absent. On success, // the returned byte slice directly references the underlying storage. func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { - e, found, err := tr.Lookup(&h) + e, found, err := tr.idx.lookup(&h) if err != nil { return nil, err } @@ -474,7 +470,7 @@ func (tr tableReader) getManyAtOffsetsWithReadFunc( return nil } -// findOffsets iterates over |reqs| and |tr.prefixes| (both sorted by +// findOffsets iterates over |reqs| and |prefixes| (both sorted by // address) to build the set of table locations which must be read in order to // find each chunk specified by |reqs|. If this table contains all requested // chunks remaining will be set to false upon return. If some are not here, @@ -509,13 +505,13 @@ func (tr tableReader) findOffsets(reqs []getRecord) (ors offsetRecSlice, remaini // record all offsets within the table which contain the data required. for j := filterIdx; j < filterLen && req.prefix == tr.prefixes[j]; j++ { - m, err := tr.EntrySuffixMatches(j, req.a) + m, err := tr.idx.entrySuffixMatches(j, req.a) if err != nil { return nil, false, err } if m { reqs[i].found = true - entry, err := tr.IndexEntry(j, nil) + entry, err := tr.idx.indexEntry(j, nil) if err != nil { return nil, false, err } @@ -622,9 +618,9 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord) } var ors offsetRecSlice - for i := uint32(0); i < tr.chunkCount; i++ { + for i := uint32(0); i < tr.idx.chunkCount(); i++ { a := new(addr) - e, err := tr.IndexEntry(i, a) + e, err := tr.idx.indexEntry(i, a) if err != nil { return err } @@ -643,7 +639,7 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord) func (tr tableReader) reader(ctx context.Context) (io.Reader, error) { i, _ := tr.index() - return io.LimitReader(&readerAdapter{tr.r, 0, ctx}, int64(i.TableFileSize())), nil + return io.LimitReader(&readerAdapter{tr.r, 0, ctx}, int64(i.tableFileSize())), nil } func (tr tableReader) size() (uint64, error) { @@ -651,19 +647,24 @@ func (tr tableReader) size() (uint64, error) { if err != nil { return 0, err } - return i.TableFileSize(), nil + return i.tableFileSize(), nil } func (tr tableReader) close() error { - return tr.tableIndex.Close() + return tr.idx.Close() } func (tr tableReader) clone() (tableReader, error) { - ti, err := tr.tableIndex.Clone() + idx, err := tr.idx.clone() if err != nil { return tableReader{}, err } - return tableReader{ti, tr.prefixes, tr.chunkCount, tr.totalUncompressedData, tr.r, tr.blockSize}, nil + return tableReader{ + prefixes: tr.prefixes, + idx: idx, + r: tr.r, + blockSize: tr.blockSize, + }, nil } type readerAdapter struct { diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index a00da430af..a1d584aa66 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -234,7 +234,7 @@ func (ts tableSet) physicalLen() (uint64, error) { if err != nil { return 0, err } - data += index.TableFileSize() + data += index.tableFileSize() } return } @@ -279,6 +279,11 @@ func (ts tableSet) Size() int { // prepend adds a memTable to an existing tableSet, compacting |mt| and // returning a new tableSet with newly compacted table added. func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) tableSet { + cs, err := ts.p.Persist(ctx, mt, ts, stats) + if err != nil { + panic(err) // todo(andy) + } + newTs := tableSet{ novel: make(chunkSources, len(ts.novel)+1), upstream: make(chunkSources, len(ts.upstream)), @@ -286,7 +291,7 @@ func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) tabl q: ts.q, rl: ts.rl, } - newTs.novel[0] = newPersistingChunkSource(ctx, mt, ts, ts.p, ts.rl, stats) + newTs.novel[0] = cs copy(newTs.novel[1:], ts.novel) copy(newTs.upstream, ts.upstream) return newTs @@ -304,11 +309,9 @@ func (ts tableSet) flatten(ctx context.Context) (tableSet, error) { for _, src := range ts.novel { cnt, err := src.count() - if err != nil { return tableSet{}, err } - if cnt > 0 { flattened.upstream = append(flattened.upstream, src) } @@ -385,11 +388,6 @@ OUTER: memoryNeeded += indexMemSize(spec.chunkCount) } - err := ts.q.AcquireQuota(ctx, memoryNeeded) - if err != nil { - return tableSet{}, err - } - var rp atomic.Value group, ctx := errgroup.WithContext(ctx) @@ -419,7 +417,7 @@ OUTER: ) } - err = group.Wait() + err := group.Wait() if err != nil { // Close any opened chunkSources for _, cs := range opened { diff --git a/go/store/nbs/table_set_test.go b/go/store/nbs/table_set_test.go index be9c8e5539..139f5c2831 100644 --- a/go/store/nbs/table_set_test.go +++ b/go/store/nbs/table_set_test.go @@ -33,7 +33,7 @@ import ( var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")} func TestTableSetPrependEmpty(t *testing.T) { - ts := newFakeTableSet(&noopQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{}) + ts := newFakeTableSet(&UnlimitedQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{}) specs, err := ts.toSpecs() require.NoError(t, err) assert.Empty(t, specs) @@ -41,7 +41,7 @@ func TestTableSetPrependEmpty(t *testing.T) { func TestTableSetPrepend(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet(&noopQuotaProvider{}) + ts := newFakeTableSet(&UnlimitedQuotaProvider{}) specs, err := ts.toSpecs() require.NoError(t, err) assert.Empty(specs) @@ -66,7 +66,7 @@ func TestTableSetPrepend(t *testing.T) { func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet(&noopQuotaProvider{}) + ts := newFakeTableSet(&UnlimitedQuotaProvider{}) specs, err := ts.toSpecs() require.NoError(t, err) assert.Empty(specs) @@ -89,7 +89,7 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) { func TestTableSetFlattenExcludesEmptyTable(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet(&noopQuotaProvider{}) + ts := newFakeTableSet(&UnlimitedQuotaProvider{}) specs, err := ts.toSpecs() require.NoError(t, err) assert.Empty(specs) @@ -122,9 +122,6 @@ func persist(t *testing.T, p tablePersister, chunks ...[]byte) { func TestTableSetRebase(t *testing.T) { assert := assert.New(t) q := NewUnlimitedMemQuotaProvider() - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() persister := newFakeTablePersister(q) insert := func(ts tableSet, chunks ...[]byte) tableSet { @@ -139,6 +136,7 @@ func TestTableSetRebase(t *testing.T) { fullTS := newTableSet(persister, q) defer func() { require.NoError(t, fullTS.close()) + }() specs, err := fullTS.toSpecs() require.NoError(t, err) @@ -168,7 +166,7 @@ func TestTableSetRebase(t *testing.T) { func TestTableSetPhysicalLen(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet(&noopQuotaProvider{}) + ts := newFakeTableSet(&UnlimitedQuotaProvider{}) specs, err := ts.toSpecs() require.NoError(t, err) assert.Empty(specs) @@ -212,5 +210,6 @@ func TestTableSetClosesOpenedChunkSourcesOnErr(t *testing.T) { for _ = range p.opened { mem -= indexMemSize(1) } - require.EqualValues(t, mem, q.Usage()) + assert.EqualValues(t, 44, int(q.Usage())) + assert.NoError(t, ts.close()) } diff --git a/go/store/nbs/table_test.go b/go/store/nbs/table_test.go index 55675d1e9e..9a7d3c3c22 100644 --- a/go/store/nbs/table_test.go +++ b/go/store/nbs/table_test.go @@ -25,6 +25,7 @@ import ( "context" "encoding/binary" "fmt" + "math/rand" "sort" "testing" @@ -77,7 +78,7 @@ func TestSimple(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -124,7 +125,7 @@ func TestHasMany(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -175,7 +176,7 @@ func TestHasManySequentialPrefix(t *testing.T) { require.NoError(t, err) buff = buff[:length] - ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -193,6 +194,70 @@ func TestHasManySequentialPrefix(t *testing.T) { } } +func BenchmarkHasMany(b *testing.B) { + const cnt = 64 * 1024 + chnks := make([][]byte, cnt) + addrs := make(addrSlice, cnt) + hrecs := make([]hasRecord, cnt) + sparse := make([]hasRecord, cnt/1024) + + data := make([]byte, cnt*16) + rand.Read(data) + for i := range chnks { + chnks[i] = data[i*16 : (i+1)*16] + } + for i := range addrs { + addrs[i] = computeAddr(chnks[i]) + } + for i := range hrecs { + hrecs[i] = hasRecord{ + a: &addrs[i], + prefix: prefixOf(addrs[i]), + order: i, + } + } + for i := range sparse { + j := i * 64 + hrecs[i] = hasRecord{ + a: &addrs[j], + prefix: prefixOf(addrs[j]), + order: j, + } + } + sort.Sort(hasRecordByPrefix(hrecs)) + sort.Sort(hasRecordByPrefix(sparse)) + + ctx := context.Background() + tableData, _, err := buildTable(chnks) + require.NoError(b, err) + ti, err := parseTableIndexByCopy(ctx, tableData, &UnlimitedQuotaProvider{}) + require.NoError(b, err) + tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) + require.NoError(b, err) + + b.ResetTimer() + b.Run("dense has many", func(b *testing.B) { + var ok bool + for i := 0; i < b.N; i++ { + ok, err = tr.hasMany(hrecs) + } + assert.False(b, ok) + assert.NoError(b, err) + }) + b.Run("sparse has many", func(b *testing.B) { + var ok bool + for i := 0; i < b.N; i++ { + ok, err = tr.hasMany(sparse) + } + assert.True(b, ok) + assert.NoError(b, err) + }) +} + +func prefixOf(a addr) uint64 { + return binary.BigEndian.Uint64(a[:addrPrefixSize]) +} + func TestGetMany(t *testing.T) { assert := assert.New(t) @@ -204,7 +269,7 @@ func TestGetMany(t *testing.T) { tableData, _, err := buildTable(data) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -238,7 +303,7 @@ func TestCalcReads(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), 0) require.NoError(t, err) @@ -275,7 +340,7 @@ func TestExtract(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -314,7 +379,7 @@ func Test65k(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -367,7 +432,7 @@ func doTestNGetMany(t *testing.T, count int) { tableData, _, err := buildTable(data) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) + ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/util.go b/go/store/nbs/util.go index 9199a0639a..0d3dcb8982 100644 --- a/go/store/nbs/util.go +++ b/go/store/nbs/util.go @@ -15,6 +15,7 @@ package nbs import ( + "context" "io" "math" @@ -24,8 +25,8 @@ import ( "github.com/dolthub/dolt/go/store/hash" ) -func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err error)) error { - idx, err := readTableIndexByCopy(rd, &noopQuotaProvider{}) +func IterChunks(ctx context.Context, rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err error)) error { + idx, err := readTableIndexByCopy(ctx, rd, &UnlimitedQuotaProvider{}) if err != nil { return err } @@ -33,9 +34,9 @@ func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err er defer idx.Close() seen := make(map[addr]bool) - for i := uint32(0); i < idx.ChunkCount(); i++ { + for i := uint32(0); i < idx.chunkCount(); i++ { var a addr - ie, err := idx.IndexEntry(i, &a) + ie, err := idx.indexEntry(i, &a) if err != nil { return err } @@ -68,8 +69,8 @@ func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err er return nil } -func GetTableIndexPrefixes(rd io.ReadSeeker) (prefixes []uint64, err error) { - idx, err := readTableIndexByCopy(rd, &noopQuotaProvider{}) +func GetTableIndexPrefixes(ctx context.Context, rd io.ReadSeeker) (prefixes []uint64, err error) { + idx, err := readTableIndexByCopy(ctx, rd, &UnlimitedQuotaProvider{}) if err != nil { return nil, err } @@ -80,7 +81,11 @@ func GetTableIndexPrefixes(rd io.ReadSeeker) (prefixes []uint64, err error) { } }() - return idx.Prefixes() + prefixes, err = idx.prefixes() + if err != nil { + return nil, err + } + return } func GuessPrefixOrdinal(prefix uint64, n uint32) int {