go/store/nbs: move chunk index hashmap into journal writer

This commit is contained in:
Andy Arthur
2023-02-17 09:45:42 -08:00
parent 4b3c909bb1
commit 2ddf1fd9f5
4 changed files with 73 additions and 98 deletions
+5 -10
View File
@@ -56,7 +56,6 @@ const (
// both memTable persists and manifest updates to a single file.
type chunkJournal struct {
wr *journalWriter
src journalChunkSource
path string
contents manifestContents
@@ -105,7 +104,7 @@ func (j *chunkJournal) openJournal(ctx context.Context) (err error) {
return err
}
_, j.src, err = j.wr.ProcessJournal(ctx)
_, err = j.wr.ProcessJournal(ctx)
if err != nil {
return err
}
@@ -133,8 +132,7 @@ func (j *chunkJournal) openJournal(ctx context.Context) (err error) {
}
// parse existing journal file
var root hash.Hash
root, j.src, err = j.wr.ProcessJournal(ctx)
root, err := j.wr.ProcessJournal(ctx)
if err != nil {
return err
}
@@ -178,15 +176,12 @@ func (j *chunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkRea
continue
}
c := chunks.NewChunkWithHash(hash.Hash(*record.a), mt.chunks[*record.a])
cc := ChunkToCompressedChunk(c)
lookup, err := j.wr.WriteChunk(cc)
err := j.wr.WriteChunk(ChunkToCompressedChunk(c))
if err != nil {
return nil, err
}
j.src.lookups.put(*record.a, lookup)
j.src.uncompressedSz += uint64(c.Size())
}
return j.src, nil
return journalChunkSource{journal: j.wr}, nil
}
// ConjoinAll implements tablePersister.
@@ -200,7 +195,7 @@ func (j *chunkJournal) Open(ctx context.Context, name addr, chunkCount uint32, s
if err := j.maybeInit(ctx); err != nil {
return nil, err
}
return j.src, nil
return journalChunkSource{journal: j.wr}, nil
}
return j.persister.Open(ctx, name, chunkCount, stats)
}
+9 -46
View File
@@ -18,7 +18,6 @@ import (
"context"
"fmt"
"io"
"sync"
"golang.org/x/sync/errgroup"
@@ -49,59 +48,23 @@ func rangeFromLookup(l recLookup) Range {
}
}
// lookupMap is a thread-safe collection of recLookups.
type lookupMap struct {
data map[addr]recLookup
lock *sync.RWMutex
}
func newLookupMap() lookupMap {
return lookupMap{
data: make(map[addr]recLookup),
lock: new(sync.RWMutex),
}
}
func (m lookupMap) get(a addr) (l recLookup, ok bool) {
m.lock.RLock()
defer m.lock.RUnlock()
l, ok = m.data[a]
return
}
func (m lookupMap) put(a addr, l recLookup) {
m.lock.Lock()
defer m.lock.Unlock()
m.data[a] = l
return
}
func (m lookupMap) count() int {
m.lock.RLock()
defer m.lock.RUnlock()
return len(m.data)
}
// journalChunkSource is a chunkSource that reads chunks
// from a chunkJournal. Unlike other NBS chunkSources,
// it is not immutable and its set of chunks grows as
// more commits are made to the chunkJournal.
type journalChunkSource struct {
address addr
journal *journalWriter
lookups lookupMap
uncompressedSz uint64
journal *journalWriter
}
var _ chunkSource = journalChunkSource{}
func (s journalChunkSource) has(h addr) (bool, error) {
return s.journal.has(h, s.lookups), nil
return s.journal.has(h), nil
}
func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error) {
for i := range addrs {
ok := s.journal.has(*addrs[i].a, s.lookups)
ok := s.journal.has(*addrs[i].a)
if ok {
addrs[i].has = true
} else {
@@ -112,11 +75,11 @@ func (s journalChunkSource) hasMany(addrs []hasRecord) (missing bool, err error)
}
func (s journalChunkSource) getCompressed(_ context.Context, h addr, _ *Stats) (CompressedChunk, error) {
return s.journal.getCompressed(h, s.lookups)
return s.journal.getCompressed(h)
}
func (s journalChunkSource) get(_ context.Context, h addr, _ *Stats) ([]byte, error) {
cc, err := s.journal.getCompressed(h, s.lookups)
cc, err := s.journal.getCompressed(h)
if err != nil {
return nil, err
} else if cc.IsEmpty() {
@@ -163,15 +126,15 @@ func (s journalChunkSource) getManyCompressed(ctx context.Context, _ *errgroup.G
}
func (s journalChunkSource) count() (uint32, error) {
return uint32(s.lookups.count()), nil
return s.journal.recordCount(), nil
}
func (s journalChunkSource) uncompressedLen() (uint64, error) {
return s.uncompressedSz, nil
return s.journal.uncompressedSize(), nil
}
func (s journalChunkSource) hash() addr {
return s.address
return journalAddr
}
// reader implements chunkSource.
@@ -186,7 +149,7 @@ func (s journalChunkSource) getRecordRanges(requests []getRecord) (map[hash.Hash
if req.found {
continue
}
rng, ok := s.journal.getRange(*req.a, s.lookups)
rng, ok := s.journal.getRange(*req.a)
if !ok {
continue
}
+49 -31
View File
@@ -78,9 +78,10 @@ func openJournalWriter(ctx context.Context, path string) (wr *journalWriter, exi
}
return &journalWriter{
buf: make([]byte, 0, journalWriterBuffSize),
file: f,
path: path,
buf: make([]byte, 0, journalWriterBuffSize),
lookups: make(map[addr]recLookup),
file: f,
path: path,
}, true, nil
}
@@ -117,9 +118,10 @@ func createJournalWriter(ctx context.Context, path string) (wr *journalWriter, e
}
return &journalWriter{
buf: make([]byte, 0, journalWriterBuffSize),
file: f,
path: path,
buf: make([]byte, 0, journalWriterBuffSize),
lookups: make(map[addr]recLookup),
file: f,
path: path,
}, nil
}
@@ -134,11 +136,13 @@ type snapshotReader interface {
}
type journalWriter struct {
buf []byte
file *os.File
off int64
path string
lock sync.RWMutex
buf []byte
lookups map[addr]recLookup
file *os.File
off int64
uncmpSz uint64
path string
lock sync.RWMutex
}
var _ io.WriteCloser = &journalWriter{}
@@ -190,6 +194,7 @@ func (wr *journalWriter) CurrentSize() int64 {
return wr.off
}
// todo: remove, this method is only used for testing
func (wr *journalWriter) Write(p []byte) (n int, err error) {
wr.lock.Lock()
defer wr.lock.Unlock()
@@ -210,23 +215,18 @@ func (wr *journalWriter) Write(p []byte) (n int, err error) {
return
}
func (wr *journalWriter) ProcessJournal(ctx context.Context) (last hash.Hash, cs journalChunkSource, err error) {
func (wr *journalWriter) ProcessJournal(ctx context.Context) (last hash.Hash, err error) {
wr.lock.Lock()
defer wr.lock.Unlock()
src := journalChunkSource{
journal: wr,
address: journalAddr,
lookups: newLookupMap(),
}
wr.off, err = processJournalRecords(ctx, wr.file, func(o int64, r journalRec) error {
switch r.kind {
case chunkJournalRecKind:
src.lookups.put(r.address, recLookup{
wr.lookups[r.address] = recLookup{
journalOff: o,
recordLen: r.length,
payloadOff: r.payloadOffset(),
})
src.uncompressedSz += r.uncompressedPayloadSize()
}
wr.uncmpSz += r.uncompressedPayloadSize()
case rootHashJournalRecKind:
last = hash.Hash(r.address)
default:
@@ -235,13 +235,12 @@ func (wr *journalWriter) ProcessJournal(ctx context.Context) (last hash.Hash, cs
return nil
})
if err != nil {
return hash.Hash{}, journalChunkSource{}, err
return hash.Hash{}, err
}
cs = src
return
}
func (wr *journalWriter) WriteChunk(cc CompressedChunk) (recLookup, error) {
func (wr *journalWriter) WriteChunk(cc CompressedChunk) error {
wr.lock.Lock()
defer wr.lock.Unlock()
l, o := chunkRecordSize(cc)
@@ -252,10 +251,11 @@ func (wr *journalWriter) WriteChunk(cc CompressedChunk) (recLookup, error) {
}
buf, err := wr.getBytes(int(rec.recordLen))
if err != nil {
return recLookup{}, err
return err
}
_ = writeChunkRecord(buf, cc)
return rec, nil
wr.lookups[addr(cc.H)] = rec
return nil
}
func (wr *journalWriter) WriteRootHash(root hash.Hash) error {
@@ -317,22 +317,28 @@ func (wr *journalWriter) flush() (err error) {
return
}
func (wr *journalWriter) has(h addr, lookups lookupMap) (ok bool) {
_, ok = lookups.get(h)
func (wr *journalWriter) has(h addr) (ok bool) {
wr.lock.RLock()
defer wr.lock.RUnlock()
_, ok = wr.lookups[h]
return
}
func (wr *journalWriter) getRange(h addr, lookups lookupMap) (rng Range, ok bool) {
func (wr *journalWriter) getRange(h addr) (rng Range, ok bool) {
wr.lock.RLock()
defer wr.lock.RUnlock()
var l recLookup
l, ok = lookups.get(h)
l, ok = wr.lookups[h]
if ok {
rng = rangeFromLookup(l)
}
return
}
func (wr *journalWriter) getCompressed(h addr, lookups lookupMap) (CompressedChunk, error) {
l, ok := lookups.get(h)
func (wr *journalWriter) getCompressed(h addr) (CompressedChunk, error) {
wr.lock.RLock()
defer wr.lock.RUnlock()
l, ok := wr.lookups[h]
if !ok {
return CompressedChunk{}, nil
}
@@ -352,3 +358,15 @@ func (wr *journalWriter) getCompressed(h addr, lookups lookupMap) (CompressedChu
}
return NewCompressedChunk(hash.Hash(h), rec.payload)
}
func (wr *journalWriter) recordCount() uint32 {
wr.lock.RLock()
defer wr.lock.RUnlock()
return uint32(len(wr.lookups))
}
func (wr *journalWriter) uncompressedSize() uint64 {
wr.lock.RLock()
defer wr.lock.RUnlock()
return wr.uncmpSz
}
+10 -11
View File
@@ -195,15 +195,14 @@ func TestJournalWriterWriteChunk(t *testing.T) {
require.NoError(t, err)
data := randomCompressedChunks()
lookups := make(map[addr]recLookup)
for a, cc := range data {
l, err := j.WriteChunk(cc)
err = j.WriteChunk(cc)
require.NoError(t, err)
lookups[a] = l
l := j.lookups[a]
validateLookup(t, j, l, cc)
}
for a, l := range lookups {
for a, l := range j.lookups {
validateLookup(t, j, l, data[a])
}
require.NoError(t, j.Close())
@@ -217,22 +216,22 @@ func TestJournalWriterBootstrap(t *testing.T) {
require.NoError(t, err)
data := randomCompressedChunks()
lookups := make(map[addr]recLookup)
for a, cc := range data {
l, err := j.WriteChunk(cc)
for _, cc := range data {
err = j.WriteChunk(cc)
require.NoError(t, err)
lookups[a] = l
}
assert.NoError(t, j.Close())
j, _, err = openJournalWriter(ctx, path)
require.NoError(t, err)
_, source, err := j.ProcessJournal(ctx)
_, err = j.ProcessJournal(ctx)
require.NoError(t, err)
for a, l := range lookups {
for a, l := range j.lookups {
validateLookup(t, j, l, data[a])
}
source := journalChunkSource{journal: j}
for a, cc := range data {
buf, err := source.get(ctx, a, nil)
require.NoError(t, err)
@@ -259,7 +258,7 @@ func TestJournalWriterSyncClose(t *testing.T) {
j, err := createJournalWriter(ctx, newTestFilePath(t))
require.NotNil(t, j)
require.NoError(t, err)
_, _, err = j.ProcessJournal(ctx)
_, err = j.ProcessJournal(ctx)
require.NoError(t, err)
// close triggers flush