Merge pull request #4679 from dolthub/andy/remove-nbs-mmap

go/store/nbs: remove mmapTableIndex
This commit is contained in:
AndyA
2022-11-01 13:00:19 -07:00
committed by GitHub
6 changed files with 8 additions and 297 deletions
-32
View File
@@ -2613,38 +2613,6 @@ SOFTWARE.
= LICENSE a33ad37999b0aa5d38b8bc56a9c6b2d6287a7e2478ee822af7fa7a11 =
================================================================================
================================================================================
= github.com/dolthub/mmap-go licensed under: =
Copyright (c) 2011, Evan Shaw <edsrzf@gmail.com>
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
= LICENSE 086af8ff5be785cbd4da914acec46f45197c2b0fd3b370cd140cedd3 =
================================================================================
================================================================================
= github.com/dolthub/vitess licensed under: =
-1
View File
@@ -14,7 +14,6 @@ require (
github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078
github.com/dolthub/fslock v0.0.3
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81
github.com/dolthub/vitess v0.0.0-20221031111135-9aad77e7b39f
github.com/dustin/go-humanize v1.0.0
-3
View File
@@ -184,8 +184,6 @@ github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371 h1:oyPHJlzumKta1vnO
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474/go.mod h1:kMz7uXOXq4qRriCEyZ/LUeTqraLJCjf0WVZcUi6TxUY=
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 h1:WRPDbpJWEnPxPmiuOTndT+lUWUeGjx6eoNOK9O4tQQQ=
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66/go.mod h1:N5ZIbMGuDUpTpOFQ7HcsN6WSIpTGQjHP+Mz27AfmAgk=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 h1:7/v8q9XGFa6q5Ap4Z/OhNkAMBaK5YeuEzwJt+NZdhiE=
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81/go.mod h1:siLfyv2c92W1eN/R4QqG/+RjjX5W2+gCTRjZxBjI3TY=
github.com/dolthub/vitess v0.0.0-20221031111135-9aad77e7b39f h1:2sNrQiE4pcdgCNp09RTOsmNeepgN5rL+ep8NF8Faw9U=
@@ -924,7 +922,6 @@ golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+8 -63
View File
@@ -24,7 +24,6 @@ package nbs
import (
"context"
"errors"
"sync"
"time"
)
@@ -72,71 +71,17 @@ func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectRead
return &chunkSourceAdapter{tr, name}, nil
}
func loadTableIndex(stats *Stats, chunkCount uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) {
ti, err := newMmapTableIndex(chunkCount)
if err != nil {
return nil, err
}
func loadTableIndex(stats *Stats, cnt uint32, q MemoryQuotaProvider, loadIndexBytes func(p []byte) error) (tableIndex, error) {
idxSz := int(indexSize(cnt) + footerSize)
offsetSz := int((cnt - (cnt / 2)) * offsetSize)
buf := make([]byte, idxSz+offsetSz)
t1 := time.Now()
err = loadIndexBytes(ti.indexDataBuff)
if err != nil {
_ = ti.mmapped.Unmap()
return onHeapTableIndex{}, err
}
stats.IndexReadLatency.SampleTimeSince(t1)
stats.IndexBytesPerRead.Sample(uint64(len(ti.indexDataBuff)))
err = ti.parseIndexBuffer(q)
if err != nil {
_ = ti.mmapped.Unmap()
if err := loadIndexBytes(buf[:idxSz]); err != nil {
return nil, err
}
stats.IndexReadLatency.SampleTimeSince(t1)
stats.IndexBytesPerRead.Sample(uint64(len(buf)))
return ti, nil
}
type awsTableReaderAt struct {
once sync.Once
getTRErr error
tra tableReaderAt
al awsLimits
ddb *ddbTableStore
s3 *s3ObjectReader
name addr
chunkCount uint32
}
func (atra *awsTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (int, error) {
atra.once.Do(func() {
atra.tra, atra.getTRErr = atra.getTableReaderAt(ctx, stats)
})
if atra.getTRErr != nil {
return 0, atra.getTRErr
}
return atra.tra.ReadAtWithStats(ctx, p, off, stats)
}
func (atra *awsTableReaderAt) getTableReaderAt(ctx context.Context, stats *Stats) (tableReaderAt, error) {
if atra.al.tableMayBeInDynamo(atra.chunkCount) {
data, err := atra.ddb.ReadTable(ctx, atra.name, stats)
if data == nil && err == nil { // There MUST be either data or an error
return &dynamoTableReaderAt{}, errors.New("no data available")
}
if data != nil {
return &dynamoTableReaderAt{ddb: atra.ddb, h: atra.name}, nil
}
if _, ok := err.(tableNotInDynamoErr); !ok {
return &dynamoTableReaderAt{}, err
}
}
return &s3TableReaderAt{s3: atra.s3, h: atra.name}, nil
return parseTableIndexWithOffsetBuff(buf[:idxSz], buf[idxSz:], q)
}
-140
View File
@@ -19,12 +19,8 @@ import (
"encoding/binary"
"errors"
"io"
"os"
"sync"
"sync/atomic"
"github.com/dolthub/mmap-go"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -557,139 +553,3 @@ func (ti onHeapTableIndex) Clone() (tableIndex, error) {
}
return ti, nil
}
// mmapTableIndex is an onHeapTableIndex but creates all of its slice buffers
// from mmap. It overrides Clone and Close of mmapTableIndex so that it can
// count references and release mmapped regions appropriately.
type mmapTableIndex struct {
onHeapTableIndex
refCnt *int32
q MemoryQuotaProvider
mmapped mmapWStat
indexDataBuff []byte
offset1DataBuff []byte
}
// newMmapTableIndex mmaps a region of memory large enough to store a fully
// parsed onHeapTableIndex. After creating the mmapTableIndex, index data should
// be loaded into |indexDataBuff| and then parsed with parseIndexBuffer.
func newMmapTableIndex(chunkCount uint32) (*mmapTableIndex, error) {
indexSize := int(indexSize(chunkCount) + footerSize)
chunks2 := chunkCount / 2
chunks1 := chunkCount - chunks2
offsets1Size := int(chunks1 * offsetSize)
mmapped, err := mmapWithStats(nil, indexSize+offsets1Size, mmap.RDWR, mmap.ANON, 0)
if err != nil {
return nil, err
}
indexBytesBuff := mmapped.m[:indexSize]
offsets1Buff := mmapped.m[indexSize : indexSize+offsets1Size]
refCnt := new(int32)
*refCnt = 1
return &mmapTableIndex{
refCnt: refCnt,
mmapped: mmapped,
indexDataBuff: indexBytesBuff,
offset1DataBuff: offsets1Buff}, nil
}
func (ti *mmapTableIndex) Clone() (tableIndex, error) {
cnt := atomic.AddInt32(ti.refCnt, 1)
if cnt == 1 {
panic("Clone() called after last Close(). This index is no longer valid.")
}
return ti, nil
}
// Close closes the underlying onHeapTableIndex and then unmaps the memory
// region.
func (ti *mmapTableIndex) Close() error {
cnt := atomic.AddInt32(ti.refCnt, -1)
if cnt == 0 {
chunkCount := ti.chunkCount
// mmapTableIndex sets the quota provider for onHeapTableIndex to a
// noopQuotaProvider, so that we can release quota after the memory region
// is unmapped.
err := ti.onHeapTableIndex.Close()
if err != nil {
return err
}
ti.indexDataBuff = nil
ti.offset1DataBuff = nil
err = ti.mmapped.Unmap()
if err != nil {
return err
}
err = ti.q.ReleaseQuota(indexMemSize(chunkCount))
if err != nil {
return err
}
}
if cnt < 0 {
panic("Close() called and reduced ref count to < 0.")
}
return nil
}
func (ti *mmapTableIndex) parseIndexBuffer(q MemoryQuotaProvider) (err error) {
ti.onHeapTableIndex, err = parseTableIndexWithOffsetBuff(ti.indexDataBuff, ti.offset1DataBuff, &noopQuotaProvider{})
ti.q = q
return err
}
type notifyFunc func(n uint64, total uint64)
var noOpNotify = func(uint64, uint64) {}
type mmapStats struct {
mu sync.Mutex
totalUsed uint64
WillMmap notifyFunc
Mmapped notifyFunc
UnMapped notifyFunc
}
var GlobalMmapStats = &mmapStats{
sync.Mutex{},
0,
noOpNotify,
noOpNotify,
noOpNotify,
}
type mmapWStat struct {
m mmap.MMap
used uint64
}
func mmapWithStats(f *os.File, length int, prot, flags int, offset int64) (mmapWStat, error) {
GlobalMmapStats.mu.Lock()
defer GlobalMmapStats.mu.Unlock()
GlobalMmapStats.WillMmap(uint64(length), GlobalMmapStats.totalUsed)
mmap, err := mmap.MapRegion(f, length, prot, flags, offset)
if err != nil {
return mmapWStat{}, err
}
GlobalMmapStats.totalUsed += uint64(length)
GlobalMmapStats.Mmapped(uint64(length), GlobalMmapStats.totalUsed)
return mmapWStat{mmap, uint64(length)}, nil
}
func (m mmapWStat) Unmap() error {
GlobalMmapStats.mu.Lock()
defer GlobalMmapStats.mu.Unlock()
err := m.m.Unmap()
if err != nil {
return err
}
GlobalMmapStats.totalUsed -= m.used
GlobalMmapStats.UnMapped(m.used, GlobalMmapStats.totalUsed)
return nil
}
-58
View File
@@ -97,64 +97,6 @@ func prefixIdx(ti onHeapTableIndex, prefix uint64) (idx uint32) {
return
}
func TestMMapIndex(t *testing.T) {
f, err := os.Open("testdata/0oa7mch34jg1rvghrnhr4shrp2fm4ftd.idx")
require.NoError(t, err)
defer f.Close()
bs, err := io.ReadAll(f)
require.NoError(t, err)
idx, err := parseTableIndexByCopy(bs, &noopQuotaProvider{})
require.NoError(t, err)
defer idx.Close()
mmidx, err := newMmapTableIndex(idx.chunkCount)
require.NoError(t, err)
copy(mmidx.indexDataBuff, bs)
err = mmidx.parseIndexBuffer(&noopQuotaProvider{})
require.NoError(t, err)
defer mmidx.Close()
assert.Equal(t, idx.ChunkCount(), mmidx.ChunkCount())
seen := make(map[addr]bool)
for i := uint32(0); i < idx.ChunkCount(); i++ {
var onheapaddr addr
onheapentry, err := idx.IndexEntry(i, &onheapaddr)
require.NoError(t, err)
var mmaddr addr
mmentry, err := mmidx.IndexEntry(i, &mmaddr)
require.NoError(t, err)
assert.Equal(t, onheapaddr, mmaddr)
assert.Equal(t, onheapentry.Offset(), mmentry.Offset())
assert.Equal(t, onheapentry.Length(), mmentry.Length())
if _, ok := seen[onheapaddr]; !ok {
seen[onheapaddr] = true
mmentry, found, err := mmidx.Lookup(&onheapaddr)
require.NoError(t, err)
assert.True(t, found)
assert.Equal(t, onheapentry.Offset(), mmentry.Offset(), "%v does not match %v for address %v", onheapentry, mmentry, onheapaddr)
assert.Equal(t, onheapentry.Length(), mmentry.Length())
}
wrongaddr := onheapaddr
if wrongaddr[19] != 0 {
wrongaddr[19] = 0
_, found, err := mmidx.Lookup(&wrongaddr)
require.NoError(t, err)
assert.False(t, found)
}
}
o1, err := idx.Ordinals()
require.NoError(t, err)
o2, err := mmidx.Ordinals()
require.NoError(t, err)
assert.Equal(t, o1, o2)
p1, err := idx.Prefixes()
require.NoError(t, err)
p2, err := mmidx.Prefixes()
require.NoError(t, err)
assert.Equal(t, p1, p2)
assert.Equal(t, idx.TableFileSize(), mmidx.TableFileSize())
assert.Equal(t, idx.TotalUncompressedData(), mmidx.TotalUncompressedData())
}
func TestOnHeapTableIndex_ResolveShortHash(t *testing.T) {
f, err := os.Open("testdata/0oa7mch34jg1rvghrnhr4shrp2fm4ftd.idx")
require.NoError(t, err)