swap chunkSources for chunkSourceSet in tableSet

This commit is contained in:
Andy Arthur
2022-11-23 14:01:50 -08:00
parent 1f5287fe4b
commit 45acdc0c6a
4 changed files with 66 additions and 61 deletions

View File

@@ -152,7 +152,7 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash
}
ranges := make(map[hash.Hash]map[hash.Hash]Range)
f := func(css chunkSources) error {
f := func(css chunkSourceSet) error {
for _, cs := range css {
switch tr := cs.(type) {
case *fileTableReader:
@@ -386,7 +386,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
contents, upstreamAppendixSpecs := upstream.removeAppendixSpecs()
switch option {
case ManifestAppendixOption_Append:
// prepend all appendix specs to contents.specs
// append all appendix specs to contents.specs
specs := append([]tableSpec{}, appendixSpecs...)
specs = append(specs, upstreamAppendixSpecs...)
contents.specs = append(specs, contents.specs...)
@@ -402,7 +402,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp
return contents, nil
}
// prepend new appendix specs to contents.specs
// append new appendix specs to contents.specs
// dropping all upstream appendix specs
specs := append([]tableSpec{}, appendixSpecs...)
contents.specs = append(specs, contents.specs...)
@@ -643,7 +643,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, h addr, data []byte) (b
nbs.mt = newMemTable(nbs.mtSize)
}
if !nbs.mt.addChunk(h, data) {
ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats)
if err != nil {
return false, err
}
@@ -984,7 +984,7 @@ func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash)
}
if cnt > preflushChunkCount {
ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats)
if err != nil {
return err
}
@@ -1070,7 +1070,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has
}
if cnt > 0 {
ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats)
ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats)
if err != nil {
return err
}

View File

