Push ReleaseQuota down into onHeapTableIndex Close()

This commit is contained in:
Dhruv Sringari
2022-03-28 16:06:33 -07:00
parent 19e042fe48
commit 69c8021920
30 changed files with 252 additions and 209 deletions

View File

@@ -28,9 +28,7 @@ import (
"time"
)
type indexParserF func([]byte) (tableIndex, error)
func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, indexCache *indexCache, stats *Stats, parseIndex indexParserF) (cs chunkSource, err error) {
func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, indexCache *indexCache, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
if indexCache != nil {
indexCache.lockEntry(name)
defer func() {
@@ -41,7 +39,8 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead
}
}()
if index, found := indexCache.get(name); found {
index, err := indexCache.get(name)
if err == nil {
tra := &awsTableReaderAt{al: al, ddb: ddb, s3: s3, name: name, chunkCount: chunkCount}
tr, err := newTableReader(index, tra, s3BlockSize)
if err != nil {
@@ -49,59 +48,55 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead
}
return &chunkSourceAdapter{tr, name}, nil
}
if err != errCacheMiss {
return &chunkSourceAdapter{}, err
}
}
t1 := time.Now()
index, tra, err := func() (tableIndex, tableReaderAt, error) {
index, tra, err := func() (onHeapTableIndex, tableReaderAt, error) {
if al.tableMayBeInDynamo(chunkCount) {
t1 := time.Now()
data, err := ddb.ReadTable(ctx, name, stats)
if data == nil && err == nil { // There MUST be either data or an error
return onHeapTableIndex{}, &dynamoTableReaderAt{}, errors.New("no data available")
}
if data != nil {
stats.IndexReadLatency.SampleTimeSince(t1)
stats.IndexBytesPerRead.Sample(uint64(len(data)))
ind, err := parseTableIndexByCopy(data)
ind, err := parseTableIndexByCopy(data, q)
if err != nil {
return onHeapTableIndex{}, nil, err
return onHeapTableIndex{}, &dynamoTableReaderAt{}, err
}
return ind, &dynamoTableReaderAt{ddb: ddb, h: name}, nil
}
if _, ok := err.(tableNotInDynamoErr); !ok {
return onHeapTableIndex{}, &dynamoTableReaderAt{}, err
}
}
size := indexSize(chunkCount) + footerSize
buff := make([]byte, size)
n, _, err := s3.ReadFromEnd(ctx, name, buff, stats)
if err != nil {
return onHeapTableIndex{}, &dynamoTableReaderAt{}, err
}
if size != uint64(n) {
return onHeapTableIndex{}, &dynamoTableReaderAt{}, errors.New("failed to read all data")
}
stats.IndexBytesPerRead.Sample(uint64(len(buff)))
ind, err := parseTableIndex(buff)
if err != nil {
return onHeapTableIndex{}, &dynamoTableReaderAt{}, err
}
return ind, &s3TableReaderAt{s3: s3, h: name}, nil
}()
index, err := loadTableIndex(stats, chunkCount, q, func(bytesFromEnd int64) ([]byte, error) {
buff := make([]byte, bytesFromEnd)
n, _, err := s3.ReadFromEnd(ctx, name, buff, stats)
if err != nil {
return nil, err
}
if bytesFromEnd != int64(n) {
return nil, errors.New("failed to read all data")
}
return buff, nil
})
if err != nil {
return onHeapTableIndex{}, &dynamoTableReaderAt{}, err
}
return index, &s3TableReaderAt{h: name, s3: s3}, nil
}()
if err != nil {
return &chunkSourceAdapter{}, err
}
stats.IndexReadLatency.SampleTimeSince(t1)
if err != nil {
return emptyChunkSource{}, err
}
if ohi, ok := index.(onHeapTableIndex); indexCache != nil && ok {
indexCache.put(name, ohi)
if indexCache != nil {
indexCache.put(name, index)
}
tr, err := newTableReader(index, tra, s3BlockSize)

View File

@@ -53,10 +53,8 @@ func TestAWSChunkSource(t *testing.T) {
h,
uint32(len(chunks)),
ic,
NewUnlimitedMemQuotaProvider(),
&Stats{},
func(bs []byte) (tableIndex, error) {
return parseTableIndex(bs)
},
)
require.NoError(t, err)
@@ -74,7 +72,7 @@ func TestAWSChunkSource(t *testing.T) {
t.Run("WithIndexCache", func(t *testing.T) {
assert := assert.New(t)
index, err := parseTableIndexByCopy(tableData)
index, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
cache := newIndexCache(1024)
cache.put(h, index)
@@ -98,7 +96,7 @@ func TestAWSChunkSource(t *testing.T) {
t.Run("WithIndexCache", func(t *testing.T) {
assert := assert.New(t)
index, err := parseTableIndexByCopy(tableData)
index, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
cache := newIndexCache(1024)
cache.put(h, index)

View File

@@ -61,7 +61,7 @@ type awsTablePersister struct {
limits awsLimits
indexCache *indexCache
ns string
parseIndex indexParserF
q MemoryQuotaProvider
}
type awsLimits struct {
@@ -90,8 +90,8 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin
name,
chunkCount,
s3p.indexCache,
s3p.q,
stats,
s3p.parseIndex,
)
}
@@ -125,7 +125,7 @@ func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch
return nil, err
}
return newReaderFromIndexData(s3p.indexCache, data, name, &dynamoTableReaderAt{ddb: s3p.ddb, h: name}, s3BlockSize)
return newReaderFromIndexData(s3p.indexCache, s3p.q, data, name, &dynamoTableReaderAt{ddb: s3p.ddb, h: name}, s3BlockSize)
}
err = s3p.multipartUpload(ctx, data, name.String())
@@ -135,7 +135,7 @@ func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch
}
tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name}
return newReaderFromIndexData(s3p.indexCache, data, name, tra, s3BlockSize)
return newReaderFromIndexData(s3p.indexCache, s3p.q, data, name, tra, s3BlockSize)
}
func (s3p awsTablePersister) multipartUpload(ctx context.Context, data []byte, key string) error {
@@ -310,7 +310,7 @@ func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
verbose.Logger(ctx).Sugar().Debugf("Compacted table of %d Kb in %s", plan.totalCompressedData/1024, time.Since(t1))
tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name}
return newReaderFromIndexData(s3p.indexCache, plan.mergedIndex, name, tra, s3BlockSize)
return newReaderFromIndexData(s3p.indexCache, s3p.q, plan.mergedIndex, name, tra, s3BlockSize)
}
func (s3p awsTablePersister) executeCompactionPlan(ctx context.Context, plan compactionPlan, key string) error {

View File

@@ -38,7 +38,7 @@ import (
)
var parseIndexF = func(bs []byte) (tableIndex, error) {
return parseTableIndex(bs)
return parseTableIndex(bs, &noopQuotaProvider{})
}
func TestAWSTablePersisterPersist(t *testing.T) {
@@ -58,7 +58,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
ic := newIndexCache(1024)
limits := awsLimits{partTarget: calcPartSize(mt, 3)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, indexCache: ic, ns: ns, parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, indexCache: ic, ns: ns, q: &noopQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -76,7 +76,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
limits := awsLimits{partTarget: calcPartSize(mt, 1)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -100,7 +100,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
limits := awsLimits{partTarget: 1 << 10}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{})
require.NoError(t, err)
@@ -116,7 +116,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1}
ddb := makeFakeDTS(makeFakeDDB(t), nil)
limits := awsLimits{partTarget: calcPartSize(mt, 4)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}}
_, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
assert.Error(err)
@@ -138,7 +138,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
ddb := makeFakeDDB(t)
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -158,7 +158,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, tc)
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
tableData, name, err := buildTable(testChunks)
require.NoError(t, err)
@@ -183,7 +183,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
ddb := makeFakeDDB(t)
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 1, partTarget: calcPartSize(mt, 1)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -203,7 +203,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
ddb := makeFakeDDB(t)
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
limits := awsLimits{itemMax: 0, chunkMax: 2 * mustUint32(mt.count()), partTarget: calcPartSize(mt, 1)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", parseIndex: parseIndexF}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -341,7 +341,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
awsLimits{targetPartSize, minPartSize, maxPartSize, maxItemSize, maxChunkCount},
ic,
"",
parseIndexF,
&noopQuotaProvider{},
}
}
@@ -546,7 +546,7 @@ func bytesToChunkSource(t *testing.T, bs ...[]byte) chunkSource {
tableSize, name, err := tw.finish()
require.NoError(t, err)
data := buff[:tableSize]
ti, err := parseTableIndexByCopy(data)
ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{})
require.NoError(t, err)
rdr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
require.NoError(t, err)

