mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-20 11:29:13 -05:00
go/store/nbs: use 16byte addr prefixes to save space in chunk journal index
This commit is contained in:
@@ -37,8 +37,11 @@ const (
|
||||
|
||||
chunkJournalAddr = "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"
|
||||
|
||||
journalIndexFileName = "journal.idx"
|
||||
journalIndexDefaultMaxNovel = 4096
|
||||
journalIndexFileName = "journal.idx"
|
||||
|
||||
// journalIndexDefaultMaxNovel determines how often we flush
|
||||
// records qto the out-of-band journal index file.
|
||||
journalIndexDefaultMaxNovel = 16384
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -154,8 +157,8 @@ type journalWriter struct {
|
||||
|
||||
var _ io.Closer = &journalWriter{}
|
||||
|
||||
// bootstrapJournal reads the journal file collecting a recLookup for each record and
|
||||
// returning the latest committed root hash.
|
||||
// bootstrapJournal reads in records from the journal file and the journal index file, initializing
|
||||
// the state of the journalWriter. It returns the most recent root hash for the journal.
|
||||
func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, err error) {
|
||||
wr.lock.Lock()
|
||||
defer wr.lock.Unlock()
|
||||
@@ -184,6 +187,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash,
|
||||
if info, err = wr.index.Stat(); err != nil {
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
// process the indexed portion of the journal
|
||||
err = processIndexRecords(ctx, wr.index, info.Size(), func(o int64, r indexRec) (err error) {
|
||||
switch r.kind {
|
||||
case tableIndexRecKind:
|
||||
@@ -330,6 +334,8 @@ func (wr *journalWriter) commitRootHash(root hash.Hash) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// flushIndexRecord writes a new record to the out-of-band journal index file. Index records
|
||||
// accelerate journal bootstrapping by reducing the amount of the journal that must be processed.
|
||||
func (wr *journalWriter) flushIndexRecord(root hash.Hash, end int64) (err error) {
|
||||
payload := serializeLookups(wr.ranges.novelLookups())
|
||||
buf := make([]byte, journalIndexRecordSize(payload))
|
||||
@@ -478,22 +484,37 @@ func (wr *journalWriter) Close() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// A rangeIndex maps chunk addresses to read Ranges in the chunk journal file.
|
||||
type rangeIndex struct {
|
||||
novel *swiss.Map[addr, Range]
|
||||
cached *swiss.Map[addr, Range]
|
||||
// novel Ranges represent most recent chunks written to
|
||||
// the journal. These Ranges have not yet been writen to
|
||||
// a journal index record.
|
||||
novel *swiss.Map[addr, Range]
|
||||
|
||||
// cached Ranges are bootstrapped from an out-of-band journal
|
||||
// index file. To save memory, these Ranges are keyed by a 16-byte
|
||||
// prefix of their addr which is assumed to be globally unique
|
||||
cached *swiss.Map[addr16, Range]
|
||||
}
|
||||
|
||||
type addr16 [16]byte
|
||||
|
||||
func toAddr16(full addr) (prefix addr16) {
|
||||
copy(prefix[:], full[:])
|
||||
return
|
||||
}
|
||||
|
||||
func newRangeIndex() rangeIndex {
|
||||
return rangeIndex{
|
||||
novel: swiss.NewMap[addr, Range](0),
|
||||
cached: swiss.NewMap[addr, Range](0),
|
||||
novel: swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel),
|
||||
cached: swiss.NewMap[addr16, Range](0),
|
||||
}
|
||||
}
|
||||
|
||||
func (idx rangeIndex) get(a addr) (rng Range, ok bool) {
|
||||
rng, ok = idx.novel.Get(a)
|
||||
if !ok {
|
||||
rng, ok = idx.cached.Get(a)
|
||||
rng, ok = idx.cached.Get(toAddr16(a))
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -502,11 +523,6 @@ func (idx rangeIndex) put(a addr, rng Range) {
|
||||
idx.novel.Put(a, rng)
|
||||
}
|
||||
|
||||
func (idx rangeIndex) iter(cb func(addr, Range) (stop bool)) {
|
||||
idx.novel.Iter(cb)
|
||||
idx.cached.Iter(cb)
|
||||
}
|
||||
|
||||
func (idx rangeIndex) count() uint32 {
|
||||
return uint32(idx.novel.Count() + idx.cached.Count())
|
||||
}
|
||||
@@ -525,19 +541,24 @@ func (idx rangeIndex) novelLookups() (lookups []lookup) {
|
||||
}
|
||||
|
||||
func (idx rangeIndex) flatten() rangeIndex {
|
||||
var union *swiss.Map[addr, Range]
|
||||
if idx.cached.Count() == 0 {
|
||||
union = idx.novel
|
||||
|
||||
} else {
|
||||
union = idx.cached
|
||||
idx.novel.Iter(func(a addr, r Range) (stop bool) {
|
||||
union.Put(a, r)
|
||||
return
|
||||
})
|
||||
}
|
||||
// rather than copy |idx.novel| to |idx.cached|, we construct
|
||||
// a new map |union| with only enough capacity for the current
|
||||
// set of Ranges, and copy everything into it. This maximizes
|
||||
// the load factor of |union| and reduces the memory overhead
|
||||
// of |idx|. The tradeoff is that we allocate/copy more often,
|
||||
// but the cost of this work is much less than flushing journal
|
||||
// index records to disk.
|
||||
union := swiss.NewMap[addr16, Range](idx.count())
|
||||
idx.novel.Iter(func(a addr, r Range) (stop bool) {
|
||||
union.Put(toAddr16(a), r)
|
||||
return
|
||||
})
|
||||
idx.cached.Iter(func(a addr16, r Range) (stop bool) {
|
||||
union.Put(a, r)
|
||||
return
|
||||
})
|
||||
return rangeIndex{
|
||||
novel: swiss.NewMap[addr, Range](0),
|
||||
novel: swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel),
|
||||
cached: union,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,10 +199,7 @@ func TestJournalWriterWriteCompressedChunk(t *testing.T) {
|
||||
r, _ := j.ranges.get(a)
|
||||
validateLookup(t, j, r, cc)
|
||||
}
|
||||
j.ranges.iter(func(a addr, r Range) (stop bool) {
|
||||
validateLookup(t, j, r, data[a])
|
||||
return
|
||||
})
|
||||
validateAllLookups(t, j, data)
|
||||
}
|
||||
|
||||
func TestJournalWriterBootstrap(t *testing.T) {
|
||||
@@ -223,10 +220,7 @@ func TestJournalWriterBootstrap(t *testing.T) {
|
||||
_, err = j.bootstrapJournal(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
j.ranges.iter(func(a addr, r Range) (stop bool) {
|
||||
validateLookup(t, j, r, data[a])
|
||||
return
|
||||
})
|
||||
validateAllLookups(t, j, data)
|
||||
|
||||
source := journalChunkSource{journal: j}
|
||||
for a, cc := range data {
|
||||
@@ -238,6 +232,27 @@ func TestJournalWriterBootstrap(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func validateAllLookups(t *testing.T, j *journalWriter, data map[addr]CompressedChunk) {
|
||||
// move |data| to addr16-keyed map
|
||||
prefixMap := make(map[addr16]CompressedChunk, len(data))
|
||||
var prefix addr16
|
||||
for a, cc := range data {
|
||||
copy(prefix[:], a[:])
|
||||
prefixMap[prefix] = cc
|
||||
}
|
||||
iterRangeIndex(j.ranges, func(a addr16, r Range) (stop bool) {
|
||||
validateLookup(t, j, r, prefixMap[a])
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func iterRangeIndex(idx rangeIndex, cb func(addr16, Range) (stop bool)) {
|
||||
idx.novel.Iter(func(a addr, r Range) (stop bool) {
|
||||
return cb(toAddr16(a), r)
|
||||
})
|
||||
idx.cached.Iter(cb)
|
||||
}
|
||||
|
||||
func validateLookup(t *testing.T, j *journalWriter, r Range, cc CompressedChunk) {
|
||||
buf := make([]byte, r.Length)
|
||||
_, err := j.readAt(buf, int64(r.Offset))
|
||||
|
||||
Reference in New Issue
Block a user