From 69c80219201b1deb8ccd97eecc5d54c2c871a682 Mon Sep 17 00:00:00 2001 From: Dhruv Sringari Date: Mon, 28 Mar 2022 16:06:33 -0700 Subject: [PATCH] Push ReleaseQuota down into onHeapTableIndex Close() --- go/store/nbs/aws_chunk_source.go | 65 +++++++++---------- go/store/nbs/aws_chunk_source_test.go | 8 +-- go/store/nbs/aws_table_persister.go | 10 +-- go/store/nbs/aws_table_persister_test.go | 22 +++---- go/store/nbs/block_store_test.go | 15 ++--- go/store/nbs/bs_persister.go | 53 ++++++++------- go/store/nbs/chunk_source_adapter.go | 4 +- go/store/nbs/cmp_chunk_table_writer_test.go | 4 +- go/store/nbs/conjoiner_test.go | 4 +- go/store/nbs/dynamo_fake_test.go | 2 +- go/store/nbs/file_table_persister.go | 11 ++-- go/store/nbs/file_table_persister_test.go | 18 ++--- go/store/nbs/generational_chunk_store_test.go | 4 +- go/store/nbs/mem_table_test.go | 6 +- go/store/nbs/mmap_table_reader.go | 12 +++- go/store/nbs/mmap_table_reader_test.go | 2 +- go/store/nbs/persisting_chunk_source_test.go | 4 +- go/store/nbs/quota.go | 15 +++++ go/store/nbs/root_tracker_test.go | 25 +++---- go/store/nbs/s3_fake_test.go | 4 +- go/store/nbs/store.go | 16 ++--- go/store/nbs/store_test.go | 19 ++++-- go/store/nbs/table_index.go | 31 +++++++-- go/store/nbs/table_index_test.go | 4 +- go/store/nbs/table_persister.go | 15 ++++- go/store/nbs/table_persister_test.go | 23 ++++++- go/store/nbs/table_set.go | 23 +------ go/store/nbs/table_set_test.go | 22 +++---- go/store/nbs/table_test.go | 16 ++--- go/store/nbs/util.go | 4 +- 30 files changed, 252 insertions(+), 209 deletions(-) diff --git a/go/store/nbs/aws_chunk_source.go b/go/store/nbs/aws_chunk_source.go index 71fe7c0e3a..f3bb68ed61 100644 --- a/go/store/nbs/aws_chunk_source.go +++ b/go/store/nbs/aws_chunk_source.go @@ -28,9 +28,7 @@ import ( "time" ) -type indexParserF func([]byte) (tableIndex, error) - -func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, indexCache *indexCache, stats *Stats, parseIndex indexParserF) (cs chunkSource, err error) { +func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, indexCache *indexCache, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { if indexCache != nil { indexCache.lockEntry(name) defer func() { @@ -41,7 +39,8 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead } }() - if index, found := indexCache.get(name); found { + index, err := indexCache.get(name) + if err == nil { tra := &awsTableReaderAt{al: al, ddb: ddb, s3: s3, name: name, chunkCount: chunkCount} tr, err := newTableReader(index, tra, s3BlockSize) if err != nil { @@ -49,59 +48,55 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead } return &chunkSourceAdapter{tr, name}, nil } + if err != errCacheMiss { + return &chunkSourceAdapter{}, err + } } - t1 := time.Now() - index, tra, err := func() (tableIndex, tableReaderAt, error) { + index, tra, err := func() (onHeapTableIndex, tableReaderAt, error) { if al.tableMayBeInDynamo(chunkCount) { + t1 := time.Now() data, err := ddb.ReadTable(ctx, name, stats) - if data == nil && err == nil { // There MUST be either data or an error return onHeapTableIndex{}, &dynamoTableReaderAt{}, errors.New("no data available") } - if data != nil { + stats.IndexReadLatency.SampleTimeSince(t1) stats.IndexBytesPerRead.Sample(uint64(len(data))) - ind, err := parseTableIndexByCopy(data) + ind, err := parseTableIndexByCopy(data, q) if err != nil { - return onHeapTableIndex{}, nil, err + return onHeapTableIndex{}, &dynamoTableReaderAt{}, err } return ind, &dynamoTableReaderAt{ddb: ddb, h: name}, nil } - if _, ok := err.(tableNotInDynamoErr); !ok { return onHeapTableIndex{}, &dynamoTableReaderAt{}, err } } - size := indexSize(chunkCount) + footerSize - buff := make([]byte, size) - n, _, err := s3.ReadFromEnd(ctx, name, buff, stats) - if err != nil { - return onHeapTableIndex{}, &dynamoTableReaderAt{}, err - } - if size != uint64(n) { - return onHeapTableIndex{}, &dynamoTableReaderAt{}, errors.New("failed to read all data") - } - stats.IndexBytesPerRead.Sample(uint64(len(buff))) - ind, err := parseTableIndex(buff) - if err != nil { - return onHeapTableIndex{}, &dynamoTableReaderAt{}, err - } - return ind, &s3TableReaderAt{s3: s3, h: name}, nil - }() + index, err := loadTableIndex(stats, chunkCount, q, func(bytesFromEnd int64) ([]byte, error) { + buff := make([]byte, bytesFromEnd) + n, _, err := s3.ReadFromEnd(ctx, name, buff, stats) + if err != nil { + return nil, err + } + if bytesFromEnd != int64(n) { + return nil, errors.New("failed to read all data") + } + return buff, nil + }) + if err != nil { + return onHeapTableIndex{}, &dynamoTableReaderAt{}, err + } + + return index, &s3TableReaderAt{h: name, s3: s3}, nil + }() if err != nil { return &chunkSourceAdapter{}, err } - stats.IndexReadLatency.SampleTimeSince(t1) - - if err != nil { - return emptyChunkSource{}, err - } - - if ohi, ok := index.(onHeapTableIndex); indexCache != nil && ok { - indexCache.put(name, ohi) + if indexCache != nil { + indexCache.put(name, index) } tr, err := newTableReader(index, tra, s3BlockSize) diff --git a/go/store/nbs/aws_chunk_source_test.go b/go/store/nbs/aws_chunk_source_test.go index 0e015267c3..e077e5876d 100644 --- a/go/store/nbs/aws_chunk_source_test.go +++ b/go/store/nbs/aws_chunk_source_test.go @@ -53,10 +53,8 @@ func TestAWSChunkSource(t *testing.T) { h, uint32(len(chunks)), ic, + NewUnlimitedMemQuotaProvider(), &Stats{}, - func(bs []byte) (tableIndex, error) { - return parseTableIndex(bs) - }, ) require.NoError(t, err) @@ -74,7 +72,7 @@ func TestAWSChunkSource(t *testing.T) { t.Run("WithIndexCache", func(t *testing.T) { assert := assert.New(t) - index, err := parseTableIndexByCopy(tableData) + index, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) cache := newIndexCache(1024) cache.put(h, index) @@ -98,7 +96,7 @@ func TestAWSChunkSource(t *testing.T) { t.Run("WithIndexCache", func(t *testing.T) { assert := assert.New(t) - index, err := parseTableIndexByCopy(tableData) + index, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) cache := newIndexCache(1024) cache.put(h, index) diff --git a/go/store/nbs/aws_table_persister.go b/go/store/nbs/aws_table_persister.go index d24b328aeb..c3382114fd 100644 --- a/go/store/nbs/aws_table_persister.go +++ b/go/store/nbs/aws_table_persister.go @@ -61,7 +61,7 @@ type awsTablePersister struct { limits awsLimits indexCache *indexCache ns string - parseIndex indexParserF + q MemoryQuotaProvider } type awsLimits struct { @@ -90,8 +90,8 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin name, chunkCount, s3p.indexCache, + s3p.q, stats, - s3p.parseIndex, ) } @@ -125,7 +125,7 @@ func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch return nil, err } - return newReaderFromIndexData(s3p.indexCache, data, name, &dynamoTableReaderAt{ddb: s3p.ddb, h: name}, s3BlockSize) + return newReaderFromIndexData(s3p.indexCache, s3p.q, data, name, &dynamoTableReaderAt{ddb: s3p.ddb, h: name}, s3BlockSize) } err = s3p.multipartUpload(ctx, data, name.String()) @@ -135,7 +135,7 @@ func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch } tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name} - return newReaderFromIndexData(s3p.indexCache, data, name, tra, s3BlockSize) + return newReaderFromIndexData(s3p.indexCache, s3p.q, data, name, tra, s3BlockSize) } func (s3p awsTablePersister) multipartUpload(ctx context.Context, data []byte, key string) error { @@ -310,7 +310,7 @@ func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource verbose.Logger(ctx).Sugar().Debugf("Compacted table of %d Kb in %s", plan.totalCompressedData/1024, time.Since(t1)) tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name} - return newReaderFromIndexData(s3p.indexCache, plan.mergedIndex, name, tra, s3BlockSize) + return newReaderFromIndexData(s3p.indexCache, s3p.q, plan.mergedIndex, name, tra, s3BlockSize) } func (s3p awsTablePersister) executeCompactionPlan(ctx context.Context, plan compactionPlan, key string) error { diff --git a/go/store/nbs/aws_table_persister_test.go b/go/store/nbs/aws_table_persister_test.go index 658f515189..2a93e20f8d 100644 --- a/go/store/nbs/aws_table_persister_test.go +++ b/go/store/nbs/aws_table_persister_test.go @@ -38,7 +38,7 @@ import ( ) var parseIndexF = func(bs []byte) (tableIndex, error) { - return parseTableIndex(bs) + return parseTableIndex(bs, &noopQuotaProvider{}) } func TestAWSTablePersisterPersist(t *testing.T) { @@ -58,7 +58,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) ic := newIndexCache(1024) limits := awsLimits{partTarget: calcPartSize(mt, 3)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, indexCache: ic, ns: ns, parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, indexCache: ic, ns: ns, q: &noopQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -76,7 +76,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) limits := awsLimits{partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -100,7 +100,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil) limits := awsLimits{partTarget: 1 << 10} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{}) require.NoError(t, err) @@ -116,7 +116,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1} ddb := makeFakeDTS(makeFakeDDB(t), nil) limits := awsLimits{partTarget: calcPartSize(mt, 4)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}} _, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) assert.Error(err) @@ -138,7 +138,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { ddb := makeFakeDDB(t) s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -158,7 +158,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, tc) limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} tableData, name, err := buildTable(testChunks) require.NoError(t, err) @@ -183,7 +183,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { ddb := makeFakeDDB(t) s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 1, partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -203,7 +203,7 @@ func TestAWSTablePersisterPersist(t *testing.T) { ddb := makeFakeDDB(t) s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil) limits := awsLimits{itemMax: 0, chunkMax: 2 * mustUint32(mt.count()), partTarget: calcPartSize(mt, 1)} - s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF} + s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}} src, err := s3p.Persist(context.Background(), mt, nil, &Stats{}) require.NoError(t, err) @@ -341,7 +341,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) { awsLimits{targetPartSize, minPartSize, maxPartSize, maxItemSize, maxChunkCount}, ic, "", - parseIndexF, + &noopQuotaProvider{}, } } @@ -546,7 +546,7 @@ func bytesToChunkSource(t *testing.T, bs ...[]byte) chunkSource { tableSize, name, err := tw.finish() require.NoError(t, err) data := buff[:tableSize] - ti, err := parseTableIndexByCopy(data) + ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{}) require.NoError(t, err) rdr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/block_store_test.go b/go/store/nbs/block_store_test.go index fe2e76b5b3..697ce0e0c5 100644 --- a/go/store/nbs/block_store_test.go +++ b/go/store/nbs/block_store_test.go @@ -442,11 +442,12 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) { t.Run("NoConjoin", func(t *testing.T) { mm := makeManifestManager(&fakeManifest{}) - p := newFakeTablePersister() q := NewUnlimitedMemQuotaProvider() defer func() { require.EqualValues(t, 0, q.Usage()) }() + p := newFakeTablePersister(q) + c := &fakeConjoiner{} smallTableStore, err := newNomsBlockStore(context.Background(), constants.FormatDefaultString, mm, p, q, c, testMemTableSize) @@ -481,11 +482,9 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) { t.Run("ConjoinSuccess", func(t *testing.T) { fm := &fakeManifest{} - p := newFakeTablePersister() q := NewUnlimitedMemQuotaProvider() - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() + p := newFakeTablePersister(q) + srcs := makeTestSrcs(t, []uint32{1, 1, 3, 7}, p) upstream, err := toSpecs(srcs) require.NoError(t, err) @@ -517,11 +516,9 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) { t.Run("ConjoinRetry", func(t *testing.T) { fm := &fakeManifest{} - p := newFakeTablePersister() q := NewUnlimitedMemQuotaProvider() - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() + p := newFakeTablePersister(q) + srcs := makeTestSrcs(t, []uint32{1, 1, 3, 7, 13}, p) upstream, err := toSpecs(srcs) require.NoError(t, err) diff --git a/go/store/nbs/bs_persister.go b/go/store/nbs/bs_persister.go index 6d9481a679..726d19cd4e 100644 --- a/go/store/nbs/bs_persister.go +++ b/go/store/nbs/bs_persister.go @@ -27,6 +27,7 @@ type blobstorePersister struct { bs blobstore.Blobstore blockSize uint64 indexCache *indexCache + q MemoryQuotaProvider } // Persist makes the contents of mt durable. Chunks already present in @@ -49,7 +50,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver } bsTRA := &bsTableReaderAt{name.String(), bsp.bs} - return newReaderFromIndexData(bsp.indexCache, data, name, bsTRA, bsp.blockSize) + return newReaderFromIndexData(bsp.indexCache, bsp.q, data, name, bsTRA, bsp.blockSize) } // ConjoinAll (Not currently implemented) conjoins all chunks in |sources| into a single, @@ -60,7 +61,7 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour // Open a table named |name|, containing |chunkCount| chunks. func (bsp *blobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { - return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.blockSize, bsp.indexCache, stats) + return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.blockSize, bsp.indexCache, bsp.q, stats) } type bsTableReaderAt struct { @@ -96,7 +97,20 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off return totalRead, nil } -func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, blockSize uint64, indexCache *indexCache, stats *Stats) (cs chunkSource, err error) { +func loadTableIndex(stats *Stats, chunkCount uint32, q MemoryQuotaProvider, getBytesFromEnd func(n int64) ([]byte, error)) (onHeapTableIndex, error) { + size := indexSize(chunkCount) + footerSize + t1 := time.Now() + bytes, err := getBytesFromEnd(int64(size)) + if err != nil { + return onHeapTableIndex{}, err + } + stats.IndexReadLatency.SampleTimeSince(t1) + stats.IndexBytesPerRead.Sample(uint64(len(bytes))) + + return parseTableIndex(bytes, q) +} + +func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, blockSize uint64, indexCache *indexCache, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) { if indexCache != nil { indexCache.lockEntry(name) defer func() { @@ -107,7 +121,8 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch } }() - if index, found := indexCache.get(name); found { + index, err := indexCache.get(name) + if err == nil { bsTRA := &bsTableReaderAt{name.String(), bs} tr, err := newTableReader(index, bsTRA, blockSize) if err != nil { @@ -115,36 +130,26 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch } return &chunkSourceAdapter{tr, name}, nil } + if err != errCacheMiss { + return &chunkSourceAdapter{}, err + } } - t1 := time.Now() - indexBytes, tra, err := func() ([]byte, tableReaderAt, error) { - size := int64(indexSize(chunkCount) + footerSize) - key := name.String() - rc, _, err := bs.Get(ctx, key, blobstore.NewBlobRange(-size, 0)) + index, err := loadTableIndex(stats, chunkCount, q, func(size int64) ([]byte, error) { + rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-size, 0)) if err != nil { - return nil, nil, err + return nil, err } defer rc.Close() buff := make([]byte, size) _, err = io.ReadFull(rc, buff) if err != nil { - return nil, nil, err + return nil, err } - return buff, &bsTableReaderAt{key, bs}, nil - }() - - if err != nil { - return nil, err - } - - stats.IndexBytesPerRead.Sample(uint64(len(indexBytes))) - stats.IndexReadLatency.SampleTimeSince(t1) - - index, err := parseTableIndex(indexBytes) - + return buff, nil + }) if err != nil { return nil, err } @@ -153,7 +158,7 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch indexCache.put(name, index) } - tr, err := newTableReader(index, tra, s3BlockSize) + tr, err := newTableReader(index, &bsTableReaderAt{name.String(), bs}, s3BlockSize) if err != nil { return nil, err } diff --git a/go/store/nbs/chunk_source_adapter.go b/go/store/nbs/chunk_source_adapter.go index 8a669d75c6..565fab6f26 100644 --- a/go/store/nbs/chunk_source_adapter.go +++ b/go/store/nbs/chunk_source_adapter.go @@ -23,8 +23,8 @@ func (csa chunkSourceAdapter) hash() (addr, error) { return csa.h, nil } -func newReaderFromIndexData(indexCache *indexCache, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) { - index, err := parseTableIndexByCopy(idxData) +func newReaderFromIndexData(indexCache *indexCache, q MemoryQuotaProvider, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) { + index, err := parseTableIndexByCopy(idxData, q) if err != nil { return nil, err diff --git a/go/store/nbs/cmp_chunk_table_writer_test.go b/go/store/nbs/cmp_chunk_table_writer_test.go index 7aa0021a48..f9c14b33fb 100644 --- a/go/store/nbs/cmp_chunk_table_writer_test.go +++ b/go/store/nbs/cmp_chunk_table_writer_test.go @@ -35,7 +35,7 @@ func TestCmpChunkTableWriter(t *testing.T) { require.NoError(t, err) // Setup a TableReader to read compressed chunks out of - ti, err := parseTableIndexByCopy(buff) + ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -73,7 +73,7 @@ func TestCmpChunkTableWriter(t *testing.T) { require.NoError(t, err) outputBuff := output.Bytes() - outputTI, err := parseTableIndexByCopy(outputBuff) + outputTI, err := parseTableIndexByCopy(outputBuff, &noopQuotaProvider{}) require.NoError(t, err) outputTR, err := newTableReader(outputTI, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/conjoiner_test.go b/go/store/nbs/conjoiner_test.go index 19101836d1..09bcad595c 100644 --- a/go/store/nbs/conjoiner_test.go +++ b/go/store/nbs/conjoiner_test.go @@ -119,7 +119,7 @@ func TestConjoin(t *testing.T) { } setup := func(lock addr, root hash.Hash, sizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) { - p = newFakeTablePersister() + p = newFakeTablePersister(&noopQuotaProvider{}) fm = &fakeManifest{} fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(sizes, p), nil) @@ -213,7 +213,7 @@ func TestConjoin(t *testing.T) { }) setupAppendix := func(lock addr, root hash.Hash, specSizes, appendixSizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) { - p = newFakeTablePersister() + p = newFakeTablePersister(&noopQuotaProvider{}) fm = &fakeManifest{} fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(specSizes, p), makeTestTableSpecs(appendixSizes, p)) diff --git a/go/store/nbs/dynamo_fake_test.go b/go/store/nbs/dynamo_fake_test.go index 1897cd3939..2b1a049e00 100644 --- a/go/store/nbs/dynamo_fake_test.go +++ b/go/store/nbs/dynamo_fake_test.go @@ -56,7 +56,7 @@ func (m *fakeDDB) readerForTable(name addr) (chunkReader, error) { if i, present := m.data[fmtTableName(name)]; present { buff, ok := i.([]byte) assert.True(m.t, ok) - ti, err := parseTableIndex(buff) + ti, err := parseTableIndex(buff, &noopQuotaProvider{}) if err != nil { return nil, err diff --git a/go/store/nbs/file_table_persister.go b/go/store/nbs/file_table_persister.go index d77b471490..b8f92d9d1d 100644 --- a/go/store/nbs/file_table_persister.go +++ b/go/store/nbs/file_table_persister.go @@ -38,19 +38,20 @@ import ( const tempTablePrefix = "nbs_table_" -func newFSTablePersister(dir string, fc *fdCache, indexCache *indexCache) tablePersister { +func newFSTablePersister(dir string, fc *fdCache, indexCache *indexCache, q MemoryQuotaProvider) tablePersister { d.PanicIfTrue(fc == nil) - return &fsTablePersister{dir, fc, indexCache} + return &fsTablePersister{dir, fc, indexCache, q} } type fsTablePersister struct { dir string fc *fdCache indexCache *indexCache + q MemoryQuotaProvider } func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) { - return newMmapTableReader(ftp.dir, name, chunkCount, ftp.indexCache, ftp.fc) + return newMmapTableReader(ftp.dir, name, chunkCount, ftp.indexCache, ftp.q, ftp.fc) } func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) { @@ -90,7 +91,7 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data [ return "", ferr } - index, ferr := parseTableIndexByCopy(data) + index, ferr := parseTableIndexByCopy(data, ftp.q) if ferr != nil { return "", ferr @@ -185,7 +186,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource } var index onHeapTableIndex - index, ferr = parseTableIndex(plan.mergedIndex) + index, ferr = parseTableIndex(plan.mergedIndex, ftp.q) if ferr != nil { return "", ferr diff --git a/go/store/nbs/file_table_persister_test.go b/go/store/nbs/file_table_persister_test.go index f0904ca0d9..042f62d62b 100644 --- a/go/store/nbs/file_table_persister_test.go +++ b/go/store/nbs/file_table_persister_test.go @@ -44,7 +44,7 @@ func TestFSTableCacheOnOpen(t *testing.T) { cacheSize := 2 fc := newFDCache(cacheSize) defer fc.Drop() - fts := newFSTablePersister(dir, fc, nil) + fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{}) // Create some tables manually, load them into the cache func() { @@ -120,14 +120,14 @@ func TestFSTablePersisterPersist(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(defaultMaxTables) defer fc.Drop() - fts := newFSTablePersister(dir, fc, nil) + fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{}) src, err := persistTableData(fts, testChunks...) require.NoError(t, err) if assert.True(mustUint32(src.count()) > 0) { buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String())) require.NoError(t, err) - ti, err := parseTableIndexByCopy(buff) + ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -159,7 +159,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(defaultMaxTables) defer fc.Drop() - fts := newFSTablePersister(dir, fc, nil) + fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{}) src, err := fts.Persist(context.Background(), mt, existingTable, &Stats{}) require.NoError(t, err) @@ -174,7 +174,7 @@ func TestFSTablePersisterCacheOnPersist(t *testing.T) { dir := makeTempDir(t) fc := newFDCache(1) defer fc.Drop() - fts := newFSTablePersister(dir, fc, nil) + fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{}) defer file.RemoveAll(dir) var name addr @@ -210,7 +210,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(len(sources)) defer fc.Drop() - fts := newFSTablePersister(dir, fc, nil) + fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{}) for i, c := range testChunks { randChunk := make([]byte, (i+1)*13) @@ -228,7 +228,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) { if assert.True(mustUint32(src.count()) > 0) { buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String())) require.NoError(t, err) - ti, err := parseTableIndexByCopy(buff) + ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -246,7 +246,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) { defer file.RemoveAll(dir) fc := newFDCache(defaultMaxTables) defer fc.Drop() - fts := newFSTablePersister(dir, fc, nil) + fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{}) reps := 3 sources := make(chunkSources, reps) @@ -267,7 +267,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) { if assert.True(mustUint32(src.count()) > 0) { buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String())) require.NoError(t, err) - ti, err := parseTableIndexByCopy(buff) + ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/generational_chunk_store_test.go b/go/store/nbs/generational_chunk_store_test.go index 5698b16477..563962ed97 100644 --- a/go/store/nbs/generational_chunk_store_test.go +++ b/go/store/nbs/generational_chunk_store_test.go @@ -139,8 +139,8 @@ func putChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, cs chun func TestGenerationalCS(t *testing.T) { ctx := context.Background() - oldGen, _ := makeTestLocalStore(t, 64) - newGen, _ := makeTestLocalStore(t, 64) + oldGen, _, _ := makeTestLocalStore(t, 64) + newGen, _, _ := makeTestLocalStore(t, 64) inOld := make(map[int]bool) inNew := make(map[int]bool) chnks := genChunks(t, 100, 1000) diff --git a/go/store/nbs/mem_table_test.go b/go/store/nbs/mem_table_test.go index 8cd48a557c..4c81f2d070 100644 --- a/go/store/nbs/mem_table_test.go +++ b/go/store/nbs/mem_table_test.go @@ -150,7 +150,7 @@ func TestMemTableWrite(t *testing.T) { td1, _, err := buildTable(chunks[1:2]) require.NoError(t, err) - ti1, err := parseTableIndexByCopy(td1) + ti1, err := parseTableIndexByCopy(td1, &noopQuotaProvider{}) require.NoError(t, err) tr1, err := newTableReader(ti1, tableReaderAtFromBytes(td1), fileBlockSize) require.NoError(t, err) @@ -158,7 +158,7 @@ func TestMemTableWrite(t *testing.T) { td2, _, err := buildTable(chunks[2:]) require.NoError(t, err) - ti2, err := parseTableIndexByCopy(td2) + ti2, err := parseTableIndexByCopy(td2, &noopQuotaProvider{}) require.NoError(t, err) tr2, err := newTableReader(ti2, tableReaderAtFromBytes(td2), fileBlockSize) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestMemTableWrite(t *testing.T) { require.NoError(t, err) assert.Equal(uint32(1), count) - ti, err := parseTableIndexByCopy(data) + ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{}) require.NoError(t, err) outReader, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/mmap_table_reader.go b/go/store/nbs/mmap_table_reader.go index 2fb5ab7c93..91792b7638 100644 --- a/go/store/nbs/mmap_table_reader.go +++ b/go/store/nbs/mmap_table_reader.go @@ -55,7 +55,7 @@ func init() { } } -func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *indexCache, fc *fdCache) (cs chunkSource, err error) { +func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *indexCache, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) { path := filepath.Join(dir, h.String()) var index onHeapTableIndex @@ -69,7 +69,12 @@ func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *index err = unlockErr } }() - index, found = indexCache.get(h) + index, err = indexCache.get(h) + if err == nil { + found = true + } else if err != errCacheMiss { + return nil, err + } } if !found { @@ -113,6 +118,7 @@ func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *index } buff := make([]byte, indexSize(chunkCount)+footerSize) + // TODO: Don't use mmap here. func() { var mm mmap.MMap mm, err = mmap.MapRegion(f, length, mmap.RDONLY, 0, aligned) @@ -133,7 +139,7 @@ func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *index return onHeapTableIndex{}, err } - ti, err = parseTableIndex(buff) + ti, err = parseTableIndex(buff, q) if err != nil { return diff --git a/go/store/nbs/mmap_table_reader_test.go b/go/store/nbs/mmap_table_reader_test.go index 4b7de78ada..97bc142d3d 100644 --- a/go/store/nbs/mmap_table_reader_test.go +++ b/go/store/nbs/mmap_table_reader_test.go @@ -52,7 +52,7 @@ func TestMmapTableReader(t *testing.T) { err = os.WriteFile(filepath.Join(dir, h.String()), tableData, 0666) require.NoError(t, err) - trc, err := newMmapTableReader(dir, h, uint32(len(chunks)), nil, fc) + trc, err := newMmapTableReader(dir, h, uint32(len(chunks)), nil, &noopQuotaProvider{}, fc) require.NoError(t, err) assertChunksInReader(chunks, trc, assert) } diff --git a/go/store/nbs/persisting_chunk_source_test.go b/go/store/nbs/persisting_chunk_source_test.go index 326a43ca3a..726d771dd2 100644 --- a/go/store/nbs/persisting_chunk_source_test.go +++ b/go/store/nbs/persisting_chunk_source_test.go @@ -31,7 +31,7 @@ import ( func TestPersistingChunkStoreEmpty(t *testing.T) { mt := newMemTable(testMemTableSize) - ccs := newPersistingChunkSource(context.Background(), mt, nil, newFakeTablePersister(), make(chan struct{}, 1), &Stats{}) + ccs := newPersistingChunkSource(context.Background(), mt, nil, newFakeTablePersister(&noopQuotaProvider{}), make(chan struct{}, 1), &Stats{}) h, err := ccs.hash() require.NoError(t, err) @@ -58,7 +58,7 @@ func TestPersistingChunkStore(t *testing.T) { } trigger := make(chan struct{}) - ccs := newPersistingChunkSource(context.Background(), mt, nil, pausingFakeTablePersister{newFakeTablePersister(), trigger}, make(chan struct{}, 1), &Stats{}) + ccs := newPersistingChunkSource(context.Background(), mt, nil, pausingFakeTablePersister{newFakeTablePersister(&noopQuotaProvider{}), trigger}, make(chan struct{}, 1), &Stats{}) assertChunksInReader(testChunks, ccs, assert) assert.EqualValues(mustUint32(mt.count()), mustUint32(ccs.getReader().count())) diff --git a/go/store/nbs/quota.go b/go/store/nbs/quota.go index f2bd0695bf..59edb60ac2 100644 --- a/go/store/nbs/quota.go +++ b/go/store/nbs/quota.go @@ -34,6 +34,9 @@ func NewUnlimitedMemQuotaProvider() *UnlimitedQuotaProvider { return &UnlimitedQuotaProvider{} } +type noopQuotaProvider struct { +} + func (q *UnlimitedQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error { q.mu.Lock() defer q.mu.Unlock() @@ -56,3 +59,15 @@ func (q *UnlimitedQuotaProvider) Usage() uint64 { defer q.mu.Unlock() return q.used } + +func (q *noopQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error { + return nil +} + +func (q *noopQuotaProvider) ReleaseQuota(memory uint64) error { + return nil +} + +func (q *noopQuotaProvider) Usage() uint64 { + return 0 +} diff --git a/go/store/nbs/root_tracker_test.go b/go/store/nbs/root_tracker_test.go index 1acbb741e5..0138c0dac7 100644 --- a/go/store/nbs/root_tracker_test.go +++ b/go/store/nbs/root_tracker_test.go @@ -176,11 +176,11 @@ func TestChunkStoreManifestFirstWriteByOtherProcess(t *testing.T) { assert := assert.New(t) fm := &fakeManifest{} mm := manifestManager{fm, newManifestCache(0), newManifestLocks()} - p := newFakeTablePersister() q := NewUnlimitedMemQuotaProvider() defer func() { require.EqualValues(t, 0, q.Usage()) }() + p := newFakeTablePersister(q) // Simulate another process writing a manifest behind store's back. newRoot, chunks, err := interloperWrite(fm, p, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2")) @@ -227,11 +227,12 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) { assert := assert.New(t) fm := &fakeManifest{} mm := manifestManager{fm, newManifestCache(defaultManifestCacheSize), newManifestLocks()} - p := newFakeTablePersister() q := NewUnlimitedMemQuotaProvider() defer func() { require.EqualValues(t, 0, q.Usage()) }() + p := newFakeTablePersister(q) + c := inlineConjoiner{defaultMaxTables} store, err := newNomsBlockStore(context.Background(), constants.Format718String, mm, p, q, c, defaultMemTableSize) @@ -278,11 +279,11 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) { fm := &fakeManifest{name: "foo"} upm := &updatePreemptManifest{manifest: fm} mm := manifestManager{upm, newManifestCache(defaultManifestCacheSize), newManifestLocks()} - p := newFakeTablePersister() q := NewUnlimitedMemQuotaProvider() defer func() { require.EqualValues(t, 0, q.Usage()) }() + p := newFakeTablePersister(q) c := inlineConjoiner{defaultMaxTables} store, err := newNomsBlockStore(context.Background(), constants.Format718String, mm, p, q, c, defaultMemTableSize) @@ -325,11 +326,12 @@ func TestChunkStoreSerializeCommits(t *testing.T) { upm := &updatePreemptManifest{manifest: fm} mc := newManifestCache(defaultManifestCacheSize) l := newManifestLocks() - p := newFakeTablePersister() q := NewUnlimitedMemQuotaProvider() defer func() { require.EqualValues(t, 0, q.Usage()) }() + p := newFakeTablePersister(q) + c := inlineConjoiner{defaultMaxTables} store, err := newNomsBlockStore(context.Background(), constants.Format718String, manifestManager{upm, mc, l}, p, q, c, defaultMemTableSize) @@ -391,8 +393,8 @@ func TestChunkStoreSerializeCommits(t *testing.T) { func makeStoreWithFakes(t *testing.T) (fm *fakeManifest, p tablePersister, q MemoryQuotaProvider, store *NomsBlockStore) { fm = &fakeManifest{} mm := manifestManager{fm, newManifestCache(0), newManifestLocks()} - p = newFakeTablePersister() q = NewUnlimitedMemQuotaProvider() + p = newFakeTablePersister(q) store, err := newNomsBlockStore(context.Background(), constants.Format718String, mm, p, q, inlineConjoiner{defaultMaxTables}, 0) require.NoError(t, err) return @@ -489,15 +491,16 @@ func (fm *fakeManifest) set(version string, lock addr, root hash.Hash, specs, ap } } -func newFakeTableSet() tableSet { - return tableSet{p: newFakeTablePersister(), q: NewUnlimitedMemQuotaProvider(), rl: make(chan struct{}, 1)} +func newFakeTableSet(q MemoryQuotaProvider) tableSet { + return tableSet{p: newFakeTablePersister(q), q: NewUnlimitedMemQuotaProvider(), rl: make(chan struct{}, 1)} } -func newFakeTablePersister() tablePersister { - return fakeTablePersister{map[addr]tableReader{}, &sync.RWMutex{}} +func newFakeTablePersister(q MemoryQuotaProvider) tablePersister { + return fakeTablePersister{q, map[addr]tableReader{}, &sync.RWMutex{}} } type fakeTablePersister struct { + q MemoryQuotaProvider sources map[addr]tableReader mu *sync.RWMutex } @@ -515,7 +518,7 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c if chunkCount > 0 { ftp.mu.Lock() defer ftp.mu.Unlock() - ti, err := parseTableIndexByCopy(data) + ti, err := parseTableIndexByCopy(data, ftp.q) if err != nil { return nil, err @@ -542,7 +545,7 @@ func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSourc if chunkCount > 0 { ftp.mu.Lock() defer ftp.mu.Unlock() - ti, err := parseTableIndexByCopy(data) + ti, err := parseTableIndexByCopy(data, ftp.q) if err != nil { return nil, err diff --git a/go/store/nbs/s3_fake_test.go b/go/store/nbs/s3_fake_test.go index 2dda8b5958..ade5cfcc83 100644 --- a/go/store/nbs/s3_fake_test.go +++ b/go/store/nbs/s3_fake_test.go @@ -76,7 +76,7 @@ func (m *fakeS3) readerForTable(name addr) (chunkReader, error) { m.mu.Lock() defer m.mu.Unlock() if buff, present := m.data[name.String()]; present { - ti, err := parseTableIndexByCopy(buff) + ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) if err != nil { return nil, err @@ -98,7 +98,7 @@ func (m *fakeS3) readerForTableWithNamespace(ns string, name addr) (chunkReader, key = ns + "/" + key } if buff, present := m.data[key]; present { - ti, err := parseTableIndexByCopy(buff) + ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) if err != nil { return nil, err diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index cd8c23d49d..5663ad780d 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -455,13 +455,7 @@ func NewAWSStoreWithMMapIndex(ctx context.Context, nbfVerStr string, table, ns, awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks}, globalIndexCache, ns, - func(bs []byte) (tableIndex, error) { - ohi, err := parseTableIndex(bs) - if err != nil { - return nil, err - } - return newMmapTableIndex(ohi, nil) - }, + q, } mm := makeManifestManager(newDynamoManifest(table, ns, ddb)) return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize) @@ -478,9 +472,7 @@ func NewAWSStore(ctx context.Context, nbfVerStr string, table, ns, bucket string awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks}, globalIndexCache, ns, - func(bs []byte) (tableIndex, error) { - return parseTableIndex(bs) - }, + q, } mm := makeManifestManager(newDynamoManifest(table, ns, ddb)) return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize) @@ -500,7 +492,7 @@ func NewBSStore(ctx context.Context, nbfVerStr string, bs blobstore.Blobstore, m mm := makeManifestManager(blobstoreManifest{"manifest", bs}) - p := &blobstorePersister{bs, s3BlockSize, globalIndexCache} + p := &blobstorePersister{bs, s3BlockSize, globalIndexCache, q} return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize) } @@ -523,7 +515,7 @@ func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi } mm := makeManifestManager(m) - p := newFSTablePersister(dir, globalFDCache, globalIndexCache) + p := newFSTablePersister(dir, globalFDCache, globalIndexCache, q) nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{maxTables}, memTableSize) if err != nil { diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index 9b622ef6d9..39080276a6 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -38,7 +38,7 @@ import ( "github.com/dolthub/dolt/go/store/util/tempfiles" ) -func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, nomsDir string) { +func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, nomsDir string, q MemoryQuotaProvider) { ctx := context.Background() nomsDir = filepath.Join(tempfiles.MovableTempFileProvider.GetTempDir(), "noms_"+uuid.New().String()[:8]) err := os.MkdirAll(nomsDir, os.ModePerm) @@ -48,9 +48,10 @@ func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, no _, err = fileManifest{nomsDir}.Update(ctx, addr{}, manifestContents{}, &Stats{}, nil) require.NoError(t, err) - st, err = newLocalStore(ctx, types.Format_Default.VersionString(), nomsDir, defaultMemTableSize, maxTableFiles, NewUnlimitedMemQuotaProvider()) + q = NewUnlimitedMemQuotaProvider() + st, err = newLocalStore(ctx, types.Format_Default.VersionString(), nomsDir, defaultMemTableSize, maxTableFiles, q) require.NoError(t, err) - return st, nomsDir + return st, nomsDir, q } type fileToData map[string][]byte @@ -83,7 +84,13 @@ func TestNBSAsTableFileStore(t *testing.T) { numTableFiles := 128 assert.Greater(t, defaultMaxTables, numTableFiles) - st, _ := makeTestLocalStore(t, defaultMaxTables) + st, _, q := makeTestLocalStore(t, defaultMaxTables) + defer func() { + require.Equal(t, uint64(0), q.Usage()) + }() + defer func() { + require.NoError(t, st.Close()) + }() fileToData := populateLocalStore(t, st, numTableFiles) _, sources, _, err := st.Sources(ctx) @@ -144,7 +151,7 @@ func TestNBSPruneTableFiles(t *testing.T) { // over populate table files numTableFiles := 64 maxTableFiles := 16 - st, nomsDir := makeTestLocalStore(t, maxTableFiles) + st, nomsDir, _ := makeTestLocalStore(t, maxTableFiles) fileToData := populateLocalStore(t, st, numTableFiles) // add a chunk and flush to trigger a conjoin @@ -226,7 +233,7 @@ func makeChunkSet(N, size int) (s map[hash.Hash]chunks.Chunk) { func TestNBSCopyGC(t *testing.T) { ctx := context.Background() - st, _ := makeTestLocalStore(t, 8) + st, _, _ := makeTestLocalStore(t, 8) keepers := makeChunkSet(64, 64) tossers := makeChunkSet(64, 64) diff --git a/go/store/nbs/table_index.go b/go/store/nbs/table_index.go index a2cac4b34c..e84cc03949 100644 --- a/go/store/nbs/table_index.go +++ b/go/store/nbs/table_index.go @@ -96,7 +96,7 @@ func ReadTableFooter(rd io.ReadSeeker) (chunkCount uint32, totalUncompressedData // parses a valid nbs tableIndex from a byte stream. |buff| must end with an NBS index // and footer and its length and capacity must match the expected indexSize for the chunkCount specified in the footer. // Retains the buffer and does not allocate new memory except for offsets, computes on buff in place. -func parseTableIndex(buff []byte) (onHeapTableIndex, error) { +func parseTableIndex(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) { chunkCount, totalUncompressedData, err := ReadTableFooter(bytes.NewReader(buff)) if err != nil { return onHeapTableIndex{}, err @@ -106,19 +106,19 @@ func parseTableIndex(buff []byte) (onHeapTableIndex, error) { return onHeapTableIndex{}, ErrWrongBufferSize } buff = buff[:len(buff)-footerSize] - return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData) + return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData, q) } // parseTableIndexByCopy reads the footer, copies indexSize(chunkCount) bytes, and parses an on heap table index. // Useful to create an onHeapTableIndex without retaining the entire underlying array of data. -func parseTableIndexByCopy(buff []byte) (onHeapTableIndex, error) { +func parseTableIndexByCopy(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) { r := bytes.NewReader(buff) - return ReadTableIndexByCopy(r) + return ReadTableIndexByCopy(r, q) } // ReadTableIndexByCopy loads an index into memory from an io.ReadSeeker // Caution: Allocates new memory for entire index -func ReadTableIndexByCopy(rd io.ReadSeeker) (onHeapTableIndex, error) { +func ReadTableIndexByCopy(rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) { chunkCount, totalUncompressedData, err := ReadTableFooter(rd) if err != nil { return onHeapTableIndex{}, err @@ -134,10 +134,12 @@ func ReadTableIndexByCopy(rd io.ReadSeeker) (onHeapTableIndex, error) { return onHeapTableIndex{}, err } - return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData) + return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData, q) } type onHeapTableIndex struct { + q MemoryQuotaProvider + refCnt *int32 tableFileSize uint64 // Tuple bytes tupleB []byte @@ -152,7 +154,7 @@ type onHeapTableIndex struct { var _ tableIndex = &onHeapTableIndex{} // NewOnHeapTableIndex creates a table index given a buffer of just the table index (no footer) -func NewOnHeapTableIndex(b []byte, chunkCount uint32, totalUncompressedData uint64) (onHeapTableIndex, error) { +func NewOnHeapTableIndex(b []byte, chunkCount uint32, totalUncompressedData uint64, q MemoryQuotaProvider) (onHeapTableIndex, error) { tuples := b[:prefixTupleSize*chunkCount] lengths := b[prefixTupleSize*chunkCount : prefixTupleSize*chunkCount+lengthSize*chunkCount] suffixes := b[prefixTupleSize*chunkCount+lengthSize*chunkCount:] @@ -169,7 +171,12 @@ func NewOnHeapTableIndex(b []byte, chunkCount uint32, totalUncompressedData uint store half the offsets and then allocate an additional len(lengths) to store the rest. */ + refCnt := new(int32) + *refCnt = 1 + return onHeapTableIndex{ + refCnt: refCnt, + q: q, tupleB: tuples, offsetB: offsets, suffixB: suffixes, @@ -328,10 +335,20 @@ func (ti onHeapTableIndex) TotalUncompressedData() uint64 { } func (ti onHeapTableIndex) Close() error { + cnt := atomic.AddInt32(ti.refCnt, -1) + if cnt == 0 { + return ti.q.ReleaseQuota(memSize(ti.chunkCount)) + } + if cnt < 0 { + panic("Close() called and reduced ref count to < 0.") + } return nil } func (ti onHeapTableIndex) Clone() (tableIndex, error) { + _ = atomic.AddInt32(ti.refCnt, 1) + // We allow Closed onHeapTableIndex's to be Cloned because we may pull a + // closed index from the singleton indexCache. return ti, nil } diff --git a/go/store/nbs/table_index_test.go b/go/store/nbs/table_index_test.go index ffff73d488..0824b267e5 100644 --- a/go/store/nbs/table_index_test.go +++ b/go/store/nbs/table_index_test.go @@ -29,7 +29,7 @@ func TestParseTableIndex(t *testing.T) { defer f.Close() bs, err := io.ReadAll(f) require.NoError(t, err) - idx, err := parseTableIndexByCopy(bs) + idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{}) require.NoError(t, err) defer idx.Close() assert.Equal(t, uint32(596), idx.ChunkCount()) @@ -55,7 +55,7 @@ func TestMMapIndex(t *testing.T) { defer f.Close() bs, err := io.ReadAll(f) require.NoError(t, err) - idx, err := parseTableIndexByCopy(bs) + idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{}) require.NoError(t, err) defer idx.Close() mmidx, err := newMmapTableIndex(idx, nil) diff --git a/go/store/nbs/table_persister.go b/go/store/nbs/table_persister.go index a0fd054994..93a7e160c7 100644 --- a/go/store/nbs/table_persister.go +++ b/go/store/nbs/table_persister.go @@ -34,6 +34,8 @@ import ( "github.com/dolthub/dolt/go/store/util/sizecache" ) +var errCacheMiss = errors.New("index cache miss") + // tablePersister allows interaction with persistent storage. It provides // primitives for pushing the contents of a memTable to persistent storage, // opening persistent tables for reading, and conjoining a number of existing @@ -101,11 +103,18 @@ func (sic *indexCache) unlockEntry(name addr) error { return nil } -func (sic *indexCache) get(name addr) (onHeapTableIndex, bool) { +// get returns errCacheMiss if the index with |name| was not found in the cache. It always retuns a Clone of the index. +func (sic *indexCache) get(name addr) (onHeapTableIndex, error) { if idx, found := sic.cache.Get(name); found { - return idx.(onHeapTableIndex), true + index := idx.(onHeapTableIndex) + index2, err := index.Clone() + if err != nil { + return onHeapTableIndex{}, err + } + return index2.(onHeapTableIndex), nil } - return onHeapTableIndex{}, false + + return onHeapTableIndex{}, errCacheMiss } func (sic *indexCache) put(name addr, idx onHeapTableIndex) { diff --git a/go/store/nbs/table_persister_test.go b/go/store/nbs/table_persister_test.go index 19184a0d4c..5a202eae29 100644 --- a/go/store/nbs/table_persister_test.go +++ b/go/store/nbs/table_persister_test.go @@ -45,7 +45,7 @@ func TestPlanCompaction(t *testing.T) { } data, name, err := buildTable(content) require.NoError(t, err) - ti, err := parseTableIndexByCopy(data) + ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize) require.NoError(t, err) @@ -63,7 +63,7 @@ func TestPlanCompaction(t *testing.T) { totalChunks += mustUint32(src.count()) } - idx, err := parseTableIndex(plan.mergedIndex) + idx, err := parseTableIndex(plan.mergedIndex, &noopQuotaProvider{}) require.NoError(t, err) assert.Equal(totalChunks, idx.chunkCount) @@ -75,3 +75,22 @@ func TestPlanCompaction(t *testing.T) { assertChunksInReader(content, tr, assert) } } + +func TestIndexCacheClonesIndicesOnGet(t *testing.T) { + chunks := [][]byte{ + []byte("hello2"), + []byte("goodbye2"), + []byte("badbye2"), + } + indexBytes, h, _ := buildTable(chunks) + index, err := parseTableIndexByCopy(indexBytes, &noopQuotaProvider{}) + require.NoError(t, err) + require.Equal(t, int32(1), *index.refCnt) + + indexCache := newIndexCache(1024) + indexCache.put(h, index) + + index2, err := indexCache.get(h) + require.NoError(t, err) + require.Equal(t, int32(2), *index2.refCnt) +} diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index 0057f1aa59..04aa0e8478 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -336,12 +336,7 @@ func (ts tableSet) Close() error { setErr(err) } for _, t := range ts.upstream { - memSize, err := getCSMemSize(t) - err = ts.q.ReleaseQuota(memSize) - if err != nil { - return err - } - err = t.Close() + err := t.Close() setErr(err) } return firstErr @@ -416,12 +411,6 @@ func (ts tableSet) Flatten(ctx context.Context) (tableSet, error) { return tableSet{}, err } - // TODO: acquire quota when we persist a memory table instead - err = ts.q.AcquireQuota(ctx, memSize(cnt)) - if err != nil { - return tableSet{}, err - } - if cnt > 0 { flattened.upstream = append(flattened.upstream, src) } @@ -491,16 +480,6 @@ func (ts tableSet) Rebase(ctx context.Context, specs []tableSpec, stats *Stats) return } if spec.name == h { - memSize, err := getCSMemSize(existing) - if err != nil { - ae.SetIfError(err) - return - } - err = ts.q.AcquireQuota(ctx, memSize) - if err != nil { - ae.SetIfError(err) - return - } c, err := existing.Clone() if err != nil { ae.SetIfError(err) diff --git a/go/store/nbs/table_set_test.go b/go/store/nbs/table_set_test.go index 76acee5949..e7c648ef51 100644 --- a/go/store/nbs/table_set_test.go +++ b/go/store/nbs/table_set_test.go @@ -32,7 +32,7 @@ import ( var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")} func TestTableSetPrependEmpty(t *testing.T) { - ts := newFakeTableSet().Prepend(context.Background(), newMemTable(testMemTableSize), &Stats{}) + ts := newFakeTableSet(&noopQuotaProvider{}).Prepend(context.Background(), newMemTable(testMemTableSize), &Stats{}) specs, err := ts.ToSpecs() require.NoError(t, err) assert.Empty(t, specs) @@ -40,7 +40,7 @@ func TestTableSetPrependEmpty(t *testing.T) { func TestTableSetPrepend(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet() + ts := newFakeTableSet(&noopQuotaProvider{}) specs, err := ts.ToSpecs() require.NoError(t, err) assert.Empty(specs) @@ -65,7 +65,7 @@ func TestTableSetPrepend(t *testing.T) { func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet() + ts := newFakeTableSet(&noopQuotaProvider{}) specs, err := ts.ToSpecs() require.NoError(t, err) assert.Empty(specs) @@ -88,7 +88,7 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) { func TestTableSetFlattenExcludesEmptyTable(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet() + ts := newFakeTableSet(&noopQuotaProvider{}) specs, err := ts.ToSpecs() require.NoError(t, err) assert.Empty(specs) @@ -111,7 +111,7 @@ func TestTableSetFlattenExcludesEmptyTable(t *testing.T) { func TestTableSetExtract(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet() + ts := newFakeTableSet(&noopQuotaProvider{}) specs, err := ts.ToSpecs() require.NoError(t, err) assert.Empty(specs) @@ -146,7 +146,11 @@ func TestTableSetExtract(t *testing.T) { func TestTableSetRebase(t *testing.T) { assert := assert.New(t) - persister := newFakeTablePersister() + q := NewUnlimitedMemQuotaProvider() + defer func() { + require.EqualValues(t, 0, q.Usage()) + }() + persister := newFakeTablePersister(q) insert := func(ts tableSet, chunks ...[]byte) tableSet { for _, c := range chunks { @@ -156,10 +160,6 @@ func TestTableSetRebase(t *testing.T) { } return ts } - q := NewUnlimitedMemQuotaProvider() - defer func() { - require.EqualValues(t, 0, q.Usage()) - }() fullTS := newTableSet(persister, q) defer func() { @@ -193,7 +193,7 @@ func TestTableSetRebase(t *testing.T) { func TestTableSetPhysicalLen(t *testing.T) { assert := assert.New(t) - ts := newFakeTableSet() + ts := newFakeTableSet(&noopQuotaProvider{}) specs, err := ts.ToSpecs() require.NoError(t, err) assert.Empty(specs) diff --git a/go/store/nbs/table_test.go b/go/store/nbs/table_test.go index 32ca6c0d1f..55675d1e9e 100644 --- a/go/store/nbs/table_test.go +++ b/go/store/nbs/table_test.go @@ -77,7 +77,7 @@ func TestSimple(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData) + ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -124,7 +124,7 @@ func TestHasMany(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData) + ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -175,7 +175,7 @@ func TestHasManySequentialPrefix(t *testing.T) { require.NoError(t, err) buff = buff[:length] - ti, err := parseTableIndexByCopy(buff) + ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize) require.NoError(t, err) @@ -204,7 +204,7 @@ func TestGetMany(t *testing.T) { tableData, _, err := buildTable(data) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData) + ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -238,7 +238,7 @@ func TestCalcReads(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData) + ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), 0) require.NoError(t, err) @@ -275,7 +275,7 @@ func TestExtract(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData) + ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -314,7 +314,7 @@ func Test65k(t *testing.T) { tableData, _, err := buildTable(chunks) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData) + ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) @@ -367,7 +367,7 @@ func doTestNGetMany(t *testing.T, count int) { tableData, _, err := buildTable(data) require.NoError(t, err) - ti, err := parseTableIndexByCopy(tableData) + ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{}) require.NoError(t, err) tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize) require.NoError(t, err) diff --git a/go/store/nbs/util.go b/go/store/nbs/util.go index d41f4c26a0..d1331cd30b 100644 --- a/go/store/nbs/util.go +++ b/go/store/nbs/util.go @@ -25,7 +25,7 @@ import ( ) func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err error)) error { - idx, err := ReadTableIndexByCopy(rd) + idx, err := ReadTableIndexByCopy(rd, &noopQuotaProvider{}) if err != nil { return err } @@ -69,7 +69,7 @@ func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err er } func GetTableIndexPrefixes(rd io.ReadSeeker) (prefixes []uint64, err error) { - idx, err := ReadTableIndexByCopy(rd) + idx, err := ReadTableIndexByCopy(rd, &noopQuotaProvider{}) if err != nil { return nil, err }