Ammend mmap implementation

This commit is contained in:
Nick Tobey
2025-07-24 15:14:56 -07:00
committed by nick
parent cbdec3ecec
commit 2d1eae8753
4 changed files with 132 additions and 60 deletions
+2 -2
View File
@@ -234,13 +234,13 @@ func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgrou
}
func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), stats *Stats) error {
addrCount := uint64(len(acs.aRdr.prefixes))
addrCount := uint64(acs.aRdr.footer.chunkCount)
for i := uint64(0); i < addrCount; i++ {
var h hash.Hash
suffix := acs.aRdr.getSuffixByID(i)
// Reconstruct the hash from the prefix and suffix.
binary.BigEndian.PutUint64(h[:uint64Size], acs.aRdr.prefixes[i])
binary.BigEndian.PutUint64(h[:uint64Size], acs.aRdr.indexReader.getPrefix(uint32(i)))
copy(h[uint64Size:], suffix[:])
if ctx.Err() != nil {
+121 -52
View File
@@ -21,11 +21,12 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math/bits"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/gozstd"
lru "github.com/hashicorp/golang-lru/v2"
"io"
"math/bits"
"os"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
@@ -34,13 +35,10 @@ import (
// archiveReader is a reader for the archive format. We use primitive type slices where possible. These are read directly
// from disk into memory for speed. The downside is complexity on the read path, but it's all constant time.
type archiveReader struct {
reader tableReaderAt
prefixes []uint64
spanIndex []uint64
chunkRefs []uint32 // Pairs of uint32s. First is the dict id, second is the data id.
suffixes []byte
footer archiveFooter
dictCache *lru.TwoQueueCache[uint32, *DecompBundle]
reader tableReaderAt
indexReader archiveIndexReader // Memory-mapped or fallback index reader
footer archiveFooter
dictCache *lru.TwoQueueCache[uint32, *DecompBundle]
}
type suffix [hash.SuffixLen]byte
@@ -204,13 +202,46 @@ func newArchiveReader(ctx context.Context, reader tableReaderAt, fileSize uint64
}
func buildArchiveReader(ctx context.Context, reader tableReaderAt, footer archiveFooter, stats *Stats) (archiveReader, error) {
dictCache, err := lru.New2Q[uint32, *DecompBundle](256)
if err != nil {
return archiveReader{}, err
}
var indexRdr archiveIndexReader
// Try to use memory mapping if the reader is a file
if fileReader, ok := reader.(*fileReaderAt); ok && fileReader.mmapIndexes {
indexRdr, err = newMmapIndexReader(fileReader.f, footer)
if err != nil {
return archiveReader{}, err
}
} else {
if _, isSet := os.LookupEnv(dconfig.EnvAssertNoInMemoryArchiveIndex); isSet {
return archiveReader{}, fmt.Errorf("attempted to load archive index into memory but %s was set", dconfig.EnvAssertNoInMemoryArchiveIndex)
}
indexRdr, err = newInMemoryArchiveIndexReader(ctx, reader, footer, stats)
if err != nil {
return archiveReader{}, err
}
}
return archiveReader{
reader: reader,
indexReader: indexRdr,
footer: footer,
dictCache: dictCache,
}, nil
}
// newInMemoryArchiveIndexReader implements the original index loading logic for non-file readers
func newInMemoryArchiveIndexReader(ctx context.Context, reader tableReaderAt, footer archiveFooter, stats *Stats) (archiveIndexReader, error) {
byteOffSpan := footer.indexByteOffsetSpan()
secRdr := newSectionReader(ctx, reader, int64(byteOffSpan.offset), int64(byteOffSpan.length), stats)
byteSpans := make([]uint64, footer.byteSpanCount+1)
byteSpans[0] = 0 // Null byteSpan to simplify logic.
err := binary.Read(secRdr, binary.BigEndian, byteSpans[1:])
if err != nil {
return archiveReader{}, fmt.Errorf("Failed to read byte spans: %w", err)
return nil, fmt.Errorf("Failed to read byte spans: %w", err)
}
prefixSpan := footer.indexPrefixSpan()
@@ -218,7 +249,7 @@ func buildArchiveReader(ctx context.Context, reader tableReaderAt, footer archiv
prefixes := make([]uint64, footer.chunkCount)
err = binary.Read(prefixRdr, binary.BigEndian, prefixes[:])
if err != nil {
return archiveReader{}, fmt.Errorf("Failed to read prefixes: %w", err)
return nil, fmt.Errorf("Failed to read prefixes: %w", err)
}
chunkRefSpan := footer.indexChunkRefSpan()
@@ -226,7 +257,7 @@ func buildArchiveReader(ctx context.Context, reader tableReaderAt, footer archiv
chnks := make([]uint32, uint64(footer.chunkCount)*2)
err = binary.Read(chunkRdr, binary.BigEndian, chnks[:])
if err != nil {
return archiveReader{}, fmt.Errorf("Failed to read chunk references: %w", err)
return nil, fmt.Errorf("Failed to read chunk references: %w", err)
}
suffixSpan := footer.indexSuffixSpan()
@@ -234,25 +265,66 @@ func buildArchiveReader(ctx context.Context, reader tableReaderAt, footer archiv
suffixes := make([]byte, suffixSpan.length)
_, err = io.ReadFull(sufRdr, suffixes)
if err != nil {
return archiveReader{}, fmt.Errorf("Failed to read suffixes: %w", err)
return nil, err
}
dictCache, err := lru.New2Q[uint32, *DecompBundle](256)
if err != nil {
return archiveReader{}, err
}
return archiveReader{
reader: reader,
return &inMemoryArchiveIndexReader{
prefixes: prefixes,
spanIndex: byteSpans,
chunkRefs: chnks,
suffixes: suffixes,
footer: footer,
dictCache: dictCache,
}, nil
}
// inMemoryArchiveIndexReader provides the original in-memory index implementation as a fallback
type inMemoryArchiveIndexReader struct {
prefixes []uint64
spanIndex []uint64
chunkRefs []uint32
suffixes []byte
}
func (f *inMemoryArchiveIndexReader) getNumChunks() int {
return len(f.prefixes)
}
func (f *inMemoryArchiveIndexReader) getSpanIndex(idx uint32) uint64 {
if idx >= uint32(len(f.spanIndex)) {
return 0
}
return f.spanIndex[idx]
}
func (f *inMemoryArchiveIndexReader) getPrefix(idx uint32) uint64 {
if idx >= uint32(len(f.prefixes)) {
return 0
}
return f.prefixes[idx]
}
func (f *inMemoryArchiveIndexReader) getChunkRef(idx int) (dict, data uint32) {
if idx < 0 || idx*2+1 >= len(f.chunkRefs) {
return 0, 0
}
return f.chunkRefs[idx*2], f.chunkRefs[idx*2+1]
}
func (f *inMemoryArchiveIndexReader) getSuffix(idx uint64) suffix {
if idx >= uint64(len(f.suffixes)/hash.SuffixLen) {
return suffix{}
}
start := idx * hash.SuffixLen
return suffix(f.suffixes[start : start+hash.SuffixLen])
}
func (f *inMemoryArchiveIndexReader) searchPrefixes(target uint64) int {
return prollyBinSearch(f, target)
}
func (f *inMemoryArchiveIndexReader) Close() error {
return nil
}
// clone returns a new archiveReader with a new (provided) reader. All other fields are immutable or thread safe,
// so they are copied.
func (ar archiveReader) clone() (archiveReader, error) {
@@ -261,13 +333,10 @@ func (ar archiveReader) clone() (archiveReader, error) {
return archiveReader{}, err
}
return archiveReader{
reader: reader,
prefixes: ar.prefixes,
spanIndex: ar.spanIndex,
chunkRefs: ar.chunkRefs,
suffixes: ar.suffixes,
footer: ar.footer,
dictCache: ar.dictCache, // cache is thread safe.
reader: reader,
indexReader: ar.indexReader,
footer: ar.footer,
dictCache: ar.dictCache, // cache is thread safe.
}, nil
}
@@ -349,15 +418,15 @@ func buildFooter(fileSize uint64, buf []byte) (f archiveFooter, err error) {
// search returns the index of the hash in the archive. If the hash is not found, -1 is returned.
func (ar archiveReader) search(hash hash.Hash) int {
prefix := hash.Prefix()
possibleMatch := prollyBinSearch(ar.prefixes, prefix)
possibleMatch := ar.indexReader.searchPrefixes(prefix)
targetSfx := hash.Suffix()
if possibleMatch < 0 || possibleMatch >= len(ar.prefixes) {
if possibleMatch < 0 || possibleMatch >= int(ar.footer.chunkCount) {
return -1
}
for idx := possibleMatch; idx < len(ar.prefixes) && ar.prefixes[idx] == prefix; idx++ {
if ar.getSuffixByID(uint64(idx)) == suffix(targetSfx) {
for idx := possibleMatch; idx < int(ar.footer.chunkCount) && ar.indexReader.getPrefix(uint32(idx)) == prefix; idx++ {
if ar.indexReader.getSuffix(uint64(idx)) == suffix(targetSfx) {
return idx
}
}
@@ -430,6 +499,10 @@ func (ar archiveReader) count() uint32 {
}
func (ar archiveReader) close() error {
err := ar.indexReader.Close()
if err != nil {
return err
}
return ar.reader.Close()
}
@@ -486,9 +559,7 @@ func (ar archiveReader) getRaw(ctx context.Context, hash hash.Hash, stats *Stats
// getChunkRef returns the dictionary and data references for the chunk at the given index. Assumes good input!
func (ar archiveReader) getChunkRef(idx int) (dict, data uint32) {
// Chunk refs are stored as pairs of uint32s, so we need to double the index.
idx *= 2
return ar.chunkRefs[idx], ar.chunkRefs[idx+1]
return ar.indexReader.getChunkRef(idx)
}
// getByteSpanByID returns the byte span for the chunk at the given index. Assumes good input!
@@ -496,16 +567,15 @@ func (ar archiveReader) getByteSpanByID(id uint32) byteSpan {
if id == 0 {
return byteSpan{}
}
// This works because byteOffSpan[0] == 0. See initialization.
offset := ar.spanIndex[id-1]
length := ar.spanIndex[id] - offset
// This works because spanIndex[0] == 0. See initialization.
offset := ar.indexReader.getSpanIndex(id - 1)
length := ar.indexReader.getSpanIndex(id) - offset
return byteSpan{offset: offset, length: length}
}
// getSuffixByID returns the suffix for the chunk at the given index. Assumes good input!
func (ar archiveReader) getSuffixByID(id uint64) suffix {
start := id * hash.SuffixLen
return suffix(ar.suffixes[start : start+hash.SuffixLen])
return ar.indexReader.getSuffix(id)
}
func (ar archiveReader) getMetadata(ctx context.Context, stats *Stats) ([]byte, error) {
@@ -532,8 +602,8 @@ func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error
for i := uint32(0); i < ar.footer.chunkCount; i++ {
var hasBytes [hash.ByteLen]byte
binary.BigEndian.PutUint64(hasBytes[:uint64Size], ar.prefixes[i])
suf := ar.getSuffixByID(uint64(i))
binary.BigEndian.PutUint64(hasBytes[:uint64Size], ar.indexReader.getPrefix(i))
suf := ar.indexReader.getSuffix(uint64(i))
copy(hasBytes[hash.ByteLen-hash.SuffixLen:], suf[:])
h := hash.New(hasBytes[:])
@@ -573,13 +643,14 @@ func verifyCheckSum(ctx context.Context, reader tableReaderAt, span byteSpan, ch
//
// For our purposes where we are just trying to get the index, we must compare the resulting index to our target to
// determine if it is a match.
func prollyBinSearch(slice []uint64, target uint64) int {
items := len(slice)
func prollyBinSearch(m archiveIndexReader, target uint64) int {
items := m.getNumChunks()
if items == 0 {
return 0
}
lft, rht := 0, items
lo, hi := slice[lft], slice[rht-1]
lo, hi := m.getPrefix(0), m.getPrefix(uint32(rht-1))
if target > hi {
return rht
}
@@ -593,19 +664,17 @@ func prollyBinSearch(slice []uint64, target uint64) int {
mhi, mlo := bits.Mul64(shiftedTgt, idxRangeSz)
dU64, _ := bits.Div64(mhi, mlo, valRangeSz)
idx := int(dU64) + lft
if slice[idx] < target {
if m.getPrefix(uint32(idx)) < target {
lft = idx + 1
// No need to update lo if i == items, since this loop will be ending.
if lft < items {
lo = slice[lft]
// Interpolation doesn't like lo >= target, so if we're already there, just return |i|.
lo = m.getPrefix(uint32(lft))
if lo >= target {
return lft
}
}
} else {
rht = idx
hi = slice[rht]
hi = m.getPrefix(uint32(rht))
}
}
return lft
+6 -3
View File
@@ -89,7 +89,7 @@ func TestArchiveSingleZStdChunk(t *testing.T) {
aIdx, err := newArchiveReader(context.Background(), tra, fileSize, &Stats{})
assert.NoError(t, err)
assert.Equal(t, []uint64{23}, aIdx.prefixes)
assert.Equal(t, uint64(23), aIdx.indexReader.getPrefix(0))
assert.True(t, aIdx.has(oneHash))
dict, data, err := aIdx.getRaw(context.Background(), oneHash, &Stats{})
@@ -141,7 +141,7 @@ func TestArchiveSingleSnappyChunk(t *testing.T) {
aIdx, err := newArchiveReader(context.Background(), tra, fileSize, &Stats{})
assert.NoError(t, err)
assert.Equal(t, []uint64{23}, aIdx.prefixes)
assert.Equal(t, uint64(23), aIdx.indexReader.getPrefix(0))
assert.True(t, aIdx.has(oneHash))
dict, data, err := aIdx.getRaw(context.Background(), oneHash, &Stats{})
@@ -208,7 +208,10 @@ func TestArchiverMultipleChunksMultipleDictionaries(t *testing.T) {
tra := tableReaderAtAdapter{readerAt}
aIdx, err := newArchiveReader(context.Background(), tra, fileSize, &Stats{})
assert.NoError(t, err)
assert.Equal(t, []uint64{21, 42, 42, 42, 42, 81, 88}, aIdx.prefixes)
expectedPrefixes := []uint64{21, 42, 42, 42, 42, 81, 88}
for i, expected := range expectedPrefixes {
assert.Equal(t, expected, aIdx.indexReader.getPrefix(uint32(i)))
}
assert.True(t, aIdx.has(h1))
assert.True(t, aIdx.has(h2))
+3 -3
View File
@@ -20,7 +20,6 @@ import (
"os"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/hash"
)
@@ -30,7 +29,7 @@ type archiveIndexReader interface {
getChunkRef(idx int) (dict, data uint32)
getSuffix(idx uint64) suffix
searchPrefixes(target uint64) int
close() error
io.Closer
}
// mmapIndexReader lazily loads archive index data from a memory-mapped file.
@@ -181,7 +180,8 @@ func (m *mmapIndexReader) prollyBinSearch(target uint64, items int) int {
}
// close unmaps the memory region
func (m *mmapIndexReader) close() error {
func (m *mmapIndexReader) Close() error {
// Currently we never unmap mmapped indexes in order to prevent a data race with the AutoIncrementTracker.
/*if m.data != nil {
data := m.data
m.data = nil