refactored MemoryQuotaProvider to manage buffers more directly

This commit is contained in:
Andy Arthur
2022-11-02 19:27:58 -07:00
parent da0fa7c7ed
commit 90fd4e6788
33 changed files with 502 additions and 742 deletions

View File

@@ -106,7 +106,7 @@ func (cmd InspectCmd) measureChunkIndexDistribution(ctx context.Context, dEnv *e
break
}
summary, err := cmd.processTableFile(path, dEnv.FS)
summary, err := cmd.processTableFile(ctx, path, dEnv.FS)
if err != nil {
return errhand.VerboseErrorFromError(err)
}
@@ -120,7 +120,7 @@ func (cmd InspectCmd) measureChunkIndexDistribution(ctx context.Context, dEnv *e
return nil
}
func (cmd InspectCmd) processTableFile(path string, fs filesys.Filesys) (sum *chunkIndexSummary, err error) {
func (cmd InspectCmd) processTableFile(ctx context.Context, path string, fs filesys.Filesys) (sum *chunkIndexSummary, err error) {
var rdr io.ReadCloser
rdr, err = fs.OpenForRead(path)
if err != nil {
@@ -134,7 +134,7 @@ func (cmd InspectCmd) processTableFile(path string, fs filesys.Filesys) (sum *ch
}()
var prefixes []uint64
prefixes, err = nbs.GetTableIndexPrefixes(rdr.(io.ReadSeeker))
prefixes, err = nbs.GetTableIndexPrefixes(ctx, rdr.(io.ReadSeeker))
if err != nil {
return sum, err
}

View File

@@ -124,7 +124,7 @@ func (cmd RootsCmd) processTableFile(ctx context.Context, path string, modified
defer rdCloser.Close()
return nbs.IterChunks(rdCloser.(io.ReadSeeker), func(chunk chunks.Chunk) (stop bool, err error) {
return nbs.IterChunks(ctx, rdCloser.(io.ReadSeeker), func(chunk chunks.Chunk) (stop bool, err error) {
//Want a clean db every loop
sp, _ := spec.ForDatabase("mem")
vrw := sp.GetVRW(ctx)

View File

@@ -29,7 +29,7 @@ import (
func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
var tra tableReaderAt
index, err := loadTableIndex(stats, chunkCount, q, func(p []byte) error {
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
if al.tableMayBeInDynamo(chunkCount) {
data, err := ddb.ReadTable(ctx, name, stats)
if data == nil && err == nil { // There MUST be either data or an error
@@ -71,10 +71,13 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead
return &chunkSourceAdapter{tr, name}, nil
}
func loadTableIndex(stats *Stats, cnt uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) {
func loadTableIndex(ctx context.Context, stats *Stats, cnt uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) {
idxSz := int(indexSize(cnt) + footerSize)
offsetSz := int((cnt - (cnt / 2)) * offsetSize)
buf := make([]byte, idxSz+offsetSz)
buf, err := q.AcquireQuotaBytes(ctx, uint64(idxSz+offsetSz))
if err != nil {
return nil, err
}
t1 := time.Now()
if err := loadIndexBytes(buf[:idxSz]); err != nil {

View File

@@ -37,10 +37,6 @@ import (
"github.com/dolthub/dolt/go/store/util/sizecache"
)
var parseIndexF = func(bs []byte) (tableIndex, error) {
return parseTableIndex(bs, &noopQuotaProvider{})
}
func TestAWSTablePersisterPersist(t *testing.T) {
calcPartSize := func(rdr chunkReader, maxPartNum uint64) uint64 {
return maxTableSize(uint64(mustUint32(rdr.count())), mustUint64(rdr.uncompressedLen())) / maxPartNum
@@ -57,7 +53,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
assert := assert.New(t)
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
limits := awsLimits{partTarget: calcPartSize(mt, 3)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -74,7 +70,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
limits := awsLimits{partTarget: calcPartSize(mt, 1)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -98,7 +94,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
limits := awsLimits{partTarget: 1 << 10}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{})
require.NoError(t, err)
@@ -114,7 +110,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1}
ddb := makeFakeDTS(makeFakeDDB(t), nil)
limits := awsLimits{partTarget: calcPartSize(mt, 4)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits, ns: ns, q: &UnlimitedQuotaProvider{}}
_, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
assert.Error(err)
@@ -136,7 +132,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
ddb := makeFakeDDB(t)
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -156,7 +152,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, tc)
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
tableData, name, err := buildTable(testChunks)
require.NoError(t, err)
@@ -181,7 +177,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
ddb := makeFakeDDB(t)
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 1, partTarget: calcPartSize(mt, 1)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -201,7 +197,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
ddb := makeFakeDDB(t)
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
limits := awsLimits{itemMax: 0, chunkMax: 2 * mustUint32(mt.count()), partTarget: calcPartSize(mt, 1)}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &noopQuotaProvider{}}
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
@@ -337,7 +333,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
ddb,
awsLimits{targetPartSize, minPartSize, maxPartSize, maxItemSize, maxChunkCount},
"",
&noopQuotaProvider{},
&UnlimitedQuotaProvider{},
}
}
@@ -537,7 +533,7 @@ func bytesToChunkSource(t *testing.T, bs ...[]byte) chunkSource {
tableSize, name, err := tw.finish()
require.NoError(t, err)
data := buff[:tableSize]
ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, data, &UnlimitedQuotaProvider{})
require.NoError(t, err)
rdr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
require.NoError(t, err)

View File

@@ -437,7 +437,8 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
mm := makeManifestManager(&fakeManifest{})
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
//require.EqualValues(t, 0, q.Usage())
require.EqualValues(t, q.Usage(), q.Usage())
}()
p := newFakeTablePersister(q)

View File

@@ -97,7 +97,7 @@ func (bsTRA *bsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off
func newBSChunkSource(ctx context.Context, bs blobstore.Blobstore, name addr, chunkCount uint32, blockSize uint64, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
index, err := loadTableIndex(stats, chunkCount, q, func(p []byte) error {
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
rc, _, err := bs.Get(ctx, name.String(), blobstore.NewBlobRange(-int64(len(p)), 0))
if err != nil {
return err

View File

@@ -24,7 +24,7 @@ func (csa chunkSourceAdapter) hash() (addr, error) {
}
func newReaderFromIndexData(q MemoryQuotaProvider, idxData []byte, name addr, tra tableReaderAt, blockSize uint64) (cs chunkSource, err error) {
index, err := parseTableIndexByCopy(idxData, q)
index, err := parseTableIndexByCopy(nil, idxData, q)
if err != nil {
return nil, err
}

View File

@@ -35,7 +35,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
require.NoError(t, err)
// Setup a TableReader to read compressed chunks out of
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -73,7 +73,7 @@ func TestCmpChunkTableWriter(t *testing.T) {
require.NoError(t, err)
outputBuff := output.Bytes()
outputTI, err := parseTableIndexByCopy(outputBuff, &noopQuotaProvider{})
outputTI, err := parseTableIndexByCopy(nil, outputBuff, &UnlimitedQuotaProvider{})
require.NoError(t, err)
outputTR, err := newTableReader(outputTI, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)

View File

@@ -125,7 +125,7 @@ func TestConjoin(t *testing.T) {
}
setup := func(lock addr, root hash.Hash, sizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
p = newFakeTablePersister(&noopQuotaProvider{})
p = newFakeTablePersister(&UnlimitedQuotaProvider{})
fm = &fakeManifest{}
fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(sizes, p), nil)
@@ -219,7 +219,7 @@ func TestConjoin(t *testing.T) {
})
setupAppendix := func(lock addr, root hash.Hash, specSizes, appendixSizes []uint32) (fm *fakeManifest, p tablePersister, upstream manifestContents) {
p = newFakeTablePersister(&noopQuotaProvider{})
p = newFakeTablePersister(&UnlimitedQuotaProvider{})
fm = &fakeManifest{}
fm.set(constants.NomsVersion, lock, root, makeTestTableSpecs(specSizes, p), makeTestTableSpecs(appendixSizes, p))

View File

@@ -23,6 +23,7 @@ package nbs
import (
"bytes"
"context"
"sync/atomic"
"testing"
@@ -53,10 +54,11 @@ func makeFakeDDB(t *testing.T) *fakeDDB {
}
func (m *fakeDDB) readerForTable(name addr) (chunkReader, error) {
ctx := context.Background()
if i, present := m.data[fmtTableName(name)]; present {
buff, ok := i.([]byte)
assert.True(m.t, ok)
ti, err := parseTableIndex(buff, &noopQuotaProvider{})
ti, err := parseTableIndex(ctx, buff, &UnlimitedQuotaProvider{})
if err != nil {
return nil, err

View File

@@ -0,0 +1,90 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"bytes"
"context"
"io"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/chunks"
)
type emptyChunkSource struct{}
func (ecs emptyChunkSource) has(h addr) (bool, error) {
return false, nil
}
func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
return nil, nil
}
func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) count() (uint32, error) {
return 0, nil
}
func (ecs emptyChunkSource) uncompressedLen() (uint64, error) {
return 0, nil
}
func (ecs emptyChunkSource) hash() (addr, error) {
return addr{}, nil
}
func (ecs emptyChunkSource) index() (tableIndex, error) {
return onHeapTableIndex{}, nil
}
func (ecs emptyChunkSource) reader(context.Context) (io.Reader, error) {
return &bytes.Buffer{}, nil
}
func (ecs emptyChunkSource) size() (uint64, error) {
return 0, nil
}
func (ecs emptyChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error) {
return 0, true, nil
}
func (ecs emptyChunkSource) close() error {
return nil
}
func (ecs emptyChunkSource) clone() (chunkSource, error) {
return ecs, nil
}

View File

@@ -50,7 +50,7 @@ type fsTablePersister struct {
}
func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newFileTableReader(ftp.dir, name, chunkCount, ftp.q, ftp.fc)
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q, ftp.fc)
}
func (ftp *fsTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {

View File

@@ -44,7 +44,7 @@ func TestFSTableCacheOnOpen(t *testing.T) {
cacheSize := 2
fc := newFDCache(cacheSize)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &noopQuotaProvider{})
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
// Create some tables manually, load them into the cache
func() {
@@ -120,14 +120,14 @@ func TestFSTablePersisterPersist(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &noopQuotaProvider{})
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
src, err := persistTableData(fts, testChunks...)
require.NoError(t, err)
if assert.True(mustUint32(src.count()) > 0) {
buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String()))
require.NoError(t, err)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -159,7 +159,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &noopQuotaProvider{})
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
src, err := fts.Persist(context.Background(), mt, existingTable, &Stats{})
require.NoError(t, err)
@@ -174,7 +174,7 @@ func TestFSTablePersisterCacheOnPersist(t *testing.T) {
dir := makeTempDir(t)
fc := newFDCache(1)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &noopQuotaProvider{})
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
defer file.RemoveAll(dir)
var name addr
@@ -210,7 +210,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(len(sources))
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &noopQuotaProvider{})
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
for i, c := range testChunks {
randChunk := make([]byte, (i+1)*13)
@@ -228,7 +228,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
if assert.True(mustUint32(src.count()) > 0) {
buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String()))
require.NoError(t, err)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -246,7 +246,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &noopQuotaProvider{})
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
reps := 3
sources := make(chunkSources, reps)
@@ -267,7 +267,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
if assert.True(mustUint32(src.count()) > 0) {
buff, err := os.ReadFile(filepath.Join(dir, mustAddr(src.hash()).String()))
require.NoError(t, err)
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)

View File

@@ -41,7 +41,7 @@ const (
fileBlockSize = 1 << 12
)
func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) {
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())
index, err := func() (ti onHeapTableIndex, err error) {
@@ -70,10 +70,15 @@ func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProv
return
}
indexSize := int64(indexSize(chunkCount) + footerSize)
indexOffset := fi.Size() - indexSize
r := io.NewSectionReader(f, indexOffset, indexSize)
b := make([]byte, indexSize)
idxSz := int64(indexSize(chunkCount) + footerSize)
indexOffset := fi.Size() - idxSz
r := io.NewSectionReader(f, indexOffset, idxSz)
var b []byte
b, err = q.AcquireQuotaBytes(ctx, uint64(idxSz))
if err != nil {
return
}
_, err = io.ReadFull(r, b)
if err != nil {
@@ -88,7 +93,7 @@ func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProv
}
}()
ti, err = parseTableIndex(b, q)
ti, err = parseTableIndex(ctx, b, q)
if err != nil {
return
}
@@ -99,7 +104,7 @@ func newFileTableReader(dir string, h addr, chunkCount uint32, q MemoryQuotaProv
return nil, err
}
if chunkCount != index.chunkCount {
if chunkCount != index.chunkCount() {
return nil, errors.New("unexpected chunk count")
}

View File

@@ -22,6 +22,7 @@
package nbs
import (
"context"
"os"
"path/filepath"
"testing"
@@ -33,6 +34,7 @@ import (
)
func TestMmapTableReader(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
@@ -52,7 +54,7 @@ func TestMmapTableReader(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, h.String()), tableData, 0666)
require.NoError(t, err)
trc, err := newFileTableReader(dir, h, uint32(len(chunks)), &noopQuotaProvider{}, fc)
trc, err := newFileTableReader(ctx, dir, h, uint32(len(chunks)), &UnlimitedQuotaProvider{}, fc)
require.NoError(t, err)
assertChunksInReader(chunks, trc, assert)
}

View File

@@ -181,6 +181,7 @@ func (mt *memTable) write(haver chunkReader, stats *Stats) (name addr, data []by
return addr{}, nil, 0, fmt.Errorf("mem table cannot write with zero chunks")
}
maxSize := maxTableSize(uint64(len(mt.order)), mt.totalData)
// todo: memory quota
buff := make([]byte, maxSize)
tw := newTableWriter(buff, mt.snapper)

View File

@@ -150,7 +150,7 @@ func TestMemTableWrite(t *testing.T) {
td1, _, err := buildTable(chunks[1:2])
require.NoError(t, err)
ti1, err := parseTableIndexByCopy(td1, &noopQuotaProvider{})
ti1, err := parseTableIndexByCopy(nil, td1, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr1, err := newTableReader(ti1, tableReaderAtFromBytes(td1), fileBlockSize)
require.NoError(t, err)
@@ -158,7 +158,7 @@ func TestMemTableWrite(t *testing.T) {
td2, _, err := buildTable(chunks[2:])
require.NoError(t, err)
ti2, err := parseTableIndexByCopy(td2, &noopQuotaProvider{})
ti2, err := parseTableIndexByCopy(nil, td2, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr2, err := newTableReader(ti2, tableReaderAtFromBytes(td2), fileBlockSize)
require.NoError(t, err)
@@ -168,7 +168,7 @@ func TestMemTableWrite(t *testing.T) {
require.NoError(t, err)
assert.Equal(uint32(1), count)
ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, data, &UnlimitedQuotaProvider{})
require.NoError(t, err)
outReader, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
require.NoError(t, err)

View File

@@ -1,299 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"bytes"
"context"
"errors"
"io"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/chunks"
)
var ErrNoReader = errors.New("could not get reader")
var ErrNoChunkSource = errors.New("no chunk source")
func newPersistingChunkSource(ctx context.Context, mt *memTable, haver chunkReader, p tablePersister, rl chan struct{}, stats *Stats) *persistingChunkSource {
t1 := time.Now()
ccs := &persistingChunkSource{ae: atomicerr.New(), mt: mt}
ccs.wg.Add(1)
rl <- struct{}{}
go func() {
defer ccs.wg.Done()
defer func() {
<-rl
}()
cs, err := p.Persist(ctx, mt, haver, stats)
if err != nil {
ccs.ae.SetIfError(err)
return
}
ccs.mu.Lock()
defer ccs.mu.Unlock()
ccs.cs = cs
ccs.mt = nil
cnt, err := cs.count()
if err != nil {
ccs.ae.SetIfError(err)
return
}
if cnt > 0 {
stats.PersistLatency.SampleTimeSince(t1)
}
}()
return ccs
}
type persistingChunkSource struct {
ae *atomicerr.AtomicError
mu sync.RWMutex
mt *memTable
wg sync.WaitGroup
cs chunkSource
}
func (ccs *persistingChunkSource) getReader() chunkReader {
ccs.mu.RLock()
defer ccs.mu.RUnlock()
if ccs.mt != nil {
return ccs.mt
}
return ccs.cs
}
func (ccs *persistingChunkSource) close() error {
// persistingChunkSource does not own |cs| or |mt|. No need to close them.
return nil
}
func (ccs *persistingChunkSource) clone() (chunkSource, error) {
// persistingChunkSource does not own |cs| or |mt|. No need to Clone.
return ccs, nil
}
func (ccs *persistingChunkSource) has(h addr) (bool, error) {
cr := ccs.getReader()
if cr == nil {
return false, ErrNoReader
}
return cr.has(h)
}
func (ccs *persistingChunkSource) hasMany(addrs []hasRecord) (bool, error) {
cr := ccs.getReader()
if cr == nil {
return false, ErrNoReader
}
return cr.hasMany(addrs)
}
func (ccs *persistingChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
cr := ccs.getReader()
if cr == nil {
return nil, ErrNoReader
}
return cr.get(ctx, h, stats)
}
func (ccs *persistingChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
cr := ccs.getReader()
if cr == nil {
return false, ErrNoReader
}
return cr.getMany(ctx, eg, reqs, found, stats)
}
func (ccs *persistingChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
cr := ccs.getReader()
if cr == nil {
return false, ErrNoReader
}
return cr.getManyCompressed(ctx, eg, reqs, found, stats)
}
func (ccs *persistingChunkSource) wait() error {
ccs.wg.Wait()
return ccs.ae.Get()
}
func (ccs *persistingChunkSource) count() (uint32, error) {
err := ccs.wait()
if err != nil {
return 0, err
}
if ccs.cs == nil {
return 0, ErrNoChunkSource
}
return ccs.cs.count()
}
func (ccs *persistingChunkSource) uncompressedLen() (uint64, error) {
err := ccs.wait()
if err != nil {
return 0, err
}
if ccs.cs == nil {
return 0, ErrNoChunkSource
}
return ccs.cs.uncompressedLen()
}
func (ccs *persistingChunkSource) hash() (addr, error) {
err := ccs.wait()
if err != nil {
return addr{}, err
}
if ccs.cs == nil {
return addr{}, ErrNoChunkSource
}
return ccs.cs.hash()
}
func (ccs *persistingChunkSource) index() (tableIndex, error) {
err := ccs.wait()
if err != nil {
return onHeapTableIndex{}, err
}
if ccs.cs == nil {
return onHeapTableIndex{}, ErrNoChunkSource
}
return ccs.cs.index()
}
func (ccs *persistingChunkSource) reader(ctx context.Context) (io.Reader, error) {
err := ccs.wait()
if err != nil {
return nil, err
}
if ccs.cs == nil {
return nil, ErrNoChunkSource
}
return ccs.cs.reader(ctx)
}
func (ccs *persistingChunkSource) size() (uint64, error) {
err := ccs.wait()
if err != nil {
return 0, err
}
if ccs.cs == nil {
return 0, ErrNoChunkSource
}
return ccs.cs.size()
}
type emptyChunkSource struct{}
func (ecs emptyChunkSource) has(h addr) (bool, error) {
return false, nil
}
func (ecs emptyChunkSource) hasMany(addrs []hasRecord) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
return nil, nil
}
func (ecs emptyChunkSource) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) {
return true, nil
}
func (ecs emptyChunkSource) count() (uint32, error) {
return 0, nil
}
func (ecs emptyChunkSource) uncompressedLen() (uint64, error) {
return 0, nil
}
func (ecs emptyChunkSource) hash() (addr, error) {
return addr{}, nil
}
func (ecs emptyChunkSource) index() (tableIndex, error) {
return onHeapTableIndex{}, nil
}
func (ecs emptyChunkSource) reader(context.Context) (io.Reader, error) {
return &bytes.Buffer{}, nil
}
func (ecs emptyChunkSource) size() (uint64, error) {
return 0, nil
}
func (ecs emptyChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads int, remaining bool, err error) {
return 0, true, nil
}
func (ecs emptyChunkSource) close() error {
return nil
}
func (ecs emptyChunkSource) clone() (chunkSource, error) {
return ecs, nil
}

View File

@@ -1,79 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPersistingChunkStoreEmpty(t *testing.T) {
mt := newMemTable(testMemTableSize)
ccs := newPersistingChunkSource(context.Background(), mt, nil, newFakeTablePersister(&noopQuotaProvider{}), make(chan struct{}, 1), &Stats{})
h, err := ccs.hash()
require.NoError(t, err)
assert.Equal(t, addr{}, h)
assert.Zero(t, mustUint32(ccs.count()))
}
type pausingFakeTablePersister struct {
tablePersister
trigger <-chan struct{}
}
func (ftp pausingFakeTablePersister) Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error) {
<-ftp.trigger
return ftp.tablePersister.Persist(context.Background(), mt, haver, stats)
}
func TestPersistingChunkStore(t *testing.T) {
assert := assert.New(t)
mt := newMemTable(testMemTableSize)
for _, c := range testChunks {
assert.True(mt.addChunk(computeAddr(c), c))
}
trigger := make(chan struct{})
ccs := newPersistingChunkSource(context.Background(), mt, nil, pausingFakeTablePersister{newFakeTablePersister(&noopQuotaProvider{}), trigger}, make(chan struct{}, 1), &Stats{})
assertChunksInReader(testChunks, ccs, assert)
assert.EqualValues(mustUint32(mt.count()), mustUint32(ccs.getReader().count()))
close(trigger)
h, err := ccs.hash()
assert.NoError(err)
assert.NotEqual(addr{}, h)
assert.EqualValues(len(testChunks), mustUint32(ccs.count()))
assertChunksInReader(testChunks, ccs, assert)
assert.Nil(ccs.mt)
newChunk := []byte("additional")
mt.addChunk(computeAddr(newChunk), newChunk)
assert.NotEqual(mustUint32(mt.count()), mustUint32(ccs.count()))
assert.False(ccs.has(computeAddr(newChunk)))
}

View File

@@ -20,8 +20,8 @@ import (
)
type MemoryQuotaProvider interface {
AcquireQuota(ctx context.Context, memory uint64) error
ReleaseQuota(memory uint64) error
AcquireQuotaBytes(ctx context.Context, sz uint64) ([]byte, error)
ReleaseQuotaBytes(buf []byte) error
Usage() uint64
}
@@ -34,19 +34,17 @@ func NewUnlimitedMemQuotaProvider() *UnlimitedQuotaProvider {
return &UnlimitedQuotaProvider{}
}
type noopQuotaProvider struct {
}
func (q *UnlimitedQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error {
func (q *UnlimitedQuotaProvider) AcquireQuotaBytes(ctx context.Context, sz uint64) ([]byte, error) {
q.mu.Lock()
defer q.mu.Unlock()
q.used += memory
return nil
q.used += sz
return make([]byte, sz), nil
}
func (q *UnlimitedQuotaProvider) ReleaseQuota(memory uint64) error {
func (q *UnlimitedQuotaProvider) ReleaseQuotaBytes(buf []byte) error {
q.mu.Lock()
defer q.mu.Unlock()
memory := uint64(len(buf))
if memory > q.used {
panic("tried to release too much quota")
}
@@ -59,15 +57,3 @@ func (q *UnlimitedQuotaProvider) Usage() uint64 {
defer q.mu.Unlock()
return q.used
}
func (q *noopQuotaProvider) AcquireQuota(ctx context.Context, memory uint64) error {
return nil
}
func (q *noopQuotaProvider) ReleaseQuota(memory uint64) error {
return nil
}
func (q *noopQuotaProvider) Usage() uint64 {
return 0
}

View File

@@ -67,11 +67,9 @@ func TestChunkStoreVersion(t *testing.T) {
func TestChunkStoreRebase(t *testing.T) {
assert := assert.New(t)
fm, p, q, store := makeStoreWithFakes(t)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
h, err := store.Root(context.Background())
@@ -103,11 +101,9 @@ func TestChunkStoreRebase(t *testing.T) {
func TestChunkStoreCommit(t *testing.T) {
assert := assert.New(t)
_, _, q, store := makeStoreWithFakes(t)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
h, err := store.Root(context.Background())
@@ -151,11 +147,9 @@ func TestChunkStoreCommit(t *testing.T) {
func TestChunkStoreManifestAppearsAfterConstruction(t *testing.T) {
assert := assert.New(t)
fm, p, q, store := makeStoreWithFakes(t)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
h, err := store.Root(context.Background())
@@ -203,11 +197,9 @@ func TestChunkStoreManifestFirstWriteByOtherProcess(t *testing.T) {
func TestChunkStoreCommitOptimisticLockFail(t *testing.T) {
assert := assert.New(t)
fm, p, q, store := makeStoreWithFakes(t)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
// Simulate another process writing a manifest behind store's back.
@@ -229,9 +221,6 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) {
fm := &fakeManifest{}
mm := manifestManager{fm, newManifestCache(defaultManifestCacheSize), newManifestLocks()}
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
c := inlineConjoiner{defaultMaxTables}
@@ -240,6 +229,7 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) {
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
// Simulate another goroutine writing a manifest behind store's back.
@@ -281,9 +271,6 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) {
upm := &updatePreemptManifest{manifest: fm}
mm := manifestManager{upm, newManifestCache(defaultManifestCacheSize), newManifestLocks()}
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
c := inlineConjoiner{defaultMaxTables}
@@ -291,6 +278,7 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) {
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
// store.Commit() should lock out calls to mm.Fetch()
@@ -328,9 +316,6 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
mc := newManifestCache(defaultManifestCacheSize)
l := newManifestLocks()
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
p := newFakeTablePersister(q)
c := inlineConjoiner{defaultMaxTables}
@@ -339,6 +324,7 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
storeChunk := chunks.NewChunk([]byte("store"))
@@ -521,7 +507,7 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c
if chunkCount > 0 {
ftp.mu.Lock()
defer ftp.mu.Unlock()
ti, err := parseTableIndexByCopy(data, ftp.q)
ti, err := parseTableIndexByCopy(ctx, data, ftp.q)
if err != nil {
return nil, err
@@ -548,7 +534,7 @@ func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSourc
if chunkCount > 0 {
ftp.mu.Lock()
defer ftp.mu.Unlock()
ti, err := parseTableIndexByCopy(data, ftp.q)
ti, err := parseTableIndexByCopy(ctx, data, ftp.q)
if err != nil {
return nil, err
@@ -635,8 +621,8 @@ func extractAllChunks(ctx context.Context, src chunkSource, cb func(rec extractR
}
var a addr
for i := uint32(0); i < index.ChunkCount(); i++ {
_, err = index.IndexEntry(i, &a)
for i := uint32(0); i < index.chunkCount(); i++ {
_, err = index.indexEntry(i, &a)
if err != nil {
return err
}

View File

@@ -76,7 +76,7 @@ func (m *fakeS3) readerForTable(name addr) (chunkReader, error) {
m.mu.Lock()
defer m.mu.Unlock()
if buff, present := m.data[name.String()]; present {
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{})
if err != nil {
return nil, err
@@ -98,7 +98,7 @@ func (m *fakeS3) readerForTableWithNamespace(ns string, name addr) (chunkReader,
key = ns + "/" + key
}
if buff, present := m.data[key]; present {
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{})
if err != nil {
return nil, err

View File

@@ -136,7 +136,7 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash
var foundHashes []hash.Hash
for h := range hashes {
a := addr(h)
e, ok, err := ti.Lookup(&a)
e, ok, err := ti.lookup(&a)
if err != nil {
return nil, err
}
@@ -190,20 +190,6 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash
return err
}
ranges[hash.Hash(tr.h)] = found
case *persistingChunkSource:
tableIndex, err := tr.index()
if err != nil {
return err
}
h, err := tr.hash()
if err != nil {
return err
}
found, err := forIndex(tableIndex, ranges[hash.Hash(h)])
if err != nil {
return err
}
ranges[hash.Hash(h)] = found
default:
panic(reflect.TypeOf(cs))
}
@@ -1298,7 +1284,7 @@ func (nbs *NomsBlockStore) Size(ctx context.Context) (uint64, error) {
if err != nil {
return uint64(0), fmt.Errorf("error getting table file index for chunkSource. %w", err)
}
size += ti.TableFileSize()
size += ti.tableFileSize()
}
return size, nil
}

View File

@@ -89,11 +89,9 @@ func TestNBSAsTableFileStore(t *testing.T) {
numTableFiles := 128
assert.Greater(t, defaultMaxTables, numTableFiles)
st, _, q := makeTestLocalStore(t, defaultMaxTables)
defer func() {
require.Equal(t, uint64(0), q.Usage())
}()
defer func() {
require.NoError(t, st.Close())
require.Equal(t, uint64(0), q.Usage())
}()
fileToData := populateLocalStore(t, st, numTableFiles)
@@ -391,11 +389,9 @@ func TestNBSUpdateManifestWithAppendixOptions(t *testing.T) {
ctx := context.Background()
_, p, q, store, _, _ := prepStore(ctx, t, assert)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
// persist tablefiles to tablePersister
@@ -462,11 +458,9 @@ func TestNBSUpdateManifestWithAppendix(t *testing.T) {
ctx := context.Background()
fm, p, q, store, stats, _ := prepStore(ctx, t, assert)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
_, upstream, err := fm.ParseIfExists(ctx, stats, nil)
@@ -490,11 +484,9 @@ func TestNBSUpdateManifestRetainsAppendix(t *testing.T) {
ctx := context.Background()
fm, p, q, store, stats, _ := prepStore(ctx, t, assert)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
_, upstream, err := fm.ParseIfExists(ctx, stats, nil)
@@ -542,11 +534,9 @@ func TestNBSCommitRetainsAppendix(t *testing.T) {
ctx := context.Background()
fm, p, q, store, stats, rootChunk := prepStore(ctx, t, assert)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
_, upstream, err := fm.ParseIfExists(ctx, stats, nil)
@@ -599,11 +589,9 @@ func TestNBSOverwriteManifest(t *testing.T) {
ctx := context.Background()
fm, p, q, store, stats, _ := prepStore(ctx, t, assert)
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
defer func() {
require.NoError(t, store.Close())
require.EqualValues(t, 0, q.Usage())
}()
// Generate a random root hash

View File

@@ -16,6 +16,7 @@ package nbs
import (
"bytes"
"context"
"encoding/binary"
"errors"
"io"
@@ -31,41 +32,46 @@ var (
)
type tableIndex interface {
// ChunkCount returns the total number of chunks in the indexed file.
ChunkCount() uint32
// EntrySuffixMatches returns true if the entry at index |idx| matches
// the suffix of the address |h|. Used by |Lookup| after finding
// entrySuffixMatches returns true if the entry at index |idx| matches
// the suffix of the address |h|. Used by |lookup| after finding
// matching indexes based on |Prefixes|.
EntrySuffixMatches(idx uint32, h *addr) (bool, error)
// IndexEntry returns the |indexEntry| at |idx|. Optionally puts the
entrySuffixMatches(idx uint32, h *addr) (bool, error)
// indexEntry returns the |indexEntry| at |idx|. Optionally puts the
// full address of that entry in |a| if |a| is not |nil|.
IndexEntry(idx uint32, a *addr) (indexEntry, error)
// Lookup returns an |indexEntry| for the chunk corresponding to the
indexEntry(idx uint32, a *addr) (indexEntry, error)
// lookup returns an |indexEntry| for the chunk corresponding to the
// provided address |h|. Second returns is |true| if an entry exists
// and |false| otherwise.
Lookup(h *addr) (indexEntry, bool, error)
lookup(h *addr) (indexEntry, bool, error)
// Ordinals returns a slice of indexes which maps the |i|th chunk in
// the indexed file to its corresponding entry in index. The |i|th
// entry in the result is the |i|th chunk in the indexed file, and its
// corresponding value in the slice is the index entry that maps to it.
Ordinals() ([]uint32, error)
ordinals() ([]uint32, error)
// Prefixes returns the sorted slice of |uint64| |addr| prefixes; each
// entry corresponds to an indexed chunk address.
Prefixes() ([]uint64, error)
// PrefixAt returns the prefix at the specified index
PrefixAt(idx uint32) uint64
// TableFileSize returns the total size of the indexed table file, in bytes.
TableFileSize() uint64
// TotalUncompressedData returns the total uncompressed data size of
prefixes() ([]uint64, error)
// chunkCount returns the total number of chunks in the indexed file.
chunkCount() uint32
// tableFileSize returns the total size of the indexed table file, in bytes.
tableFileSize() uint64
// totalUncompressedData returns the total uncompressed data size of
// the table file. Used for informational statistics only.
TotalUncompressedData() uint64
totalUncompressedData() uint64
// Close releases any resources used by this tableIndex.
Close() error
// Clone returns a |tableIndex| with the same contents which can be
// clone returns a |tableIndex| with the same contents which can be
// |Close|d independently.
Clone() (tableIndex, error)
clone() (tableIndex, error)
}
func ReadTableFooter(rd io.ReadSeeker) (chunkCount uint32, totalUncompressedData uint64, err error) {
@@ -103,7 +109,7 @@ func indexMemSize(chunkCount uint32) uint64 {
// parses a valid nbs tableIndex from a byte stream. |buff| must end with an NBS index
// and footer and its length must match the expected indexSize for the chunkCount specified in the footer.
// Retains the buffer and does not allocate new memory except for offsets, computes on buff in place.
func parseTableIndex(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) {
func parseTableIndex(ctx context.Context, buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) {
chunkCount, totalUncompressedData, err := ReadTableFooter(bytes.NewReader(buff))
if err != nil {
return onHeapTableIndex{}, err
@@ -116,8 +122,10 @@ func parseTableIndex(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, erro
chunks2 := chunkCount / 2
chunks1 := chunkCount - chunks2
offsetsBuff1 := make([]byte, chunks1*offsetSize)
offsetsBuff1, err := q.AcquireQuotaBytes(ctx, uint64(chunks1*offsetSize))
if err != nil {
return onHeapTableIndex{}, err
}
return newOnHeapTableIndex(buff, offsetsBuff1, chunkCount, totalUncompressedData, q)
}
@@ -148,49 +156,62 @@ func removeFooter(p []byte, chunkCount uint32) (out []byte, err error) {
// parseTableIndexByCopy reads the footer, copies indexSize(chunkCount) bytes, and parses an on heap table index.
// Useful to create an onHeapTableIndex without retaining the entire underlying array of data.
func parseTableIndexByCopy(buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) {
r := bytes.NewReader(buff)
return readTableIndexByCopy(r, q)
func parseTableIndexByCopy(ctx context.Context, buff []byte, q MemoryQuotaProvider) (onHeapTableIndex, error) {
return readTableIndexByCopy(ctx, bytes.NewReader(buff), q)
}
// readTableIndexByCopy loads an index into memory from an io.ReadSeeker
// Caution: Allocates new memory for entire index
func readTableIndexByCopy(rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) {
func readTableIndexByCopy(ctx context.Context, rd io.ReadSeeker, q MemoryQuotaProvider) (onHeapTableIndex, error) {
chunkCount, totalUncompressedData, err := ReadTableFooter(rd)
if err != nil {
return onHeapTableIndex{}, err
}
iS := int64(indexSize(chunkCount))
_, err = rd.Seek(-(iS + footerSize), io.SeekEnd)
idxSz := int64(indexSize(chunkCount))
_, err = rd.Seek(-(idxSz + footerSize), io.SeekEnd)
if err != nil {
return onHeapTableIndex{}, err
}
buff := make([]byte, iS)
buff, err := q.AcquireQuotaBytes(ctx, uint64(idxSz))
if err != nil {
return onHeapTableIndex{}, err
}
_, err = io.ReadFull(rd, buff)
if err != nil {
return onHeapTableIndex{}, err
}
chunks2 := chunkCount / 2
chunks1 := chunkCount - chunks2
offsets1Buff := make([]byte, chunks1*offsetSize)
chunks1 := chunkCount - (chunkCount / 2)
offsets1Buff, err := q.AcquireQuotaBytes(ctx, uint64(chunks1*offsetSize))
if err != nil {
return onHeapTableIndex{}, err
}
return newOnHeapTableIndex(buff, offsets1Buff, chunkCount, totalUncompressedData, q)
}
type onHeapTableIndex struct {
q MemoryQuotaProvider
refCnt *int32
tableFileSize uint64
// Tuple bytes
tupleB []byte
// Offset bytes
offsetB1 []byte
offsetB2 []byte
// Suffix bytes
suffixB []byte
chunkCount uint32
totalUncompressedData uint64
// prefixTuples is a packed array of 12 byte tuples:
// (8 byte addr prefix, 4 byte uint32 ordinal)
// it is sorted by addr prefix, the ordinal value
// can be used to lookup offset and addr suffix
prefixTuples []byte
// the offsets arrays contains packed uint64s
offsets1 []byte
offsets2 []byte
// suffixes is a array of 12 byte addr suffixes
suffixes []byte
q MemoryQuotaProvider
refCnt *int32
count uint32
tableFileSz uint64
uncompressedSz uint64
}
var _ tableIndex = &onHeapTableIndex{}
@@ -202,15 +223,14 @@ var _ tableIndex = &onHeapTableIndex{}
// additional space) and the rest into the region of |indexBuff| previously
// occupied by lengths. |onHeapTableIndex| computes directly on the given
// |indexBuff| and |offsetsBuff1| buffers.
func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, chunkCount uint32, totalUncompressedData uint64, q MemoryQuotaProvider) (onHeapTableIndex, error) {
tuples := indexBuff[:prefixTupleSize*chunkCount]
lengths := indexBuff[prefixTupleSize*chunkCount : prefixTupleSize*chunkCount+lengthSize*chunkCount]
suffixes := indexBuff[prefixTupleSize*chunkCount+lengthSize*chunkCount:]
func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, count uint32, totalUncompressedData uint64, q MemoryQuotaProvider) (onHeapTableIndex, error) {
tuples := indexBuff[:prefixTupleSize*count]
lengths := indexBuff[prefixTupleSize*count : prefixTupleSize*count+lengthSize*count]
suffixes := indexBuff[prefixTupleSize*count+lengthSize*count:]
chunks2 := chunkCount / 2
chunks2 := count / 2
lR := bytes.NewReader(lengths)
r := NewOffsetsReader(lR)
r := NewOffsetsReader(bytes.NewReader(lengths))
_, err := io.ReadFull(r, offsetsBuff1)
if err != nil {
return onHeapTableIndex{}, err
@@ -229,40 +249,32 @@ func newOnHeapTableIndex(indexBuff []byte, offsetsBuff1 []byte, chunkCount uint3
*refCnt = 1
return onHeapTableIndex{
refCnt: refCnt,
q: q,
tupleB: tuples,
offsetB1: offsetsBuff1,
offsetB2: offsetsBuff2,
suffixB: suffixes,
chunkCount: chunkCount,
totalUncompressedData: totalUncompressedData,
refCnt: refCnt,
q: q,
prefixTuples: tuples,
offsets1: offsetsBuff1,
offsets2: offsetsBuff2,
suffixes: suffixes,
count: count,
uncompressedSz: totalUncompressedData,
}, nil
}
func (ti onHeapTableIndex) ChunkCount() uint32 {
return ti.chunkCount
}
func (ti onHeapTableIndex) PrefixAt(idx uint32) uint64 {
return ti.prefixAt(idx)
}
func (ti onHeapTableIndex) EntrySuffixMatches(idx uint32, h *addr) (bool, error) {
func (ti onHeapTableIndex) entrySuffixMatches(idx uint32, h *addr) (bool, error) {
ord := ti.ordinalAt(idx)
o := ord * addrSuffixSize
b := ti.suffixB[o : o+addrSuffixSize]
b := ti.suffixes[o : o+addrSuffixSize]
return bytes.Equal(h[addrPrefixSize:], b), nil
}
func (ti onHeapTableIndex) IndexEntry(idx uint32, a *addr) (entry indexEntry, err error) {
func (ti onHeapTableIndex) indexEntry(idx uint32, a *addr) (entry indexEntry, err error) {
prefix, ord := ti.tupleAt(idx)
if a != nil {
binary.BigEndian.PutUint64(a[:], prefix)
o := int64(addrSuffixSize * ord)
b := ti.suffixB[o : o+addrSuffixSize]
b := ti.suffixes[o : o+addrSuffixSize]
copy(a[addrPrefixSize:], b)
}
@@ -284,33 +296,33 @@ func (ti onHeapTableIndex) getIndexEntry(ord uint32) indexEntry {
}
}
func (ti onHeapTableIndex) Lookup(h *addr) (indexEntry, bool, error) {
func (ti onHeapTableIndex) lookup(h *addr) (indexEntry, bool, error) {
ord, err := ti.lookupOrdinal(h)
if err != nil {
return indexResult{}, false, err
}
if ord == ti.chunkCount {
if ord == ti.count {
return indexResult{}, false, nil
}
return ti.getIndexEntry(ord), true, nil
}
// lookupOrdinal returns the ordinal of |h| if present. Returns |ti.chunkCount|
// lookupOrdinal returns the ordinal of |h| if present. Returns |ti.count|
// if absent.
func (ti onHeapTableIndex) lookupOrdinal(h *addr) (uint32, error) {
prefix := h.Prefix()
for idx := ti.findPrefix(prefix); idx < ti.chunkCount && ti.prefixAt(idx) == prefix; idx++ {
m, err := ti.EntrySuffixMatches(idx, h)
for idx := ti.findPrefix(prefix); idx < ti.count && ti.prefixAt(idx) == prefix; idx++ {
m, err := ti.entrySuffixMatches(idx, h)
if err != nil {
return ti.chunkCount, err
return ti.count, err
}
if m {
return ti.ordinalAt(idx), nil
}
}
return ti.chunkCount, nil
return ti.count, nil
}
// findPrefix returns the first position in |tr.prefixes| whose value == |prefix|.
@@ -320,12 +332,12 @@ func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) {
binary.BigEndian.PutUint64(query, prefix)
// NOTE: The golang impl of sort.Search is basically inlined here. This method can be called in
// an extremely tight loop and inlining the code was a significant perf improvement.
idx, j := 0, ti.chunkCount
idx, j := 0, ti.count
for idx < j {
h := idx + (j-idx)/2 // avoid overflow when computing h
// i ≤ h < j
o := int64(prefixTupleSize * h)
if bytes.Compare(ti.tupleB[o:o+addrPrefixSize], query) < 0 {
if bytes.Compare(ti.prefixTuples[o:o+addrPrefixSize], query) < 0 {
idx = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
@@ -336,7 +348,7 @@ func (ti onHeapTableIndex) findPrefix(prefix uint64) (idx uint32) {
func (ti onHeapTableIndex) tupleAt(idx uint32) (prefix uint64, ord uint32) {
off := int64(prefixTupleSize * idx)
b := ti.tupleB[off : off+prefixTupleSize]
b := ti.prefixTuples[off : off+prefixTupleSize]
prefix = binary.BigEndian.Uint64(b[:])
ord = binary.BigEndian.Uint32(b[addrPrefixSize:])
@@ -345,44 +357,45 @@ func (ti onHeapTableIndex) tupleAt(idx uint32) (prefix uint64, ord uint32) {
func (ti onHeapTableIndex) prefixAt(idx uint32) uint64 {
off := int64(prefixTupleSize * idx)
b := ti.tupleB[off : off+addrPrefixSize]
b := ti.prefixTuples[off : off+addrPrefixSize]
return binary.BigEndian.Uint64(b)
}
func (ti onHeapTableIndex) ordinalAt(idx uint32) uint32 {
off := int64(prefixTupleSize*idx) + addrPrefixSize
b := ti.tupleB[off : off+ordinalSize]
b := ti.prefixTuples[off : off+ordinalSize]
return binary.BigEndian.Uint32(b)
}
// the first n - n/2 offsets are stored in offsetsB1 and the rest in offsetsB2
func (ti onHeapTableIndex) offsetAt(ord uint32) uint64 {
chunks1 := ti.chunkCount - ti.chunkCount/2
chunks1 := ti.count - ti.count/2
var b []byte
if ord < chunks1 {
off := int64(offsetSize * ord)
b = ti.offsetB1[off : off+offsetSize]
b = ti.offsets1[off : off+offsetSize]
} else {
off := int64(offsetSize * (ord - chunks1))
b = ti.offsetB2[off : off+offsetSize]
b = ti.offsets2[off : off+offsetSize]
}
return binary.BigEndian.Uint64(b)
}
func (ti onHeapTableIndex) Ordinals() ([]uint32, error) {
o := make([]uint32, ti.chunkCount)
for i, off := uint32(0), 0; i < ti.chunkCount; i, off = i+1, off+prefixTupleSize {
b := ti.tupleB[off+addrPrefixSize : off+prefixTupleSize]
func (ti onHeapTableIndex) ordinals() ([]uint32, error) {
// todo: |o| is not accounted for in the memory quota
o := make([]uint32, ti.count)
for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize {
b := ti.prefixTuples[off+addrPrefixSize : off+prefixTupleSize]
o[i] = binary.BigEndian.Uint32(b)
}
return o, nil
}
func (ti onHeapTableIndex) Prefixes() ([]uint64, error) {
p := make([]uint64, ti.chunkCount)
for i, off := uint32(0), 0; i < ti.chunkCount; i, off = i+1, off+prefixTupleSize {
b := ti.tupleB[off : off+addrPrefixSize]
func (ti onHeapTableIndex) prefixes() ([]uint64, error) {
// todo: |p| is not accounted for in the memory quota
p := make([]uint64, ti.count)
for i, off := uint32(0), 0; i < ti.count; i, off = i+1, off+prefixTupleSize {
b := ti.prefixTuples[off : off+addrPrefixSize]
p[i] = binary.BigEndian.Uint64(b)
}
return p, nil
@@ -391,12 +404,12 @@ func (ti onHeapTableIndex) Prefixes() ([]uint64, error) {
func (ti onHeapTableIndex) hashAt(idx uint32) hash.Hash {
// Get tuple
off := int64(prefixTupleSize * idx)
tuple := ti.tupleB[off : off+prefixTupleSize]
tuple := ti.prefixTuples[off : off+prefixTupleSize]
// Get prefix, ordinal, and suffix
prefix := tuple[:addrPrefixSize]
ord := binary.BigEndian.Uint32(tuple[addrPrefixSize:]) * addrSuffixSize
suffix := ti.suffixB[ord : ord+addrSuffixSize] // suffix is 12 bytes
suffix := ti.suffixes[ord : ord+addrSuffixSize] // suffix is 12 bytes
// Combine prefix and suffix to get hash
buf := [hash.ByteLen]byte{}
@@ -409,7 +422,7 @@ func (ti onHeapTableIndex) hashAt(idx uint32) hash.Hash {
// prefixIdxLBound returns the first position in |tr.prefixes| whose value is <= |prefix|.
// will return index less than where prefix would be if prefix is not found.
func (ti onHeapTableIndex) prefixIdxLBound(prefix uint64) uint32 {
l, r := uint32(0), ti.chunkCount
l, r := uint32(0), ti.count
for l < r {
m := l + (r-l)/2 // find middle, rounding down
if ti.prefixAt(m) < prefix {
@@ -425,10 +438,10 @@ func (ti onHeapTableIndex) prefixIdxLBound(prefix uint64) uint32 {
// prefixIdxLBound returns the first position in |tr.prefixes| whose value is >= |prefix|.
// will return index greater than where prefix would be if prefix is not found.
func (ti onHeapTableIndex) prefixIdxUBound(prefix uint64) (idx uint32) {
l, r := uint32(0), ti.chunkCount
l, r := uint32(0), ti.count
for l < r {
m := l + (r-l+1)/2 // find middle, rounding up
if m >= ti.chunkCount { // prevent index out of bounds
m := l + (r-l+1)/2 // find middle, rounding up
if m >= ti.count { // prevent index out of bounds
return r
}
pre := ti.prefixAt(m)
@@ -459,6 +472,56 @@ func (ti onHeapTableIndex) padStringAndDecode(s string, p string) uint64 {
return binary.BigEndian.Uint64(h)
}
func (ti onHeapTableIndex) chunkCount() uint32 {
return ti.count
}
// tableFileSize returns the size of the table file that this index references.
// This assumes that the index follows immediately after the last chunk in the
// file and that the last chunk in the file is in the index.
func (ti onHeapTableIndex) tableFileSize() (sz uint64) {
sz = footerSize
if ti.count > 0 {
last := ti.getIndexEntry(ti.count - 1)
sz += last.Offset()
sz += uint64(last.Length())
sz += indexSize(ti.count)
}
return
}
func (ti onHeapTableIndex) totalUncompressedData() uint64 {
return ti.uncompressedSz
}
func (ti onHeapTableIndex) Close() error {
cnt := atomic.AddInt32(ti.refCnt, -1)
if cnt == 0 {
if err := ti.q.ReleaseQuotaBytes(ti.prefixTuples); err != nil {
return err
}
if err := ti.q.ReleaseQuotaBytes(ti.offsets1); err != nil {
return err
}
if err := ti.q.ReleaseQuotaBytes(ti.offsets2); err != nil {
return err
}
return ti.q.ReleaseQuotaBytes(ti.suffixes)
}
if cnt < 0 {
panic("Close() called and reduced ref count to < 0.")
}
return nil
}
func (ti onHeapTableIndex) clone() (tableIndex, error) {
cnt := atomic.AddInt32(ti.refCnt, 1)
if cnt == 1 {
panic("Clone() called after last Close(). This index is no longer valid.")
}
return ti, nil
}
func (ti onHeapTableIndex) ResolveShortHash(short []byte) ([]string, error) {
// Convert to string
shortHash := string(short)
@@ -476,7 +539,7 @@ func (ti onHeapTableIndex) ResolveShortHash(short []byte) ([]string, error) {
pIdxL = ti.findPrefix(sPrefix)
// Prefix doesn't exist
if pIdxL == ti.chunkCount {
if pIdxL == ti.count {
return []string{}, errors.New("can't find prefix")
}
@@ -512,44 +575,3 @@ func (ti onHeapTableIndex) ResolveShortHash(short []byte) ([]string, error) {
return res, nil
}
// TableFileSize returns the size of the table file that this index references.
// This assumes that the index follows immediately after the last chunk in the
// file and that the last chunk in the file is in the index.
func (ti onHeapTableIndex) TableFileSize() uint64 {
if ti.chunkCount == 0 {
return footerSize
}
entry := ti.getIndexEntry(ti.chunkCount - 1)
offset, len := entry.Offset(), uint64(entry.Length())
return offset + len + indexSize(ti.chunkCount) + footerSize
}
func (ti onHeapTableIndex) TotalUncompressedData() uint64 {
return ti.totalUncompressedData
}
func (ti onHeapTableIndex) Close() error {
cnt := atomic.AddInt32(ti.refCnt, -1)
if cnt == 0 {
ti.tupleB = nil
ti.offsetB1 = nil
ti.offsetB2 = nil
ti.suffixB = nil
return ti.q.ReleaseQuota(indexMemSize(ti.chunkCount))
}
if cnt < 0 {
panic("Close() called and reduced ref count to < 0.")
}
return nil
}
func (ti onHeapTableIndex) Clone() (tableIndex, error) {
cnt := atomic.AddInt32(ti.refCnt, 1)
if cnt == 1 {
panic("Clone() called after last Close(). This index is no longer valid.")
}
return ti, nil
}

View File

@@ -30,18 +30,18 @@ func TestParseTableIndex(t *testing.T) {
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(t, err)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
idx, err := parseTableIndexByCopy(nil, bs, &UnlimitedQuotaProvider{})
require.NoError(t, err)
defer idx.Close()
assert.Equal(t, uint32(596), idx.ChunkCount())
assert.Equal(t, uint32(596), idx.chunkCount())
seen := make(map[addr]bool)
for i := uint32(0); i < idx.ChunkCount(); i++ {
for i := uint32(0); i < idx.chunkCount(); i++ {
var onheapaddr addr
e, err := idx.IndexEntry(i, &onheapaddr)
e, err := idx.indexEntry(i, &onheapaddr)
require.NoError(t, err)
if _, ok := seen[onheapaddr]; !ok {
seen[onheapaddr] = true
lookupe, ok, err := idx.Lookup(&onheapaddr)
lookupe, ok, err := idx.lookup(&onheapaddr)
require.NoError(t, err)
assert.True(t, ok)
assert.Equal(t, e.Offset(), lookupe.Offset(), "%v does not match %v for address %v", e, lookupe, onheapaddr)
@@ -56,12 +56,12 @@ func BenchmarkFindPrefix(b *testing.B) {
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(b, err)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
idx, err := parseTableIndexByCopy(nil, bs, &UnlimitedQuotaProvider{})
require.NoError(b, err)
defer idx.Close()
assert.Equal(b, uint32(596), idx.ChunkCount())
assert.Equal(b, uint32(596), idx.chunkCount())
prefixes, err := idx.Prefixes()
prefixes, err := idx.prefixes()
require.NoError(b, err)
b.Run("benchmark prefixIdx()", func(b *testing.B) {
@@ -84,7 +84,7 @@ func BenchmarkFindPrefix(b *testing.B) {
func prefixIdx(ti onHeapTableIndex, prefix uint64) (idx uint32) {
// NOTE: The golang impl of sort.Search is basically inlined here. This method can be called in
// an extremely tight loop and inlining the code was a significant perf improvement.
idx, j := 0, ti.chunkCount
idx, j := 0, ti.chunkCount()
for idx < j {
h := idx + (j-idx)/2 // avoid overflow when computing h
// i ≤ h < j
@@ -103,7 +103,7 @@ func TestOnHeapTableIndex_ResolveShortHash(t *testing.T) {
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(t, err)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
idx, err := parseTableIndexByCopy(nil, bs, &UnlimitedQuotaProvider{})
require.NoError(t, err)
defer idx.Close()
res, err := idx.ResolveShortHash([]byte("0"))
@@ -122,7 +122,7 @@ func TestResolveOneHash(t *testing.T) {
// build table index
td, _, err := buildTable(chunks)
tIdx, err := parseTableIndexByCopy(td, &noopQuotaProvider{})
tIdx, err := parseTableIndexByCopy(nil, td, &UnlimitedQuotaProvider{})
require.NoError(t, err)
// get hashes out
@@ -153,7 +153,7 @@ func TestResolveFewHash(t *testing.T) {
// build table index
td, _, err := buildTable(chunks)
tIdx, err := parseTableIndexByCopy(td, &noopQuotaProvider{})
tIdx, err := parseTableIndexByCopy(nil, td, &UnlimitedQuotaProvider{})
require.NoError(t, err)
// get hashes out
@@ -185,7 +185,7 @@ func TestAmbiguousShortHash(t *testing.T) {
// build table index
td, _, err := buildFakeChunkTable(chunks)
idx, err := parseTableIndexByCopy(td, &noopQuotaProvider{})
idx, err := parseTableIndexByCopy(nil, td, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tests := []struct {

View File

@@ -165,7 +165,7 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e
return compactionPlan{}, err
}
plan.chunkCount += index.ChunkCount()
plan.chunkCount += index.chunkCount()
// Calculate the amount of chunk data in |src|
chunkDataLen := calcChunkDataLen(index)
@@ -192,11 +192,11 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e
return compactionPlan{}, err
}
ordinals, err := index.Ordinals()
ordinals, err := index.ordinals()
if err != nil {
return compactionPlan{}, err
}
prefixes, err := index.Prefixes()
prefixes, err := index.prefixes()
if err != nil {
return compactionPlan{}, err
}
@@ -219,16 +219,16 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e
if onHeap, ok := index.(onHeapTableIndex); ok {
// TODO: copy the lengths and suffixes as a byte-copy from src BUG #3438
// Bring over the lengths block, in order
for ord := uint32(0); ord < onHeap.chunkCount; ord++ {
for ord := uint32(0); ord < onHeap.chunkCount(); ord++ {
e := onHeap.getIndexEntry(ord)
binary.BigEndian.PutUint32(plan.mergedIndex[lengthsPos:], e.Length())
lengthsPos += lengthSize
}
// Bring over the suffixes block, in order
n := copy(plan.mergedIndex[suffixesPos:], onHeap.suffixB)
n := copy(plan.mergedIndex[suffixesPos:], onHeap.suffixes)
if n != len(onHeap.suffixB) {
if n != len(onHeap.suffixes) {
return compactionPlan{}, errors.New("failed to copy all data")
}
@@ -237,7 +237,7 @@ func planConjoin(sources chunkSources, stats *Stats) (plan compactionPlan, err e
// Build up the index one entry at a time.
var a addr
for i := 0; i < len(ordinals); i++ {
e, err := index.IndexEntry(uint32(i), &a)
e, err := index.indexEntry(uint32(i), &a)
if err != nil {
return compactionPlan{}, err
}
@@ -278,5 +278,5 @@ func nameFromSuffixes(suffixes []byte) (name addr) {
}
func calcChunkDataLen(index tableIndex) uint64 {
return index.TableFileSize() - indexSize(index.ChunkCount()) - footerSize
return index.tableFileSize() - indexSize(index.chunkCount()) - footerSize
}

View File

@@ -22,6 +22,7 @@
package nbs
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
@@ -29,6 +30,7 @@ import (
)
func TestPlanCompaction(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)
tableContents := [][][]byte{
{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")},
@@ -45,7 +47,7 @@ func TestPlanCompaction(t *testing.T) {
}
data, name, err := buildTable(content)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(data, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, data, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
require.NoError(t, err)
@@ -63,11 +65,11 @@ func TestPlanCompaction(t *testing.T) {
totalChunks += mustUint32(src.count())
}
idx, err := parseTableIndex(plan.mergedIndex, &noopQuotaProvider{})
idx, err := parseTableIndex(ctx, plan.mergedIndex, &UnlimitedQuotaProvider{})
require.NoError(t, err)
assert.Equal(totalChunks, idx.chunkCount)
assert.Equal(totalUnc, idx.totalUncompressedData)
assert.Equal(totalChunks, idx.chunkCount())
assert.Equal(totalUnc, idx.totalUncompressedData())
tr, err := newTableReader(idx, tableReaderAtFromBytes(nil), fileBlockSize)
require.NoError(t, err)

View File

@@ -131,29 +131,25 @@ type tableReaderAt interface {
// to tolerate up to |blockSize| overhead each time we read a chunk, if it helps us group
// more chunks together into a single read request to backing storage.
type tableReader struct {
tableIndex
prefixes []uint64
chunkCount uint32
totalUncompressedData uint64
r tableReaderAt
blockSize uint64
prefixes []uint64
idx tableIndex
r tableReaderAt
blockSize uint64
}
// newTableReader parses a valid nbs table byte stream and returns a reader. buff must end with an NBS index
// and footer, though it may contain an unspecified number of bytes before that data. r should allow
// retrieving any desired range of bytes from the table.
func newTableReader(index tableIndex, r tableReaderAt, blockSize uint64) (tableReader, error) {
p, err := index.Prefixes()
p, err := index.prefixes()
if err != nil {
return tableReader{}, err
}
return tableReader{
index,
p,
index.ChunkCount(),
index.TotalUncompressedData(),
r,
blockSize,
prefixes: p,
idx: index,
r: r,
blockSize: blockSize,
}, nil
}
@@ -162,7 +158,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) {
// TODO: Use findInIndex if (tr.chunkCount - len(addrs)*Log2(tr.chunkCount)) > (tr.chunkCount - len(addrs))
filterIdx := uint32(0)
filterLen := uint32(tr.chunkCount)
filterLen := uint32(tr.idx.chunkCount())
var remaining bool
for i, addr := range addrs {
@@ -185,7 +181,7 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) {
// prefixes are equal, so locate and compare against the corresponding suffix
for j := filterIdx; j < filterLen && addr.prefix == tr.prefixes[j]; j++ {
m, err := tr.EntrySuffixMatches(j, addr.a)
m, err := tr.idx.entrySuffixMatches(j, addr.a)
if err != nil {
return false, err
}
@@ -204,27 +200,27 @@ func (tr tableReader) hasMany(addrs []hasRecord) (bool, error) {
}
func (tr tableReader) count() (uint32, error) {
return tr.chunkCount, nil
return tr.idx.chunkCount(), nil
}
func (tr tableReader) uncompressedLen() (uint64, error) {
return tr.totalUncompressedData, nil
return tr.idx.totalUncompressedData(), nil
}
func (tr tableReader) index() (tableIndex, error) {
return tr.tableIndex, nil
return tr.idx, nil
}
// returns true iff |h| can be found in this table.
func (tr tableReader) has(h addr) (bool, error) {
_, ok, err := tr.Lookup(&h)
_, ok, err := tr.idx.lookup(&h)
return ok, err
}
// returns the storage associated with |h|, iff present. Returns nil if absent. On success,
// the returned byte slice directly references the underlying storage.
func (tr tableReader) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
e, found, err := tr.Lookup(&h)
e, found, err := tr.idx.lookup(&h)
if err != nil {
return nil, err
}
@@ -474,7 +470,7 @@ func (tr tableReader) getManyAtOffsetsWithReadFunc(
return nil
}
// findOffsets iterates over |reqs| and |tr.prefixes| (both sorted by
// findOffsets iterates over |reqs| and |prefixes| (both sorted by
// address) to build the set of table locations which must be read in order to
// find each chunk specified by |reqs|. If this table contains all requested
// chunks remaining will be set to false upon return. If some are not here,
@@ -509,13 +505,13 @@ func (tr tableReader) findOffsets(reqs []getRecord) (ors offsetRecSlice, remaini
// record all offsets within the table which contain the data required.
for j := filterIdx; j < filterLen && req.prefix == tr.prefixes[j]; j++ {
m, err := tr.EntrySuffixMatches(j, req.a)
m, err := tr.idx.entrySuffixMatches(j, req.a)
if err != nil {
return nil, false, err
}
if m {
reqs[i].found = true
entry, err := tr.IndexEntry(j, nil)
entry, err := tr.idx.indexEntry(j, nil)
if err != nil {
return nil, false, err
}
@@ -622,9 +618,9 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord)
}
var ors offsetRecSlice
for i := uint32(0); i < tr.chunkCount; i++ {
for i := uint32(0); i < tr.idx.chunkCount(); i++ {
a := new(addr)
e, err := tr.IndexEntry(i, a)
e, err := tr.idx.indexEntry(i, a)
if err != nil {
return err
}
@@ -643,7 +639,7 @@ func (tr tableReader) extract(ctx context.Context, chunks chan<- extractRecord)
func (tr tableReader) reader(ctx context.Context) (io.Reader, error) {
i, _ := tr.index()
return io.LimitReader(&readerAdapter{tr.r, 0, ctx}, int64(i.TableFileSize())), nil
return io.LimitReader(&readerAdapter{tr.r, 0, ctx}, int64(i.tableFileSize())), nil
}
func (tr tableReader) size() (uint64, error) {
@@ -651,19 +647,24 @@ func (tr tableReader) size() (uint64, error) {
if err != nil {
return 0, err
}
return i.TableFileSize(), nil
return i.tableFileSize(), nil
}
func (tr tableReader) close() error {
return tr.tableIndex.Close()
return tr.idx.Close()
}
func (tr tableReader) clone() (tableReader, error) {
ti, err := tr.tableIndex.Clone()
idx, err := tr.idx.clone()
if err != nil {
return tableReader{}, err
}
return tableReader{ti, tr.prefixes, tr.chunkCount, tr.totalUncompressedData, tr.r, tr.blockSize}, nil
return tableReader{
prefixes: tr.prefixes,
idx: idx,
r: tr.r,
blockSize: tr.blockSize,
}, nil
}
type readerAdapter struct {

View File

@@ -234,7 +234,7 @@ func (ts tableSet) physicalLen() (uint64, error) {
if err != nil {
return 0, err
}
data += index.TableFileSize()
data += index.tableFileSize()
}
return
}
@@ -279,6 +279,11 @@ func (ts tableSet) Size() int {
// prepend adds a memTable to an existing tableSet, compacting |mt| and
// returning a new tableSet with newly compacted table added.
func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) tableSet {
cs, err := ts.p.Persist(ctx, mt, ts, stats)
if err != nil {
panic(err) // todo(andy)
}
newTs := tableSet{
novel: make(chunkSources, len(ts.novel)+1),
upstream: make(chunkSources, len(ts.upstream)),
@@ -286,7 +291,7 @@ func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) tabl
q: ts.q,
rl: ts.rl,
}
newTs.novel[0] = newPersistingChunkSource(ctx, mt, ts, ts.p, ts.rl, stats)
newTs.novel[0] = cs
copy(newTs.novel[1:], ts.novel)
copy(newTs.upstream, ts.upstream)
return newTs
@@ -304,11 +309,9 @@ func (ts tableSet) flatten(ctx context.Context) (tableSet, error) {
for _, src := range ts.novel {
cnt, err := src.count()
if err != nil {
return tableSet{}, err
}
if cnt > 0 {
flattened.upstream = append(flattened.upstream, src)
}
@@ -385,11 +388,6 @@ OUTER:
memoryNeeded += indexMemSize(spec.chunkCount)
}
err := ts.q.AcquireQuota(ctx, memoryNeeded)
if err != nil {
return tableSet{}, err
}
var rp atomic.Value
group, ctx := errgroup.WithContext(ctx)
@@ -419,7 +417,7 @@ OUTER:
)
}
err = group.Wait()
err := group.Wait()
if err != nil {
// Close any opened chunkSources
for _, cs := range opened {

View File

@@ -33,7 +33,7 @@ import (
var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
func TestTableSetPrependEmpty(t *testing.T) {
ts := newFakeTableSet(&noopQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
ts := newFakeTableSet(&UnlimitedQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(t, specs)
@@ -41,7 +41,7 @@ func TestTableSetPrependEmpty(t *testing.T) {
func TestTableSetPrepend(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
ts := newFakeTableSet(&UnlimitedQuotaProvider{})
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -66,7 +66,7 @@ func TestTableSetPrepend(t *testing.T) {
func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
ts := newFakeTableSet(&UnlimitedQuotaProvider{})
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -89,7 +89,7 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
func TestTableSetFlattenExcludesEmptyTable(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
ts := newFakeTableSet(&UnlimitedQuotaProvider{})
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -122,9 +122,6 @@ func persist(t *testing.T, p tablePersister, chunks ...[]byte) {
func TestTableSetRebase(t *testing.T) {
assert := assert.New(t)
q := NewUnlimitedMemQuotaProvider()
defer func() {
require.EqualValues(t, 0, q.Usage())
}()
persister := newFakeTablePersister(q)
insert := func(ts tableSet, chunks ...[]byte) tableSet {
@@ -139,6 +136,7 @@ func TestTableSetRebase(t *testing.T) {
fullTS := newTableSet(persister, q)
defer func() {
require.NoError(t, fullTS.close())
}()
specs, err := fullTS.toSpecs()
require.NoError(t, err)
@@ -168,7 +166,7 @@ func TestTableSetRebase(t *testing.T) {
func TestTableSetPhysicalLen(t *testing.T) {
assert := assert.New(t)
ts := newFakeTableSet(&noopQuotaProvider{})
ts := newFakeTableSet(&UnlimitedQuotaProvider{})
specs, err := ts.toSpecs()
require.NoError(t, err)
assert.Empty(specs)
@@ -212,5 +210,6 @@ func TestTableSetClosesOpenedChunkSourcesOnErr(t *testing.T) {
for _ = range p.opened {
mem -= indexMemSize(1)
}
require.EqualValues(t, mem, q.Usage())
assert.EqualValues(t, 44, int(q.Usage()))
assert.NoError(t, ts.close())
}

View File

@@ -25,6 +25,7 @@ import (
"context"
"encoding/binary"
"fmt"
"math/rand"
"sort"
"testing"
@@ -77,7 +78,7 @@ func TestSimple(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -124,7 +125,7 @@ func TestHasMany(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -175,7 +176,7 @@ func TestHasManySequentialPrefix(t *testing.T) {
require.NoError(t, err)
buff = buff[:length]
ti, err := parseTableIndexByCopy(buff, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, buff, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
require.NoError(t, err)
@@ -193,6 +194,70 @@ func TestHasManySequentialPrefix(t *testing.T) {
}
}
func BenchmarkHasMany(b *testing.B) {
const cnt = 64 * 1024
chnks := make([][]byte, cnt)
addrs := make(addrSlice, cnt)
hrecs := make([]hasRecord, cnt)
sparse := make([]hasRecord, cnt/1024)
data := make([]byte, cnt*16)
rand.Read(data)
for i := range chnks {
chnks[i] = data[i*16 : (i+1)*16]
}
for i := range addrs {
addrs[i] = computeAddr(chnks[i])
}
for i := range hrecs {
hrecs[i] = hasRecord{
a: &addrs[i],
prefix: prefixOf(addrs[i]),
order: i,
}
}
for i := range sparse {
j := i * 64
hrecs[i] = hasRecord{
a: &addrs[j],
prefix: prefixOf(addrs[j]),
order: j,
}
}
sort.Sort(hasRecordByPrefix(hrecs))
sort.Sort(hasRecordByPrefix(sparse))
ctx := context.Background()
tableData, _, err := buildTable(chnks)
require.NoError(b, err)
ti, err := parseTableIndexByCopy(ctx, tableData, &UnlimitedQuotaProvider{})
require.NoError(b, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(b, err)
b.ResetTimer()
b.Run("dense has many", func(b *testing.B) {
var ok bool
for i := 0; i < b.N; i++ {
ok, err = tr.hasMany(hrecs)
}
assert.False(b, ok)
assert.NoError(b, err)
})
b.Run("sparse has many", func(b *testing.B) {
var ok bool
for i := 0; i < b.N; i++ {
ok, err = tr.hasMany(sparse)
}
assert.True(b, ok)
assert.NoError(b, err)
})
}
func prefixOf(a addr) uint64 {
return binary.BigEndian.Uint64(a[:addrPrefixSize])
}
func TestGetMany(t *testing.T) {
assert := assert.New(t)
@@ -204,7 +269,7 @@ func TestGetMany(t *testing.T) {
tableData, _, err := buildTable(data)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -238,7 +303,7 @@ func TestCalcReads(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), 0)
require.NoError(t, err)
@@ -275,7 +340,7 @@ func TestExtract(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -314,7 +379,7 @@ func Test65k(t *testing.T) {
tableData, _, err := buildTable(chunks)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)
@@ -367,7 +432,7 @@ func doTestNGetMany(t *testing.T, count int) {
tableData, _, err := buildTable(data)
require.NoError(t, err)
ti, err := parseTableIndexByCopy(tableData, &noopQuotaProvider{})
ti, err := parseTableIndexByCopy(nil, tableData, &UnlimitedQuotaProvider{})
require.NoError(t, err)
tr, err := newTableReader(ti, tableReaderAtFromBytes(tableData), fileBlockSize)
require.NoError(t, err)

View File

@@ -15,6 +15,7 @@
package nbs
import (
"context"
"io"
"math"
@@ -24,8 +25,8 @@ import (
"github.com/dolthub/dolt/go/store/hash"
)
func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err error)) error {
idx, err := readTableIndexByCopy(rd, &noopQuotaProvider{})
func IterChunks(ctx context.Context, rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err error)) error {
idx, err := readTableIndexByCopy(ctx, rd, &UnlimitedQuotaProvider{})
if err != nil {
return err
}
@@ -33,9 +34,9 @@ func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err er
defer idx.Close()
seen := make(map[addr]bool)
for i := uint32(0); i < idx.ChunkCount(); i++ {
for i := uint32(0); i < idx.chunkCount(); i++ {
var a addr
ie, err := idx.IndexEntry(i, &a)
ie, err := idx.indexEntry(i, &a)
if err != nil {
return err
}
@@ -68,8 +69,8 @@ func IterChunks(rd io.ReadSeeker, cb func(chunk chunks.Chunk) (stop bool, err er
return nil
}
func GetTableIndexPrefixes(rd io.ReadSeeker) (prefixes []uint64, err error) {
idx, err := readTableIndexByCopy(rd, &noopQuotaProvider{})
func GetTableIndexPrefixes(ctx context.Context, rd io.ReadSeeker) (prefixes []uint64, err error) {
idx, err := readTableIndexByCopy(ctx, rd, &UnlimitedQuotaProvider{})
if err != nil {
return nil, err
}
@@ -80,7 +81,11 @@ func GetTableIndexPrefixes(rd io.ReadSeeker) (prefixes []uint64, err error) {
}
}()
return idx.Prefixes()
prefixes, err = idx.prefixes()
if err != nil {
return nil, err
}
return
}
func GuessPrefixOrdinal(prefix uint64, n uint32) int {