NBS: s3TablePersister caches tables locally on write (#3507)

This commit is contained in:
cmasone-attic
2017-05-31 12:10:37 -07:00
committed by Rafael Weinstein
parent b98562656a
commit e014edfa66
28 changed files with 355 additions and 131 deletions

View File

@@ -180,7 +180,7 @@ func conjoinTables(p tablePersister, upstream []tableSpec, stats *Stats) (conjoi
toConjoin, toKeep := chooseConjoinees(sources)
conjoinedSrc := p.ConjoinAll(toConjoin, stats)
stats.ConjoinLatency.SampleTime(time.Since(t1))
stats.ConjoinLatency.SampleTime(roundedSince(t1))
stats.TablesPerConjoin.SampleLen(len(toConjoin))
stats.ChunksPerConjoin.Sample(uint64(conjoinedSrc.count()))

View File

@@ -54,7 +54,7 @@ func (dm dynamoManifest) Name() string {
func (dm dynamoManifest) ParseIfExists(stats *Stats, readHook func()) (exists bool, contents manifestContents) {
t1 := time.Now()
defer func() { stats.ReadManifestLatency.SampleTime(time.Since(t1)) }()
defer func() { stats.ReadManifestLatency.SampleTime(roundedSince(t1)) }()
result, err := dm.ddbsvc.GetItem(&dynamodb.GetItemInput{
ConsistentRead: aws.Bool(true), // This doubles the cost :-(
@@ -105,7 +105,8 @@ func (dm dynamoManifest) Update(lastLock addr, newContents manifestContents, sta
}
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTime(time.Since(t1)) }()
defer func() { stats.WriteManifestLatency.SampleTime(roundedSince(t1)) }()
putArgs := dynamodb.PutItemInput{
TableName: aws.String(dm.table),
Item: map[string]*dynamodb.AttributeValue{

View File

@@ -18,6 +18,7 @@ import (
const (
defaultAWSReadLimit = 1024
awsMaxOpenFiles = 8192
awsMaxTables = 128
)
@@ -33,11 +34,16 @@ type AWSStoreFactory struct {
// NewAWSStoreFactory returns a ChunkStore factory that vends NomsBlockStore
// instances that store manifests in the named DynamoDB table, and chunk data
// in the named S3 bucket. All connections to AWS services share |sess|.
func NewAWSStoreFactory(sess *session.Session, table, bucket string, indexCacheSize uint64) chunks.Factory {
func NewAWSStoreFactory(sess *session.Session, table, bucket string, indexCacheSize, tableCacheSize uint64, tableCacheDir string) chunks.Factory {
var indexCache *indexCache
if indexCacheSize > 0 {
indexCache = newIndexCache(indexCacheSize)
}
var tc *fsTableCache
if tableCacheSize > 0 {
tc = newFSTableCache(tableCacheDir, tableCacheSize, awsMaxOpenFiles)
}
return &AWSStoreFactory{
dynamodb.New(sess),
&s3TablePersister{
@@ -48,6 +54,7 @@ func NewAWSStoreFactory(sess *session.Session, table, bucket string, indexCacheS
maxS3PartSize,
indexCache,
make(chan struct{}, defaultAWSReadLimit),
tc,
},
table,
newManifestCache(defaultManifestCacheSize),

View File

@@ -16,7 +16,7 @@ import (
func TestLocalStoreFactory(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(assert)
dir := makeTempDir(t)
defer os.RemoveAll(dir)
f := NewLocalStoreFactory(dir, 0, 8)

View File

@@ -17,7 +17,7 @@ import (
)
func TestFDCache(t *testing.T) {
dir := makeTempDir(assert.New(t))
dir := makeTempDir(t)
defer os.RemoveAll(dir)
paths := [3]string{}

View File

@@ -50,7 +50,7 @@ func (fm fileManifest) Name() string {
// This is to allow for race condition testing.
func (fm fileManifest) ParseIfExists(stats *Stats, readHook func()) (exists bool, contents manifestContents) {
t1 := time.Now()
defer func() { stats.ReadManifestLatency.SampleTime(time.Since(t1)) }()
defer func() { stats.ReadManifestLatency.SampleTime(roundedSince(t1)) }()
// !exists(lockFileName) => unitialized store
if l := openIfExists(filepath.Join(fm.dir, lockFileName)); l != nil {
@@ -111,7 +111,8 @@ func (fm fileManifest) Update(lastLock addr, newContents manifestContents, stats
}
t1 := time.Now()
defer func() { stats.WriteManifestLatency.SampleTime(time.Since(t1)) }()
defer func() { stats.WriteManifestLatency.SampleTime(roundedSince(t1)) }()
// Write a temporary manifest file, to be renamed over manifestFileName upon success.
// The closure here ensures this file is closed before moving on.
tempManifestPath := func() string {

View File

@@ -5,7 +5,6 @@
package nbs
import (
"bytes"
"crypto/rand"
"fmt"
"io/ioutil"
@@ -19,7 +18,7 @@ import (
func TestFSTableCacheOnOpen(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(assert)
dir := makeTempDir(t)
defer os.RemoveAll(dir)
names := []addr{}
@@ -58,6 +57,12 @@ func TestFSTableCacheOnOpen(t *testing.T) {
assert.Len(present, cacheSize)
}
func makeTempDir(t *testing.T) string {
dir, err := ioutil.TempDir("", "")
assert.NoError(t, err)
return dir
}
func writeTableData(dir string, chunx ...[]byte) (name addr, err error) {
var tableData []byte
tableData, name = buildTable(chunx)
@@ -85,7 +90,7 @@ func contains(s sort.StringSlice, e string) bool {
func TestFSTablePersisterPersist(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(assert)
dir := makeTempDir(t)
defer os.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
@@ -96,7 +101,7 @@ func TestFSTablePersisterPersist(t *testing.T) {
if assert.True(src.count() > 0) {
buff, err := ioutil.ReadFile(filepath.Join(dir, src.hash().String()))
assert.NoError(err)
tr := newTableReader(parseTableIndex(buff), bytes.NewReader(buff), fileBlockSize)
tr := newTableReader(parseTableIndex(buff), tableReaderAtFromBytes(buff), fileBlockSize)
assertChunksInReader(testChunks, tr, assert)
}
}
@@ -121,7 +126,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
assert.True(existingTable.addChunk(computeAddr(c), c))
}
dir := makeTempDir(assert)
dir := makeTempDir(t)
defer os.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
@@ -136,7 +141,7 @@ func TestFSTablePersisterPersistNoData(t *testing.T) {
func TestFSTablePersisterCacheOnPersist(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(assert)
dir := makeTempDir(t)
fc := newFDCache(1)
defer fc.Drop()
fts := newFSTablePersister(dir, fc, nil)
@@ -168,7 +173,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
assert.True(len(testChunks) > 1, "Whoops, this test isn't meaningful")
sources := make(chunkSources, len(testChunks))
dir := makeTempDir(assert)
dir := makeTempDir(t)
defer os.RemoveAll(dir)
fc := newFDCache(len(sources))
defer fc.Drop()
@@ -188,7 +193,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
if assert.True(src.count() > 0) {
buff, err := ioutil.ReadFile(filepath.Join(dir, src.hash().String()))
assert.NoError(err)
tr := newTableReader(parseTableIndex(buff), bytes.NewReader(buff), fileBlockSize)
tr := newTableReader(parseTableIndex(buff), tableReaderAtFromBytes(buff), fileBlockSize)
assertChunksInReader(testChunks, tr, assert)
}
@@ -199,7 +204,7 @@ func TestFSTablePersisterConjoinAll(t *testing.T) {
func TestFSTablePersisterConjoinAllDups(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(assert)
dir := makeTempDir(t)
defer os.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
@@ -219,7 +224,7 @@ func TestFSTablePersisterConjoinAllDups(t *testing.T) {
if assert.True(src.count() > 0) {
buff, err := ioutil.ReadFile(filepath.Join(dir, src.hash().String()))
assert.NoError(err)
tr := newTableReader(parseTableIndex(buff), bytes.NewReader(buff), fileBlockSize)
tr := newTableReader(parseTableIndex(buff), tableReaderAtFromBytes(buff), fileBlockSize)
assertChunksInReader(testChunks, tr, assert)
assert.EqualValues(reps*len(testChunks), tr.count())
}

118
go/nbs/fs_table_cache.go Normal file
View File

@@ -0,0 +1,118 @@
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package nbs
import (
"errors"
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/util/sizecache"
)
type fsTableCache struct {
dir string
cache *sizecache.SizeCache
fd *fdCache
}
func newFSTableCache(dir string, cacheSize uint64, maxOpenFds int) *fsTableCache {
ftc := &fsTableCache{dir: dir, fd: newFDCache(maxOpenFds)}
ftc.cache = sizecache.NewWithExpireCallback(cacheSize, func(elm interface{}) {
ftc.expire(elm.(addr))
})
ftc.init(maxOpenFds)
return ftc
}
func (ftc *fsTableCache) init(concurrency int) {
type finfo struct {
path string
h addr
size uint64
}
infos := make(chan finfo)
errc := make(chan error, 1)
go func() {
isTableFile := func(info os.FileInfo) bool {
return info.Mode().IsRegular() && ValidateAddr(info.Name())
}
defer close(errc)
defer close(infos)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(ftc.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if path == ftc.dir {
return nil
}
if !isTableFile(info) {
return errors.New(path + " is not a table file; cache dir must contain only table files")
}
infos <- finfo{path, ParseAddr([]byte(info.Name())), uint64(info.Size())}
return nil
})
}()
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for info := range infos {
ftc.cache.Add(info.h, info.size, true)
ftc.fd.RefFile(info.path)
ftc.fd.UnrefFile(info.path)
}
}()
}
wg.Wait()
d.PanicIfError(<-errc)
}
func (ftc *fsTableCache) checkout(h addr) io.ReaderAt {
if _, ok := ftc.cache.Get(h); !ok {
return nil
}
if fd, err := ftc.fd.RefFile(filepath.Join(ftc.dir, h.String())); err == nil {
return fd
}
return nil
}
func (ftc *fsTableCache) checkin(h addr) {
ftc.fd.UnrefFile(filepath.Join(ftc.dir, h.String()))
}
func (ftc *fsTableCache) store(h addr, data io.Reader, size uint64) {
path := filepath.Join(ftc.dir, h.String())
tempName := func() string {
temp, err := ioutil.TempFile(ftc.dir, "nbs_table_")
d.PanicIfError(err)
defer checkClose(temp)
io.Copy(temp, data)
return temp.Name()
}()
err := os.Rename(tempName, path)
d.PanicIfError(err)
ftc.cache.Add(h, size, true)
ftc.fd.RefFile(path) // Prime the file in the fd cache
ftc.fd.UnrefFile(path)
}
func (ftc *fsTableCache) expire(h addr) {
err := os.Remove(filepath.Join(ftc.dir, h.String()))
d.PanicIfError(err)
}

View File

@@ -84,20 +84,32 @@ func TestMemTableWrite(t *testing.T) {
td1, _ := buildTable(chunks[1:2])
td2, _ := buildTable(chunks[2:])
tr1 := newTableReader(parseTableIndex(td1), bytes.NewReader(td1), fileBlockSize)
tr2 := newTableReader(parseTableIndex(td2), bytes.NewReader(td2), fileBlockSize)
tr1 := newTableReader(parseTableIndex(td1), tableReaderAtFromBytes(td1), fileBlockSize)
tr2 := newTableReader(parseTableIndex(td2), tableReaderAtFromBytes(td2), fileBlockSize)
assert.True(tr1.has(computeAddr(chunks[1])))
assert.True(tr2.has(computeAddr(chunks[2])))
_, data, count := mt.write(chunkReaderGroup{tr1, tr2}, &Stats{})
assert.Equal(uint32(1), count)
outReader := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize)
outReader := newTableReader(parseTableIndex(data), tableReaderAtFromBytes(data), fileBlockSize)
assert.True(outReader.has(computeAddr(chunks[0])))
assert.False(outReader.has(computeAddr(chunks[1])))
assert.False(outReader.has(computeAddr(chunks[2])))
}
type tableReaderAtAdapter struct {
*bytes.Reader
}
func tableReaderAtFromBytes(b []byte) tableReaderAt {
return tableReaderAtAdapter{bytes.NewReader(b)}
}
func (adapter tableReaderAtAdapter) ReadAtWithStats(p []byte, off int64, stats *Stats) (n int, err error) {
return adapter.ReadAt(p, off)
}
func TestMemTableSnappyWriteOutOfLine(t *testing.T) {
assert := assert.New(t)
mt := newMemTable(1024)

View File

@@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"strconv"
"time"
"golang.org/x/sys/unix"
@@ -86,11 +87,19 @@ type cacheReaderAt struct {
fc *fdCache
}
func (cra *cacheReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
func (cra *cacheReaderAt) ReadAtWithStats(p []byte, off int64, stats *Stats) (n int, err error) {
var r io.ReaderAt
t1 := time.Now()
if r, err = cra.fc.RefFile(cra.path); err != nil {
return
}
defer func() {
stats.FileBytesPerRead.Sample(uint64(len(p)))
stats.FileReadLatency.SampleTime(roundedSince(t1))
}()
defer cra.fc.UnrefFile(cra.path)
return r.ReadAt(p, off)
}

View File

@@ -31,7 +31,7 @@ func newPersistingChunkSource(mt *memTable, haver chunkReader, p tablePersister,
<-rl
if cs.count() > 0 {
stats.PersistLatency.SampleTime(time.Since(t1))
stats.PersistLatency.SampleTime(roundedSince(t1))
}
}()
return ccs

View File

@@ -5,7 +5,6 @@
package nbs
import (
"bytes"
"fmt"
"sync"
"testing"
@@ -207,18 +206,21 @@ func newFakeTableSet() tableSet {
}
func newFakeTablePersister() tablePersister {
return fakeTablePersister{map[addr]tableReader{}}
return fakeTablePersister{map[addr]tableReader{}, &sync.RWMutex{}}
}
type fakeTablePersister struct {
sources map[addr]tableReader
mu *sync.RWMutex
}
func (ftp fakeTablePersister) Persist(mt *memTable, haver chunkReader, stats *Stats) chunkSource {
if mt.count() > 0 {
name, data, chunkCount := mt.write(haver, stats)
if chunkCount > 0 {
ftp.sources[name] = newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize)
ftp.mu.Lock()
defer ftp.mu.Unlock()
ftp.sources[name] = newTableReader(parseTableIndex(data), tableReaderAtFromBytes(data), fileBlockSize)
return chunkSourceAdapter{ftp.sources[name], name}
}
}
@@ -228,7 +230,9 @@ func (ftp fakeTablePersister) Persist(mt *memTable, haver chunkReader, stats *St
func (ftp fakeTablePersister) ConjoinAll(sources chunkSources, stats *Stats) chunkSource {
name, data, chunkCount := compactSourcesToBuffer(sources)
if chunkCount > 0 {
ftp.sources[name] = newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize)
ftp.mu.Lock()
defer ftp.mu.Unlock()
ftp.sources[name] = newTableReader(parseTableIndex(data), tableReaderAtFromBytes(data), fileBlockSize)
return chunkSourceAdapter{ftp.sources[name], name}
}
return emptyChunkSource{}
@@ -277,6 +281,8 @@ func compactSourcesToBuffer(sources chunkSources) (name addr, data []byte, chunk
}
func (ftp fakeTablePersister) Open(name addr, chunkCount uint32) chunkSource {
ftp.mu.RLock()
defer ftp.mu.RUnlock()
return chunkSourceAdapter{ftp.sources[name], name}
}

View File

@@ -57,7 +57,7 @@ func (m *fakeS3) readerForTable(name addr) chunkReader {
m.mu.Lock()
defer m.mu.Unlock()
if buff, present := m.data[name.String()]; present {
return newTableReader(parseTableIndex(buff), bytes.NewReader(buff), s3BlockSize)
return newTableReader(parseTableIndex(buff), tableReaderAtFromBytes(buff), s3BlockSize)
}
return nil
}

View File

@@ -32,10 +32,11 @@ type s3TablePersister struct {
targetPartSize, minPartSize, maxPartSize uint64
indexCache *indexCache
readRl chan struct{}
tc *fsTableCache
}
func (s3p s3TablePersister) Open(name addr, chunkCount uint32) chunkSource {
return newS3TableReader(s3p.s3, s3p.bucket, name, chunkCount, s3p.indexCache, s3p.readRl)
return newS3TableReader(s3p.s3, s3p.bucket, name, chunkCount, s3p.indexCache, s3p.readRl, s3p.tc)
}
type s3UploadedPart struct {
@@ -52,6 +53,9 @@ func (s3p s3TablePersister) persistTable(name addr, data []byte, chunkCount uint
return emptyChunkSource{}
}
t1 := time.Now()
if s3p.tc != nil {
go s3p.tc.store(name, bytes.NewReader(data), uint64(len(data)))
}
s3p.multipartUpload(data, name.String())
verbose.Log("Compacted table of %d Kb in %s", len(data)/1024, time.Since(t1))
@@ -209,9 +213,23 @@ func (s3p s3TablePersister) ConjoinAll(sources chunkSources, stats *Stats) chunk
s3p.executeCompactionPlan(plan, name.String())
verbose.Log("Compacted table of %d Kb in %s", plan.totalCompressedData/1024, time.Since(t1))
if s3p.tc != nil {
go s3p.loadIntoCache(name) // load conjoined table to the cache
}
return s3p.newReaderFromIndexData(plan.mergedIndex, name)
}
func (s3p s3TablePersister) loadIntoCache(name addr) {
input := &s3.GetObjectInput{
Bucket: aws.String(s3p.bucket),
Key: aws.String(name.String()),
}
result, err := s3p.s3.GetObject(input)
d.PanicIfError(err)
s3p.tc.store(name, result.Body, uint64(*result.ContentLength))
}
func (s3p s3TablePersister) executeCompactionPlan(plan compactionPlan, key string) {
uploadID := s3p.startMultipartUpload(key)
multipartUpload, err := s3p.assembleTable(plan, key, uploadID)

View File

@@ -5,7 +5,6 @@
package nbs
import (
"bytes"
"math/rand"
"sync"
"testing"
@@ -23,12 +22,12 @@ func TestS3TablePersisterPersist(t *testing.T) {
}
s3svc := makeFakeS3(assert)
cache := newIndexCache(1024)
ic := newIndexCache(1024)
sz := calcPartSize(mt, 3)
s3p := s3TablePersister{s3: s3svc, bucket: "bucket", targetPartSize: sz, indexCache: cache}
s3p := s3TablePersister{s3: s3svc, bucket: "bucket", targetPartSize: sz, indexCache: ic}
src := s3p.Persist(mt, nil, &Stats{})
assert.NotNil(cache.get(src.hash()))
assert.NotNil(ic.get(src.hash()))
if assert.True(src.count() > 0) {
if r := s3svc.readerForTable(src.hash()); assert.NotNil(r) {
@@ -168,10 +167,14 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
targetPartSize := uint64(1024)
minPartSize, maxPartSize := targetPartSize, 5*targetPartSize
cache := newIndexCache(1024)
ic := newIndexCache(1024)
rl := make(chan struct{}, 8)
defer close(rl)
newPersister := func(s3svc s3svc) s3TablePersister {
return s3TablePersister{s3svc, "bucket", targetPartSize, minPartSize, maxPartSize, ic, rl, nil}
}
smallChunks := [][]byte{}
rnd := rand.New(rand.NewSource(0))
for smallChunkTotal := uint64(0); smallChunkTotal <= uint64(minPartSize); {
@@ -195,12 +198,12 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
t.Run("TotalUnderMinSize", func(t *testing.T) {
assert := assert.New(t)
s3svc := makeFakeS3(assert)
s3p := s3TablePersister{s3svc, "bucket", targetPartSize, minPartSize, maxPartSize, cache, rl}
s3p := newPersister(s3svc)
chunks := smallChunks[:len(smallChunks)-1]
sources := makeSources(s3p, chunks)
src := s3p.ConjoinAll(sources, &Stats{})
assert.NotNil(cache.get(src.hash()))
assert.NotNil(ic.get(src.hash()))
if assert.True(src.count() > 0) {
if r := s3svc.readerForTable(src.hash()); assert.NotNil(r) {
@@ -212,11 +215,11 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
t.Run("TotalOverMinSize", func(t *testing.T) {
assert := assert.New(t)
s3svc := makeFakeS3(assert)
s3p := s3TablePersister{s3svc, "bucket", targetPartSize, minPartSize, maxPartSize, cache, rl}
s3p := newPersister(s3svc)
sources := makeSources(s3p, smallChunks)
src := s3p.ConjoinAll(sources, &Stats{})
assert.NotNil(cache.get(src.hash()))
assert.NotNil(ic.get(src.hash()))
if assert.True(src.count() > 0) {
if r := s3svc.readerForTable(src.hash()); assert.NotNil(r) {
@@ -237,7 +240,7 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
t.Run("AllOverMax", func(t *testing.T) {
assert := assert.New(t)
s3svc := makeFakeS3(assert)
s3p := s3TablePersister{s3svc, "bucket", targetPartSize, minPartSize, maxPartSize, cache, rl}
s3p := newPersister(s3svc)
// Make 2 chunk sources that each have >maxPartSize chunk data
sources := make(chunkSources, 2)
@@ -249,7 +252,7 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
sources[i] = s3p.Persist(mt, nil, &Stats{})
}
src := s3p.ConjoinAll(sources, &Stats{})
assert.NotNil(cache.get(src.hash()))
assert.NotNil(ic.get(src.hash()))
if assert.True(src.count() > 0) {
if r := s3svc.readerForTable(src.hash()); assert.NotNil(r) {
@@ -262,7 +265,7 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
t.Run("SomeOverMax", func(t *testing.T) {
assert := assert.New(t)
s3svc := makeFakeS3(assert)
s3p := s3TablePersister{s3svc, "bucket", targetPartSize, minPartSize, maxPartSize, cache, rl}
s3p := newPersister(s3svc)
// Add one chunk source that has >maxPartSize data
mtb := newMemTable(uint64(2 * maxPartSize))
@@ -281,7 +284,7 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
sources := chunkSources{s3p.Persist(mt, nil, &Stats{}), s3p.Persist(mtb, nil, &Stats{})}
src := s3p.ConjoinAll(sources, &Stats{})
assert.NotNil(cache.get(src.hash()))
assert.NotNil(ic.get(src.hash()))
if assert.True(src.count() > 0) {
if r := s3svc.readerForTable(src.hash()); assert.NotNil(r) {
@@ -294,7 +297,7 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
t.Run("Mix", func(t *testing.T) {
assert := assert.New(t)
s3svc := makeFakeS3(assert)
s3p := s3TablePersister{s3svc, "bucket", targetPartSize, minPartSize, maxPartSize, cache, rl}
s3p := newPersister(s3svc)
// Start with small tables. Since total > minPartSize, will require more than one part to upload.
sources := make(chunkSources, len(smallChunks))
@@ -322,7 +325,7 @@ func TestS3TablePersisterConjoinAll(t *testing.T) {
sources = append(sources, s3p.Persist(mt, nil, &Stats{}))
src := s3p.ConjoinAll(sources, &Stats{})
assert.NotNil(cache.get(src.hash()))
assert.NotNil(ic.get(src.hash()))
if assert.True(src.count() > 0) {
if r := s3svc.readerForTable(src.hash()); assert.NotNil(r) {
@@ -347,6 +350,6 @@ func bytesToChunkSource(bs ...[]byte) chunkSource {
}
tableSize, name := tw.finish()
data := buff[:tableSize]
rdr := newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize)
rdr := newTableReader(parseTableIndex(data), tableReaderAtFromBytes(data), fileBlockSize)
return chunkSourceAdapter{rdr, name}
}

View File

@@ -30,6 +30,7 @@ type s3TableReader struct {
bucket string
h addr
readRl chan struct{}
tc *fsTableCache
}
type s3svc interface {
@@ -42,8 +43,8 @@ type s3svc interface {
PutObject(input *s3.PutObjectInput) (*s3.PutObjectOutput, error)
}
func newS3TableReader(s3 s3svc, bucket string, h addr, chunkCount uint32, indexCache *indexCache, readRl chan struct{}) chunkSource {
source := &s3TableReader{s3: s3, bucket: bucket, h: h, readRl: readRl}
func newS3TableReader(s3 s3svc, bucket string, h addr, chunkCount uint32, indexCache *indexCache, readRl chan struct{}, tc *fsTableCache) chunkSource {
source := &s3TableReader{s3: s3, bucket: bucket, h: h, readRl: readRl, tc: tc}
var index tableIndex
found := false
@@ -74,7 +75,25 @@ func (s3tr *s3TableReader) hash() addr {
return s3tr.h
}
func (s3tr *s3TableReader) ReadAt(p []byte, off int64) (n int, err error) {
func (s3tr *s3TableReader) ReadAtWithStats(p []byte, off int64, stats *Stats) (n int, err error) {
t1 := time.Now()
if s3tr.tc != nil {
r := s3tr.tc.checkout(s3tr.hash())
if r != nil {
defer func() {
stats.FileBytesPerRead.Sample(uint64(len(p)))
stats.FileReadLatency.SampleTime(roundedSince(t1))
}()
defer s3tr.tc.checkin(s3tr.hash())
return r.ReadAt(p, off)
}
}
defer func() {
stats.S3BytesPerRead.Sample(uint64(len(p)))
stats.S3ReadLatency.SampleTime(roundedSince(t1))
}()
return s3tr.readRange(p, s3RangeHeader(off, int64(len(p))))
}

View File

@@ -29,7 +29,7 @@ func TestS3TableReader(t *testing.T) {
tableData, h := buildTable(chunks)
s3.data[h.String()] = tableData
trc := newS3TableReader(s3, "bucket", h, uint32(len(chunks)), nil, nil)
trc := newS3TableReader(s3, "bucket", h, uint32(len(chunks)), nil, nil, nil)
assertChunksInReader(chunks, trc, assert)
}
@@ -51,7 +51,7 @@ func TestS3TableReaderIndexCache(t *testing.T) {
cache := newIndexCache(1024)
cache.put(h, index)
trc := newS3TableReader(s3, "bucket", h, uint32(len(chunks)), cache, nil)
trc := newS3TableReader(s3, "bucket", h, uint32(len(chunks)), cache, nil, nil)
assert.Equal(0, s3.getCount) // constructing the table shouldn't have resulted in any reads
@@ -72,7 +72,7 @@ func TestS3TableReaderFails(t *testing.T) {
fake.data[h.String()] = tableData
trc := newS3TableReader(makeFlakyS3(fake), "bucket", h, uint32(len(chunks)), nil, nil)
trc := newS3TableReader(makeFlakyS3(fake), "bucket", h, uint32(len(chunks)), nil, nil, nil)
assert.Equal(2, fake.getCount) // constructing the table should have resulted in 2 reads
assertChunksInReader(chunks, trc, assert)

View File

@@ -14,9 +14,10 @@ type Stats struct {
GetLatency metrics.Histogram
ChunksPerGet metrics.Histogram
ReadLatency metrics.Histogram
BytesPerRead metrics.Histogram
ChunksPerRead metrics.Histogram
FileReadLatency metrics.Histogram
FileBytesPerRead metrics.Histogram
S3ReadLatency metrics.Histogram
S3BytesPerRead metrics.Histogram
HasLatency metrics.Histogram
AddressesPerHas metrics.Histogram
@@ -39,8 +40,10 @@ type Stats struct {
func NewStats() *Stats {
return &Stats{
GetLatency: metrics.NewTimeHistogram(),
ReadLatency: metrics.NewTimeHistogram(),
BytesPerRead: metrics.NewByteHistogram(),
FileReadLatency: metrics.NewTimeHistogram(),
FileBytesPerRead: metrics.NewByteHistogram(),
S3ReadLatency: metrics.NewTimeHistogram(),
S3BytesPerRead: metrics.NewByteHistogram(),
HasLatency: metrics.NewTimeHistogram(),
PutLatency: metrics.NewTimeHistogram(),
PersistLatency: metrics.NewTimeHistogram(),
@@ -56,9 +59,11 @@ func (s *Stats) Add(other Stats) {
s.GetLatency.Add(other.GetLatency)
s.ChunksPerGet.Add(other.ChunksPerGet)
s.ReadLatency.Add(other.ReadLatency)
s.BytesPerRead.Add(other.BytesPerRead)
s.ChunksPerRead.Add(other.ChunksPerRead)
s.FileReadLatency.Add(other.FileReadLatency)
s.FileBytesPerRead.Add(other.FileBytesPerRead)
s.S3ReadLatency.Add(other.S3ReadLatency)
s.S3BytesPerRead.Add(other.S3BytesPerRead)
s.HasLatency.Add(other.HasLatency)
s.AddressesPerHas.Add(other.AddressesPerHas)
@@ -80,9 +85,11 @@ func (s Stats) Delta(other Stats) Stats {
s.GetLatency.Delta(other.GetLatency),
s.ChunksPerGet.Delta(other.ChunksPerGet),
s.ReadLatency.Delta(other.ReadLatency),
s.BytesPerRead.Delta(other.BytesPerRead),
s.ChunksPerRead.Delta(other.ChunksPerRead),
s.FileReadLatency.Delta(other.FileReadLatency),
s.FileBytesPerRead.Delta(other.FileBytesPerRead),
s.S3ReadLatency.Delta(other.S3ReadLatency),
s.S3BytesPerRead.Delta(other.S3BytesPerRead),
s.HasLatency.Delta(other.HasLatency),
s.AddressesPerHas.Delta(other.AddressesPerHas),
@@ -107,9 +114,10 @@ func (s Stats) String() string {
return fmt.Sprintf(`---NBS Stats---
GetLatency: %s
ChunksPerGet: %s
ReadLatency: %s
ChunksPerRead: %s
BytesPerRead: %s
FileReadLatency: %s
FileBytesPerRead: %s
S3ReadLatency: %s
S3BytesPerRead: %s
HasLatency: %s
AddressesHasGet: %s
PutLatency: %s
@@ -126,9 +134,11 @@ WriteManifestLatency: %s
s.GetLatency,
s.ChunksPerGet,
s.ReadLatency,
s.ChunksPerRead,
s.BytesPerRead,
s.FileReadLatency,
s.FileBytesPerRead,
s.S3ReadLatency,
s.S3BytesPerRead,
s.HasLatency,
s.AddressesPerHas,

View File

@@ -49,7 +49,7 @@ func TestStats(t *testing.T) {
assert.False(store.Get(c2.Hash()).IsEmpty())
assert.False(store.Get(c3.Hash()).IsEmpty())
assert.Equal(uint64(3), stats(store).GetLatency.Samples())
assert.Equal(uint64(0), stats(store).ReadLatency.Samples())
assert.Equal(uint64(0), stats(store).FileReadLatency.Samples())
assert.Equal(uint64(4), stats(store).ChunksPerGet.Sum())
store.Commit(store.Root(), store.Root())
@@ -66,9 +66,8 @@ func TestStats(t *testing.T) {
store.Get(c1.Hash())
store.Get(c2.Hash())
store.Get(c3.Hash())
assert.Equal(uint64(3), stats(store).ReadLatency.Samples())
assert.Equal(uint64(36), stats(store).BytesPerRead.Sum())
assert.Equal(uint64(4), stats(store).ChunksPerRead.Sum())
assert.Equal(uint64(3), stats(store).FileReadLatency.Samples())
assert.Equal(uint64(36), stats(store).FileBytesPerRead.Sum())
// Try A GetMany
chnx := make([]chunks.Chunk, 3)
@@ -81,9 +80,8 @@ func TestStats(t *testing.T) {
}
chunkChan := make(chan *chunks.Chunk, 3)
store.GetMany(hashes.HashSet(), chunkChan)
assert.Equal(uint64(4), stats(store).ReadLatency.Samples())
assert.Equal(uint64(60), stats(store).BytesPerRead.Sum())
assert.Equal(uint64(7), stats(store).ChunksPerRead.Sum())
assert.Equal(uint64(4), stats(store).FileReadLatency.Samples())
assert.Equal(uint64(60), stats(store).FileBytesPerRead.Sum())
// Force a conjoin
store.c = newAsyncConjoiner(2)

View File

@@ -75,6 +75,7 @@ func NewAWSStore(table, ns, bucket string, s3 s3svc, ddb ddbsvc, memTableSize ui
maxS3PartSize,
globalIndexCache,
make(chan struct{}, 32),
nil,
}
return newAWSStore(table, ns, ddb, globalManifestCache, p, globalConjoiner, memTableSize)
}
@@ -122,11 +123,7 @@ func (nbs *NomsBlockStore) Put(c chunks.Chunk) {
d.PanicIfFalse(nbs.addChunk(a, c.Data()))
nbs.putCount++
dur := time.Since(t1) // This actually was 0 sometimes: see attic BUG 1787
if dur == 0 {
dur = time.Duration(1)
}
nbs.stats.PutLatency.SampleTime(dur)
nbs.stats.PutLatency.SampleTime(roundedSince(t1))
}
// TODO: figure out if there's a non-error reason for this to return false. If not, get rid of return value.
@@ -147,7 +144,7 @@ func (nbs *NomsBlockStore) addChunk(h addr, data []byte) bool {
func (nbs *NomsBlockStore) Get(h hash.Hash) chunks.Chunk {
t1 := time.Now()
defer func() {
nbs.stats.GetLatency.SampleTime(time.Since(t1))
nbs.stats.GetLatency.SampleTime(roundedSince(t1))
nbs.stats.ChunksPerGet.Sample(1)
}()
@@ -176,7 +173,7 @@ func (nbs *NomsBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks
defer func() {
if len(hashes) > 0 {
nbs.stats.GetLatency.SampleTime(time.Since(t1))
nbs.stats.GetLatency.SampleTime(roundedSince(t1))
nbs.stats.ChunksPerGet.Sample(uint64(len(reqs)))
}
}()
@@ -266,7 +263,7 @@ func (nbs *NomsBlockStore) Count() uint32 {
func (nbs *NomsBlockStore) Has(h hash.Hash) bool {
t1 := time.Now()
defer func() {
nbs.stats.HasLatency.SampleTime(time.Since(t1))
nbs.stats.HasLatency.SampleTime(roundedSince(t1))
nbs.stats.AddressesPerHas.Sample(1)
}()
@@ -304,7 +301,7 @@ func (nbs *NomsBlockStore) HasMany(hashes hash.HashSet) hash.HashSet {
}
if len(hashes) > 0 {
nbs.stats.HasLatency.SampleTime(time.Since(t1))
nbs.stats.HasLatency.SampleTime(roundedSince(t1))
nbs.stats.AddressesPerHas.SampleLen(len(reqs))
}

View File

@@ -155,6 +155,11 @@ func ParseAddr(b []byte) (h addr) {
return
}
func ValidateAddr(s string) bool {
_, err := encoding.DecodeString(s)
return err == nil
}
type addrSlice []addr
func (hs addrSlice) Len() int { return len(hs) }

View File

@@ -17,7 +17,8 @@ import (
// tablePersister allows interaction with persistent storage. It provides
// primitives for pushing the contents of a memTable to persistent storage,
// opening persistent tables for reading, and conjoining a number of existing
// chunkSources into one.
// chunkSources into one. A tablePersister implementation must be goroutine-
// safe.
type tablePersister interface {
// Persist makes the contents of mt durable. Chunks already present in
// |haver| may be dropped in the process.
@@ -27,9 +28,7 @@ type tablePersister interface {
// chunkSource.
ConjoinAll(sources chunkSources, stats *Stats) chunkSource
// Open a table named |name|, containing |chunkCount| chunks. The
// tablePersister is responsible for managing the lifetime of the returned
// chunkSource. TODO: Is that actually true? Or can we get rid of explicit 'close'
// Open a table named |name|, containing |chunkCount| chunks.
Open(name addr, chunkCount uint32) chunkSource
}

View File

@@ -5,7 +5,6 @@
package nbs
import (
"bytes"
"testing"
"github.com/attic-labs/testify/assert"
@@ -27,7 +26,7 @@ func TestPlanCompaction(t *testing.T) {
totalUnc += uint64(len(chnk))
}
data, name := buildTable(content)
src := chunkSourceAdapter{newTableReader(parseTableIndex(data), bytes.NewReader(data), fileBlockSize), name}
src := chunkSourceAdapter{newTableReader(parseTableIndex(data), tableReaderAtFromBytes(data), fileBlockSize), name}
dataLens = append(dataLens, uint64(len(data))-indexSize(src.count())-footerSize)
sources = append(sources, src)
}
@@ -45,7 +44,7 @@ func TestPlanCompaction(t *testing.T) {
assert.Equal(totalChunks, idx.chunkCount)
assert.Equal(totalUnc, idx.totalUncompressedData)
tr := newTableReader(idx, bytes.NewReader(nil), fileBlockSize)
tr := newTableReader(idx, tableReaderAtFromBytes(nil), fileBlockSize)
for _, content := range tableContents {
assertChunksInReader(content, tr, assert)
}

View File

@@ -26,10 +26,14 @@ type tableIndex struct {
suffixes []byte
}
type tableReaderAt interface {
ReadAtWithStats(p []byte, off int64, stats *Stats) (n int, err error)
}
// tableReader implements get & has queries against a single nbs table. goroutine safe.
type tableReader struct {
tableIndex
r io.ReaderAt
r tableReaderAt
blockSize uint64
}
@@ -139,7 +143,7 @@ func (ti tableIndex) lookupOrdinal(h addr) uint32 {
}
// newTableReader parses a valid nbs table byte stream and returns a reader. buff must end with an NBS index and footer, though it may contain an unspecified number of bytes before that data. r should allow retrieving any desired range of bytes from the table.
func newTableReader(index tableIndex, r io.ReaderAt, blockSize uint64) tableReader {
func newTableReader(index tableIndex, r tableReaderAt, blockSize uint64) tableReader {
return tableReader{index, r, blockSize}
}
@@ -215,12 +219,7 @@ func (tr tableReader) get(h addr, stats *Stats) (data []byte) {
length := uint64(tr.lengths[ordinal])
buff := make([]byte, length) // TODO: Avoid this allocation for every get
t1 := time.Now()
n, err := tr.r.ReadAt(buff, int64(offset))
stats.BytesPerRead.Sample(length)
stats.ChunksPerRead.SampleLen(1)
stats.ReadLatency.SampleTime(roundedSince(t1))
n, err := tr.r.ReadAtWithStats(buff, int64(offset), stats)
d.Chk.NoError(err)
d.Chk.True(n == int(length))
data = tr.parseChunk(buff)
@@ -260,11 +259,7 @@ func (tr tableReader) readAtOffsets(
readLength := readEnd - readStart
buff := make([]byte, readLength)
t1 := time.Now()
n, err := tr.r.ReadAt(buff, int64(readStart))
stats.BytesPerRead.Sample(readLength)
stats.ChunksPerRead.SampleLen(len(offsets))
stats.ReadLatency.SampleTime(roundedSince(t1))
n, err := tr.r.ReadAtWithStats(buff, int64(readStart), stats)
d.Chk.NoError(err)
d.Chk.True(uint64(n) == readLength)
@@ -471,7 +466,7 @@ func (tr tableReader) extract(chunks chan<- extractRecord) {
}
chunkLen := tr.offsets[tr.chunkCount-1] + uint64(tr.lengths[tr.chunkCount-1])
buff := make([]byte, chunkLen)
n, err := tr.r.ReadAt(buff, int64(tr.offsets[0]))
n, err := tr.r.ReadAtWithStats(buff, int64(tr.offsets[0]), &Stats{})
d.Chk.NoError(err)
d.Chk.True(uint64(n) == chunkLen)
@@ -490,12 +485,12 @@ func (tr tableReader) reader() io.Reader {
}
type readerAdapter struct {
rat io.ReaderAt
rat tableReaderAt
off int64
}
func (ra *readerAdapter) Read(p []byte) (n int, err error) {
n, err = ra.rat.ReadAt(p, ra.off)
n, err = ra.rat.ReadAtWithStats(p, ra.off, &Stats{})
ra.off += int64(n)
return
}

View File

@@ -5,8 +5,6 @@
package nbs
import (
"io/ioutil"
"os"
"testing"
"github.com/attic-labs/testify/assert"
@@ -108,19 +106,9 @@ func TestTableSetExtract(t *testing.T) {
}
}
func makeTempDir(assert *assert.Assertions) string {
dir, err := ioutil.TempDir("", "")
assert.NoError(err)
return dir
}
func TestTableSetRebase(t *testing.T) {
assert := assert.New(t)
dir := makeTempDir(assert)
defer os.RemoveAll(dir)
fc := newFDCache(defaultMaxTables)
defer fc.Drop()
persister := newFSTablePersister(dir, fc, nil)
persister := newFakeTablePersister()
insert := func(ts tableSet, chunks ...[]byte) tableSet {
for _, c := range chunks {

View File

@@ -10,8 +10,6 @@ import (
"sort"
"testing"
"bytes"
"sync"
"github.com/attic-labs/noms/go/chunks"
@@ -49,7 +47,7 @@ func TestSimple(t *testing.T) {
}
tableData, _ := buildTable(chunks)
tr := newTableReader(parseTableIndex(tableData), bytes.NewReader(tableData), fileBlockSize)
tr := newTableReader(parseTableIndex(tableData), tableReaderAtFromBytes(tableData), fileBlockSize)
assertChunksInReader(chunks, tr, assert)
@@ -92,7 +90,7 @@ func TestHasMany(t *testing.T) {
}
tableData, _ := buildTable(chunks)
tr := newTableReader(parseTableIndex(tableData), bytes.NewReader(tableData), fileBlockSize)
tr := newTableReader(parseTableIndex(tableData), tableReaderAtFromBytes(tableData), fileBlockSize)
addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
hasAddrs := []hasRecord{
@@ -138,7 +136,7 @@ func TestHasManySequentialPrefix(t *testing.T) {
length, _ := tw.finish()
buff = buff[:length]
tr := newTableReader(parseTableIndex(buff), bytes.NewReader(buff), fileBlockSize)
tr := newTableReader(parseTableIndex(buff), tableReaderAtFromBytes(buff), fileBlockSize)
hasAddrs := make([]hasRecord, 2)
// Leave out the first address
@@ -162,7 +160,7 @@ func TestGetMany(t *testing.T) {
}
tableData, _ := buildTable(data)
tr := newTableReader(parseTableIndex(tableData), bytes.NewReader(tableData), fileBlockSize)
tr := newTableReader(parseTableIndex(tableData), tableReaderAtFromBytes(tableData), fileBlockSize)
addrs := addrSlice{computeAddr(data[0]), computeAddr(data[1]), computeAddr(data[2])}
getBatch := []getRecord{
@@ -197,7 +195,7 @@ func TestCalcReads(t *testing.T) {
}
tableData, _ := buildTable(chunks)
tr := newTableReader(parseTableIndex(tableData), bytes.NewReader(tableData), 0)
tr := newTableReader(parseTableIndex(tableData), tableReaderAtFromBytes(tableData), 0)
addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
getBatch := []getRecord{
{&addrs[0], binary.BigEndian.Uint64(addrs[0][:addrPrefixSize]), false},
@@ -228,7 +226,7 @@ func TestExtract(t *testing.T) {
}
tableData, _ := buildTable(chunks)
tr := newTableReader(parseTableIndex(tableData), bytes.NewReader(tableData), fileBlockSize)
tr := newTableReader(parseTableIndex(tableData), tableReaderAtFromBytes(tableData), fileBlockSize)
addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
@@ -258,7 +256,7 @@ func Test65k(t *testing.T) {
}
tableData, _ := buildTable(chunks)
tr := newTableReader(parseTableIndex(tableData), bytes.NewReader(tableData), fileBlockSize)
tr := newTableReader(parseTableIndex(tableData), tableReaderAtFromBytes(tableData), fileBlockSize)
for i := 0; i < count; i++ {
data := dataFn(i)
@@ -303,7 +301,7 @@ func doTestNGetMany(t *testing.T, count int) {
}
tableData, _ := buildTable(data)
tr := newTableReader(parseTableIndex(tableData), bytes.NewReader(tableData), fileBlockSize)
tr := newTableReader(parseTableIndex(tableData), tableReaderAtFromBytes(tableData), fileBlockSize)
getBatch := make([]getRecord, len(data))
for i := 0; i < count; i++ {

View File

@@ -27,10 +27,25 @@ type SizeCache struct {
mu sync.Mutex
lru list.List
cache map[interface{}]sizeCacheEntry
expireCb func(elm interface{})
}
type ExpireCallback func(key interface{})
// New creates a SizeCache that will hold up to |maxSize| item data.
func New(maxSize uint64) *SizeCache {
return &SizeCache{maxSize: maxSize, cache: map[interface{}]sizeCacheEntry{}}
return NewWithExpireCallback(maxSize, nil)
}
// NewWithExpireCallback creates a SizeCache that will hold up to |maxSize|
// item data, and will call cb(key) when the item corresponding with that key
// expires.
func NewWithExpireCallback(maxSize uint64, cb ExpireCallback) *SizeCache {
return &SizeCache{
maxSize: maxSize,
cache: map[interface{}]sizeCacheEntry{},
expireCb: cb,
}
}
// entry() checks if the value is in the cache. If not in the cache, it returns an
@@ -88,6 +103,9 @@ func (c *SizeCache) Add(key interface{}, size uint64, value interface{}) {
delete(c.cache, key1)
c.totalSize -= ce.size
c.lru.Remove(el)
if c.expireCb != nil {
c.expireCb(key1)
}
el = next
}
}

View File

@@ -6,6 +6,7 @@ package sizecache
import (
"fmt"
"sort"
"sync"
"testing"
@@ -76,6 +77,23 @@ func TestSizeCache(t *testing.T) {
assert.Equal(4, len(c.cache))
}
func TestSizeCacheWithExpiry(t *testing.T) {
expired := []string{}
expire := func(key interface{}) {
expired = append(expired, key.(string))
}
c := NewWithExpireCallback(5, expire)
data := []string{"a", "b", "c", "d", "e"}
for i, k := range data {
c.Add(k, 1, i)
}
c.Add("big", 5, "thing")
sort.Sort(sort.StringSlice(expired))
assert.Equal(t, data, expired)
}
func concurrencySizeCacheTest(data []string) {
dchan := make(chan string, 128)
go func() {