View File

@@ -442,11 +442,12 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
t.Run("NoConjoin", func(t *testing.T) {
mm := makeManifestManager(&fakeManifest{})
p := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
c := &fakeConjoiner{}
smallTableStore, err := newNomsBlockStore(context.Background(), constants.FormatDefaultString, mm, p, q, c, testMemTableSize)
@@ -481,11 +482,9 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
t.Run("ConjoinSuccess", func(t *testing.T) {
fm := &fakeManifest{}
p := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
srcs := makeTestSrcs(t, []uint32{1, 1, 3, 7}, p)
upstream, err := toSpecs(srcs)
require.NoError(t, err)
@@ -517,11 +516,9 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
t.Run("ConjoinRetry", func(t *testing.T) {
fm := &fakeManifest{}
p := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
srcs := makeTestSrcs(t, []uint32{1, 1, 3, 7, 13}, p)
upstream, err := toSpecs(srcs)
require.NoError(t, err)

View File

@@ -27,6 +27,7 @@ type blobstorePersister struct {
bs blobstore.Blobstore
blockSize uint64
indexCache *indexCache
q MemoryQuotaProvider
}
// Persist makes the contents of mt durable. Chunks already present in
@@ -49,7 +50,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver
}
bsTRA := &bsTableReaderAt{name.String(), bsp.bs}
return newReaderFromIndexData(bsp.indexCache, data, name, bsTRA, bsp.blockSize)
return newReaderFromIndexData(bsp.indexCache, bsp.q, data, name, bsTRA, bsp.blockSize)
}
// ConjoinAll (Not currently implemented) conjoins all chunks in |sources| into a single,
@@ -60,7 +61,7 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
// Open a table named |name|, containing |chunkCount| chunks.
func (bsp *blobstorePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.blockSize, bsp.indexCache, stats)
return newBSChunkSource(ctx, bsp.bs, name, chunkCount, bsp.blockSize, bsp.indexCache, bsp.q, stats)
}
type bsTableReaderAt struct {
@@ -96,7 +97,20 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off
return totalRead, nil
}
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, blockSize uint64, indexCache *indexCache, stats *Stats) (cs chunkSource, err error) {
func loadTableIndex(stats *Stats, chunkCount uint32, q MemoryQuotaProvider, getBytesFromEnd func(n int64) ([]byte, error)) (onHeapTableIndex, error) {
size := indexSize(chunkCount) + footerSize
t1 := time.Now()
bytes, err := getBytesFromEnd(int64(size))
if err != nil {
return onHeapTableIndex{}, err
}
stats.IndexReadLatency.SampleTimeSince(t1)
stats.IndexBytesPerRead.Sample(uint64(len(bytes)))
return parseTableIndex(bytes, q)
}
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, blockSize uint64, indexCache *indexCache, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
if indexCache != nil {
indexCache.lockEntry(name)
defer func() {
@@ -107,7 +121,8 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch
}
}()
if index, found := indexCache.get(name); found {
index, err := indexCache.get(name)
if err == nil {
bsTRA := &bsTableReaderAt{name.String(), bs}
tr, err := newTableReader(index, bsTRA, blockSize)
if err != nil {
@@ -115,36 +130,26 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch
}
return &chunkSourceAdapter{tr, name}, nil
}
if err != errCacheMiss {
return &chunkSourceAdapter{}, err
}
}
t1 := time.Now()
indexBytes, tra, err := func() ([]byte, tableReaderAt, error) {
size := int64(indexSize(chunkCount) + footerSize)
key := name.String()
rc, _, err := bs.Get(ctx, key, blobstore.NewBlobRange(-size, 0))
index, err := loadTableIndex(stats, chunkCount, q, func(size int64) ([]byte, error) {
rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-size, 0))
if err != nil {
return nil, nil, err
return nil, err
}
defer rc.Close()
buff := make([]byte, size)
_, err = io.ReadFull(rc, buff)
if err != nil {
return nil, nil, err
return nil, err
}
return buff, &bsTableReaderAt{key, bs}, nil
}()
if err != nil {
return nil, err
}
stats.IndexBytesPerRead.Sample(uint64(len(indexBytes)))
stats.IndexReadLatency.SampleTimeSince(t1)
index, err := parseTableIndex(indexBytes)
return buff, nil
})
if err != nil {
return nil, err
}
@@ -153,7 +158,7 @@ func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, ch
indexCache.put(name, index)
}
tr, err := newTableReader(index, tra, s3BlockSize)
tr, err := newTableReader(index, &bsTableReaderAt{name.String(), bs}, s3BlockSize)
if err != nil {
return nil, err
}

