diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index 4f8a8e2e10..87ab0288a4 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -37,8 +37,11 @@ const ( chunkJournalAddr = "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv" - journalIndexFileName = "journal.idx" - journalIndexDefaultMaxNovel = 4096 + journalIndexFileName = "journal.idx" + + // journalIndexDefaultMaxNovel determines how often we flush + // records qto the out-of-band journal index file. + journalIndexDefaultMaxNovel = 16384 ) var ( @@ -154,8 +157,8 @@ type journalWriter struct { var _ io.Closer = &journalWriter{} -// bootstrapJournal reads the journal file collecting a recLookup for each record and -// returning the latest committed root hash. +// bootstrapJournal reads in records from the journal file and the journal index file, initializing +// the state of the journalWriter. It returns the most recent root hash for the journal. func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, err error) { wr.lock.Lock() defer wr.lock.Unlock() @@ -184,6 +187,7 @@ func (wr *journalWriter) bootstrapJournal(ctx context.Context) (last hash.Hash, if info, err = wr.index.Stat(); err != nil { return hash.Hash{}, err } + // process the indexed portion of the journal err = processIndexRecords(ctx, wr.index, info.Size(), func(o int64, r indexRec) (err error) { switch r.kind { case tableIndexRecKind: @@ -330,6 +334,8 @@ func (wr *journalWriter) commitRootHash(root hash.Hash) error { return err } +// flushIndexRecord writes a new record to the out-of-band journal index file. Index records +// accelerate journal bootstrapping by reducing the amount of the journal that must be processed. func (wr *journalWriter) flushIndexRecord(root hash.Hash, end int64) (err error) { payload := serializeLookups(wr.ranges.novelLookups()) buf := make([]byte, journalIndexRecordSize(payload)) @@ -478,22 +484,37 @@ func (wr *journalWriter) Close() (err error) { return } +// A rangeIndex maps chunk addresses to read Ranges in the chunk journal file. type rangeIndex struct { - novel *swiss.Map[addr, Range] - cached *swiss.Map[addr, Range] + // novel Ranges represent most recent chunks written to + // the journal. These Ranges have not yet been writen to + // a journal index record. + novel *swiss.Map[addr, Range] + + // cached Ranges are bootstrapped from an out-of-band journal + // index file. To save memory, these Ranges are keyed by a 16-byte + // prefix of their addr which is assumed to be globally unique + cached *swiss.Map[addr16, Range] +} + +type addr16 [16]byte + +func toAddr16(full addr) (prefix addr16) { + copy(prefix[:], full[:]) + return } func newRangeIndex() rangeIndex { return rangeIndex{ - novel: swiss.NewMap[addr, Range](0), - cached: swiss.NewMap[addr, Range](0), + novel: swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel), + cached: swiss.NewMap[addr16, Range](0), } } func (idx rangeIndex) get(a addr) (rng Range, ok bool) { rng, ok = idx.novel.Get(a) if !ok { - rng, ok = idx.cached.Get(a) + rng, ok = idx.cached.Get(toAddr16(a)) } return } @@ -502,11 +523,6 @@ func (idx rangeIndex) put(a addr, rng Range) { idx.novel.Put(a, rng) } -func (idx rangeIndex) iter(cb func(addr, Range) (stop bool)) { - idx.novel.Iter(cb) - idx.cached.Iter(cb) -} - func (idx rangeIndex) count() uint32 { return uint32(idx.novel.Count() + idx.cached.Count()) } @@ -525,19 +541,24 @@ func (idx rangeIndex) novelLookups() (lookups []lookup) { } func (idx rangeIndex) flatten() rangeIndex { - var union *swiss.Map[addr, Range] - if idx.cached.Count() == 0 { - union = idx.novel - - } else { - union = idx.cached - idx.novel.Iter(func(a addr, r Range) (stop bool) { - union.Put(a, r) - return - }) - } + // rather than copy |idx.novel| to |idx.cached|, we construct + // a new map |union| with only enough capacity for the current + // set of Ranges, and copy everything into it. This maximizes + // the load factor of |union| and reduces the memory overhead + // of |idx|. The tradeoff is that we allocate/copy more often, + // but the cost of this work is much less than flushing journal + // index records to disk. + union := swiss.NewMap[addr16, Range](idx.count()) + idx.novel.Iter(func(a addr, r Range) (stop bool) { + union.Put(toAddr16(a), r) + return + }) + idx.cached.Iter(func(a addr16, r Range) (stop bool) { + union.Put(a, r) + return + }) return rangeIndex{ - novel: swiss.NewMap[addr, Range](0), + novel: swiss.NewMap[addr, Range](journalIndexDefaultMaxNovel), cached: union, } } diff --git a/go/store/nbs/journal_writer_test.go b/go/store/nbs/journal_writer_test.go index 15f44dc18e..a50d5dec02 100644 --- a/go/store/nbs/journal_writer_test.go +++ b/go/store/nbs/journal_writer_test.go @@ -199,10 +199,7 @@ func TestJournalWriterWriteCompressedChunk(t *testing.T) { r, _ := j.ranges.get(a) validateLookup(t, j, r, cc) } - j.ranges.iter(func(a addr, r Range) (stop bool) { - validateLookup(t, j, r, data[a]) - return - }) + validateAllLookups(t, j, data) } func TestJournalWriterBootstrap(t *testing.T) { @@ -223,10 +220,7 @@ func TestJournalWriterBootstrap(t *testing.T) { _, err = j.bootstrapJournal(ctx) require.NoError(t, err) - j.ranges.iter(func(a addr, r Range) (stop bool) { - validateLookup(t, j, r, data[a]) - return - }) + validateAllLookups(t, j, data) source := journalChunkSource{journal: j} for a, cc := range data { @@ -238,6 +232,27 @@ func TestJournalWriterBootstrap(t *testing.T) { } } +func validateAllLookups(t *testing.T, j *journalWriter, data map[addr]CompressedChunk) { + // move |data| to addr16-keyed map + prefixMap := make(map[addr16]CompressedChunk, len(data)) + var prefix addr16 + for a, cc := range data { + copy(prefix[:], a[:]) + prefixMap[prefix] = cc + } + iterRangeIndex(j.ranges, func(a addr16, r Range) (stop bool) { + validateLookup(t, j, r, prefixMap[a]) + return + }) +} + +func iterRangeIndex(idx rangeIndex, cb func(addr16, Range) (stop bool)) { + idx.novel.Iter(func(a addr, r Range) (stop bool) { + return cb(toAddr16(a), r) + }) + idx.cached.Iter(cb) +} + func validateLookup(t *testing.T, j *journalWriter, r Range, cc CompressedChunk) { buf := make([]byte, r.Length) _, err := j.readAt(buf, int64(r.Offset))