Merge pull request #5438 from dolthub/aaron/nbs-fs-table-persister-delete-after-conjoin

go/store/nbs: file_table_persister: Add a mechanism for a table persister to clean up newly unused sources after a successful conjoin.
This commit is contained in:
Aaron Son
2023-02-28 07:29:22 -08:00
committed by GitHub
24 changed files with 198 additions and 944 deletions
+6 -5
View File
@@ -335,27 +335,28 @@ func (s partsByPartNum) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (s3p awsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
plan, err := planRangeCopyConjoin(sources, stats)
if err != nil {
return nil, err
return nil, nil, err
}
if plan.chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, nil, nil
}
t1 := time.Now()
name := nameFromSuffixes(plan.suffixes())
err = s3p.executeCompactionPlan(ctx, plan, name.String())
if err != nil {
return nil, err
return nil, nil, err
}
verbose.Logger(ctx).Sugar().Debugf("Compacted table of %d Kb in %s", plan.totalCompressedData/1024, time.Since(t1))
tra := &s3TableReaderAt{&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns}, name}
return newReaderFromIndexData(ctx, s3p.q, plan.mergedIndex, name, tra, s3BlockSize)
cs, err := newReaderFromIndexData(ctx, s3p.q, plan.mergedIndex, name, tra, s3BlockSize)
return cs, func() {}, err
}
func (s3p awsTablePersister) executeCompactionPlan(ctx context.Context, plan compactionPlan, key string) error {
+5 -5
View File
@@ -381,7 +381,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
chunks := smallChunks[:len(smallChunks)-1]
sources := makeSources(s3p, chunks)
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -402,7 +402,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
s3p := newPersister(s3svc, ddb)
sources := makeSources(s3p, smallChunks)
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -443,7 +443,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
sources[i], err = s3p.Persist(context.Background(), mt, nil, &Stats{})
require.NoError(t, err)
}
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -484,7 +484,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
require.NoError(t, err)
sources := chunkSources{cs1, cs2}
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
@@ -539,7 +539,7 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
require.NoError(t, err)
sources = append(sources, cs)
src, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
require.NoError(t, err)
defer src.close()
for _, s := range sources {
+16 -7
View File
@@ -78,7 +78,7 @@ func (bsp *blobstorePersister) Persist(ctx context.Context, mt *memTable, haver
}
// ConjoinAll implements tablePersister.
func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
var sized []sourceWithSize
for _, src := range sources {
sized = append(sized, sourceWithSize{src, src.currentSize()})
@@ -86,7 +86,7 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
plan, err := planConjoin(sized, stats)
if err != nil {
return nil, err
return nil, nil, err
}
address := nameFromSuffixes(plan.suffixes())
name := address.String()
@@ -101,24 +101,25 @@ func (bsp *blobstorePersister) ConjoinAll(ctx context.Context, sources chunkSour
for _, src := range plan.sources.sws {
sub, err := bsp.getRecordsSubObject(ctx, src.source)
if err != nil {
return nil, err
return nil, nil, err
}
conjoinees = append(conjoinees, sub)
}
// first concatenate all the sub-objects to create a composite sub-object
if _, err = bsp.bs.Concatenate(ctx, name+tableRecordsExt, conjoinees); err != nil {
return nil, err
return nil, nil, err
}
if _, err = blobstore.PutBytes(ctx, bsp.bs, name+tableTailExt, plan.mergedIndex); err != nil {
return nil, err
return nil, nil, err
}
// then concatenate into a final blob
if _, err = bsp.bs.Concatenate(ctx, name, []string{name + tableRecordsExt, name + tableTailExt}); err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, nil, err
}
return newBSChunkSource(ctx, bsp.bs, address, plan.chunkCount, bsp.q, stats)
cs, err := newBSChunkSource(ctx, bsp.bs, address, plan.chunkCount, bsp.q, stats)
return cs, func() {}, err
}
func (bsp *blobstorePersister) getRecordsSubObject(ctx context.Context, cs chunkSource) (name string, err error) {
@@ -233,6 +234,14 @@ type bsTableReaderAt struct {
bs blobstore.Blobstore
}
func (bsTRA *bsTableReaderAt) Close() error {
return nil
}
func (bsTRA *bsTableReaderAt) clone() (tableReaderAt, error) {
return bsTRA, nil
}
func (bsTRA *bsTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
rc, _, err := bsTRA.bs.Get(ctx, bsTRA.key, blobstore.AllRange)
return rc, err
+18 -10
View File
@@ -94,6 +94,7 @@ func (c noopConjoiner) chooseConjoinees(sources []tableSpec) (conjoinees, keeper
func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents, mm manifestUpdater, p tablePersister, stats *Stats) (manifestContents, error) {
var conjoined tableSpec
var conjoinees, keepers, appendixSpecs []tableSpec
var cleanup cleanupFunc
for {
if conjoinees == nil {
@@ -110,7 +111,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
return manifestContents{}, err
}
conjoined, err = conjoinTables(ctx, conjoinees, p, stats)
conjoined, cleanup, err = conjoinTables(ctx, conjoinees, p, stats)
if err != nil {
return manifestContents{}, err
}
@@ -140,11 +141,18 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
}
if newContents.lock == upstream.lock {
cleanup()
return upstream, nil
}
// Optimistic lock failure. Someone else moved to the root, the set of tables, or both out from under us.
// If we can re-use the conjoin we already performed, we want to try again. Currently, we will only do so if ALL conjoinees are still present upstream. If we can't re-use...then someone else almost certainly landed a conjoin upstream. In this case, bail and let clients ask again if they think they still can't proceed.
// Optimistic lock failure. Someone else moved to the root, the
// set of tables, or both out from under us. If we can re-use
// the conjoin we already performed, we want to try again.
// Currently, we will only do so if ALL conjoinees are still
// present upstream. If we can't re-use...then someone else
// almost certainly landed a conjoin upstream. In this case,
// bail and let clients ask again if they think they still
// can't proceed.
// If the appendix has changed we simply bail
// and let the client retry
@@ -186,7 +194,7 @@ func conjoin(ctx context.Context, s conjoinStrategy, upstream manifestContents,
}
}
func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister, stats *Stats) (conjoined tableSpec, err error) {
func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister, stats *Stats) (conjoined tableSpec, cleanup cleanupFunc, err error) {
eg, ectx := errgroup.WithContext(ctx)
toConjoin := make(chunkSources, len(conjoinees))
@@ -205,14 +213,14 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
}
}()
if err = eg.Wait(); err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
t1 := time.Now()
conjoinedSrc, err := p.ConjoinAll(ctx, toConjoin, stats)
conjoinedSrc, cleanup, err := p.ConjoinAll(ctx, toConjoin, stats)
if err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
defer conjoinedSrc.close()
@@ -221,7 +229,7 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
cnt, err := conjoinedSrc.count()
if err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
stats.ChunksPerConjoin.Sample(uint64(cnt))
@@ -229,9 +237,9 @@ func conjoinTables(ctx context.Context, conjoinees []tableSpec, p tablePersister
h := conjoinedSrc.hash()
cnt, err = conjoinedSrc.count()
if err != nil {
return tableSpec{}, err
return tableSpec{}, nil, err
}
return tableSpec{h, cnt}, nil
return tableSpec{h, cnt}, cleanup, nil
}
func toSpecs(srcs chunkSources) ([]tableSpec, error) {
+8
View File
@@ -54,6 +54,14 @@ func (t tableNotInDynamoErr) Error() string {
return fmt.Sprintf("NBS table %s not present in DynamoDB table %s", t.nbs, t.dynamo)
}
func (dtra *dynamoTableReaderAt) Close() error {
return nil
}
func (dtra *dynamoTableReaderAt) clone() (tableReaderAt, error) {
return dtra, nil
}
func (dtra *dynamoTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
data, err := dtra.ddb.ReadTable(ctx, dtra.h, &Stats{})
if err != nil {
-178
View File
@@ -1,178 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"os"
"sort"
"sync"
)
func newFDCache(targetSize int) *fdCache {
return &fdCache{targetSize: targetSize, cache: map[string]fdCacheEntry{}}
}
// fdCache ref-counts open file descriptors, but doesn't keep a hard cap on
// the number of open files. Once the cache's target size is exceeded, opening
// a new file causes the cache to try to get the cache back to the target size
// by closing fds with zero refs. If there aren't enough such fds, fdCache
// gives up and tries again next time a caller refs a file.
type fdCache struct {
targetSize int
mu sync.Mutex
cache map[string]fdCacheEntry
}
type fdCacheEntry struct {
refCount uint32
f *os.File
}
// RefFile returns an opened *os.File for the file at |path|, or an error
// indicating why the file could not be opened. If the cache already had an
// entry for |path|, RefFile increments its refcount and returns the cached
// pointer. If not, it opens the file and caches the pointer for others to
// use. If RefFile returns an error, it's guaranteed that no refCounts were
// changed, so it's an error to make a subsequent call to UnrefFile().
// This is intended for clients that hold fds for extremely short periods.
func (fc *fdCache) RefFile(path string) (f *os.File, err error) {
refFile := func() *os.File {
if ce, present := fc.cache[path]; present {
ce.refCount++
fc.cache[path] = ce
return ce.f
}
return nil
}
f = func() *os.File {
fc.mu.Lock()
defer fc.mu.Unlock()
return refFile()
}()
if f != nil {
return f, nil
}
// Very much want this to be outside the lock, but the downside is that multiple callers may get here concurrently. That means we need to deal with the raciness below.
f, err = os.Open(path)
if err != nil {
return nil, err
}
fc.mu.Lock()
defer fc.mu.Unlock()
if cached := refFile(); cached != nil {
// Someone beat us to it, so close f and return cached fd
f.Close()
return cached, nil
}
// I won the race!
fc.cache[path] = fdCacheEntry{f: f, refCount: 1}
return f, nil
}
// UnrefFile reduces the refcount of the entry at |path|. If the cache is over
// |fc.targetSize|, UnrefFile makes a best effort to shrink the cache by dumping
// entries with a zero refcount. If there aren't enough zero refcount entries
// to drop to get the cache back to |fc.targetSize|, the cache will remain
// over |fc.targetSize| until the next call to UnrefFile().
func (fc *fdCache) UnrefFile(path string) error {
fc.mu.Lock()
defer fc.mu.Unlock()
if ce, present := fc.cache[path]; present {
ce.refCount--
fc.cache[path] = ce
}
if len(fc.cache) > fc.targetSize {
// Sadly, we can't remove items from a map while iterating, so we'll record the stuff we want to drop and then do it after
needed := len(fc.cache) - fc.targetSize
toDrop := make([]string, 0, needed)
for p, ce := range fc.cache {
if ce.refCount != 0 {
continue
}
toDrop = append(toDrop, p)
err := ce.f.Close()
if err != nil {
return err
}
needed--
if needed == 0 {
break
}
}
for _, p := range toDrop {
delete(fc.cache, p)
}
}
return nil
}
// ShrinkCache forcefully removes all file handles with a refcount of zero.
func (fc *fdCache) ShrinkCache() error {
fc.mu.Lock()
defer fc.mu.Unlock()
toDrop := make([]string, 0, len(fc.cache))
for p, ce := range fc.cache {
if ce.refCount != 0 {
continue
}
toDrop = append(toDrop, p)
err := ce.f.Close()
if err != nil {
return err
}
}
for _, p := range toDrop {
delete(fc.cache, p)
}
return nil
}
// Drop dumps the entire cache and closes all currently open files.
func (fc *fdCache) Drop() {
fc.mu.Lock()
defer fc.mu.Unlock()
for _, ce := range fc.cache {
ce.f.Close()
}
fc.cache = map[string]fdCacheEntry{}
}
// reportEntries is meant for testing.
func (fc *fdCache) reportEntries() sort.StringSlice {
fc.mu.Lock()
defer fc.mu.Unlock()
ret := make(sort.StringSlice, 0, len(fc.cache))
for p := range fc.cache {
ret = append(ret, p)
}
sort.Sort(ret)
return ret
}
-136
View File
@@ -1,136 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"testing"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFDCache(t *testing.T) {
dir := makeTempDir(t)
defer file.RemoveAll(dir)
paths := [3]string{}
for i := range paths {
name := fmt.Sprintf("file%d", i)
paths[i] = filepath.Join(dir, name)
err := os.WriteFile(paths[i], []byte(name), 0644)
require.NoError(t, err)
}
refNoError := func(fc *fdCache, p string, assert *assert.Assertions) *os.File {
f, err := fc.RefFile(p)
require.NoError(t, err)
assert.NotNil(f)
return f
}
t.Run("ConcurrentOpen", func(t *testing.T) {
assert := assert.New(t)
concurrency := 3
fc := newFDCache(3)
defer fc.Drop()
trigger := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-trigger
fc.RefFile(paths[0])
}()
}
close(trigger)
wg.Wait()
present := fc.reportEntries()
if assert.Len(present, 1) {
ce := fc.cache[present[0]]
assert.EqualValues(concurrency, ce.refCount)
}
})
t.Run("NoEvictions", func(t *testing.T) {
assert := assert.New(t)
fc := newFDCache(2)
defer fc.Drop()
f := refNoError(fc, paths[0], assert)
f2 := refNoError(fc, paths[1], assert)
assert.NotEqual(f, f2)
dup := refNoError(fc, paths[0], assert)
assert.Equal(f, dup)
})
t.Run("Evictions", func(t *testing.T) {
assert := assert.New(t)
fc := newFDCache(1)
defer fc.Drop()
f0 := refNoError(fc, paths[0], assert)
f1 := refNoError(fc, paths[1], assert)
assert.NotEqual(f0, f1)
// f0 wasn't evicted, because that doesn't happen until UnrefFile()
dup := refNoError(fc, paths[0], assert)
assert.Equal(f0, dup)
expected := sort.StringSlice(paths[:2])
sort.Sort(expected)
assert.EqualValues(expected, fc.reportEntries())
// Unreffing f1 now should evict it
err := fc.UnrefFile(paths[1])
require.NoError(t, err)
assert.EqualValues(paths[:1], fc.reportEntries())
// Bring f1 back so we can test multiple evictions in a row
f1 = refNoError(fc, paths[1], assert)
assert.NotEqual(f0, f1)
// After adding f3, we should be able to evict both f0 and f1
f2 := refNoError(fc, paths[2], assert)
assert.NotEqual(f0, f2)
assert.NotEqual(f1, f2)
err = fc.UnrefFile(paths[0])
require.NoError(t, err)
err = fc.UnrefFile(paths[0])
require.NoError(t, err)
err = fc.UnrefFile(paths[1])
require.NoError(t, err)
assert.EqualValues(paths[2:], fc.reportEntries())
})
}
+17 -26
View File
@@ -33,20 +33,17 @@ import (
"time"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/util/tempfiles"
)
const tempTablePrefix = "nbs_table_"
func newFSTablePersister(dir string, fc *fdCache, q MemoryQuotaProvider) tablePersister {
d.PanicIfTrue(fc == nil)
return &fsTablePersister{dir, fc, q}
func newFSTablePersister(dir string, q MemoryQuotaProvider) tablePersister {
return &fsTablePersister{dir, q}
}
type fsTablePersister struct {
dir string
fc *fdCache
q MemoryQuotaProvider
}
@@ -54,7 +51,7 @@ var _ tablePersister = &fsTablePersister{}
var _ tableFilePersister = &fsTablePersister{}
func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q, ftp.fc)
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q)
}
func (ftp *fsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
@@ -154,11 +151,6 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data [
}
newName := filepath.Join(ftp.dir, name.String())
err = ftp.fc.ShrinkCache()
if err != nil {
return nil, err
}
err = file.Rename(tempName, newName)
@@ -169,15 +161,14 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data [
return ftp.Open(ctx, name, chunkCount, stats)
}
func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
plan, err := planRangeCopyConjoin(sources, stats)
if err != nil {
return emptyChunkSource{}, err
return emptyChunkSource{}, nil, err
}
if plan.chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, nil, nil
}
name := nameFromSuffixes(plan.suffixes())
@@ -224,18 +215,24 @@ func (ftp *fsTablePersister) ConjoinAll(ctx context.Context, sources chunkSource
return temp.Name(), nil
}()
if err != nil {
return nil, err
return nil, nil, err
}
err = file.Rename(tempName, filepath.Join(ftp.dir, name.String()))
if err != nil {
return nil, err
return nil, nil, err
}
return ftp.Open(ctx, name, plan.chunkCount, stats)
cs, err := ftp.Open(ctx, name, plan.chunkCount, stats)
if err != nil {
return nil, nil, err
}
return cs, func() {
for _, s := range sources {
file.Remove(filepath.Join(ftp.dir, s.hash().String()))
}
}, nil
}
func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manifestContents, mtime time.Time) error {
@@ -247,12 +244,6 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manif
return err
}
err = ftp.fc.ShrinkCache()
if err != nil {
return err
}
ea := make(gcErrAccum)
for _, info := range fileInfos {
if info.IsDir() {
+17 -118
View File
@@ -35,57 +35,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestFSTableCacheOnOpen(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
defer file.RemoveAll(dir)
names := []addr{}
cacheSize := 2
fc := newFDCache(cacheSize)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
// Create some tables manually, load them into the cache
func() {
for i := 0; i < cacheSize; i++ {
name, err := writeTableData(dir, []byte{byte(i)})
require.NoError(t, err)
names = append(names, name)
}
for _, name := range names {
tr, err := fts.Open(context.Background(), name, 1, nil)
require.NoError(t, err)
defer tr.close()
}
}()
// Tables should still be cached and on disk
for i, name := range names {
src, err := fts.Open(context.Background(), name, 1, nil)
require.NoError(t, err)
defer src.close()
h := computeAddr([]byte{byte(i)})
assert.True(src.has(h))
}
// Kick a table out of the cache
name, err := writeTableData(dir, []byte{0xff})
require.NoError(t, err)
tr, err := fts.Open(context.Background(), name, 1, nil)
require.NoError(t, err)
defer tr.close()
present := fc.reportEntries()
// Since 0 refcount entries are evicted randomly, the only thing we can validate is that fc remains at its target size
assert.Len(present, cacheSize)
err = fc.ShrinkCache()
require.NoError(t, err)
err = removeTables(dir, names...)
require.NoError(t, err)
}
func makeTempDir(t *testing.T) string {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
@@ -122,9 +71,7 @@ func TestFSTablePersisterPersist(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
src, err := persistTableData(fts, testChunks...)
require.NoError(t, err)
@@ -163,9 +110,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
src, err := fts.Persist(context.Background(), mt, existingTable, &Stats{})
require.NoError(t, err)
@@ -175,41 +120,6 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
assert.True(os.IsNotExist(err), "%v", err)
}
func TestFSTablePersisterCacheOnPersist(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
fc := newFDCache(1)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
defer file.RemoveAll(dir)
var name addr
func() {
src, err := persistTableData(fts, testChunks...)
require.NoError(t, err)
defer src.close()
name = src.hash()
}()
// Table should still be cached
src, err := fts.Open(context.Background(), name, uint32(len(testChunks)), nil)
require.NoError(t, err)
defer src.close()
assertChunksInReader(testChunks, src, assert)
// Evict |name| from cache
tr, err := persistTableData(fts, []byte{0xff})
require.NoError(t, err)
defer tr.close()
present := fc.reportEntries()
// Since 0 refcount entries are evicted randomly, the only thing we can validate is that fc remains at its target size
assert.Len(present, 1)
err = removeTables(dir, name)
require.NoError(t, err)
}
func TestFSTablePersisterConjoinAll(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)
@@ -218,9 +128,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(len(sources))
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
for i, c := range testChunks {
randChunk := make([]byte, (i+1)*13)
@@ -237,7 +145,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
}
}()
src, err := fts.ConjoinAll(ctx, sources, &Stats{})
src, _, err := fts.ConjoinAll(ctx, sources, &Stats{})
require.NoError(t, err)
defer src.close()
@@ -251,10 +159,6 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
defer tr.close()
assertChunksInReader(testChunks, tr, assert)
}
present := fc.reportEntries()
// Since 0 refcount entries are evicted randomly, the only thing we can validate is that fc remains at its target size
assert.Len(present, len(sources))
}
func TestFSTablePersisterConjoinAllDups(t *testing.T) {
@@ -262,29 +166,24 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
reps := 3
sources := make(chunkSources, reps)
for i := 0; i < reps; i++ {
mt := newMemTable(1 << 10)
for _, c := range testChunks {
mt.addChunk(computeAddr(c), c)
}
var err error
sources[i], err = fts.Persist(ctx, mt, nil, &Stats{})
require.NoError(t, err)
mt := newMemTable(1 << 10)
for _, c := range testChunks {
mt.addChunk(computeAddr(c), c)
}
defer func() {
for _, s := range sources {
s.close()
}
}()
src, err := fts.ConjoinAll(ctx, sources, &Stats{})
var err error
sources[0], err = fts.Persist(ctx, mt, nil, &Stats{})
require.NoError(t, err)
sources[1], err = sources[0].clone()
require.NoError(t, err)
sources[2], err = sources[0].clone()
require.NoError(t, err)
src, _, err := fts.ConjoinAll(ctx, sources, &Stats{})
require.NoError(t, err)
defer src.close()
+39 -44
View File
@@ -33,8 +33,7 @@ import (
type fileTableReader struct {
tableReader
fc *fdCache
h addr
h addr
}
const (
@@ -52,16 +51,15 @@ func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
return err == nil, err
}
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) {
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())
var f *os.File
index, sz, err := func() (ti onHeapTableIndex, sz int64, err error) {
// Be careful with how |f| is used below. |RefFile| returns a cached
// os.File pointer so the code needs to use f in a concurrency-safe
// manner. Moving the file offset is BAD.
var f *os.File
f, err = fc.RefFile(path)
f, err = os.Open(path)
if err != nil {
return
}
@@ -103,14 +101,6 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
return
}
defer func() {
unrefErr := fc.UnrefFile(path)
if unrefErr != nil && err == nil {
q.ReleaseQuotaBytes(len(b))
err = unrefErr
}
}()
ti, err = parseTableIndex(ctx, b, q)
if err != nil {
q.ReleaseQuotaBytes(len(b))
@@ -120,72 +110,77 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
return
}()
if err != nil {
if f != nil {
f.Close()
}
return nil, err
}
if chunkCount != index.chunkCount() {
index.Close()
f.Close()
return nil, errors.New("unexpected chunk count")
}
tr, err := newTableReader(index, &cacheReaderAt{path, fc, sz}, fileBlockSize)
tr, err := newTableReader(index, &fileReaderAt{f, path, sz}, fileBlockSize)
if err != nil {
index.Close()
f.Close()
return nil, err
}
return &fileTableReader{
tr,
fc,
h,
}, nil
}
func (mmtr *fileTableReader) hash() addr {
return mmtr.h
func (ftr *fileTableReader) hash() addr {
return ftr.h
}
func (mmtr *fileTableReader) close() error {
return mmtr.tableReader.close()
func (ftr *fileTableReader) Close() error {
return ftr.tableReader.close()
}
func (mmtr *fileTableReader) clone() (chunkSource, error) {
tr, err := mmtr.tableReader.clone()
func (ftr *fileTableReader) clone() (chunkSource, error) {
tr, err := ftr.tableReader.clone()
if err != nil {
return &fileTableReader{}, err
}
return &fileTableReader{tr, mmtr.fc, mmtr.h}, nil
return &fileTableReader{tr, ftr.h}, nil
}
type cacheReaderAt struct {
type fileReaderAt struct {
f *os.File
path string
fc *fdCache
sz int64
}
func (cra *cacheReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return io.NopCloser(io.LimitReader(&readerAdapter{cra, 0, ctx}, cra.sz)), nil
func (fra *fileReaderAt) clone() (tableReaderAt, error) {
f, err := os.Open(fra.path)
if err != nil {
return nil, err
}
return &fileReaderAt{
f,
fra.path,
fra.sz,
}, nil
}
func (cra *cacheReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
var r io.ReaderAt
func (fra *fileReaderAt) Close() error {
return fra.f.Close()
}
func (fra *fileReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return os.Open(fra.path)
}
func (fra *fileReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
t1 := time.Now()
if r, err = cra.fc.RefFile(cra.path); err != nil {
return
}
defer func() {
stats.FileBytesPerRead.Sample(uint64(len(p)))
stats.FileReadLatency.SampleTimeSince(t1)
}()
defer func() {
unrefErr := cra.fc.UnrefFile(cra.path)
if err == nil {
err = unrefErr
}
}()
return r.ReadAt(p, off)
return fra.f.ReadAt(p, off)
}
+1 -4
View File
@@ -40,9 +40,6 @@ func TestMmapTableReader(t *testing.T) {
require.NoError(t, err)
defer file.RemoveAll(dir)
fc := newFDCache(1)
defer fc.Drop()
chunks := [][]byte{
[]byte("hello2"),
[]byte("goodbye2"),
@@ -54,7 +51,7 @@ func TestMmapTableReader(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, h.String()), tableData, 0666)
require.NoError(t, err)
trc, err := newFileTableReader(ctx, dir, h, uint32(len(chunks)), &UnlimitedQuotaProvider{}, fc)
trc, err := newFileTableReader(ctx, dir, h, uint32(len(chunks)), &UnlimitedQuotaProvider{})
require.NoError(t, err)
defer trc.close()
assertChunksInReader(chunks, trc, assert)
-231
View File
@@ -1,231 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"errors"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/util/sizecache"
"github.com/dolthub/dolt/go/store/util/tempfiles"
)
type tableCache interface {
checkout(h addr) (io.ReaderAt, error)
checkin(h addr) error
store(h addr, data io.Reader, size uint64) error
}
type fsTableCache struct {
dir string
cache *sizecache.SizeCache
fd *fdCache
}
func newFSTableCache(dir string, cacheSize uint64, maxOpenFds int) (*fsTableCache, error) {
ftc := &fsTableCache{dir: dir, fd: newFDCache(maxOpenFds)}
ftc.cache = sizecache.NewWithExpireCallback(cacheSize, func(elm interface{}) {
ftc.expire(elm.(addr))
})
err := ftc.init(maxOpenFds)
if err != nil {
return nil, err
}
return ftc, nil
}
func (ftc *fsTableCache) init(concurrency int) error {
type finfo struct {
path string
h addr
size uint64
}
infos := make(chan finfo)
errc := make(chan error, 1)
go func() {
isTableFile := func(info os.FileInfo) bool {
return info.Mode().IsRegular() && ValidateAddr(info.Name())
}
isTempTableFile := func(info os.FileInfo) bool {
return info.Mode().IsRegular() && strings.HasPrefix(info.Name(), tempTablePrefix)
}
defer close(errc)
defer close(infos)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(ftc.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if path == ftc.dir {
return nil
}
if isTempTableFile(info) {
// ignore failure to remove temp file
_ = file.Remove(path)
return nil
}
if !isTableFile(info) {
return errors.New(path + " is not a table file; cache dir must contain only table files")
}
ad, err := parseAddr(info.Name())
if err != nil {
return err
}
infos <- finfo{path, ad, uint64(info.Size())}
return nil
})
}()
ae := atomicerr.New()
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for info := range infos {
if ae.IsSet() {
break
}
ftc.cache.Add(info.h, info.size, true)
_, err := ftc.fd.RefFile(info.path)
if err != nil {
ae.SetIfError(err)
break
}
err = ftc.fd.UnrefFile(info.path)
if err != nil {
ae.SetIfError(err)
break
}
}
}()
}
wg.Wait()
err := <-errc
if err != nil {
return err
}
if err := ae.Get(); err != nil {
return err
}
return nil
}
func (ftc *fsTableCache) checkout(h addr) (io.ReaderAt, error) {
if _, ok := ftc.cache.Get(h); !ok {
return nil, nil
}
fd, err := ftc.fd.RefFile(filepath.Join(ftc.dir, h.String()))
if err != nil {
return nil, err
}
return fd, nil
}
func (ftc *fsTableCache) checkin(h addr) error {
return ftc.fd.UnrefFile(filepath.Join(ftc.dir, h.String()))
}
func (ftc *fsTableCache) store(h addr, data io.Reader, size uint64) error {
path := filepath.Join(ftc.dir, h.String())
tempName, err := func() (name string, ferr error) {
var temp *os.File
temp, ferr = tempfiles.MovableTempFileProvider.NewFile(ftc.dir, tempTablePrefix)
if ferr != nil {
return "", ferr
}
defer func() {
closeErr := temp.Close()
if ferr == nil {
ferr = closeErr
}
}()
_, ferr = io.Copy(temp, data)
if ferr != nil {
return "", ferr
}
return temp.Name(), nil
}()
if err != nil {
return err
}
err = ftc.fd.ShrinkCache()
if err != nil {
return err
}
err = file.Rename(tempName, path)
if err != nil {
return err
}
ftc.cache.Add(h, size, true)
// Prime the file in the fd cache ignore err
if _, err = ftc.fd.RefFile(path); err == nil {
err := ftc.fd.UnrefFile(path)
if err != nil {
return err
}
}
return nil
}
func (ftc *fsTableCache) expire(h addr) error {
return file.Remove(filepath.Join(ftc.dir, h.String()))
}
-140
View File
@@ -1,140 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"bytes"
"io"
"os"
"path/filepath"
"sort"
"testing"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFSTableCache(t *testing.T) {
datas := [][]byte{[]byte("hello"), []byte("world"), []byte("goodbye")}
sort.SliceStable(datas, func(i, j int) bool { return len(datas[i]) < len(datas[j]) })
t.Run("ExpireLRU", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
sum := 0
for _, s := range datas[1:] {
sum += len(s)
}
tc, err := newFSTableCache(dir, uint64(sum), len(datas))
require.NoError(t, err)
for _, d := range datas {
err := tc.store(computeAddr(d), bytes.NewReader(d), uint64(len(d)))
require.NoError(t, err)
}
expiredName := computeAddr(datas[0])
r, err := tc.checkout(expiredName)
require.NoError(t, err)
assert.Nil(t, r)
_, fserr := os.Stat(filepath.Join(dir, expiredName.String()))
assert.True(t, os.IsNotExist(fserr))
for _, d := range datas[1:] {
name := computeAddr(d)
r, err := tc.checkout(name)
require.NoError(t, err)
assert.NotNil(t, r)
assertDataInReaderAt(t, d, r)
_, fserr := os.Stat(filepath.Join(dir, name.String()))
assert.False(t, os.IsNotExist(fserr))
}
})
t.Run("Init", func(t *testing.T) {
t.Run("Success", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
assert := assert.New(t)
var names []addr
for i := byte(0); i < 4; i++ {
name := computeAddr([]byte{i})
require.NoError(t, os.WriteFile(filepath.Join(dir, name.String()), nil, 0666))
names = append(names, name)
}
ftc, err := newFSTableCache(dir, 1024, 4)
require.NoError(t, err)
assert.NotNil(ftc)
for _, name := range names {
assert.NotNil(ftc.checkout(name))
}
})
t.Run("BadFile", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
require.NoError(t, os.WriteFile(filepath.Join(dir, "boo"), nil, 0666))
_, err := newFSTableCache(dir, 1024, 4)
assert.Error(t, err)
})
t.Run("ClearTempFile", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
tempFile := filepath.Join(dir, tempTablePrefix+"boo")
require.NoError(t, os.WriteFile(tempFile, nil, 0666))
_, err := newFSTableCache(dir, 1024, 4)
require.NoError(t, err)
_, fserr := os.Stat(tempFile)
assert.True(t, os.IsNotExist(fserr))
})
t.Run("Dir", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
require.NoError(t, os.Mkdir(filepath.Join(dir, "sub"), 0777))
_, err := newFSTableCache(dir, 1024, 4)
assert.Error(t, err)
})
})
}
func assertDataInReaderAt(t *testing.T, data []byte, r io.ReaderAt) {
p := make([]byte, len(data))
n, err := r.ReadAt(p, 0)
require.NoError(t, err)
assert.Equal(t, len(data), n)
assert.Equal(t, data, p)
}
+1 -1
View File
@@ -197,7 +197,7 @@ func (j *chunkJournal) Persist(ctx context.Context, mt *memTable, haver chunkRea
}
// ConjoinAll implements tablePersister.
func (j *chunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (j *chunkJournal) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
return j.persister.ConjoinAll(ctx, sources, stats)
}
+1 -1
View File
@@ -36,7 +36,7 @@ func makeTestChunkJournal(t *testing.T) *chunkJournal {
m, err := getFileManifest(ctx, dir, syncFlush)
require.NoError(t, err)
q := NewUnlimitedMemQuotaProvider()
p := newFSTablePersister(dir, globalFDCache, q)
p := newFSTablePersister(dir, q)
nbf := types.Format_Default.VersionString()
j, err := newChunkJournal(ctx, nbf, dir, m, p.(*fsTablePersister))
require.NoError(t, err)
+8
View File
@@ -190,6 +190,14 @@ func tableReaderAtFromBytes(b []byte) tableReaderAt {
return tableReaderAtAdapter{bytes.NewReader(b)}
}
func (adapter tableReaderAtAdapter) Close() error {
return nil
}
func (adapter tableReaderAtAdapter) clone() (tableReaderAt, error) {
return adapter, nil
}
func (adapter tableReaderAtAdapter) Reader(ctx context.Context) (io.ReadCloser, error) {
r := *adapter.br
return io.NopCloser(&r), nil
+6 -6
View File
@@ -528,12 +528,12 @@ func (ftp fakeTablePersister) Persist(ctx context.Context, mt *memTable, haver c
return chunkSourceAdapter{cs, name}, nil
}
func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error) {
func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error) {
name, data, chunkCount, err := compactSourcesToBuffer(sources)
if err != nil {
return nil, err
return nil, nil, err
} else if chunkCount == 0 {
return emptyChunkSource{}, nil
return emptyChunkSource{}, func() {}, nil
}
ftp.mu.Lock()
@@ -542,14 +542,14 @@ func (ftp fakeTablePersister) ConjoinAll(ctx context.Context, sources chunkSourc
ti, err := parseTableIndexByCopy(ctx, data, ftp.q)
if err != nil {
return nil, err
return nil, nil, err
}
cs, err := newTableReader(ti, tableReaderAtFromBytes(data), fileBlockSize)
if err != nil {
return nil, err
return nil, nil, err
}
return chunkSourceAdapter{cs, name}, nil
return chunkSourceAdapter{cs, name}, func() {}, nil
}
func compactSourcesToBuffer(sources chunkSources) (name addr, data []byte, chunkCount uint32, err error) {
+8
View File
@@ -60,6 +60,14 @@ type s3svc interface {
PutObjectWithContext(ctx aws.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error)
}
func (s3tra *s3TableReaderAt) Close() error {
return nil
}
func (s3tra *s3TableReaderAt) clone() (tableReaderAt, error) {
return s3tra, nil
}
func (s3tra *s3TableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return s3tra.s3.Reader(ctx, s3tra.h)
}
+2 -5
View File
@@ -68,14 +68,11 @@ const (
var (
cacheOnce = sync.Once{}
makeManifestManager func(manifest) manifestManager
globalFDCache *fdCache
)
var tracer = otel.Tracer("github.com/dolthub/dolt/go/store/nbs")
func makeGlobalCaches() {
globalFDCache = newFDCache(defaultMaxTables)
manifestCache := newManifestCache(defaultManifestCacheSize)
manifestLocks := newManifestLocks()
makeManifestManager = func(m manifest) manifestManager { return manifestManager{m, manifestCache, manifestLocks} }
@@ -479,7 +476,7 @@ func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi
if err != nil {
return nil, err
}
p := newFSTablePersister(dir, globalFDCache, q)
p := newFSTablePersister(dir, q)
c := conjoinStrategy(inlineConjoiner{maxTables})
return newNomsBlockStore(ctx, nbfVerStr, makeManifestManager(m), p, q, c, memTableSize)
@@ -495,7 +492,7 @@ func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQ
if err != nil {
return nil, err
}
p := newFSTablePersister(dir, globalFDCache, q)
p := newFSTablePersister(dir, q)
journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister))
if err != nil {
+20 -7
View File
@@ -58,14 +58,14 @@ func makeTestLocalStore(t *testing.T, maxTableFiles int) (st *NomsBlockStore, no
type fileToData map[string][]byte
func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fileToData {
func writeLocalTableFiles(t *testing.T, st *NomsBlockStore, numTableFiles, seed int) (map[string]int, fileToData) {
ctx := context.Background()
fileToData := make(fileToData, numTableFiles)
fileIDToNumChunks := make(map[string]int)
fileIDToNumChunks := make(map[string]int, numTableFiles)
for i := 0; i < numTableFiles; i++ {
var chunkData [][]byte
for j := 0; j < i+1; j++ {
chunkData = append(chunkData, []byte(fmt.Sprintf("%d:%d", i, j)))
chunkData = append(chunkData, []byte(fmt.Sprintf("%d:%d:%d", i, j, seed)))
}
data, addr, err := buildTable(chunkData)
require.NoError(t, err)
@@ -77,9 +77,14 @@ func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fil
})
require.NoError(t, err)
}
return fileIDToNumChunks, fileToData
}
func populateLocalStore(t *testing.T, st *NomsBlockStore, numTableFiles int) fileToData {
ctx := context.Background()
fileIDToNumChunks, fileToData := writeLocalTableFiles(t, st, numTableFiles, 0)
err := st.AddTableFilesToManifest(ctx, fileIDToNumChunks)
require.NoError(t, err)
return fileToData
}
@@ -190,8 +195,10 @@ func TestNBSPruneTableFiles(t *testing.T) {
numTableFiles := 64
maxTableFiles := 16
st, nomsDir, _ := makeTestLocalStore(t, maxTableFiles)
fileToData := populateLocalStore(t, st, numTableFiles)
defer st.Close()
fileToData := populateLocalStore(t, st, numTableFiles)
_, toDeleteToData := writeLocalTableFiles(t, st, numTableFiles, 32)
// add a chunk and flush to trigger a conjoin
c := chunks.NewChunk([]byte("it's a boy!"))
@@ -212,6 +219,9 @@ func TestNBSPruneTableFiles(t *testing.T) {
// assert some input table files were conjoined
assert.NotEmpty(t, absent)
toDelete := tfSet.findAbsent(toDeleteToData)
assert.Len(t, toDelete, len(toDeleteToData))
currTableFiles := func(dirName string) *set.StrSet {
infos, err := os.ReadDir(dirName)
require.NoError(t, err)
@@ -228,7 +238,7 @@ func TestNBSPruneTableFiles(t *testing.T) {
for _, tf := range sources {
assert.True(t, preGC.Contains(tf.FileID()))
}
for _, fileName := range absent {
for _, fileName := range toDelete {
assert.True(t, preGC.Contains(fileName))
}
@@ -237,11 +247,14 @@ func TestNBSPruneTableFiles(t *testing.T) {
postGC := currTableFiles(nomsDir)
for _, tf := range sources {
assert.True(t, preGC.Contains(tf.FileID()))
assert.True(t, postGC.Contains(tf.FileID()))
}
for _, fileName := range absent {
assert.False(t, postGC.Contains(fileName))
}
for _, fileName := range toDelete {
assert.False(t, postGC.Contains(fileName))
}
infos, err := os.ReadDir(nomsDir)
require.NoError(t, err)
+6 -2
View File
@@ -34,6 +34,8 @@ import (
var errCacheMiss = errors.New("index cache miss")
type cleanupFunc func()
// tablePersister allows interaction with persistent storage. It provides
// primitives for pushing the contents of a memTable to persistent storage,
// opening persistent tables for reading, and conjoining a number of existing
@@ -45,8 +47,10 @@ type tablePersister interface {
Persist(ctx context.Context, mt *memTable, haver chunkReader, stats *Stats) (chunkSource, error)
// ConjoinAll conjoins all chunks in |sources| into a single, new
// chunkSource.
ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, error)
// chunkSource. It returns a |cleanupFunc| which can be called to
// potentially release resources associated with the |sources| once
// they are no longer needed.
ConjoinAll(ctx context.Context, sources chunkSources, stats *Stats) (chunkSource, cleanupFunc, error)
// Open a table named |name|, containing |chunkCount| chunks.
Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error)
+14 -14
View File
@@ -131,6 +131,8 @@ func (ir indexResult) Length() uint32 {
type tableReaderAt interface {
ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error)
Reader(ctx context.Context) (io.ReadCloser, error)
Close() error
clone() (tableReaderAt, error)
}
// tableReader implements get & has queries against a single nbs table. goroutine safe.
@@ -663,7 +665,12 @@ func (tr tableReader) currentSize() uint64 {
}
func (tr tableReader) close() error {
return tr.idx.Close()
err := tr.idx.Close()
if err != nil {
tr.r.Close()
return err
}
return tr.r.Close()
}
func (tr tableReader) clone() (tableReader, error) {
@@ -671,22 +678,15 @@ func (tr tableReader) clone() (tableReader, error) {
if err != nil {
return tableReader{}, err
}
r, err := tr.r.clone()
if err != nil {
idx.Close()
return tableReader{}, err
}
return tableReader{
prefixes: tr.prefixes,
idx: idx,
r: tr.r,
r: r,
blockSize: tr.blockSize,
}, nil
}
type readerAdapter struct {
rat tableReaderAt
off int64
ctx context.Context
}
func (ra *readerAdapter) Read(p []byte) (n int, err error) {
n, err = ra.rat.ReadAtWithStats(ra.ctx, p, ra.off, &Stats{})
ra.off += int64(n)
return
}
-4
View File
@@ -182,15 +182,11 @@ var CopiedNomsFiles []CopiedNomsFile = []CopiedNomsFile{
{Path: "store/nbs/dynamo_manifest_test.go", NomsPath: "go/nbs/dynamo_manifest_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/dynamo_table_reader.go", NomsPath: "go/nbs/dynamo_table_reader.go", HadCopyrightNotice: true},
{Path: "store/nbs/dynamo_table_reader_test.go", NomsPath: "go/nbs/dynamo_table_reader_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/fd_cache.go", NomsPath: "go/nbs/fd_cache.go", HadCopyrightNotice: true},
{Path: "store/nbs/fd_cache_test.go", NomsPath: "go/nbs/fd_cache_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_manifest.go", NomsPath: "go/nbs/file_manifest.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_manifest_test.go", NomsPath: "go/nbs/file_manifest_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_table_persister.go", NomsPath: "go/nbs/file_table_persister.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_table_persister_test.go", NomsPath: "go/nbs/file_table_persister_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/frag/main.go", NomsPath: "go/nbs/frag/main.go", HadCopyrightNotice: true},
{Path: "store/nbs/fs_table_cache.go", NomsPath: "go/nbs/fs_table_cache.go", HadCopyrightNotice: true},
{Path: "store/nbs/fs_table_cache_test.go", NomsPath: "go/nbs/fs_table_cache_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/manifest.go", NomsPath: "go/nbs/manifest.go", HadCopyrightNotice: true},
{Path: "store/nbs/manifest_cache.go", NomsPath: "go/nbs/manifest_cache.go", HadCopyrightNotice: true},
{Path: "store/nbs/manifest_cache_test.go", NomsPath: "go/nbs/manifest_cache_test.go", HadCopyrightNotice: true},
@@ -304,6 +304,11 @@ skip_if_chunk_journal() {
# leave data in the working set
dolt sql -q "INSERT INTO test VALUES ($(($NUM_COMMITS+1))),($(($NUM_COMMITS+2))),($(($NUM_COMMITS+3)));"
# write a garbage file which looks like an old table file
for i in `seq 0 100`; do
dolt --help >> .dolt/noms/b0f6n6b1ej7a9ovalt0rr80bsentq807
done
BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g')
run dolt gc --shallow
[ "$status" -eq 0 ]