go/store/nbs: file_table_persister: Add a mechanism for a table persister to clean up newly unused sources after a successful conjoin.

This commit is contained in:
Aaron Son
2023-02-27 13:40:50 -08:00
parent 42cd3fe28a
commit 6ecd22d7fc
10 changed files with 85 additions and 53 deletions

View File

@@ -335,27 +335,28 @@ func (s partsByPartNum) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
plan, err := planRangeCopyConjoin(sources, stats)
if err != nil {
return nil, err
return nil, nil, err
}
if plan.chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, nil, nil
}
t1 := time.Now()
name := nameFromSuffixes(plan.suffixes())
err = s3p.executeCompactionPlan(ctx, plan, name.String())
if err != nil {
return nil, err
return nil, nil, err
}
verbose.Logger(ctx).Sugar().Debugf("Compacted table of %d Kb in %s", plan.totalCompressedData/1024, time.Since(t1))
tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name}
return newReaderFromIndexData(ctx, s3p.q, plan.mergedIndex, name, tra, s3BlockSize)
cs, err := newReaderFromIndexData(ctx, s3p.q, plan.mergedIndex, name, tra, s3BlockSize)
return cs, func() {}, err
}
func (s3p awsTablePersister) executeCompactionPlan(ctx context.Context, plan compactionPlan, key string) error {

View File

@@ -381,7 +381,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
chunks := smallChunks[:len(smallChunks)-1]
sources := makeSources(s3p, chunks)
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -402,7 +402,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
s3p := newPersister(s3svc, ddb)
sources := makeSources(s3p, smallChunks)
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -443,7 +443,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
}
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -484,7 +484,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
require.NoError(t, err)
sources := chunkSources{cs1, cs2}
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -539,7 +539,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
require.NoError(t, err)
sources = append(sources, cs)
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {

View File

@@ -78,7 +78,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver
}
// ConjoinAll implements tablePersister.
func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
var sized []sourceWithSize
for _, src := range sources {
sized = append(sized, sourceWithSize{src, src.currentSize()})
@@ -86,7 +86,7 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
plan, err := planConjoin(sized, stats)
if err != nil {
return nil, err
return nil, nil, err
}
address := nameFromSuffixes(plan.suffixes())
name := address.String()
@@ -101,24 +101,25 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
for _, src := range plan.sources.sws {
sub, err := bsp.getRecordsSubObject(ctx, src.source)
if err != nil {
return nil, err
return nil, nil, err
}
conjoinees = append(conjoinees, sub)
}
// first concatenate all the sub-objects to create a composite sub-object
if _, err = bsp.bs.Concatenate(ctx, name+tableRecordsExt, conjoinees); err != nil {
return nil, err
return nil, nil, err
}
if _, err = blobstore.PutBytes(ctx, bsp.bs, name+tableTailExt, plan.mergedIndex); err != nil {
return nil, err
return nil, nil, err
}
// then concatenate into a final blob
if _, err = bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt}); err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, nil, err
}
return newBSChunkSource(ctx, bsp.bs, address, plan.chunkCount, bsp.q, stats)
cs, err := newBSChunkSource(ctx, bsp.bs, address, plan.chunkCount, bsp.q, stats)
return cs, func() {}, err
}
func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunkSource) (name string, err error) {

View File

@@ -94,6 +94,7 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, error) {
var conjoined tableSpec
var conjoinees, keepers, appendixSpecs []tableSpec
var cleanup cleanupFunc
for {
if conjoinees == nil {
@@ -110,7 +111,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
return manifestContents{}, err
}
conjoined, err = conjoinTables(ctx, conjoinees, p, stats)
conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats)
if err != nil {
return manifestContents{}, err
}
@@ -140,11 +141,18 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
}
if newContents.lock == upstream.lock {
cleanup()
return upstream, nil
}
// Optimistic lock failure. Someone else moved to the root, the set of tables, or both out from under us.
// If we can re-use the conjoin we already performed, we want to try again. Currently, we will only do so if ALL conjoinees are still present upstream. If we can't re-use...then someone else almost certainly landed a conjoin upstream. In this case, bail and let clients ask again if they think they still can't proceed.
// Optimistic lock failure. Someone else moved to the root, the
// set of tables, or both out from under us. If we can re-use
// the conjoin we already performed, we want to try again.
// Currently, we will only do so if ALL conjoinees are still
// present upstream. If we can't re-use...then someone else
// almost certainly landed a conjoin upstream. In this case,
// bail and let clients ask again if they think they still
// can't proceed.
// If the appendix has changed we simply bail
// and let the client retry
@@ -186,7 +194,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
}
}
func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister, stats *Stats) (conjoined tableSpec, err error) {
func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister, stats *Stats) (conjoined tableSpec, cleanup cleanupFunc, err error) {
eg, ectx := errgroup.WithContext(ctx)
toConjoin := make(chunkSources, len(conjoinees))
@@ -205,14 +213,14 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
}
}()
if err = eg.Wait(); err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
t1 := time.Now()
conjoinedSrc, err := p.ConjoinAll(ctx, toConjoin, stats)
conjoinedSrc, cleanup, err := p.ConjoinAll(ctx, toConjoin, stats)
if err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
defer conjoinedSrc.close()
@@ -221,7 +229,7 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
cnt, err := conjoinedSrc.count()
if err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
stats.ChunksPerConjoin.Sample(uint64(cnt))
@@ -229,9 +237,9 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
h := conjoinedSrc.hash()
cnt, err = conjoinedSrc.count()
if err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
return tableSpec{h, cnt}, nil
return tableSpec{h, cnt}, cleanup, nil
}
func toSpecs(srcs chunkSources) ([]tableSpec, error) {

View File

@@ -169,15 +169,14 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data [
return ftp.Open(ctx, name, chunkCount, stats)
}
func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
plan, err := planRangeCopyConjoin(sources, stats)
if err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, nil, err
}
if plan.chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, nil, nil
}
name := nameFromSuffixes(plan.suffixes())
@@ -224,18 +223,24 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
return temp.Name(), nil
}()
if err != nil {
return nil, err
return nil, nil, err
}
err = file.Rename(tempName, filepath.Join(ftp.dir, name.String()))
if err != nil {
return nil, err
return nil, nil, err
}
return ftp.Open(ctx, name, plan.chunkCount, stats)
cs, err := ftp.Open(ctx, name, plan.chunkCount, stats)
if err != nil {
return nil, nil, err
}
return cs, func() {
for _, s := range sources {
file.Remove(filepath.Join(ftp.dir, s.hash().String()))
}
}, nil
}
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error {

View File

@@ -237,7 +237,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
}
}()
src, err := fts.ConjoinAll(ctx, sources, &Stats{})
src, _, err := fts.ConjoinAll(ctx, sources, &Stats{})
require.NoError(t, err)
defer src.close()
@@ -284,7 +284,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
}
}()
src, err := fts.ConjoinAll(ctx, sources, &Stats{})
src, _, err := fts.ConjoinAll(ctx, sources, &Stats{})
require.NoError(t, err)
defer src.close()

