diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 03f1ed3310..4584943135 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -152,7 +152,7 @@ func (nbs *NomsBlockStore) GetChunkLocations(hashes hash.HashSet) (map[hash.Hash } ranges := make(map[hash.Hash]map[hash.Hash]Range) - f := func(css chunkSources) error { + f := func(css chunkSourceSet) error { for _, cs := range css { switch tr := cs.(type) { case *fileTableReader: @@ -386,7 +386,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp contents, upstreamAppendixSpecs := upstream.removeAppendixSpecs() switch option { case ManifestAppendixOption_Append: - // prepend all appendix specs to contents.specs + // append all appendix specs to contents.specs specs := append([]tableSpec{}, appendixSpecs...) specs = append(specs, upstreamAppendixSpecs...) contents.specs = append(specs, contents.specs...) @@ -402,7 +402,7 @@ func fromManifestAppendixOptionNewContents(upstream manifestContents, appendixSp return contents, nil } - // prepend new appendix specs to contents.specs + // append new appendix specs to contents.specs // dropping all upstream appendix specs specs := append([]tableSpec{}, appendixSpecs...) contents.specs = append(specs, contents.specs...) @@ -643,7 +643,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, h addr, data []byte) (b nbs.mt = newMemTable(nbs.mtSize) } if !nbs.mt.addChunk(h, data) { - ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats) + ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats) if err != nil { return false, err } @@ -984,7 +984,7 @@ func (nbs *NomsBlockStore) Commit(ctx context.Context, current, last hash.Hash) } if cnt > preflushChunkCount { - ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats) + ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats) if err != nil { return err } @@ -1070,7 +1070,7 @@ func (nbs *NomsBlockStore) updateManifest(ctx context.Context, current, last has } if cnt > 0 { - ts, err := nbs.tables.prepend(ctx, nbs.mt, nbs.stats) + ts, err := nbs.tables.append(ctx, nbs.mt, nbs.stats) if err != nil { return err } diff --git a/go/store/nbs/table.go b/go/store/nbs/table.go index 5f7088069d..0d9f25a7f3 100644 --- a/go/store/nbs/table.go +++ b/go/store/nbs/table.go @@ -262,6 +262,16 @@ type chunkSource interface { type chunkSources []chunkSource +type chunkSourceSet map[addr]chunkSource + +func copyChunkSourceSet(s chunkSourceSet) (cp chunkSourceSet) { + cp = make(chunkSourceSet, len(s)) + for k, v := range s { + cp[k] = v + } + return +} + // TableFile is an interface for working with an existing table file type TableFile interface { // FileID gets the id of the file diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index e20b911496..6af8589088 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -42,14 +42,14 @@ func newTableSet(p tablePersister, q MemoryQuotaProvider) tableSet { // tableSet is an immutable set of persistable chunkSources. type tableSet struct { - novel, upstream chunkSources + novel, upstream chunkSourceSet p tablePersister q MemoryQuotaProvider rl chan struct{} } func (ts tableSet) has(h addr) (bool, error) { - f := func(css chunkSources) (bool, error) { + f := func(css chunkSourceSet) (bool, error) { for _, haver := range css { has, err := haver.has(h) @@ -78,7 +78,7 @@ func (ts tableSet) has(h addr) (bool, error) { } func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { - f := func(css chunkSources) (bool, error) { + f := func(css chunkSourceSet) (bool, error) { for _, haver := range css { has, err := haver.hasMany(addrs) @@ -106,7 +106,7 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { } func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error) { - f := func(css chunkSources) ([]byte, error) { + f := func(css chunkSourceSet) ([]byte, error) { for _, haver := range css { data, err := haver.get(ctx, h, stats) @@ -136,7 +136,7 @@ func (ts tableSet) get(ctx context.Context, h addr, stats *Stats) ([]byte, error } func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, *chunks.Chunk), stats *Stats) (remaining bool, err error) { - f := func(css chunkSources) bool { + f := func(css chunkSourceSet) bool { for _, haver := range css { remaining, err = haver.getMany(ctx, eg, reqs, found, stats) if err != nil { @@ -153,7 +153,7 @@ func (ts tableSet) getMany(ctx context.Context, eg *errgroup.Group, reqs []getRe } func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (remaining bool, err error) { - f := func(css chunkSources) bool { + f := func(css chunkSourceSet) bool { for _, haver := range css { remaining, err = haver.getManyCompressed(ctx, eg, reqs, found, stats) if err != nil { @@ -171,7 +171,7 @@ func (ts tableSet) getManyCompressed(ctx context.Context, eg *errgroup.Group, re } func (ts tableSet) count() (uint32, error) { - f := func(css chunkSources) (count uint32, err error) { + f := func(css chunkSourceSet) (count uint32, err error) { for _, haver := range css { thisCount, err := haver.count() @@ -200,7 +200,7 @@ func (ts tableSet) count() (uint32, error) { } func (ts tableSet) uncompressedLen() (uint64, error) { - f := func(css chunkSources) (data uint64, err error) { + f := func(css chunkSourceSet) (data uint64, err error) { for _, haver := range css { uncmpLen, err := haver.uncompressedLen() @@ -229,7 +229,7 @@ func (ts tableSet) uncompressedLen() (uint64, error) { } func (ts tableSet) physicalLen() (uint64, error) { - f := func(css chunkSources) (data uint64, err error) { + f := func(css chunkSourceSet) (data uint64, err error) { for _, haver := range css { index, err := haver.index() if err != nil { @@ -277,24 +277,22 @@ func (ts tableSet) Size() int { return len(ts.novel) + len(ts.upstream) } -// prepend adds a memTable to an existing tableSet, compacting |mt| and +// append adds a memTable to an existing tableSet, compacting |mt| and // returning a new tableSet with newly compacted table added. -func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) { +func (ts tableSet) append(ctx context.Context, mt *memTable, stats *Stats) (tableSet, error) { cs, err := ts.p.Persist(ctx, mt, ts, stats) if err != nil { return tableSet{}, err } newTs := tableSet{ - novel: make(chunkSources, len(ts.novel)+1), - upstream: make(chunkSources, len(ts.upstream)), + novel: copyChunkSourceSet(ts.novel), + upstream: copyChunkSourceSet(ts.upstream), p: ts.p, q: ts.q, rl: ts.rl, } - newTs.novel[0] = cs - copy(newTs.novel[1:], ts.novel) - copy(newTs.upstream, ts.upstream) + newTs.novel[cs.hash()] = cs return newTs, nil } @@ -302,7 +300,7 @@ func (ts tableSet) prepend(ctx context.Context, mt *memTable, stats *Stats) (tab // and ts.upstream. func (ts tableSet) flatten(ctx context.Context) (tableSet, error) { flattened := tableSet{ - upstream: make(chunkSources, 0, ts.Size()), + upstream: copyChunkSourceSet(ts.upstream), p: ts.p, q: ts.q, rl: ts.rl, @@ -312,13 +310,10 @@ func (ts tableSet) flatten(ctx context.Context) (tableSet, error) { cnt, err := src.count() if err != nil { return tableSet{}, err - } - if cnt > 0 { - flattened.upstream = append(flattened.upstream, src) + } else if cnt > 0 { + flattened.upstream[src.hash()] = src } } - - flattened.upstream = append(flattened.upstream, ts.upstream...) return flattened, nil } @@ -354,7 +349,7 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats) // copy |ts.novel|, skipping empty chunkSources // (usually due to de-duping during table compaction) - novel := make(chunkSources, 0, len(ts.novel)) + novel := make(chunkSourceSet, len(ts.novel)) for _, t := range ts.novel { cnt, err := t.count() if err != nil { @@ -366,49 +361,46 @@ func (ts tableSet) rebase(ctx context.Context, specs []tableSpec, stats *Stats) if err != nil { return tableSet{}, err } - novel = append(novel, t2) - } - - existing := make(map[addr]chunkSource, len(ts.upstream)) - for _, cs := range ts.upstream { - existing[cs.hash()] = cs + novel[t2.hash()] = t2 } // newly opened tables are unowned, we must // close them if the rebase operation fails - opened := new(sync.Map) + opened := make(chunkSourceSet, len(specs)) eg, ctx := errgroup.WithContext(ctx) - upstream := make([]chunkSource, len(specs)) - for i, s := range specs { + mu := new(sync.Mutex) + upstream := make(chunkSourceSet, len(specs)) + for _, s := range specs { // clone tables that we have already opened - if cs, ok := existing[s.name]; ok { - c, err := cs.clone() + if cs, ok := ts.upstream[s.name]; ok { + cl, err := cs.clone() if err != nil { return tableSet{}, err } - upstream[i] = c + upstream[cl.hash()] = cl continue } // open missing tables in parallel - idx, spec := i, s + spec := s eg.Go(func() error { cs, err := ts.p.Open(ctx, spec.name, spec.chunkCount, stats) if err != nil { return err } - upstream[idx] = cs - opened.Store(spec.name, cs) + mu.Lock() + defer mu.Unlock() + upstream[cs.hash()] = cs + opened[cs.hash()] = cs return nil }) } if err := eg.Wait(); err != nil { - opened.Range(func(_, v any) bool { + for _, cs := range opened { // close any opened chunkSources - _ = v.(chunkSource).close() - return true - }) + _ = cs.close() + } return tableSet{}, err } @@ -456,7 +448,10 @@ func (ts tableSet) toSpecs() ([]tableSpec, error) { } func tableSetCalcReads(ts tableSet, reqs []getRecord, blockSize uint64) (reads int, split, remaining bool, err error) { - all := append(ts.novel, ts.upstream...) + all := copyChunkSourceSet(ts.upstream) + for a, cs := range ts.novel { + all[a] = cs + } for _, tbl := range all { rdr, ok := tbl.(*fileTableReader) if !ok { diff --git a/go/store/nbs/table_set_test.go b/go/store/nbs/table_set_test.go index b6aa4b0826..301dc26161 100644 --- a/go/store/nbs/table_set_test.go +++ b/go/store/nbs/table_set_test.go @@ -32,7 +32,7 @@ import ( var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")} func TestTableSetPrependEmpty(t *testing.T) { - ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).prepend(context.Background(), newMemTable(testMemTableSize), &Stats{}) + ts, err := newFakeTableSet(&UnlimitedQuotaProvider{}).append(context.Background(), newMemTable(testMemTableSize), &Stats{}) require.NoError(t, err) specs, err := ts.toSpecs() require.NoError(t, err) @@ -47,7 +47,7 @@ func TestTableSetPrepend(t *testing.T) { assert.Empty(specs) mt := newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) firstSpecs, err := ts.toSpecs() @@ -57,7 +57,7 @@ func TestTableSetPrepend(t *testing.T) { mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) secondSpecs, err := ts.toSpecs() @@ -74,17 +74,17 @@ func TestTableSetToSpecsExcludesEmptyTable(t *testing.T) { assert.Empty(specs) mt := newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) specs, err = ts.toSpecs() @@ -100,17 +100,17 @@ func TestTableSetFlattenExcludesEmptyTable(t *testing.T) { assert.Empty(specs) mt := newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) ts, err = ts.flatten(context.Background()) @@ -138,7 +138,7 @@ func TestTableSetRebase(t *testing.T) { for _, c := range chunks { mt := newMemTable(testMemTableSize) mt.addChunk(computeAddr(c), c) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) } return ts @@ -182,13 +182,13 @@ func TestTableSetPhysicalLen(t *testing.T) { assert.Empty(specs) mt := newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[0]), testChunks[0]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) mt = newMemTable(testMemTableSize) mt.addChunk(computeAddr(testChunks[1]), testChunks[1]) mt.addChunk(computeAddr(testChunks[2]), testChunks[2]) - ts, err = ts.prepend(context.Background(), mt, &Stats{}) + ts, err = ts.append(context.Background(), mt, &Stats{}) require.NoError(t, err) assert.True(mustUint64(ts.physicalLen()) > indexSize(mustUint32(ts.count())))