@@ -262,6 +262,16 @@ type chunkSource interface {
type chunkSources []chunkSource
type chunkSourceSet map[addr]chunkSource
func copyChunkSourceSet(s chunkSourceSet) (cp chunkSourceSet) {
cp = make(chunkSourceSet, len(s))
for k, v := range s {
cp[k] = v
}
return
}
// TableFile is an interface for working with an existing table file
type TableFile interface {
// FileID gets the id of the file

View File

@@ -42,14 +42,14 @@ func newTableSet(p tablePersister, q MemoryQuotaProvider) tableSet {
// tableSet is an immutable set of persistable chunkSources.
type tableSet struct {
novel, upstream chunkSources
novel, upstream chunkSourceSet
p tablePersister
q MemoryQuotaProvider
rl chan struct{}
}
func (ts tableSet) has(h addr) (bool, error) {
f := func(css chunkSources) (bool, error) {
f := func(css chunkSourceSet) (bool, error) {
for _, haver := range css {
has, err := haver.has(h)
@@ -78,7 +78,7 @@ func (ts tableSet) has(h addr) (bool, error) {
}
func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) {
f := func(css chunkSources) (bool, error) {
f := func(css chunkSourceSet) (bool, error) {
for _, haver := range css {
has, err := haver.hasMany(addrs)
@@ -106,7 +106,7 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) {
}
func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) {
f := func(css chunkSources) ([]byte, error) {
f := func(css chunkSourceSet) ([]byte, error) {
for _, haver := range css {
data, err := haver.get(ctx, h, stats)
@@ -136,7 +136,7 @@ func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error
}
func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (remaining bool, err error) {
f := func(css chunkSources) bool {
f := func(css chunkSourceSet) bool {
for _, haver := range css {
remaining, err = haver.getMany(ctx, eg, reqs, found, stats)
if err != nil {
@@ -153,7 +153,7 @@ func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRe
}
func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (remaining bool, err error) {
f := func(css chunkSources) bool {
f := func(css chunkSourceSet) bool {
for _, haver := range css {
remaining, err = haver.getManyCompressed(ctx, eg, reqs, found, stats)
if err != nil {
@@ -171,7 +171,7 @@ func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, re
}
func (ts tableSet) count() (uint32, error) {
f := func(css chunkSources) (count uint32, err error) {
f := func(css chunkSourceSet) (count uint32, err error) {
for _, haver := range css {
thisCount, err := haver.count()
@@ -200,7 +200,7 @@ func (ts tableSet) count() (uint32, error) {
}
func (ts tableSet) uncompressedLen() (uint64, error) {
f := func(css chunkSources) (data uint64, err error) {
f := func(css chunkSourceSet) (data uint64, err error) {
for _, haver := range css {
uncmpLen, err := haver.uncompressedLen()
@@ -229,7 +229,7 @@ func (ts tableSet) uncompressedLen() (uint64, error) {
}
func (ts tableSet) physicalLen() (uint64, error) {
f := func(css chunkSources) (data uint64, err error) {
f := func(css chunkSourceSet) (data uint64, err error) {
for _, haver := range css {
index, err := haver.index()
if err != nil {
@@ -277,24 +277,22 @@ func (ts tableSet) Size() int {
return len(ts.novel) + len(ts.upstream)
}
// prepend adds a memTable to an existing tableSet, compacting |mt| and
// append adds a memTable to an existing tableSet, compacting |mt| and
// returning a new tableSet with newly compacted table added.
func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) {
func (ts tableSet) append(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) {
cs, err := ts.p.Persist(ctx, mt, ts, stats)
if err != nil {
return tableSet{}, err
}
newTs := tableSet{
novel: make(chunkSources, len(ts.novel)+1),
upstream: make(chunkSources, len(ts.upstream)),
novel: copyChunkSourceSet(ts.novel),
upstream: copyChunkSourceSet(ts.upstream),
p: ts.p,
q: ts.q,
rl: ts.rl,
}
newTs.novel[0] = cs
copy(newTs.novel[1:], ts.novel)
copy(newTs.upstream, ts.upstream)
newTs.novel[cs.hash()] = cs
return newTs, nil
}
@@ -302,7 +300,7 @@ func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) (tab
// and ts.upstream.
func (ts tableSet) flatten(ctx context.Context) (tableSet, error) {
flattened := tableSet{
upstream: make(chunkSources, 0, ts.Size()),
upstream: copyChunkSourceSet(ts.upstream),
p: ts.p,
q: ts.q,
rl: ts.rl,
@@ -312,13 +310,10 @@ func (ts tableSet) flatten(ctx context.Context) (tableSet, error) {
cnt, err := src.count()
if err != nil {
return tableSet{}, err
}
if cnt > 0 {
flattened.upstream = append(flattened.upstream, src)
} else if cnt > 0 {
flattened.upstream[src.hash()] = src
}
}
flattened.upstream = append(flattened.upstream, ts.upstream...)
return flattened, nil
}
@@ -354,7 +349,7 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
// copy |ts.novel|, skipping empty chunkSources
// (usually due to de-duping during table compaction)
novel := make(chunkSources, 0, len(ts.novel))
novel := make(chunkSourceSet, len(ts.novel))
for _, t := range ts.novel {
cnt, err := t.count()
if err != nil {
@@ -366,49 +361,46 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats)
if err != nil {
return tableSet{}, err
}
novel = append(novel, t2)
}
existing := make(map[addr]chunkSource, len(ts.upstream))
for _, cs := range ts.upstream {
existing[cs.hash()] = cs
novel[t2.hash()] = t2
}
// newly opened tables are unowned, we must
// close them if the rebase operation fails
opened := new(sync.Map)
opened := make(chunkSourceSet, len(specs))
eg, ctx := errgroup.WithContext(ctx)
upstream := make([]chunkSource, len(specs))
for i, s := range specs {
mu := new(sync.Mutex)
upstream := make(chunkSourceSet, len(specs))
for _, s := range specs {
// clone tables that we have already opened
if cs, ok := existing[s.name]; ok {
c, err := cs.clone()
if cs, ok := ts.upstream[s.name]; ok {
cl, err := cs.clone()
if err != nil {
return tableSet{}, err
}
upstream[i] = c
upstream[cl.hash()] = cl
continue
}
// open missing tables in parallel
idx, spec := i, s
spec := s
eg.Go(func() error {
cs, err := ts.p.Open(ctx, spec.name, spec.chunkCount, stats)
if err != nil {
return err
}
upstream[idx] = cs
opened.Store(spec.name, cs)
mu.Lock()
defer mu.Unlock()
upstream[cs.hash()] = cs
opened[cs.hash()] = cs
return nil
})
}
if err := eg.Wait(); err != nil {
opened.Range(func(_, v any) bool {
for _, cs := range opened {
// close any opened chunkSources
_ = v.(chunkSource).close()
return true
})
_ = cs.close()
}
return tableSet{}, err
}
@@ -456,7 +448,10 @@ func (ts tableSet) toSpecs() ([]tableSpec, error) {
}
func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) {
all := append(ts.novel, ts.upstream...)
all := copyChunkSourceSet(ts.upstream)
for a, cs := range ts.novel {
all[a] = cs
}
for _, tbl := range all {
rdr, ok := tbl.(*fileTableReader)
if !ok {

View File

@@ -32,7 +32,7 @@ import (
var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
func TestTableSetPrependEmpty(t *testing.T) {
ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{})
ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).append(context.Background(), newMemTable(testMemTableSize), &Stats{})
require.NoError(t, err)
specs, err := ts.toSpecs()
require.NoError(t, err)
@@ -47,7 +47,7 @@ func TestTableSetPrepend(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
firstSpecs, err := ts.toSpecs()
@@ -57,7 +57,7 @@ func TestTableSetPrepend(t *testing.T) {
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
secondSpecs, err := ts.toSpecs()
@@ -74,17 +74,17 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
mt = newMemTable(testMemTableSize)
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
specs, err = ts.toSpecs()
@@ -100,17 +100,17 @@ func TestTableSetFlattenExcludesEmptyTable(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
mt = newMemTable(testMemTableSize)
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
ts, err = ts.flatten(context.Background())
@@ -138,7 +138,7 @@ func TestTableSetRebase(t *testing.T) {
for _, c := range chunks {
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(c), c)
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
}
return ts
@@ -182,13 +182,13 @@ func TestTableSetPhysicalLen(t *testing.T) {
assert.Empty(specs)
mt := newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[0]), testChunks[0])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
mt = newMemTable(testMemTableSize)
mt.addChunk(computeAddr(testChunks[1]), testChunks[1])
mt.addChunk(computeAddr(testChunks[2]), testChunks[2])
ts, err = ts.prepend(context.Background(), mt, &Stats{})
ts, err = ts.append(context.Background(), mt, &Stats{})
require.NoError(t, err)
assert.True(mustUint64(ts.physicalLen()) > indexSize(mustUint32(ts.count())))