View File

@@ -23,8 +23,8 @@ func (csa chunkSourceAdapter) hash() (addr, error) {
return csa.h, nil
}
func newReaderFromIndexData(indexCache *indexCache, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
index, err := parseTableIndexByCopy(idxData)
func newReaderFromIndexData(indexCache *indexCache, q MemoryQuotaProvider, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
index, err := parseTableIndexByCopy(idxData, q)
if err != nil {
return nil, err

View File

@@ -35,7 +35,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
require.NoError(t, err)
// Setup a TableReader to read compressed chunks out of
ti, err := parseTableIndexByCopy(buff)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -73,7 +73,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
require.NoError(t, err)
outputBuff := output.Bytes()
outputTI, err := parseTableIndexByCopy(outputBuff)
outputTI, err := parseTableIndexByCopy(outputBuff, &noopQuotaProvider{})
require.NoError(t, err)
outputTR, err := newTableReader(outputTI, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)

View File

@@ -119,7 +119,7 @@ func TestConjoin(t *testing.T) {
}
setup := func(lock addr, root hash.Hash, sizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
p = newFakeTablePersister()
p = newFakeTablePersister(&noopQuotaProvider{})
fm = &fakeManifest{}
fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(sizes, p), nil)
@@ -213,7 +213,7 @@ func TestConjoin(t *testing.T) {
})
setupAppendix := func(lock addr, root hash.Hash, specSizes, appendixSizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
p = newFakeTablePersister()
p = newFakeTablePersister(&noopQuotaProvider{})
fm = &fakeManifest{}
fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(specSizes, p), makeTestTableSpecs(appendixSizes, p))

View File

@@ -56,7 +56,7 @@ func (m *fakeDDB) readerForTable(name addr) (chunkReader, error) {
if i, present := m.data[fmtTableName(name)]; present {
buff, ok := i.([]byte)
assert.True(m.t, ok)
ti, err := parseTableIndex(buff)
ti, err := parseTableIndex(buff, &noopQuotaProvider{})
if err != nil {
return nil, err

View File

@@ -38,19 +38,20 @@ import (
const tempTablePrefix = "nbs_table_"
func newFSTablePersister(dir string, fc *fdCache, indexCache *indexCache) tablePersister {
func newFSTablePersister(dir string, fc *fdCache, indexCache *indexCache, q MemoryQuotaProvider) tablePersister {
d.PanicIfTrue(fc == nil)
return &fsTablePersister{dir, fc, indexCache}
return &fsTablePersister{dir, fc, indexCache, q}
}
type fsTablePersister struct {
dir string
fc *fdCache
indexCache *indexCache
q MemoryQuotaProvider
}
func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newMmapTableReader(ftp.dir, name, chunkCount, ftp.indexCache, ftp.fc)
return newMmapTableReader(ftp.dir, name, chunkCount, ftp.indexCache, ftp.q, ftp.fc)
}
func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
@@ -90,7 +91,7 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data [
return "", ferr
}
index, ferr := parseTableIndexByCopy(data)
index, ferr := parseTableIndexByCopy(data, ftp.q)
if ferr != nil {
return "", ferr
@@ -185,7 +186,7 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
}
var index onHeapTableIndex
index, ferr = parseTableIndex(plan.mergedIndex)
index, ferr = parseTableIndex(plan.mergedIndex, ftp.q)
if ferr != nil {
return "", ferr

View File

@@ -44,7 +44,7 @@ func TestFSTableCacheOnOpen(t *testing.T) {
cacheSize := 2
fc := newFDCache(cacheSize)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, nil)
fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{})
// Create some tables manually, load them into the cache
func() {
@@ -120,14 +120,14 @@ func TestFSTablePersisterPersist(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, nil)
fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{})
src, err := persistTableData(fts, testChunks...)
require.NoError(t, err)
if assert.True(mustUint32(src.count()) > 0) {
buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String()))
require.NoError(t, err)
ti, err := parseTableIndexByCopy(buff)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -159,7 +159,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, nil)
fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{})
src, err := fts.Persist(context.Background(), mt, existingTable, &Stats{})
require.NoError(t, err)
@@ -174,7 +174,7 @@ func TestFSTablePersisterCacheOnPersist(t *testing.T) {
dir := makeTempDir(t)
fc := newFDCache(1)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, nil)
fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{})
defer file.RemoveAll(dir)
var name addr
@@ -210,7 +210,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(len(sources))
defer fc.Drop()
fts := newFSTablePersister(dir, fc, nil)
fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{})
for i, c := range testChunks {
randChunk := make([]byte, (i+1)*13)
@@ -228,7 +228,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
if assert.True(mustUint32(src.count()) > 0) {
buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String()))
require.NoError(t, err)
ti, err := parseTableIndexByCopy(buff)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -246,7 +246,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, nil)
fts := newFSTablePersister(dir, fc, nil, &noopQuotaProvider{})
reps := 3
sources := make(chunkSources, reps)
@@ -267,7 +267,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
if assert.True(mustUint32(src.count()) > 0) {
buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String()))
require.NoError(t, err)
ti, err := parseTableIndexByCopy(buff)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)

