go/store/nbs: remove mmapTableIndex

This commit is contained in:
Andy Arthur
2022-11-01 11:33:38 -07:00
parent 5e0d7e0de5
commit 71a4cf5d2f
3 changed files with 8 additions and 261 deletions
+8 -63
View File
@@ -24,7 +24,6 @@ package nbs
import (
"context"
"errors"
"sync"
"time"
)
@@ -72,71 +71,17 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead
return &chunkSourceAdapter{tr, name}, nil
}
func loadTableIndex(stats *Stats, chunkCount uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) {
ti, err := newMmapTableIndex(chunkCount)
if err != nil {
return nil, err
}
func loadTableIndex(stats *Stats, cnt uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) {
idxSz := int(indexSize(cnt) + footerSize)
offsetSz := int((cnt - (cnt / 2)) * offsetSize)
buf := make([]byte, idxSz+offsetSz)
t1 := time.Now()
err = loadIndexBytes(ti.indexDataBuff)
if err != nil {
_ = ti.mmapped.Unmap()
return onHeapTableIndex{}, err
}
stats.IndexReadLatency.SampleTimeSince(t1)
stats.IndexBytesPerRead.Sample(uint64(len(ti.indexDataBuff)))
err = ti.parseIndexBuffer(q)
if err != nil {
_ = ti.mmapped.Unmap()
if err := loadIndexBytes(buf[:idxSz]); err != nil {
return nil, err
}
stats.IndexReadLatency.SampleTimeSince(t1)
stats.IndexBytesPerRead.Sample(uint64(len(buf)))
return ti, nil
}
type awsTableReaderAt struct {
once sync.Once
getTRErr error
tra tableReaderAt
al awsLimits
ddb *ddbTableStore
s3 *s3ObjectReader
name addr
chunkCount uint32
}
func (atra *awsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (int, error) {
atra.once.Do(func() {
atra.tra, atra.getTRErr = atra.getTableReaderAt(ctx, stats)
})
if atra.getTRErr != nil {
return 0, atra.getTRErr
}
return atra.tra.ReadAtWithStats(ctx, p, off, stats)
}
func (atra *awsTableReaderAt) getTableReaderAt(ctx context.Context, stats *Stats) (tableReaderAt, error) {
if atra.al.tableMayBeInDynamo(atra.chunkCount) {
data, err := atra.ddb.ReadTable(ctx, atra.name, stats)
if data == nil && err == nil { // There MUST be either data or an error
return &dynamoTableReaderAt{}, errors.New("no data available")
}
if data != nil {
return &dynamoTableReaderAt{ddb: atra.ddb, h: atra.name}, nil
}
if _, ok := err.(tableNotInDynamoErr); !ok {
return &dynamoTableReaderAt{}, err
}
}
return &s3TableReaderAt{s3: atra.s3, h: atra.name}, nil
return parseTableIndexWithOffsetBuff(buf[:idxSz], buf[idxSz:], q)
}
-140
View File
@@ -19,12 +19,8 @@ import (
"encoding/binary"
"errors"
"io"
"os"
"sync"
"sync/atomic"
"github.com/dolthub/mmap-go"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -557,139 +553,3 @@ func (ti onHeapTableIndex) Clone() (tableIndex, error) {
}
return ti, nil
}
// mmapTableIndex is an onHeapTableIndex but creates all of its slice buffers
// from mmap. It overrides Clone and Close of mmapTableIndex so that it can
// count references and release mmapped regions appropriately.
type mmapTableIndex struct {
onHeapTableIndex
refCnt *int32
q MemoryQuotaProvider
mmapped mmapWStat
indexDataBuff []byte
offset1DataBuff []byte
}
// newMmapTableIndex mmaps a region of memory large enough to store a fully
// parsed onHeapTableIndex. After creating the mmapTableIndex, index data should
// be loaded into |indexDataBuff| and then parsed with parseIndexBuffer.
func newMmapTableIndex(chunkCount uint32) (*mmapTableIndex, error) {
indexSize := int(indexSize(chunkCount) + footerSize)
chunks2 := chunkCount / 2
chunks1 := chunkCount - chunks2
offsets1Size := int(chunks1 * offsetSize)
mmapped, err := mmapWithStats(nil, indexSize+offsets1Size, mmap.RDWR, mmap.ANON, 0)
if err != nil {
return nil, err
}
indexBytesBuff := mmapped.m[:indexSize]
offsets1Buff := mmapped.m[indexSize : indexSize+offsets1Size]
refCnt := new(int32)
*refCnt = 1
return &mmapTableIndex{
refCnt: refCnt,
mmapped: mmapped,
indexDataBuff: indexBytesBuff,
offset1DataBuff: offsets1Buff}, nil
}
func (ti *mmapTableIndex) Clone() (tableIndex, error) {
cnt := atomic.AddInt32(ti.refCnt, 1)
if cnt == 1 {
panic("Clone() called after last Close(). This index is no longer valid.")
}
return ti, nil
}
// Close closes the underlying onHeapTableIndex and then unmaps the memory
// region.
func (ti *mmapTableIndex) Close() error {
cnt := atomic.AddInt32(ti.refCnt, -1)
if cnt == 0 {
chunkCount := ti.chunkCount
// mmapTableIndex sets the quota provider for onHeapTableIndex to a
// noopQuotaProvider, so that we can release quota after the memory region
// is unmapped.
err := ti.onHeapTableIndex.Close()
if err != nil {
return err
}
ti.indexDataBuff = nil
ti.offset1DataBuff = nil
err = ti.mmapped.Unmap()
if err != nil {
return err
}
err = ti.q.ReleaseQuota(indexMemSize(chunkCount))
if err != nil {
return err
}
}
if cnt < 0 {
panic("Close() called and reduced ref count to < 0.")
}
return nil
}
func (ti *mmapTableIndex) parseIndexBuffer(q MemoryQuotaProvider) (err error) {
ti.onHeapTableIndex, err = parseTableIndexWithOffsetBuff(ti.indexDataBuff, ti.offset1DataBuff, &noopQuotaProvider{})
ti.q = q
return err
}
type notifyFunc func(n uint64, total uint64)
var noOpNotify = func(uint64, uint64) {}
type mmapStats struct {
mu sync.Mutex
totalUsed uint64
WillMmap notifyFunc
Mmapped notifyFunc
UnMapped notifyFunc
}
var GlobalMmapStats = &mmapStats{
sync.Mutex{},
0,
noOpNotify,
noOpNotify,
noOpNotify,
}
type mmapWStat struct {
m mmap.MMap
used uint64
}
func mmapWithStats(f *os.File, length int, prot, flags int, offset int64) (mmapWStat, error) {
GlobalMmapStats.mu.Lock()
defer GlobalMmapStats.mu.Unlock()
GlobalMmapStats.WillMmap(uint64(length), GlobalMmapStats.totalUsed)
mmap, err := mmap.MapRegion(f, length, prot, flags, offset)
if err != nil {
return mmapWStat{}, err
}
GlobalMmapStats.totalUsed += uint64(length)
GlobalMmapStats.Mmapped(uint64(length), GlobalMmapStats.totalUsed)
return mmapWStat{mmap, uint64(length)}, nil
}
func (m mmapWStat) Unmap() error {
GlobalMmapStats.mu.Lock()
defer GlobalMmapStats.mu.Unlock()
err := m.m.Unmap()
if err != nil {
return err
}
GlobalMmapStats.totalUsed -= m.used
GlobalMmapStats.UnMapped(m.used, GlobalMmapStats.totalUsed)
return nil
}
-58
View File
@@ -97,64 +97,6 @@ func prefixIdx(ti onHeapTableIndex, prefix uint64) (idx uint32) {
return
}
func TestMMapIndex(t *testing.T) {
f, err := os.Open("testdata/0oa7mch34jg1rvghrnhr4shrp2fm4ftd.idx")
require.NoError(t, err)
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(t, err)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
require.NoError(t, err)
defer idx.Close()
mmidx, err := newMmapTableIndex(idx.chunkCount)
require.NoError(t, err)
copy(mmidx.indexDataBuff, bs)
err = mmidx.parseIndexBuffer(&noopQuotaProvider{})
require.NoError(t, err)
defer mmidx.Close()
assert.Equal(t, idx.ChunkCount(), mmidx.ChunkCount())
seen := make(map[addr]bool)
for i := uint32(0); i < idx.ChunkCount(); i++ {
var onheapaddr addr
onheapentry, err := idx.IndexEntry(i, &onheapaddr)
require.NoError(t, err)
var mmaddr addr
mmentry, err := mmidx.IndexEntry(i, &mmaddr)
require.NoError(t, err)
assert.Equal(t, onheapaddr, mmaddr)
assert.Equal(t, onheapentry.Offset(), mmentry.Offset())
assert.Equal(t, onheapentry.Length(), mmentry.Length())
if _, ok := seen[onheapaddr]; !ok {
seen[onheapaddr] = true
mmentry, found, err := mmidx.Lookup(&onheapaddr)
require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, onheapentry.Offset(), mmentry.Offset(), "%v does not match %v for address %v", onheapentry, mmentry, onheapaddr)
assert.Equal(t, onheapentry.Length(), mmentry.Length())
}
wrongaddr := onheapaddr
if wrongaddr[19] != 0 {
wrongaddr[19] = 0
_, found, err := mmidx.Lookup(&wrongaddr)
require.NoError(t, err)
assert.False(t, found)
}
}
o1, err := idx.Ordinals()
require.NoError(t, err)
o2, err := mmidx.Ordinals()
require.NoError(t, err)
assert.Equal(t, o1, o2)
p1, err := idx.Prefixes()
require.NoError(t, err)
p2, err := mmidx.Prefixes()
require.NoError(t, err)
assert.Equal(t, p1, p2)
assert.Equal(t, idx.TableFileSize(), mmidx.TableFileSize())
assert.Equal(t, idx.TotalUncompressedData(), mmidx.TotalUncompressedData())
}
func TestOnHeapTableIndex_ResolveShortHash(t *testing.T) {
f, err := os.Open("testdata/0oa7mch34jg1rvghrnhr4shrp2fm4ftd.idx")
require.NoError(t, err)