Merge remote-tracking branch 'origin/aaron/nbs-no-fd-cache' into aaron/nbs-fs-table-persister-delete-after-conjoin

This commit is contained in:
Aaron Son
2023-02-27 15:50:08 -08:00
16 changed files with 96 additions and 876 deletions
+8
View File
@@ -234,6 +234,14 @@ type bsTableReaderAt struct {
bs blobstore.Blobstore
}
func (bsTRA *bsTableReaderAt) Close() error {
return nil
}
func (bsTRA *bsTableReaderAt) clone() (tableReaderAt, error) {
return bsTRA, nil
}
func (bsTRA *bsTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
rc, _, err := bsTRA.bs.Get(ctx, bsTRA.key, blobstore.AllRange)
return rc, err
+8
View File
@@ -54,6 +54,14 @@ func (t tableNotInDynamoErr) Error() string {
return fmt.Sprintf("NBS table %s not present in DynamoDB table %s", t.nbs, t.dynamo)
}
func (dtra *dynamoTableReaderAt) Close() error {
return nil
}
func (dtra *dynamoTableReaderAt) clone() (tableReaderAt, error) {
return dtra, nil
}
func (dtra *dynamoTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
data, err := dtra.ddb.ReadTable(ctx, dtra.h, &Stats{})
if err != nil {
-178
View File
@@ -1,178 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"os"
"sort"
"sync"
)
func newFDCache(targetSize int) *fdCache {
return &fdCache{targetSize: targetSize, cache: map[string]fdCacheEntry{}}
}
// fdCache ref-counts open file descriptors, but doesn't keep a hard cap on
// the number of open files. Once the cache's target size is exceeded, opening
// a new file causes the cache to try to get the cache back to the target size
// by closing fds with zero refs. If there aren't enough such fds, fdCache
// gives up and tries again next time a caller refs a file.
type fdCache struct {
targetSize int
mu sync.Mutex
cache map[string]fdCacheEntry
}
type fdCacheEntry struct {
refCount uint32
f *os.File
}
// RefFile returns an opened *os.File for the file at |path|, or an error
// indicating why the file could not be opened. If the cache already had an
// entry for |path|, RefFile increments its refcount and returns the cached
// pointer. If not, it opens the file and caches the pointer for others to
// use. If RefFile returns an error, it's guaranteed that no refCounts were
// changed, so it's an error to make a subsequent call to UnrefFile().
// This is intended for clients that hold fds for extremely short periods.
func (fc *fdCache) RefFile(path string) (f *os.File, err error) {
refFile := func() *os.File {
if ce, present := fc.cache[path]; present {
ce.refCount++
fc.cache[path] = ce
return ce.f
}
return nil
}
f = func() *os.File {
fc.mu.Lock()
defer fc.mu.Unlock()
return refFile()
}()
if f != nil {
return f, nil
}
// Very much want this to be outside the lock, but the downside is that multiple callers may get here concurrently. That means we need to deal with the raciness below.
f, err = os.Open(path)
if err != nil {
return nil, err
}
fc.mu.Lock()
defer fc.mu.Unlock()
if cached := refFile(); cached != nil {
// Someone beat us to it, so close f and return cached fd
f.Close()
return cached, nil
}
// I won the race!
fc.cache[path] = fdCacheEntry{f: f, refCount: 1}
return f, nil
}
// UnrefFile reduces the refcount of the entry at |path|. If the cache is over
// |fc.targetSize|, UnrefFile makes a best effort to shrink the cache by dumping
// entries with a zero refcount. If there aren't enough zero refcount entries
// to drop to get the cache back to |fc.targetSize|, the cache will remain
// over |fc.targetSize| until the next call to UnrefFile().
func (fc *fdCache) UnrefFile(path string) error {
fc.mu.Lock()
defer fc.mu.Unlock()
if ce, present := fc.cache[path]; present {
ce.refCount--
fc.cache[path] = ce
}
if len(fc.cache) > fc.targetSize {
// Sadly, we can't remove items from a map while iterating, so we'll record the stuff we want to drop and then do it after
needed := len(fc.cache) - fc.targetSize
toDrop := make([]string, 0, needed)
for p, ce := range fc.cache {
if ce.refCount != 0 {
continue
}
toDrop = append(toDrop, p)
err := ce.f.Close()
if err != nil {
return err
}
needed--
if needed == 0 {
break
}
}
for _, p := range toDrop {
delete(fc.cache, p)
}
}
return nil
}
// ShrinkCache forcefully removes all file handles with a refcount of zero.
func (fc *fdCache) ShrinkCache() error {
fc.mu.Lock()
defer fc.mu.Unlock()
toDrop := make([]string, 0, len(fc.cache))
for p, ce := range fc.cache {
if ce.refCount != 0 {
continue
}
toDrop = append(toDrop, p)
err := ce.f.Close()
if err != nil {
return err
}
}
for _, p := range toDrop {
delete(fc.cache, p)
}
return nil
}
// Drop dumps the entire cache and closes all currently open files.
func (fc *fdCache) Drop() {
fc.mu.Lock()
defer fc.mu.Unlock()
for _, ce := range fc.cache {
ce.f.Close()
}
fc.cache = map[string]fdCacheEntry{}
}
// reportEntries is meant for testing.
func (fc *fdCache) reportEntries() sort.StringSlice {
fc.mu.Lock()
defer fc.mu.Unlock()
ret := make(sort.StringSlice, 0, len(fc.cache))
for p := range fc.cache {
ret = append(ret, p)
}
sort.Sort(ret)
return ret
}
-136
View File
@@ -1,136 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"fmt"
"os"
"path/filepath"
"sort"
"sync"
"testing"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFDCache(t *testing.T) {
dir := makeTempDir(t)
defer file.RemoveAll(dir)
paths := [3]string{}
for i := range paths {
name := fmt.Sprintf("file%d", i)
paths[i] = filepath.Join(dir, name)
err := os.WriteFile(paths[i], []byte(name), 0644)
require.NoError(t, err)
}
refNoError := func(fc *fdCache, p string, assert *assert.Assertions) *os.File {
f, err := fc.RefFile(p)
require.NoError(t, err)
assert.NotNil(f)
return f
}
t.Run("ConcurrentOpen", func(t *testing.T) {
assert := assert.New(t)
concurrency := 3
fc := newFDCache(3)
defer fc.Drop()
trigger := make(chan struct{})
wg := sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
<-trigger
fc.RefFile(paths[0])
}()
}
close(trigger)
wg.Wait()
present := fc.reportEntries()
if assert.Len(present, 1) {
ce := fc.cache[present[0]]
assert.EqualValues(concurrency, ce.refCount)
}
})
t.Run("NoEvictions", func(t *testing.T) {
assert := assert.New(t)
fc := newFDCache(2)
defer fc.Drop()
f := refNoError(fc, paths[0], assert)
f2 := refNoError(fc, paths[1], assert)
assert.NotEqual(f, f2)
dup := refNoError(fc, paths[0], assert)
assert.Equal(f, dup)
})
t.Run("Evictions", func(t *testing.T) {
assert := assert.New(t)
fc := newFDCache(1)
defer fc.Drop()
f0 := refNoError(fc, paths[0], assert)
f1 := refNoError(fc, paths[1], assert)
assert.NotEqual(f0, f1)
// f0 wasn't evicted, because that doesn't happen until UnrefFile()
dup := refNoError(fc, paths[0], assert)
assert.Equal(f0, dup)
expected := sort.StringSlice(paths[:2])
sort.Sort(expected)
assert.EqualValues(expected, fc.reportEntries())
// Unreffing f1 now should evict it
err := fc.UnrefFile(paths[1])
require.NoError(t, err)
assert.EqualValues(paths[:1], fc.reportEntries())
// Bring f1 back so we can test multiple evictions in a row
f1 = refNoError(fc, paths[1], assert)
assert.NotEqual(f0, f1)
// After adding f3, we should be able to evict both f0 and f1
f2 := refNoError(fc, paths[2], assert)
assert.NotEqual(f0, f2)
assert.NotEqual(f1, f2)
err = fc.UnrefFile(paths[0])
require.NoError(t, err)
err = fc.UnrefFile(paths[0])
require.NoError(t, err)
err = fc.UnrefFile(paths[1])
require.NoError(t, err)
assert.EqualValues(paths[2:], fc.reportEntries())
})
}
+3 -17
View File
@@ -33,20 +33,17 @@ import (
"time"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/util/tempfiles"
)
const tempTablePrefix = "nbs_table_"
func newFSTablePersister(dir string, fc *fdCache, q MemoryQuotaProvider) tablePersister {
d.PanicIfTrue(fc == nil)
return &fsTablePersister{dir, fc, q}
func newFSTablePersister(dir string, q MemoryQuotaProvider) tablePersister {
return &fsTablePersister{dir, q}
}
type fsTablePersister struct {
dir string
fc *fdCache
q MemoryQuotaProvider
}
@@ -54,7 +51,7 @@ var _ tablePersister = &fsTablePersister{}
var _ tableFilePersister = &fsTablePersister{}
func (ftp *fsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q, ftp.fc)
return newFileTableReader(ctx, ftp.dir, name, chunkCount, ftp.q)
}
func (ftp *fsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
@@ -154,11 +151,6 @@ func (ftp *fsTablePersister) persistTable(ctx context.Context, name addr, data [
}
newName := filepath.Join(ftp.dir, name.String())
err = ftp.fc.ShrinkCache()
if err != nil {
return nil, err
}
err = file.Rename(tempName, newName)
@@ -252,12 +244,6 @@ func (ftp *fsTablePersister) PruneTableFiles(ctx context.Context, contents manif
return err
}
err = ftp.fc.ShrinkCache()
if err != nil {
return err
}
ea := make(gcErrAccum)
for _, info := range fileInfos {
if info.IsDir() {
+4 -102
View File
@@ -35,57 +35,6 @@ import (
"github.com/stretchr/testify/require"
)
func TestFSTableCacheOnOpen(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
defer file.RemoveAll(dir)
names := []addr{}
cacheSize := 2
fc := newFDCache(cacheSize)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
// Create some tables manually, load them into the cache
func() {
for i := 0; i < cacheSize; i++ {
name, err := writeTableData(dir, []byte{byte(i)})
require.NoError(t, err)
names = append(names, name)
}
for _, name := range names {
tr, err := fts.Open(context.Background(), name, 1, nil)
require.NoError(t, err)
defer tr.close()
}
}()
// Tables should still be cached and on disk
for i, name := range names {
src, err := fts.Open(context.Background(), name, 1, nil)
require.NoError(t, err)
defer src.close()
h := computeAddr([]byte{byte(i)})
assert.True(src.has(h))
}
// Kick a table out of the cache
name, err := writeTableData(dir, []byte{0xff})
require.NoError(t, err)
tr, err := fts.Open(context.Background(), name, 1, nil)
require.NoError(t, err)
defer tr.close()
present := fc.reportEntries()
// Since 0 refcount entries are evicted randomly, the only thing we can validate is that fc remains at its target size
assert.Len(present, cacheSize)
err = fc.ShrinkCache()
require.NoError(t, err)
err = removeTables(dir, names...)
require.NoError(t, err)
}
func makeTempDir(t *testing.T) string {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
@@ -122,9 +71,7 @@ func TestFSTablePersisterPersist(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
src, err := persistTableData(fts, testChunks...)
require.NoError(t, err)
@@ -163,9 +110,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
src, err := fts.Persist(context.Background(), mt, existingTable, &Stats{})
require.NoError(t, err)
@@ -175,41 +120,6 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
assert.True(os.IsNotExist(err), "%v", err)
}
func TestFSTablePersisterCacheOnPersist(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
fc := newFDCache(1)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
defer file.RemoveAll(dir)
var name addr
func() {
src, err := persistTableData(fts, testChunks...)
require.NoError(t, err)
defer src.close()
name = src.hash()
}()
// Table should still be cached
src, err := fts.Open(context.Background(), name, uint32(len(testChunks)), nil)
require.NoError(t, err)
defer src.close()
assertChunksInReader(testChunks, src, assert)
// Evict |name| from cache
tr, err := persistTableData(fts, []byte{0xff})
require.NoError(t, err)
defer tr.close()
present := fc.reportEntries()
// Since 0 refcount entries are evicted randomly, the only thing we can validate is that fc remains at its target size
assert.Len(present, 1)
err = removeTables(dir, name)
require.NoError(t, err)
}
func TestFSTablePersisterConjoinAll(t *testing.T) {
ctx := context.Background()
assert := assert.New(t)
@@ -218,9 +128,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(len(sources))
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
for i, c := range testChunks {
randChunk := make([]byte, (i+1)*13)
@@ -251,10 +159,6 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
defer tr.close()
assertChunksInReader(testChunks, tr, assert)
}
present := fc.reportEntries()
// Since 0 refcount entries are evicted randomly, the only thing we can validate is that fc remains at its target size
assert.Len(present, len(sources))
}
func TestFSTablePersisterConjoinAllDups(t *testing.T) {
@@ -262,9 +166,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(t)
defer file.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, &UnlimitedQuotaProvider{})
fts := newFSTablePersister(dir, &UnlimitedQuotaProvider{})
reps := 3
sources := make(chunkSources, reps)
+39 -44
View File
@@ -33,8 +33,7 @@ import (
type fileTableReader struct {
tableReader
fc *fdCache
h addr
h addr
}
const (
@@ -52,16 +51,15 @@ func tableFileExists(ctx context.Context, dir string, h addr) (bool, error) {
return err == nil, err
}
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider, fc *fdCache) (cs chunkSource, err error) {
func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {
path := filepath.Join(dir, h.String())
var f *os.File
index, sz, err := func() (ti onHeapTableIndex, sz int64, err error) {
// Be careful with how |f| is used below. |RefFile| returns a cached
// os.File pointer so the code needs to use f in a concurrency-safe
// manner. Moving the file offset is BAD.
var f *os.File
f, err = fc.RefFile(path)
f, err = os.Open(path)
if err != nil {
return
}
@@ -103,14 +101,6 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
return
}
defer func() {
unrefErr := fc.UnrefFile(path)
if unrefErr != nil && err == nil {
q.ReleaseQuotaBytes(len(b))
err = unrefErr
}
}()
ti, err = parseTableIndex(ctx, b, q)
if err != nil {
q.ReleaseQuotaBytes(len(b))
@@ -120,72 +110,77 @@ func newFileTableReader(ctx context.Context, dir string, h addr, chunkCount uint
return
}()
if err != nil {
if f != nil {
f.Close()
}
return nil, err
}
if chunkCount != index.chunkCount() {
index.Close()
f.Close()
return nil, errors.New("unexpected chunk count")
}
tr, err := newTableReader(index, &cacheReaderAt{path, fc, sz}, fileBlockSize)
tr, err := newTableReader(index, &fileReaderAt{f, path, sz}, fileBlockSize)
if err != nil {
index.Close()
f.Close()
return nil, err
}
return &fileTableReader{
tr,
fc,
h,
}, nil
}
func (mmtr *fileTableReader) hash() addr {
return mmtr.h
func (ftr *fileTableReader) hash() addr {
return ftr.h
}
func (mmtr *fileTableReader) close() error {
return mmtr.tableReader.close()
func (ftr *fileTableReader) Close() error {
return ftr.tableReader.close()
}
func (mmtr *fileTableReader) clone() (chunkSource, error) {
tr, err := mmtr.tableReader.clone()
func (ftr *fileTableReader) clone() (chunkSource, error) {
tr, err := ftr.tableReader.clone()
if err != nil {
return &fileTableReader{}, err
}
return &fileTableReader{tr, mmtr.fc, mmtr.h}, nil
return &fileTableReader{tr, ftr.h}, nil
}
type cacheReaderAt struct {
type fileReaderAt struct {
f *os.File
path string
fc *fdCache
sz int64
}
func (cra *cacheReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return io.NopCloser(io.LimitReader(&readerAdapter{cra, 0, ctx}, cra.sz)), nil
func (fra *fileReaderAt) clone() (tableReaderAt, error) {
f, err := os.Open(fra.path)
if err != nil {
return nil, err
}
return &fileReaderAt{
f,
fra.path,
fra.sz,
}, nil
}
func (cra *cacheReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
var r io.ReaderAt
func (fra *fileReaderAt) Close() error {
return fra.f.Close()
}
func (fra *fileReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return os.Open(fra.path)
}
func (fra *fileReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
t1 := time.Now()
if r, err = cra.fc.RefFile(cra.path); err != nil {
return
}
defer func() {
stats.FileBytesPerRead.Sample(uint64(len(p)))
stats.FileReadLatency.SampleTimeSince(t1)
}()
defer func() {
unrefErr := cra.fc.UnrefFile(cra.path)
if err == nil {
err = unrefErr
}
}()
return r.ReadAt(p, off)
return fra.f.ReadAt(p, off)
}
+1 -4
View File
@@ -40,9 +40,6 @@ func TestMmapTableReader(t *testing.T) {
require.NoError(t, err)
defer file.RemoveAll(dir)
fc := newFDCache(1)
defer fc.Drop()
chunks := [][]byte{
[]byte("hello2"),
[]byte("goodbye2"),
@@ -54,7 +51,7 @@ func TestMmapTableReader(t *testing.T) {
err = os.WriteFile(filepath.Join(dir, h.String()), tableData, 0666)
require.NoError(t, err)
trc, err := newFileTableReader(ctx, dir, h, uint32(len(chunks)), &UnlimitedQuotaProvider{}, fc)
trc, err := newFileTableReader(ctx, dir, h, uint32(len(chunks)), &UnlimitedQuotaProvider{})
require.NoError(t, err)
defer trc.close()
assertChunksInReader(chunks, trc, assert)
-231
View File
@@ -1,231 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"errors"
"io"
"os"
"path/filepath"
"strings"
"sync"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/util/sizecache"
"github.com/dolthub/dolt/go/store/util/tempfiles"
)
type tableCache interface {
checkout(h addr) (io.ReaderAt, error)
checkin(h addr) error
store(h addr, data io.Reader, size uint64) error
}
type fsTableCache struct {
dir string
cache *sizecache.SizeCache
fd *fdCache
}
func newFSTableCache(dir string, cacheSize uint64, maxOpenFds int) (*fsTableCache, error) {
ftc := &fsTableCache{dir: dir, fd: newFDCache(maxOpenFds)}
ftc.cache = sizecache.NewWithExpireCallback(cacheSize, func(elm interface{}) {
ftc.expire(elm.(addr))
})
err := ftc.init(maxOpenFds)
if err != nil {
return nil, err
}
return ftc, nil
}
func (ftc *fsTableCache) init(concurrency int) error {
type finfo struct {
path string
h addr
size uint64
}
infos := make(chan finfo)
errc := make(chan error, 1)
go func() {
isTableFile := func(info os.FileInfo) bool {
return info.Mode().IsRegular() && ValidateAddr(info.Name())
}
isTempTableFile := func(info os.FileInfo) bool {
return info.Mode().IsRegular() && strings.HasPrefix(info.Name(), tempTablePrefix)
}
defer close(errc)
defer close(infos)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(ftc.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if path == ftc.dir {
return nil
}
if isTempTableFile(info) {
// ignore failure to remove temp file
_ = file.Remove(path)
return nil
}
if !isTableFile(info) {
return errors.New(path + " is not a table file; cache dir must contain only table files")
}
ad, err := parseAddr(info.Name())
if err != nil {
return err
}
infos <- finfo{path, ad, uint64(info.Size())}
return nil
})
}()
ae := atomicerr.New()
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for info := range infos {
if ae.IsSet() {
break
}
ftc.cache.Add(info.h, info.size, true)
_, err := ftc.fd.RefFile(info.path)
if err != nil {
ae.SetIfError(err)
break
}
err = ftc.fd.UnrefFile(info.path)
if err != nil {
ae.SetIfError(err)
break
}
}
}()
}
wg.Wait()
err := <-errc
if err != nil {
return err
}
if err := ae.Get(); err != nil {
return err
}
return nil
}
func (ftc *fsTableCache) checkout(h addr) (io.ReaderAt, error) {
if _, ok := ftc.cache.Get(h); !ok {
return nil, nil
}
fd, err := ftc.fd.RefFile(filepath.Join(ftc.dir, h.String()))
if err != nil {
return nil, err
}
return fd, nil
}
func (ftc *fsTableCache) checkin(h addr) error {
return ftc.fd.UnrefFile(filepath.Join(ftc.dir, h.String()))
}
func (ftc *fsTableCache) store(h addr, data io.Reader, size uint64) error {
path := filepath.Join(ftc.dir, h.String())
tempName, err := func() (name string, ferr error) {
var temp *os.File
temp, ferr = tempfiles.MovableTempFileProvider.NewFile(ftc.dir, tempTablePrefix)
if ferr != nil {
return "", ferr
}
defer func() {
closeErr := temp.Close()
if ferr == nil {
ferr = closeErr
}
}()
_, ferr = io.Copy(temp, data)
if ferr != nil {
return "", ferr
}
return temp.Name(), nil
}()
if err != nil {
return err
}
err = ftc.fd.ShrinkCache()
if err != nil {
return err
}
err = file.Rename(tempName, path)
if err != nil {
return err
}
ftc.cache.Add(h, size, true)
// Prime the file in the fd cache ignore err
if _, err = ftc.fd.RefFile(path); err == nil {
err := ftc.fd.UnrefFile(path)
if err != nil {
return err
}
}
return nil
}
func (ftc *fsTableCache) expire(h addr) error {
return file.Remove(filepath.Join(ftc.dir, h.String()))
}
-140
View File
@@ -1,140 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"bytes"
"io"
"os"
"path/filepath"
"sort"
"testing"
"github.com/dolthub/dolt/go/libraries/utils/file"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFSTableCache(t *testing.T) {
datas := [][]byte{[]byte("hello"), []byte("world"), []byte("goodbye")}
sort.SliceStable(datas, func(i, j int) bool { return len(datas[i]) < len(datas[j]) })
t.Run("ExpireLRU", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
sum := 0
for _, s := range datas[1:] {
sum += len(s)
}
tc, err := newFSTableCache(dir, uint64(sum), len(datas))
require.NoError(t, err)
for _, d := range datas {
err := tc.store(computeAddr(d), bytes.NewReader(d), uint64(len(d)))
require.NoError(t, err)
}
expiredName := computeAddr(datas[0])
r, err := tc.checkout(expiredName)
require.NoError(t, err)
assert.Nil(t, r)
_, fserr := os.Stat(filepath.Join(dir, expiredName.String()))
assert.True(t, os.IsNotExist(fserr))
for _, d := range datas[1:] {
name := computeAddr(d)
r, err := tc.checkout(name)
require.NoError(t, err)
assert.NotNil(t, r)
assertDataInReaderAt(t, d, r)
_, fserr := os.Stat(filepath.Join(dir, name.String()))
assert.False(t, os.IsNotExist(fserr))
}
})
t.Run("Init", func(t *testing.T) {
t.Run("Success", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
assert := assert.New(t)
var names []addr
for i := byte(0); i < 4; i++ {
name := computeAddr([]byte{i})
require.NoError(t, os.WriteFile(filepath.Join(dir, name.String()), nil, 0666))
names = append(names, name)
}
ftc, err := newFSTableCache(dir, 1024, 4)
require.NoError(t, err)
assert.NotNil(ftc)
for _, name := range names {
assert.NotNil(ftc.checkout(name))
}
})
t.Run("BadFile", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
require.NoError(t, os.WriteFile(filepath.Join(dir, "boo"), nil, 0666))
_, err := newFSTableCache(dir, 1024, 4)
assert.Error(t, err)
})
t.Run("ClearTempFile", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
tempFile := filepath.Join(dir, tempTablePrefix+"boo")
require.NoError(t, os.WriteFile(tempFile, nil, 0666))
_, err := newFSTableCache(dir, 1024, 4)
require.NoError(t, err)
_, fserr := os.Stat(tempFile)
assert.True(t, os.IsNotExist(fserr))
})
t.Run("Dir", func(t *testing.T) {
t.Parallel()
dir := makeTempDir(t)
defer file.RemoveAll(dir)
require.NoError(t, os.Mkdir(filepath.Join(dir, "sub"), 0777))
_, err := newFSTableCache(dir, 1024, 4)
assert.Error(t, err)
})
})
}
func assertDataInReaderAt(t *testing.T, data []byte, r io.ReaderAt) {
p := make([]byte, len(data))
n, err := r.ReadAt(p, 0)
require.NoError(t, err)
assert.Equal(t, len(data), n)
assert.Equal(t, data, p)
}
+1 -1
View File
@@ -36,7 +36,7 @@ func makeTestChunkJournal(t *testing.T) *chunkJournal {
m, err := getFileManifest(ctx, dir, syncFlush)
require.NoError(t, err)
q := NewUnlimitedMemQuotaProvider()
p := newFSTablePersister(dir, globalFDCache, q)
p := newFSTablePersister(dir, q)
nbf := types.Format_Default.VersionString()
j, err := newChunkJournal(ctx, nbf, dir, m, p.(*fsTablePersister))
require.NoError(t, err)
+8
View File
@@ -190,6 +190,14 @@ func tableReaderAtFromBytes(b []byte) tableReaderAt {
return tableReaderAtAdapter{bytes.NewReader(b)}
}
func (adapter tableReaderAtAdapter) Close() error {
return nil
}
func (adapter tableReaderAtAdapter) clone() (tableReaderAt, error) {
return adapter, nil
}
func (adapter tableReaderAtAdapter) Reader(ctx context.Context) (io.ReadCloser, error) {
r := *adapter.br
return io.NopCloser(&r), nil
+8
View File
@@ -60,6 +60,14 @@ type s3svc interface {
PutObjectWithContext(ctx aws.Context, input *s3.PutObjectInput, opts ...request.Option) (*s3.PutObjectOutput, error)
}
func (s3tra *s3TableReaderAt) Close() error {
return nil
}
func (s3tra *s3TableReaderAt) clone() (tableReaderAt, error) {
return s3tra, nil
}
func (s3tra *s3TableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
return s3tra.s3.Reader(ctx, s3tra.h)
}
+2 -5
View File
@@ -68,14 +68,11 @@ const (
var (
cacheOnce = sync.Once{}
makeManifestManager func(manifest) manifestManager
globalFDCache *fdCache
)
var tracer = otel.Tracer("github.com/dolthub/dolt/go/store/nbs")
func makeGlobalCaches() {
globalFDCache = newFDCache(defaultMaxTables)
manifestCache := newManifestCache(defaultManifestCacheSize)
manifestLocks := newManifestLocks()
makeManifestManager = func(m manifest) manifestManager { return manifestManager{m, manifestCache, manifestLocks} }
@@ -479,7 +476,7 @@ func newLocalStore(ctx context.Context, nbfVerStr string, dir string, memTableSi
if err != nil {
return nil, err
}
p := newFSTablePersister(dir, globalFDCache, q)
p := newFSTablePersister(dir, q)
c := conjoinStrategy(inlineConjoiner{maxTables})
return newNomsBlockStore(ctx, nbfVerStr, makeManifestManager(m), p, q, c, memTableSize)
@@ -495,7 +492,7 @@ func NewLocalJournalingStore(ctx context.Context, nbfVers, dir string, q MemoryQ
if err != nil {
return nil, err
}
p := newFSTablePersister(dir, globalFDCache, q)
p := newFSTablePersister(dir, q)
journal, err := newChunkJournal(ctx, nbfVers, dir, m, p.(*fsTablePersister))
if err != nil {
+14 -14
View File
@@ -131,6 +131,8 @@ func (ir indexResult) Length() uint32 {
type tableReaderAt interface {
ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error)
Reader(ctx context.Context) (io.ReadCloser, error)
Close() error
clone() (tableReaderAt, error)
}
// tableReader implements get & has queries against a single nbs table. goroutine safe.
@@ -663,7 +665,12 @@ func (tr tableReader) currentSize() uint64 {
}
func (tr tableReader) close() error {
return tr.idx.Close()
err := tr.idx.Close()
if err != nil {
tr.r.Close()
return err
}
return tr.r.Close()
}
func (tr tableReader) clone() (tableReader, error) {
@@ -671,22 +678,15 @@ func (tr tableReader) clone() (tableReader, error) {
if err != nil {
return tableReader{}, err
}
r, err := tr.r.clone()
if err != nil {
idx.Close()
return tableReader{}, err
}
return tableReader{
prefixes: tr.prefixes,
idx: idx,
r: tr.r,
r: r,
blockSize: tr.blockSize,
}, nil
}
type readerAdapter struct {
rat tableReaderAt
off int64
ctx context.Context
}
func (ra *readerAdapter) Read(p []byte) (n int, err error) {
n, err = ra.rat.ReadAtWithStats(ra.ctx, p, ra.off, &Stats{})
ra.off += int64(n)
return
}
-4
View File
@@ -182,15 +182,11 @@ var CopiedNomsFiles []CopiedNomsFile = []CopiedNomsFile{
{Path: "store/nbs/dynamo_manifest_test.go", NomsPath: "go/nbs/dynamo_manifest_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/dynamo_table_reader.go", NomsPath: "go/nbs/dynamo_table_reader.go", HadCopyrightNotice: true},
{Path: "store/nbs/dynamo_table_reader_test.go", NomsPath: "go/nbs/dynamo_table_reader_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/fd_cache.go", NomsPath: "go/nbs/fd_cache.go", HadCopyrightNotice: true},
{Path: "store/nbs/fd_cache_test.go", NomsPath: "go/nbs/fd_cache_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_manifest.go", NomsPath: "go/nbs/file_manifest.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_manifest_test.go", NomsPath: "go/nbs/file_manifest_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_table_persister.go", NomsPath: "go/nbs/file_table_persister.go", HadCopyrightNotice: true},
{Path: "store/nbs/file_table_persister_test.go", NomsPath: "go/nbs/file_table_persister_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/frag/main.go", NomsPath: "go/nbs/frag/main.go", HadCopyrightNotice: true},
{Path: "store/nbs/fs_table_cache.go", NomsPath: "go/nbs/fs_table_cache.go", HadCopyrightNotice: true},
{Path: "store/nbs/fs_table_cache_test.go", NomsPath: "go/nbs/fs_table_cache_test.go", HadCopyrightNotice: true},
{Path: "store/nbs/manifest.go", NomsPath: "go/nbs/manifest.go", HadCopyrightNotice: true},
{Path: "store/nbs/manifest_cache.go", NomsPath: "go/nbs/manifest_cache.go", HadCopyrightNotice: true},
{Path: "store/nbs/manifest_cache_test.go", NomsPath: "go/nbs/manifest_cache_test.go", HadCopyrightNotice: true},