View File

@@ -139,8 +139,8 @@ func putChunks(t *testing.T, ctx context.Context, chunks []chunks.Chunk, cs chun
func TestGenerationalCS(t *testing.T) {
ctx := context.Background()
oldGen, _ := makeTestLocalStore(t, 64)
newGen, _ := makeTestLocalStore(t, 64)
oldGen, _, _ := makeTestLocalStore(t, 64)
newGen, _, _ := makeTestLocalStore(t, 64)
inOld := make(map[int]bool)
inNew := make(map[int]bool)
chnks := genChunks(t, 100, 1000)

View File

@@ -150,7 +150,7 @@ func TestMemTableWrite(t *testing.T) {
td1, _, err := buildTable(chunks[1:2])
require.NoError(t, err)
ti1, err := parseTableIndexByCopy(td1)
ti1, err := parseTableIndexByCopy(td1, &noopQuotaProvider{})
require.NoError(t, err)
tr1, err := newTableReader(ti1, tableReaderAtFromBytes(td1), fileBlockSize)
require.NoError(t, err)
@@ -158,7 +158,7 @@ func TestMemTableWrite(t *testing.T) {
td2, _, err := buildTable(chunks[2:])
require.NoError(t, err)
ti2, err := parseTableIndexByCopy(td2)
ti2, err := parseTableIndexByCopy(td2, &noopQuotaProvider{})
require.NoError(t, err)
tr2, err := newTableReader(ti2, tableReaderAtFromBytes(td2), fileBlockSize)
require.NoError(t, err)
@@ -168,7 +168,7 @@ func TestMemTableWrite(t *testing.T) {
require.NoError(t, err)
assert.Equal(uint32(1), count)
ti, err := parseTableIndexByCopy(data)
ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{})
require.NoError(t, err)
outReader, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
require.NoError(t, err)

View File

@@ -55,7 +55,7 @@ func init() {
}
}
func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *indexCache, fc *fdCache) (cs chunkSource, err error) {
func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *indexCache, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())
var index onHeapTableIndex
@@ -69,7 +69,12 @@ func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *index
err = unlockErr
}
}()
index, found = indexCache.get(h)
index, err = indexCache.get(h)
if err == nil {
found = true
} else if err != errCacheMiss {
return nil, err
}
}
if !found {
@@ -113,6 +118,7 @@ func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *index
}
buff := make([]byte, indexSize(chunkCount)+footerSize)
// TODO: Don't use mmap here.
func() {
var mm mmap.MMap
mm, err = mmap.MapRegion(f, length, mmap.RDONLY, 0, aligned)
@@ -133,7 +139,7 @@ func newMmapTableReader(dir string, h addr, chunkCount uint32, indexCache *index
return onHeapTableIndex{}, err
}
ti, err = parseTableIndex(buff)
ti, err = parseTableIndex(buff, q)
if err != nil {
return

View File

@@ -52,7 +52,7 @@ func TestMmapTableReader(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, h.String()), tableData, 0666)
require.NoError(t, err)
trc, err := newMmapTableReader(dir, h, uint32(len(chunks)), nil, fc)
trc, err := newMmapTableReader(dir, h, uint32(len(chunks)), nil, &noopQuotaProvider{}, fc)
require.NoError(t, err)
assertChunksInReader(chunks, trc, assert)
}

View File

