mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-13 19:29:58 -05:00
Merge pull request #6884 from dolthub/aaron/remove-dynamodb-table-persister
[no-release-notes] go/store/nbs: Remove unused functionality which previously allowed storing small table files, containing very few chunks, in DynamoDB instead of S3.
This commit is contained in:
@@ -28,18 +28,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func tableExistsInChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
|
||||
if al.tableMayBeInDynamo(chunkCount) {
|
||||
data, err := ddb.ReadTable(ctx, name, nil)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if data == nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func tableExistsInChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (bool, error) {
|
||||
magic := make([]byte, magicNumberSize)
|
||||
n, _, err := s3.ReadFromEnd(ctx, name, magic, stats)
|
||||
if err != nil {
|
||||
@@ -51,28 +40,9 @@ func tableExistsInChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3Obj
|
||||
return bytes.Equal(magic, []byte(magicNumber)), nil
|
||||
}
|
||||
|
||||
func newAWSChunkSource(ctx context.Context, ddb *ddbTableStore, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
|
||||
func newAWSChunkSource(ctx context.Context, s3 *s3ObjectReader, al awsLimits, name addr, chunkCount uint32, q MemoryQuotaProvider, stats *Stats) (cs chunkSource, err error) {
|
||||
var tra tableReaderAt
|
||||
index, err := loadTableIndex(ctx, stats, chunkCount, q, func(p []byte) error {
|
||||
if al.tableMayBeInDynamo(chunkCount) {
|
||||
data, err := ddb.ReadTable(ctx, name, stats)
|
||||
if data == nil && err == nil { // There MUST be either data or an error
|
||||
return errors.New("no data available")
|
||||
}
|
||||
if data != nil {
|
||||
if len(p) > len(data) {
|
||||
return errors.New("not enough data for chunk count")
|
||||
}
|
||||
indexBytes := data[len(data)-len(p):]
|
||||
copy(p, indexBytes)
|
||||
tra = &dynamoTableReaderAt{ddb: ddb, h: name}
|
||||
return nil
|
||||
}
|
||||
if _, ok := err.(tableNotInDynamoErr); !ok {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
n, _, err := s3.ReadFromEnd(ctx, name, p, stats)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -39,17 +39,14 @@ func TestAWSChunkSource(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
s3 := makeFakeS3(t)
|
||||
ddb := makeFakeDDB(t)
|
||||
|
||||
s3or := &s3ObjectReader{s3, "bucket", nil, ""}
|
||||
dts := &ddbTableStore{ddb, "table", nil, nil}
|
||||
|
||||
makeSrc := func(chunkMax int) chunkSource {
|
||||
cs, err := newAWSChunkSource(
|
||||
context.Background(),
|
||||
dts,
|
||||
s3or,
|
||||
awsLimits{itemMax: maxDynamoItemSize, chunkMax: uint32(chunkMax)},
|
||||
awsLimits{},
|
||||
h,
|
||||
uint32(len(chunks)),
|
||||
NewUnlimitedMemQuotaProvider(),
|
||||
@@ -61,16 +58,6 @@ func TestAWSChunkSource(t *testing.T) {
|
||||
return cs
|
||||
}
|
||||
|
||||
t.Run("Dynamo", func(t *testing.T) {
|
||||
ddb.putData(fmtTableName(h), tableData)
|
||||
|
||||
t.Run("Has Chunks", func(t *testing.T) {
|
||||
src := makeSrc(len(chunks) + 1)
|
||||
assertChunksInReader(chunks, src, assert.New(t))
|
||||
src.close()
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("S3", func(t *testing.T) {
|
||||
s3.data[h.String()] = tableData
|
||||
|
||||
|
||||
@@ -47,12 +47,6 @@ const (
|
||||
maxS3PartSize = 64 * 1 << 20 // 64MiB
|
||||
maxS3Parts = 10000
|
||||
|
||||
// Disable persisting tables in DynamoDB. This is currently unused by
|
||||
// Dolthub and keeping it requires provisioning DynamoDB throughout for
|
||||
// the noop reads.
|
||||
maxDynamoChunks = 0
|
||||
maxDynamoItemSize = 0
|
||||
|
||||
defaultS3PartSize = minS3PartSize // smallest allowed by S3 allows for most throughput
|
||||
)
|
||||
|
||||
@@ -60,7 +54,6 @@ type awsTablePersister struct {
|
||||
s3 s3iface.S3API
|
||||
bucket string
|
||||
rl chan struct{}
|
||||
ddb *ddbTableStore
|
||||
limits awsLimits
|
||||
ns string
|
||||
q MemoryQuotaProvider
|
||||
@@ -71,25 +64,11 @@ var _ tableFilePersister = awsTablePersister{}
|
||||
|
||||
type awsLimits struct {
|
||||
partTarget, partMin, partMax uint64
|
||||
itemMax int
|
||||
chunkMax uint32
|
||||
}
|
||||
|
||||
func (al awsLimits) tableFitsInDynamo(name addr, dataLen int, chunkCount uint32) bool {
|
||||
calcItemSize := func(n addr, dataLen int) int {
|
||||
return len(dbAttr) + len(tablePrefix) + len(n.String()) + len(dataAttr) + dataLen
|
||||
}
|
||||
return chunkCount <= al.chunkMax && calcItemSize(name, dataLen) < al.itemMax
|
||||
}
|
||||
|
||||
func (al awsLimits) tableMayBeInDynamo(chunkCount uint32) bool {
|
||||
return chunkCount <= al.chunkMax
|
||||
}
|
||||
|
||||
func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (chunkSource, error) {
|
||||
return newAWSChunkSource(
|
||||
ctx,
|
||||
s3p.ddb,
|
||||
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
|
||||
s3p.limits,
|
||||
name,
|
||||
@@ -102,7 +81,6 @@ func (s3p awsTablePersister) Open(ctx context.Context, name addr, chunkCount uin
|
||||
func (s3p awsTablePersister) Exists(ctx context.Context, name addr, chunkCount uint32, stats *Stats) (bool, error) {
|
||||
return tableExistsInChunkSource(
|
||||
ctx,
|
||||
s3p.ddb,
|
||||
&s3ObjectReader{s3: s3p.s3, bucket: s3p.bucket, readRl: s3p.rl, ns: s3p.ns},
|
||||
s3p.limits,
|
||||
name,
|
||||
@@ -122,19 +100,6 @@ func (s3p awsTablePersister) CopyTableFile(ctx context.Context, r io.ReadCloser,
|
||||
}
|
||||
}()
|
||||
|
||||
name, err := parseAddr(fileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if s3p.limits.tableFitsInDynamo(name, int(fileSz), chunkCount) {
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s3p.ddb.Write(ctx, name, data)
|
||||
}
|
||||
|
||||
return s3p.multipartUpload(ctx, r, fileSz, fileId)
|
||||
}
|
||||
|
||||
@@ -165,16 +130,6 @@ func (s3p awsTablePersister) Persist(ctx context.Context, mt *memTable, haver ch
|
||||
return emptyChunkSource{}, nil
|
||||
}
|
||||
|
||||
if s3p.limits.tableFitsInDynamo(name, len(data), chunkCount) {
|
||||
err := s3p.ddb.Write(ctx, name, data)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newReaderFromIndexData(ctx, s3p.q, data, name, &dynamoTableReaderAt{ddb: s3p.ddb, h: name}, s3BlockSize)
|
||||
}
|
||||
|
||||
err = s3p.multipartUpload(ctx, bytes.NewReader(data), uint64(len(data)), name.String())
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -35,8 +35,6 @@ import (
|
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/util/sizecache"
|
||||
)
|
||||
|
||||
func randomChunks(t *testing.T, r *rand.Rand, sz int) [][]byte {
|
||||
@@ -71,9 +69,6 @@ func TestRandomChunks(t *testing.T) {
|
||||
|
||||
func TestAWSTablePersisterPersist(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
calcPartSize := func(rdr chunkReader, maxPartNum uint64) uint64 {
|
||||
return maxTableSize(uint64(mustUint32(rdr.count())), mustUint64(rdr.uncompressedLen())) / maxPartNum
|
||||
}
|
||||
|
||||
r := rand.New(rand.NewSource(1024))
|
||||
const sz15mb = 1 << 20 * 15
|
||||
@@ -90,8 +85,8 @@ func TestAWSTablePersisterPersist(t *testing.T) {
|
||||
testIt := func(t *testing.T, ns string) {
|
||||
t.Run("InMultipleParts", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
|
||||
require.NoError(t, err)
|
||||
@@ -108,8 +103,8 @@ func TestAWSTablePersisterPersist(t *testing.T) {
|
||||
t.Run("InSinglePart", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits64mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits64mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
|
||||
require.NoError(t, err)
|
||||
@@ -133,8 +128,8 @@ func TestAWSTablePersisterPersist(t *testing.T) {
|
||||
assert.Equal(existingTable.addChunk(computeAddr(c), c), chunkAdded)
|
||||
}
|
||||
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
src, err := s3p.Persist(context.Background(), mt, existingTable, &Stats{})
|
||||
require.NoError(t, err)
|
||||
@@ -149,8 +144,7 @@ func TestAWSTablePersisterPersist(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
s3svc := &failingFakeS3{makeFakeS3(t), sync.Mutex{}, 1}
|
||||
ddb := makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: ddb, limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", limits: limits5mb, ns: ns, q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
_, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
|
||||
assert.Error(err)
|
||||
@@ -163,96 +157,6 @@ func TestAWSTablePersisterPersist(t *testing.T) {
|
||||
testIt(t, "a-namespace-here")
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("PersistToDynamo", func(t *testing.T) {
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
t.SkipNow()
|
||||
assert := assert.New(t)
|
||||
|
||||
ddb := makeFakeDDB(t)
|
||||
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
|
||||
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())}
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
|
||||
require.NoError(t, err)
|
||||
if assert.True(mustUint32(src.count()) > 0) {
|
||||
if r, err := ddb.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) {
|
||||
assertChunksInReader(testChunks, r, assert)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("CacheOnOpen", func(t *testing.T) {
|
||||
t.SkipNow()
|
||||
assert := assert.New(t)
|
||||
|
||||
tc := sizecache.New(maxDynamoItemSize)
|
||||
ddb := makeFakeDDB(t)
|
||||
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, tc)
|
||||
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 2 * mustUint32(mt.count())}
|
||||
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
tableData, name, err := buildTable(testChunks)
|
||||
require.NoError(t, err)
|
||||
ddb.putData(fmtTableName(name), tableData)
|
||||
|
||||
src, err := s3p.Open(context.Background(), name, uint32(len(testChunks)), &Stats{})
|
||||
require.NoError(t, err)
|
||||
if assert.True(mustUint32(src.count()) > 0) {
|
||||
if r, err := ddb.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) {
|
||||
assertChunksInReader(testChunks, r, assert)
|
||||
}
|
||||
if data, present := tc.Get(name); assert.True(present) {
|
||||
assert.Equal(tableData, data.([]byte))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("FailTooManyChunks", func(t *testing.T) {
|
||||
t.SkipNow()
|
||||
assert := assert.New(t)
|
||||
|
||||
ddb := makeFakeDDB(t)
|
||||
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
|
||||
limits := awsLimits{itemMax: maxDynamoItemSize, chunkMax: 1, partTarget: calcPartSize(mt, 1)}
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
|
||||
require.NoError(t, err)
|
||||
if assert.True(mustUint32(src.count()) > 0) {
|
||||
if r, err := ddb.readerForTable(ctx, src.hash()); assert.Nil(r) && assert.NoError(err) {
|
||||
if r, err := s3svc.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) {
|
||||
assertChunksInReader(testChunks, r, assert)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("FailItemTooBig", func(t *testing.T) {
|
||||
t.SkipNow()
|
||||
assert := assert.New(t)
|
||||
|
||||
ddb := makeFakeDDB(t)
|
||||
s3svc, dts := makeFakeS3(t), makeFakeDTS(ddb, nil)
|
||||
limits := awsLimits{itemMax: 0, chunkMax: 2 * mustUint32(mt.count()), partTarget: calcPartSize(mt, 1)}
|
||||
s3p := awsTablePersister{s3: s3svc, bucket: "bucket", ddb: dts, limits: limits, ns: "", q: &UnlimitedQuotaProvider{}}
|
||||
|
||||
src, err := s3p.Persist(context.Background(), mt, nil, &Stats{})
|
||||
require.NoError(t, err)
|
||||
if assert.True(mustUint32(src.count()) > 0) {
|
||||
if r, err := ddb.readerForTable(ctx, src.hash()); assert.Nil(r) && assert.NoError(err) {
|
||||
if r, err := s3svc.readerForTable(ctx, src.hash()); assert.NotNil(r) && assert.NoError(err) {
|
||||
assertChunksInReader(testChunks, r, assert)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
func makeFakeDTS(ddb ddbsvc, tc *sizecache.SizeCache) *ddbTableStore {
|
||||
return &ddbTableStore{ddb, "table", nil, tc}
|
||||
}
|
||||
|
||||
type waitOnStoreTableCache struct {
|
||||
@@ -367,18 +271,16 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
|
||||
const sz5mb = 1 << 20 * 5
|
||||
targetPartSize := uint64(sz5mb)
|
||||
minPartSize, maxPartSize := targetPartSize, 5*targetPartSize
|
||||
maxItemSize, maxChunkCount := int(targetPartSize/2), uint32(4)
|
||||
|
||||
rl := make(chan struct{}, 8)
|
||||
defer close(rl)
|
||||
|
||||
newPersister := func(s3svc s3iface.S3API, ddb *ddbTableStore) awsTablePersister {
|
||||
newPersister := func(s3svc s3iface.S3API) awsTablePersister {
|
||||
return awsTablePersister{
|
||||
s3svc,
|
||||
"bucket",
|
||||
rl,
|
||||
ddb,
|
||||
awsLimits{targetPartSize, minPartSize, maxPartSize, maxItemSize, maxChunkCount},
|
||||
awsLimits{targetPartSize, minPartSize, maxPartSize},
|
||||
"",
|
||||
&UnlimitedQuotaProvider{},
|
||||
}
|
||||
@@ -411,8 +313,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
|
||||
|
||||
t.Run("TotalUnderMinSize", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := newPersister(s3svc, ddb)
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := newPersister(s3svc)
|
||||
|
||||
chunks := smallChunks[:len(smallChunks)-1]
|
||||
sources := makeSources(s3p, chunks)
|
||||
@@ -433,8 +335,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
|
||||
|
||||
t.Run("TotalOverMinSize", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := newPersister(s3svc, ddb)
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := newPersister(s3svc)
|
||||
|
||||
sources := makeSources(s3p, smallChunks)
|
||||
src, _, err := s3p.ConjoinAll(context.Background(), sources, &Stats{})
|
||||
@@ -463,8 +365,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
|
||||
|
||||
t.Run("AllOverMax", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := newPersister(s3svc, ddb)
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := newPersister(s3svc)
|
||||
|
||||
// Make 2 chunk sources that each have >maxPartSize chunk data
|
||||
sources := make(chunkSources, 2)
|
||||
@@ -496,8 +398,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
|
||||
|
||||
t.Run("SomeOverMax", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := newPersister(s3svc, ddb)
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := newPersister(s3svc)
|
||||
|
||||
// Add one chunk source that has >maxPartSize data
|
||||
mtb := newMemTable(uint64(2 * maxPartSize))
|
||||
@@ -537,8 +439,8 @@ func TestAWSTablePersisterConjoinAll(t *testing.T) {
|
||||
|
||||
t.Run("Mix", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s3svc, ddb := makeFakeS3(t), makeFakeDTS(makeFakeDDB(t), nil)
|
||||
s3p := newPersister(s3svc, ddb)
|
||||
s3svc := makeFakeS3(t)
|
||||
s3p := newPersister(s3svc)
|
||||
|
||||
// Start with small tables. Since total > minPartSize, will require more than one part to upload.
|
||||
sources := make(chunkSources, len(smallChunks))
|
||||
|
||||
@@ -23,7 +23,6 @@ package nbs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
@@ -53,26 +52,6 @@ func makeFakeDDB(t *testing.T) *fakeDDB {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *fakeDDB) readerForTable(ctx context.Context, name addr) (chunkReader, error) {
|
||||
if i, present := m.data[fmtTableName(name)]; present {
|
||||
buff, ok := i.([]byte)
|
||||
assert.True(m.t, ok)
|
||||
ti, err := parseTableIndex(ctx, buff, &UnlimitedQuotaProvider{})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tr, err := newTableReader(ti, tableReaderAtFromBytes(buff), fileBlockSize)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tr, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *fakeDDB) GetItemWithContext(ctx aws.Context, input *dynamodb.GetItemInput, opts ...request.Option) (*dynamodb.GetItemOutput, error) {
|
||||
key := input.Key[dbAttr].S
|
||||
assert.NotNil(m.t, key, "key should have been a String: %+v", input.Key[dbAttr])
|
||||
@@ -92,8 +71,6 @@ func (m *fakeDDB) GetItemWithContext(ctx aws.Context, input *dynamodb.GetItemInp
|
||||
if e.appendix != "" {
|
||||
item[appendixAttr] = &dynamodb.AttributeValue{S: aws.String(e.appendix)}
|
||||
}
|
||||
case []byte:
|
||||
item[dataAttr] = &dynamodb.AttributeValue{B: e}
|
||||
}
|
||||
}
|
||||
atomic.AddInt64(&m.numGets, 1)
|
||||
@@ -113,12 +90,6 @@ func (m *fakeDDB) PutItemWithContext(ctx aws.Context, input *dynamodb.PutItemInp
|
||||
assert.NotNil(m.t, input.Item[dbAttr].S, "key should have been a String: %+v", input.Item[dbAttr])
|
||||
key := *input.Item[dbAttr].S
|
||||
|
||||
if input.Item[dataAttr] != nil {
|
||||
assert.NotNil(m.t, input.Item[dataAttr].B, "data should have been a blob: %+v", input.Item[dataAttr])
|
||||
m.putData(key, input.Item[dataAttr].B)
|
||||
return &dynamodb.PutItemOutput{}, nil
|
||||
}
|
||||
|
||||
assert.NotNil(m.t, input.Item[nbsVersAttr], "%s should have been present", nbsVersAttr)
|
||||
assert.NotNil(m.t, input.Item[nbsVersAttr].S, "nbsVers should have been a String: %+v", input.Item[nbsVersAttr])
|
||||
assert.Equal(m.t, AWSStorageVersion, *input.Item[nbsVersAttr].S)
|
||||
|
||||
@@ -1,172 +0,0 @@
|
||||
// Copyright 2019 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// This file incorporates work covered by the following copyright and
|
||||
// permission notice:
|
||||
//
|
||||
// Copyright 2017 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/util/sizecache"
|
||||
"github.com/dolthub/dolt/go/store/util/verbose"
|
||||
)
|
||||
|
||||
const (
|
||||
dataAttr = "data"
|
||||
tablePrefix = "*" // I want to use NBS table names as keys when they are written to DynamoDB, but a bare table name is a legal Noms Database name as well. To avoid collisions, dynamoTableReader prepends this prefix (which is not a legal character in a Noms Database name).
|
||||
)
|
||||
|
||||
// dynamoTableReaderAt assumes the existence of a DynamoDB table whose primary partition key is in String format and named `db`.
|
||||
type dynamoTableReaderAt struct {
|
||||
ddb *ddbTableStore
|
||||
h addr
|
||||
}
|
||||
|
||||
type tableNotInDynamoErr struct {
|
||||
nbs, dynamo string
|
||||
}
|
||||
|
||||
func (t tableNotInDynamoErr) Error() string {
|
||||
return fmt.Sprintf("NBS table %s not present in DynamoDB table %s", t.nbs, t.dynamo)
|
||||
}
|
||||
|
||||
func (dtra *dynamoTableReaderAt) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dtra *dynamoTableReaderAt) clone() (tableReaderAt, error) {
|
||||
return dtra, nil
|
||||
}
|
||||
|
||||
func (dtra *dynamoTableReaderAt) Reader(ctx context.Context) (io.ReadCloser, error) {
|
||||
data, err := dtra.ddb.ReadTable(ctx, dtra.h, &Stats{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return io.NopCloser(bytes.NewReader(data)), nil
|
||||
}
|
||||
|
||||
func (dtra *dynamoTableReaderAt) ReadAtWithStats(ctx context.Context, p []byte, off int64, stats *Stats) (n int, err error) {
|
||||
data, err := dtra.ddb.ReadTable(ctx, dtra.h, stats)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
n = copy(p, data[off:])
|
||||
if n < len(p) {
|
||||
err = io.ErrUnexpectedEOF
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type ddbTableStore struct {
|
||||
ddb ddbsvc
|
||||
table string
|
||||
readRl chan struct{}
|
||||
cache *sizecache.SizeCache // TODO: merge this with tableCache as part of BUG 3601
|
||||
}
|
||||
|
||||
func (dts *ddbTableStore) ReadTable(ctx context.Context, name addr, stats *Stats) (data []byte, err error) {
|
||||
t1 := time.Now()
|
||||
if dts.cache != nil {
|
||||
if i, present := dts.cache.Get(name); present {
|
||||
data = i.([]byte)
|
||||
defer func() {
|
||||
stats.MemBytesPerRead.Sample(uint64(len(data)))
|
||||
stats.MemReadLatency.SampleTimeSince(t1)
|
||||
}()
|
||||
return data, nil
|
||||
}
|
||||
}
|
||||
|
||||
data, err = dts.readTable(ctx, name)
|
||||
if data != nil {
|
||||
defer func() {
|
||||
stats.DynamoBytesPerRead.Sample(uint64(len(data)))
|
||||
stats.DynamoReadLatency.SampleTimeSince(t1)
|
||||
}()
|
||||
}
|
||||
|
||||
if dts.cache != nil && err == nil {
|
||||
dts.cache.Add(name, uint64(len(data)), data)
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
func (dts *ddbTableStore) readTable(ctx context.Context, name addr) (data []byte, err error) {
|
||||
try := func(input *dynamodb.GetItemInput) (data []byte, err error) {
|
||||
if dts.readRl != nil {
|
||||
dts.readRl <- struct{}{}
|
||||
defer func() {
|
||||
<-dts.readRl
|
||||
}()
|
||||
}
|
||||
result, rerr := dts.ddb.GetItemWithContext(ctx, input)
|
||||
if rerr != nil {
|
||||
return nil, rerr
|
||||
} else if len(result.Item) == 0 {
|
||||
return nil, tableNotInDynamoErr{name.String(), dts.table}
|
||||
} else if result.Item[dataAttr] == nil || result.Item[dataAttr].B == nil {
|
||||
return nil, fmt.Errorf("NBS table %s in DynamoDB table %s is malformed", name, dts.table)
|
||||
}
|
||||
return result.Item[dataAttr].B, nil
|
||||
}
|
||||
|
||||
input := dynamodb.GetItemInput{
|
||||
TableName: aws.String(dts.table),
|
||||
Key: map[string]*dynamodb.AttributeValue{
|
||||
dbAttr: {S: aws.String(fmtTableName(name))},
|
||||
},
|
||||
}
|
||||
data, err = try(&input)
|
||||
if _, isNotFound := err.(tableNotInDynamoErr); isNotFound {
|
||||
verbose.Logger(ctx).Sugar().Debugf("Eventually consistent read for %s failed; trying fully-consistent", name)
|
||||
input.ConsistentRead = aws.Bool(true)
|
||||
return try(&input)
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
func fmtTableName(name addr) string {
|
||||
return tablePrefix + name.String()
|
||||
}
|
||||
|
||||
func (dts *ddbTableStore) Write(ctx context.Context, name addr, data []byte) error {
|
||||
_, err := dts.ddb.PutItemWithContext(ctx, &dynamodb.PutItemInput{
|
||||
TableName: aws.String(dts.table),
|
||||
Item: map[string]*dynamodb.AttributeValue{
|
||||
dbAttr: {S: aws.String(fmtTableName(name))},
|
||||
dataAttr: {B: data},
|
||||
},
|
||||
})
|
||||
|
||||
if dts.cache != nil && err == nil {
|
||||
dts.cache.Add(name, uint64(len(data)), data)
|
||||
}
|
||||
return err
|
||||
}
|
||||
@@ -1,145 +0,0 @@
|
||||
// Copyright 2019 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// This file incorporates work covered by the following copyright and
|
||||
// permission notice:
|
||||
//
|
||||
// Copyright 2017 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/util/sizecache"
|
||||
)
|
||||
|
||||
func TestDynamoTableReaderAt(t *testing.T) {
|
||||
ddb := makeFakeDDB(t)
|
||||
|
||||
chunks := [][]byte{
|
||||
[]byte("hello2"),
|
||||
[]byte("goodbye2"),
|
||||
[]byte("badbye2"),
|
||||
}
|
||||
|
||||
tableData, h, err := buildTable(chunks)
|
||||
require.NoError(t, err)
|
||||
ddb.putData(fmtTableName(h), tableData)
|
||||
|
||||
t.Run("ddbTableStore", func(t *testing.T) {
|
||||
t.Run("ReadTable", func(t *testing.T) {
|
||||
test := func(dts *ddbTableStore) {
|
||||
assert := assert.New(t)
|
||||
data, err := dts.ReadTable(context.Background(), h, &Stats{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(tableData, data)
|
||||
|
||||
data, err = dts.ReadTable(context.Background(), computeAddr([]byte{}), &Stats{})
|
||||
assert.Error(err)
|
||||
assert.IsType(tableNotInDynamoErr{}, err)
|
||||
assert.Nil(data)
|
||||
}
|
||||
|
||||
t.Run("EventuallyConsistentSuccess", func(t *testing.T) {
|
||||
test(&ddbTableStore{ddb, "table", nil, nil})
|
||||
})
|
||||
|
||||
t.Run("EventuallyConsistentFailure", func(t *testing.T) {
|
||||
test(&ddbTableStore{&eventuallyConsistentDDB{ddb}, "table", nil, nil})
|
||||
})
|
||||
|
||||
t.Run("WithCache", func(t *testing.T) {
|
||||
tc := sizecache.New(uint64(2 * len(tableData)))
|
||||
dts := &ddbTableStore{ddb, "table", nil, tc}
|
||||
test(dts)
|
||||
|
||||
// Table should have been cached on read
|
||||
baseline := ddb.NumGets()
|
||||
_, err := dts.ReadTable(context.Background(), h, &Stats{})
|
||||
require.NoError(t, err)
|
||||
assert.Zero(t, ddb.NumGets()-baseline)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("WriteTable", func(t *testing.T) {
|
||||
t.Run("WithoutCache", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
dts := &ddbTableStore{makeFakeDDB(t), "table", nil, nil}
|
||||
require.NoError(t, dts.Write(context.Background(), h, tableData))
|
||||
|
||||
data, err := dts.ReadTable(context.Background(), h, &Stats{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(tableData, data)
|
||||
})
|
||||
|
||||
t.Run("WithCache", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
tc := sizecache.New(uint64(2 * len(tableData)))
|
||||
dts := &ddbTableStore{makeFakeDDB(t), "table", nil, tc}
|
||||
require.NoError(t, dts.Write(context.Background(), h, tableData))
|
||||
|
||||
// Table should have been cached on write
|
||||
baseline := ddb.NumGets()
|
||||
data, err := dts.ReadTable(context.Background(), h, &Stats{})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(tableData, data)
|
||||
assert.Zero(ddb.NumGets() - baseline)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("ReadAtWithCache", func(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
stats := &Stats{}
|
||||
|
||||
tc := sizecache.New(uint64(2 * len(tableData)))
|
||||
tra := &dynamoTableReaderAt{&ddbTableStore{ddb, "table", nil, tc}, h}
|
||||
|
||||
// First, read when table is not yet cached
|
||||
scratch := make([]byte, len(tableData)/4)
|
||||
baseline := ddb.NumGets()
|
||||
_, err := tra.ReadAtWithStats(context.Background(), scratch, 0, stats)
|
||||
require.NoError(t, err)
|
||||
assert.True(ddb.NumGets() > baseline)
|
||||
|
||||
// Table should have been cached on read so read again, a different slice this time
|
||||
baseline = ddb.NumGets()
|
||||
_, err = tra.ReadAtWithStats(context.Background(), scratch, int64(len(scratch)), stats)
|
||||
require.NoError(t, err)
|
||||
assert.Zero(ddb.NumGets() - baseline)
|
||||
})
|
||||
}
|
||||
|
||||
type eventuallyConsistentDDB struct {
|
||||
ddbsvc
|
||||
}
|
||||
|
||||
func (ec *eventuallyConsistentDDB) GetItemWithContext(ctx aws.Context, input *dynamodb.GetItemInput, opts ...request.Option) (*dynamodb.GetItemOutput, error) {
|
||||
if input.ConsistentRead != nil && *(input.ConsistentRead) {
|
||||
return ec.ddbsvc.GetItemWithContext(ctx, input)
|
||||
}
|
||||
return &dynamodb.GetItemOutput{}, nil
|
||||
}
|
||||
@@ -229,12 +229,12 @@ func (m *fakeS3) CompleteMultipartUploadWithContext(ctx aws.Context, input *s3.C
|
||||
}
|
||||
|
||||
func (m *fakeS3) GetObjectWithContext(ctx aws.Context, input *s3.GetObjectInput, opts ...request.Option) (*s3.GetObjectOutput, error) {
|
||||
m.getCount++
|
||||
m.assert.NotNil(input.Bucket, "Bucket is a required field")
|
||||
m.assert.NotNil(input.Key, "Key is a required field")
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.getCount++
|
||||
obj, present := m.data[*input.Key]
|
||||
if !present {
|
||||
return nil, mockAWSError("NoSuchKey")
|
||||
|
||||
@@ -491,8 +491,7 @@ func NewAWSStoreWithMMapIndex(ctx context.Context, nbfVerStr string, table, ns,
|
||||
s3,
|
||||
bucket,
|
||||
readRateLimiter,
|
||||
&ddbTableStore{ddb, table, readRateLimiter, nil},
|
||||
awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks},
|
||||
awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize},
|
||||
ns,
|
||||
q,
|
||||
}
|
||||
@@ -507,8 +506,7 @@ func NewAWSStore(ctx context.Context, nbfVerStr string, table, ns, bucket string
|
||||
s3,
|
||||
bucket,
|
||||
readRateLimiter,
|
||||
&ddbTableStore{ddb, table, readRateLimiter, nil},
|
||||
awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize, maxDynamoItemSize, maxDynamoChunks},
|
||||
awsLimits{defaultS3PartSize, minS3PartSize, maxS3PartSize},
|
||||
ns,
|
||||
q,
|
||||
}
|
||||
|
||||
@@ -178,8 +178,6 @@ var CopiedNomsFiles []CopiedNomsFile = []CopiedNomsFile{
|
||||
{Path: "store/nbs/dynamo_fake_test.go", NomsPath: "go/nbs/dynamo_fake_test.go", HadCopyrightNotice: true},
|
||||
{Path: "store/nbs/dynamo_manifest.go", NomsPath: "go/nbs/dynamo_manifest.go", HadCopyrightNotice: true},
|
||||
{Path: "store/nbs/dynamo_manifest_test.go", NomsPath: "go/nbs/dynamo_manifest_test.go", HadCopyrightNotice: true},
|
||||
{Path: "store/nbs/dynamo_table_reader.go", NomsPath: "go/nbs/dynamo_table_reader.go", HadCopyrightNotice: true},
|
||||
{Path: "store/nbs/dynamo_table_reader_test.go", NomsPath: "go/nbs/dynamo_table_reader_test.go", HadCopyrightNotice: true},
|
||||
{Path: "store/nbs/file_manifest.go", NomsPath: "go/nbs/file_manifest.go", HadCopyrightNotice: true},
|
||||
{Path: "store/nbs/file_manifest_test.go", NomsPath: "go/nbs/file_manifest_test.go", HadCopyrightNotice: true},
|
||||
{Path: "store/nbs/file_table_persister.go", NomsPath: "go/nbs/file_table_persister.go", HadCopyrightNotice: true},
|
||||
|
||||
Reference in New Issue
Block a user