View File

@@ -197,7 +197,7 @@ func (j *chunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkRea
}
// ConjoinAll implements tablePersister.
func (j *chunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (j *chunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
return j.persister.ConjoinAll(ctx, sources, stats)
}

View File

@@ -528,12 +528,12 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c
return chunkSourceAdapter{cs, name}, nil
}
func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
name, data, chunkCount, err := compactSourcesToBuffer(sources)
if err != nil {
return nil, err
return nil, nil, err
} else if chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, func() {}, nil
}
ftp.mu.Lock()
@@ -542,14 +542,14 @@ func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSourc
ti, err := parseTableIndexByCopy(ctx, data, ftp.q)
if err != nil {
return nil, err
return nil, nil, err
}
cs, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
if err != nil {
return nil, err
return nil, nil, err
}
return chunkSourceAdapter{cs, name}, nil
return chunkSourceAdapter{cs, name}, func() {}, nil
}
func compactSourcesToBuffer(sources chunkSources) (name addr, data []byte, chunkCount uint32, err error) {

View File

@@ -58,14 +58,14 @@ func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, no
type fileToData map[string][]byte
func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fileToData {
func writeLocalTableFiles(t *testing.T, st *NomsBlockStore, numTableFiles, seed int) (map[string]int, fileToData) {
ctx := context.Background()
fileToData := make(fileToData, numTableFiles)
fileIDToNumChunks := make(map[string]int)
fileIDToNumChunks := make(map[string]int, numTableFiles)
for i := 0; i < numTableFiles; i++ {
var chunkData [][]byte
for j := 0; j < i+1; j++ {
chunkData = append(chunkData, []byte(fmt.Sprintf("%d:%d", i, j)))
chunkData = append(chunkData, []byte(fmt.Sprintf("%d:%d:%d", i, j, seed)))
}
data, addr, err := buildTable(chunkData)
require.NoError(t, err)
@@ -77,9 +77,14 @@ func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fil
})
require.NoError(t, err)
}
return fileIDToNumChunks, fileToData
}
func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fileToData {
ctx := context.Background()
fileIDToNumChunks, fileToData := writeLocalTableFiles(t, st, numTableFiles, 0)
err := st.AddTableFilesToManifest(ctx, fileIDToNumChunks)
require.NoError(t, err)
return fileToData
}
@@ -190,8 +195,10 @@ func TestNBSPruneTableFiles(t *testing.T) {
numTableFiles := 64
maxTableFiles := 16
st, nomsDir, _ := makeTestLocalStore(t, maxTableFiles)
fileToData := populateLocalStore(t, st, numTableFiles)
defer st.Close()
fileToData := populateLocalStore(t, st, numTableFiles)
_, toDeleteToData := writeLocalTableFiles(t, st, numTableFiles, 32)
// add a chunk and flush to trigger a conjoin
c := chunks.NewChunk([]byte("it's a boy!"))
@@ -212,6 +219,9 @@ func TestNBSPruneTableFiles(t *testing.T) {
// assert some input table files were conjoined
assert.NotEmpty(t, absent)
toDelete := tfSet.findAbsent(toDeleteToData)
assert.Len(t, toDelete, len(toDeleteToData))
currTableFiles := func(dirName string) *set.StrSet {
infos, err := os.ReadDir(dirName)
require.NoError(t, err)
@@ -229,6 +239,9 @@ func TestNBSPruneTableFiles(t *testing.T) {
assert.True(t, preGC.Contains(tf.FileID()))
}
for _, fileName := range absent {
assert.False(t, preGC.Contains(fileName))
}
for _, fileName := range toDelete {
assert.True(t, preGC.Contains(fileName))
}
@@ -239,7 +252,7 @@ func TestNBSPruneTableFiles(t *testing.T) {
for _, tf := range sources {
assert.True(t, preGC.Contains(tf.FileID()))
}
for _, fileName := range absent {
for _, fileName := range toDelete {
assert.False(t, postGC.Contains(fileName))
}
infos, err := os.ReadDir(nomsDir)

View File

@@ -34,6 +34,8 @@ import (
var errCacheMiss = errors.New("index cache miss")
type cleanupFunc func()
// tablePersister allows interaction with persistent storage. It provides
// primitives for pushing the contents of a memTable to persistent storage,
// opening persistent tables for reading, and conjoining a number of existing
@@ -45,8 +47,10 @@ type tablePersister interface {
Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error)
// ConjoinAll conjoins all chunks in |sources| into a single, new
// chunkSource.
ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error)
// chunkSource. It returns a |cleanupFunc| which can be called to
// potentially release resources associated with the |sources| once
// they are no longer needed.
ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error)
// Open a table named |name|, containing |chunkCount| chunks.
Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error)