@@ -31,7 +31,7 @@ import (
func TestPersistingChunkStoreEmpty(t *testing.T) {
mt := newMemTable(testMemTableSize)
ccs := newPersistingChunkSource(context.Background(), mt, nil, newFakeTablePersister(), make(chan struct{}, 1), &Stats{})
ccs := newPersistingChunkSource(context.Background(), mt, nil, newFakeTablePersister(&noopQuotaProvider{}), make(chan struct{}, 1), &Stats{})
h, err := ccs.hash()
require.NoError(t, err)
@@ -58,7 +58,7 @@ func TestPersistingChunkStore(t *testing.T) {
}
trigger := make(chan struct{})
ccs := newPersistingChunkSource(context.Background(), mt, nil, pausingFakeTablePersister{newFakeTablePersister(), trigger}, make(chan struct{}, 1), &Stats{})
ccs := newPersistingChunkSource(context.Background(), mt, nil, pausingFakeTablePersister{newFakeTablePersister(&noopQuotaProvider{}), trigger}, make(chan struct{}, 1), &Stats{})
assertChunksInReader(testChunks, ccs, assert)
assert.EqualValues(mustUint32(mt.count()), mustUint32(ccs.getReader().count()))

View File

@@ -34,6 +34,9 @@ func NewUnlimitedMemQuotaProvider() *UnlimitedQuotaProvider {
return &UnlimitedQuotaProvider{}
}
type noopQuotaProvider struct {
}
func (q *UnlimitedQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error {
q.mu.Lock()
defer q.mu.Unlock()
@@ -56,3 +59,15 @@ func (q *UnlimitedQuotaProvider) Usage() uint64 {
defer q.mu.Unlock()
return q.used
}
func (q *noopQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error {
return nil
}
func (q *noopQuotaProvider) ReleaseQuota(memory uint64) error {
return nil
}
func (q *noopQuotaProvider) Usage() uint64 {
return 0
}

View File

@@ -176,11 +176,11 @@ func TestChunkStoreManifestFirstWriteByOtherProcess(t *testing.T) {
assert := assert.New(t)
fm := &fakeManifest{}
mm := manifestManager{fm, newManifestCache(0), newManifestLocks()}
p := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
// Simulate another process writing a manifest behind store's back.
newRoot, chunks, err := interloperWrite(fm, p, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
@@ -227,11 +227,12 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) {
assert := assert.New(t)
fm := &fakeManifest{}
mm := manifestManager{fm, newManifestCache(defaultManifestCacheSize), newManifestLocks()}
p := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
c := inlineConjoiner{defaultMaxTables}
store, err := newNomsBlockStore(context.Background(), constants.Format718String, mm, p, q, c, defaultMemTableSize)
@@ -278,11 +279,11 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) {
fm := &fakeManifest{name: "foo"}
upm := &updatePreemptManifest{manifest: fm}
mm := manifestManager{upm, newManifestCache(defaultManifestCacheSize), newManifestLocks()}
p := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
c := inlineConjoiner{defaultMaxTables}
store, err := newNomsBlockStore(context.Background(), constants.Format718String, mm, p, q, c, defaultMemTableSize)
@@ -325,11 +326,12 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
upm := &updatePreemptManifest{manifest: fm}
mc := newManifestCache(defaultManifestCacheSize)
l := newManifestLocks()
p := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
c := inlineConjoiner{defaultMaxTables}
store, err := newNomsBlockStore(context.Background(), constants.Format718String, manifestManager{upm, mc, l}, p, q, c, defaultMemTableSize)
@@ -391,8 +393,8 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
func makeStoreWithFakes(t *testing.T) (fm *fakeManifest, p tablePersister, q MemoryQuotaProvider, store *NomsBlockStore) {
fm = &fakeManifest{}
mm := manifestManager{fm, newManifestCache(0), newManifestLocks()}
p = newFakeTablePersister()
q = NewUnlimitedMemQuotaProvider()
p = newFakeTablePersister(q)
store, err := newNomsBlockStore(context.Background(), constants.Format718String, mm, p, q, inlineConjoiner{defaultMaxTables}, 0)
require.NoError(t, err)
return
@@ -489,15 +491,16 @@ func (fm *fakeManifest) set(version string, lock addr, root hash.Hash, specs, ap
}
}
func newFakeTableSet() tableSet {
return tableSet{p: newFakeTablePersister(), q: NewUnlimitedMemQuotaProvider(), rl: make(chan struct{}, 1)}
func newFakeTableSet(q MemoryQuotaProvider) tableSet {
return tableSet{p: newFakeTablePersister(q), q: NewUnlimitedMemQuotaProvider(), rl: make(chan struct{}, 1)}
}
func newFakeTablePersister() tablePersister {
return fakeTablePersister{map[addr]tableReader{}, &sync.RWMutex{}}
func newFakeTablePersister(q MemoryQuotaProvider) tablePersister {
return fakeTablePersister{q, map[addr]tableReader{}, &sync.RWMutex{}}
}
type fakeTablePersister struct {
q MemoryQuotaProvider
sources map[addr]tableReader
mu *sync.RWMutex
}
@@ -515,7 +518,7 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c
if chunkCount > 0 {
ftp.mu.Lock()
defer ftp.mu.Unlock()
ti, err := parseTableIndexByCopy(data)
ti, err := parseTableIndexByCopy(data, ftp.q)
if err != nil {
return nil, err
@@ -542,7 +545,7 @@ func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSourc
if chunkCount > 0 {
ftp.mu.Lock()
defer ftp.mu.Unlock()
ti, err := parseTableIndexByCopy(data)
ti, err := parseTableIndexByCopy(data, ftp.q)
if err != nil {
return nil, err

View File

@@ -76,7 +76,7 @@ func (m *fakeS3) readerForTable(name addr) (chunkReader, error) {
m.mu.Lock()
defer m.mu.Unlock()
if buff, present := m.data[name.String()]; present {
ti, err := parseTableIndexByCopy(buff)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
if err != nil {
return nil, err
@@ -98,7 +98,7 @@ func (m *fakeS3) readerForTableWithNamespace(ns string, name addr) (chunkReader,
key = ns + "/" + key
}
if buff, present := m.data[key]; present {
ti, err := parseTableIndexByCopy(buff)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
if err != nil {
return nil, err

View File

@@ -455,13 +455,7 @@ func NewAWSStoreWithMMapIndex(ctx context.Context, nbfVerStr string, table, ns,
awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks},
globalIndexCache,
ns,
func(bs []byte) (tableIndex, error) {
ohi, err := parseTableIndex(bs)
if err != nil {
return nil, err
}
return newMmapTableIndex(ohi, nil)
},
q,
}
mm := makeManifestManager(newDynamoManifest(table, ns, ddb))
return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize)
@@ -478,9 +472,7 @@ func NewAWSStore(ctx context.Context, nbfVerStr string, table, ns, bucket string
awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks},
globalIndexCache,
ns,
func(bs []byte) (tableIndex, error) {
return parseTableIndex(bs)
},
q,
}
mm := makeManifestManager(newDynamoManifest(table, ns, ddb))
return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize)
@@ -500,7 +492,7 @@ func NewBSStore(ctx context.Context, nbfVerStr string, bs blobstore.Blobstore, m
mm := makeManifestManager(blobstoreManifest{"manifest", bs})
p := &blobstorePersister{bs, s3BlockSize, globalIndexCache}
p := &blobstorePersister{bs, s3BlockSize, globalIndexCache, q}
return newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{defaultMaxTables}, memTableSize)
}
@@ -523,7 +515,7 @@ func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi
}
mm := makeManifestManager(m)
p := newFSTablePersister(dir, globalFDCache, globalIndexCache)
p := newFSTablePersister(dir, globalFDCache, globalIndexCache, q)
nbs, err := newNomsBlockStore(ctx, nbfVerStr, mm, p, q, inlineConjoiner{maxTables}, memTableSize)
if err != nil {

View File

@@ -38,7 +38,7 @@ import (
"github.com/dolthub/dolt/go/store/util/tempfiles"
)
func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, nomsDir string) {
func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, nomsDir string, q MemoryQuotaProvider) {
ctx := context.Background()
nomsDir = filepath.Join(tempfiles.MovableTempFileProvider.GetTempDir(), "noms_"+uuid.New().String()[:8])
err := os.MkdirAll(nomsDir, os.ModePerm)
@@ -48,9 +48,10 @@ func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, no
_, err = fileManifest{nomsDir}.Update(ctx, addr{}, manifestContents{}, &Stats{}, nil)
require.NoError(t, err)
st, err = newLocalStore(ctx, types.Format_Default.VersionString(), nomsDir, defaultMemTableSize, maxTableFiles, NewUnlimitedMemQuotaProvider())
q = NewUnlimitedMemQuotaProvider()
st, err = newLocalStore(ctx, types.Format_Default.VersionString(), nomsDir, defaultMemTableSize, maxTableFiles, q)
require.NoError(t, err)
return st, nomsDir
return st, nomsDir, q
}
type fileToData map[string][]byte
@@ -83,7 +84,13 @@ func TestNBSAsTableFileStore(t *testing.T) {
numTableFiles := 128
assert.Greater(t, defaultMaxTables, numTableFiles)
st, _ := makeTestLocalStore(t, defaultMaxTables)
st, _, q := makeTestLocalStore(t, defaultMaxTables)
defer func() {
require.Equal(t, uint64(0), q.Usage())
}()
defer func() {
require.NoError(t, st.Close())
}()
fileToData := populateLocalStore(t, st, numTableFiles)
_, sources, _, err := st.Sources(ctx)
@@ -144,7 +151,7 @@ func TestNBSPruneTableFiles(t *testing.T) {
// over populate table files
numTableFiles := 64
maxTableFiles := 16
st, nomsDir := makeTestLocalStore(t, maxTableFiles)
st, nomsDir, _ := makeTestLocalStore(t, maxTableFiles)
fileToData := populateLocalStore(t, st, numTableFiles)
// add a chunk and flush to trigger a conjoin
@@ -226,7 +233,7 @@ func makeChunkSet(N, size int) (s map[hash.Hash]chunks.Chunk) {
func TestNBSCopyGC(t *testing.T) {
ctx := context.Background()
st, _ := makeTestLocalStore(t, 8)
st, _, _ := makeTestLocalStore(t, 8)
keepers := makeChunkSet(64, 64)
tossers := makeChunkSet(64, 64)

View File

@@ -96,7 +96,7 @@ func ReadTableFooter(rd io.ReadSeeker) (chunkCount uint32, totalUncompressedData
// parses a valid nbs tableIndex from a byte stream. |buff| must end with an NBS index
// and footer and its length and capacity must match the expected indexSize for the chunkCount specified in the footer.
// Retains the buffer and does not allocate new memory except for offsets, computes on buff in place.
func parseTableIndex(buff []byte) (onHeapTableIndex, error) {
func parseTableIndex(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) {
chunkCount, totalUncompressedData, err := ReadTableFooter(bytes.NewReader(buff))
if err != nil {
return onHeapTableIndex{}, err
@@ -106,19 +106,19 @@ func parseTableIndex(buff []byte) (onHeapTableIndex, error) {
return onHeapTableIndex{}, ErrWrongBufferSize
}
buff = buff[:len(buff)-footerSize]
return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData)
return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData, q)
}
// parseTableIndexByCopy reads the footer, copies indexSize(chunkCount) bytes, and parses an on heap table index.
// Useful to create an onHeapTableIndex without retaining the entire underlying array of data.
func parseTableIndexByCopy(buff []byte) (onHeapTableIndex, error) {
func parseTableIndexByCopy(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) {
r := bytes.NewReader(buff)
return ReadTableIndexByCopy(r)
return ReadTableIndexByCopy(r, q)
}
// ReadTableIndexByCopy loads an index into memory from an io.ReadSeeker
// Caution: Allocates new memory for entire index
func ReadTableIndexByCopy(rd io.ReadSeeker) (onHeapTableIndex, error) {
func ReadTableIndexByCopy(rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) {
chunkCount, totalUncompressedData, err := ReadTableFooter(rd)
if err != nil {
return onHeapTableIndex{}, err
@@ -134,10 +134,12 @@ func ReadTableIndexByCopy(rd io.ReadSeeker) (onHeapTableIndex, error) {
return onHeapTableIndex{}, err
}
return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData)
return NewOnHeapTableIndex(buff, chunkCount, totalUncompressedData, q)
}
type onHeapTableIndex struct {
q MemoryQuotaProvider
refCnt *int32
tableFileSize uint64
// Tuple bytes
tupleB []byte
@@ -152,7 +154,7 @@ type onHeapTableIndex struct {
var _ tableIndex = &onHeapTableIndex{}
// NewOnHeapTableIndex creates a table index given a buffer of just the table index (no footer)
func NewOnHeapTableIndex(b []byte, chunkCount uint32, totalUncompressedData uint64) (onHeapTableIndex, error) {
func NewOnHeapTableIndex(b []byte, chunkCount uint32, totalUncompressedData uint64, q MemoryQuotaProvider) (onHeapTableIndex, error) {
tuples := b[:prefixTupleSize*chunkCount]
lengths := b[prefixTupleSize*chunkCount : prefixTupleSize*chunkCount+lengthSize*chunkCount]
suffixes := b[prefixTupleSize*chunkCount+lengthSize*chunkCount:]
@@ -169,7 +171,12 @@ func NewOnHeapTableIndex(b []byte, chunkCount uint32, totalUncompressedData uint
store half the offsets and then allocate an additional len(lengths) to store the rest.
*/
refCnt := new(int32)
*refCnt = 1
return onHeapTableIndex{
refCnt: refCnt,
q: q,
tupleB: tuples,
offsetB: offsets,
suffixB: suffixes,
@@ -328,10 +335,20 @@ func (ti onHeapTableIndex) TotalUncompressedData() uint64 {
}
func (ti onHeapTableIndex) Close() error {
cnt := atomic.AddInt32(ti.refCnt, -1)
if cnt == 0 {
return ti.q.ReleaseQuota(memSize(ti.chunkCount))
}
if cnt < 0 {
panic("Close() called and reduced ref count to < 0.")
}
return nil
}
func (ti onHeapTableIndex) Clone() (tableIndex, error) {
_ = atomic.AddInt32(ti.refCnt, 1)
// We allow Closed onHeapTableIndex's to be Cloned because we may pull a
// closed index from the singleton indexCache.
return ti, nil
}

View File

@@ -29,7 +29,7 @@ func TestParseTableIndex(t *testing.T) {
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(t, err)
idx, err := parseTableIndexByCopy(bs)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
require.NoError(t, err)
defer idx.Close()
assert.Equal(t, uint32(596), idx.ChunkCount())
@@ -55,7 +55,7 @@ func TestMMapIndex(t *testing.T) {
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(t, err)
idx, err := parseTableIndexByCopy(bs)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
require.NoError(t, err)
defer idx.Close()
mmidx, err := newMmapTableIndex(idx, nil)

View File

@@ -34,6 +34,8 @@ import (
"github.com/dolthub/dolt/go/store/util/sizecache"
)
var errCacheMiss = errors.New("index cache miss")
// tablePersister allows interaction with persistent storage. It provides
// primitives for pushing the contents of a memTable to persistent storage,
// opening persistent tables for reading, and conjoining a number of existing
@@ -101,11 +103,18 @@ func (sic *indexCache) unlockEntry(name addr) error {
return nil
}
func (sic *indexCache) get(name addr) (onHeapTableIndex, bool) {
// get returns errCacheMiss if the index with |name| was not found in the cache. It always retuns a Clone of the index.
func (sic *indexCache) get(name addr) (onHeapTableIndex, error) {
if idx, found := sic.cache.Get(name); found {
return idx.(onHeapTableIndex), true
index := idx.(onHeapTableIndex)
index2, err := index.Clone()
if err != nil {
return onHeapTableIndex{}, err
}
return index2.(onHeapTableIndex), nil
}
return onHeapTableIndex{}, false
return onHeapTableIndex{}, errCacheMiss
}
func (sic *indexCache) put(name addr, idx onHeapTableIndex) {

View File

@@ -45,7 +45,7 @@ func TestPlanCompaction(t *testing.T) {
}
data, name, err := buildTable(content)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(data)
ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
require.NoError(t, err)
@@ -63,7 +63,7 @@ func TestPlanCompaction(t *testing.T) {
totalChunks += mustUint32(src.count())
}
idx, err := parseTableIndex(plan.mergedIndex)
idx, err := parseTableIndex(plan.mergedIndex, &noopQuotaProvider{})
require.NoError(t, err)
assert.Equal(totalChunks, idx.chunkCount)
@@ -75,3 +75,22 @@ func TestPlanCompaction(t *testing.T) {
assertChunksInReader(content, tr, assert)
}
}
func TestIndexCacheClonesIndicesOnGet(t *testing.T) {
chunks := [][]byte{
[]byte("hello2"),
[]byte("goodbye2"),
[]byte("badbye2"),
}
indexBytes, h, _ := buildTable(chunks)
index, err := parseTableIndexByCopy(indexBytes, &noopQuotaProvider{})
require.NoError(t, err)
require.Equal(t, int32(1), *index.refCnt)
indexCache := newIndexCache(1024)
indexCache.put(h, index)
index2, err := indexCache.get(h)
require.NoError(t, err)
require.Equal(t, int32(2), *index2.refCnt)
}

View File

@@ -336,12 +336,7 @@ func (ts tableSet) Close() error {
setErr(err)
}
for _, t := range ts.upstream {
memSize, err := getCSMemSize(t)
err = ts.q.ReleaseQuota(memSize)
if err != nil {
return err
}
err = t.Close()
err := t.Close()
setErr(err)
}
return firstErr
@@ -416,12 +411,6 @@ func (ts tableSet) Flatten(ctx context.Context) (tableSet, error) {
return tableSet{}, err
}
// TODO: acquire quota when we persist a memory table instead
err = ts.q.AcquireQuota(ctx, memSize(cnt))
if err != nil {
return tableSet{}, err
}
if cnt > 0 {
flattened.upstream = append(flattened.upstream, src)
}
@@ -491,16 +480,6 @@ func (ts tableSet) Rebase(ctx context.Context, specs []tableSpec, stats *Stats)
return
}
if spec.name == h {
memSize, err := getCSMemSize(existing)
if err != nil {
ae.SetIfError(err)
return
}
err = ts.q.AcquireQuota(ctx, memSize)
if err != nil {
ae.SetIfError(err)
return
}
c, err := existing.Clone()
if err != nil {
ae.SetIfError(err)

View File

@@ -32,7 +32,7 @@ import (
var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
func TestTableSetPrependEmpty(t *testing.T) {
ts := newFakeTableSet().Prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
ts := newFakeTableSet(&noopQuotaProvider{}).Prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
specs, err := ts.ToSpecs()
require.NoError(t, err)
assert.Empty(t, specs)
@@ -40,7 +40,7 @@ func TestTableSetPrependEmpty(t *testing.T) {
func TestTableSetPrepend(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet()
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -65,7 +65,7 @@ func TestTableSetPrepend(t *testing.T) {
func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet()
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -88,7 +88,7 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
func TestTableSetFlattenExcludesEmptyTable(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet()
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -111,7 +111,7 @@ func TestTableSetFlattenExcludesEmptyTable(t *testing.T) {
func TestTableSetExtract(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet()
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -146,7 +146,11 @@ func TestTableSetExtract(t *testing.T) {
func TestTableSetRebase(t *testing.T) {
assert := assert.New(t)
persister := newFakeTablePersister()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
persister := newFakeTablePersister(q)
insert := func(ts tableSet, chunks ...[]byte) tableSet {
for _, c := range chunks {
@@ -156,10 +160,6 @@ func TestTableSetRebase(t *testing.T) {
}
return ts
}
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
fullTS := newTableSet(persister, q)
defer func() {
@@ -193,7 +193,7 @@ func TestTableSetRebase(t *testing.T) {
func TestTableSetPhysicalLen(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet()
ts := newFakeTableSet(&noopQuotaProvider{})
specs, err := ts.ToSpecs()
require.NoError(t, err)
assert.Empty(specs)

View File

@@ -77,7 +77,7 @@ func TestSimple(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -124,7 +124,7 @@ func TestHasMany(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -175,7 +175,7 @@ func TestHasManySequentialPrefix(t *testing.T) {
require.NoError(t, err)
buff = buff[:length]
ti, err := parseTableIndexByCopy(buff)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -204,7 +204,7 @@ func TestGetMany(t *testing.T) {
tableData, _, err := buildTable(data)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -238,7 +238,7 @@ func TestCalcReads(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), 0)
require.NoError(t, err)
@@ -275,7 +275,7 @@ func TestExtract(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -314,7 +314,7 @@ func Test65k(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -367,7 +367,7 @@ func doTestNGetMany(t *testing.T, count int) {
tableData, _, err := buildTable(data)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)

View File

@@ -25,7 +25,7 @@ import (
)
func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err error)) error {
idx, err := ReadTableIndexByCopy(rd)
idx, err := ReadTableIndexByCopy(rd, &noopQuotaProvider{})
if err != nil {
return err
}
@@ -69,7 +69,7 @@ func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err er
}
func GetTableIndexPrefixes(rd io.ReadSeeker) (prefixes []uint64, err error) {
idx, err := ReadTableIndexByCopy(rd)
idx, err := ReadTableIndexByCopy(rd, &noopQuotaProvider{})
if err != nil {
return nil, err
}