Compress big chunks in DynamoStore

It turns out that many large chunks are quite compressible, and
writing smaller chunks to DynamoDB saves time, and allows more
headroom before hitting the provisioned capacity on the backing
table. Compressed chunks are tagged with the algorithm used to
compress them, though we treat untagged chunks as uncompressed for
backward compatibility.
This commit is contained in:
Chris Masone
2016-02-18 17:28:41 -08:00
parent 5f724df918
commit 3d9ebdf61e
5 changed files with 118 additions and 41 deletions
+63 -16
View File
@@ -1,8 +1,11 @@
package chunks
import (
"bytes"
"compress/gzip"
"flag"
"fmt"
"io"
"sync"
"time"
@@ -16,13 +19,17 @@ import (
)
const (
dynamoMaxGetCount = 100
dynamoMaxPutCount = 25
dynamoMaxPutSize = 400 * 1024 // 400K
dynamoMaxGetCount = 100
dynamoMaxPutCount = 25
dynamoMaxPutSize = 400 * 1024 // 400K
dynamoWriteUnitSize = 1024 // 1K
dynamoTableName = "noms"
refAttr = "ref"
chunkAttr = "chunk"
compAttr = "comp"
noneValue = "none"
gzipValue = "gzip"
)
var (
@@ -46,9 +53,12 @@ type DynamoStore struct {
rootKey []byte
ddbsvc ddbsvc
writeTime int64
writeCount int64
writeBatchCount uint64
writeCount uint64
writeTotal uint64
writeCompTotal uint64
readTime int64
readCount int64
readBatchCount int64
readQueue chan readRequest
writeQueue chan Chunk
finishedChan chan struct{}
@@ -186,7 +196,7 @@ func (s *DynamoStore) Put(c Chunk) {
func (s *DynamoStore) sendGetRequests(req readRequest) {
n := time.Now().UnixNano()
defer func() {
s.readCount++
s.readBatchCount++
s.readTime += time.Now().UnixNano() - n
}()
batch := readBatch{}
@@ -245,7 +255,16 @@ func (s *DynamoStore) processResponses(responses []map[string]*dynamodb.Attribut
r := ref.FromSlice(s.removeNamespace(p.B))
p = item[chunkAttr]
d.Chk.NotNil(p)
c := NewChunkWithRef(r, p.B)
b := p.B
if p = item[compAttr]; p != nil && *p.S == gzipValue {
gr, err := gzip.NewReader(bytes.NewReader(b))
d.Chk.NoError(err)
buf := &bytes.Buffer{}
_, err = io.Copy(buf, gr)
d.Chk.NoError(err)
b = buf.Bytes()
}
c := NewChunkWithRef(r, b)
for _, reqChan := range batch[r] {
reqChan.Satisfy(c)
}
@@ -256,7 +275,7 @@ func (s *DynamoStore) processResponses(responses []map[string]*dynamodb.Attribut
func (s *DynamoStore) sendWriteRequests(first Chunk) {
n := time.Now().UnixNano()
defer func() {
s.writeCount++
s.writeBatchCount++
s.writeTime += time.Now().UnixNano() - n
}()
chunks := []Chunk{}
@@ -300,14 +319,32 @@ func (s *DynamoStore) sendWriteRequests(first Chunk) {
func chunkItemSize(c Chunk) int {
r := c.Ref()
return len(refAttr) + len(r.DigestSlice()) + len(chunkAttr) + len(c.Data())
return len(refAttr) + len(r.DigestSlice()) + len(chunkAttr) + len(c.Data()) + len(compAttr) + len(noneValue)
}
func (s *DynamoStore) buildWriteRequests(chunks []Chunk) map[string][]*dynamodb.WriteRequest {
chunkToItem := func(c Chunk) map[string]*dynamodb.AttributeValue {
chunkData := c.Data()
chunkDataLen := uint64(len(chunkData))
compDataLen := chunkDataLen
compression := noneValue
if chunkItemSize(c) > dynamoWriteUnitSize {
compression = gzipValue
buf := &bytes.Buffer{}
gw := gzip.NewWriter(buf)
_, err := io.Copy(gw, bytes.NewReader(chunkData))
d.Chk.NoError(err)
gw.Close()
chunkData = buf.Bytes()
compDataLen = uint64(buf.Len())
}
s.writeCount++
s.writeTotal += chunkDataLen
s.writeCompTotal += compDataLen
return map[string]*dynamodb.AttributeValue{
refAttr: {B: s.makeNamespacedKey(c.Ref())},
chunkAttr: {B: c.Data()},
chunkAttr: {B: chunkData},
compAttr: {S: aws.String(compression)},
}
}
var requests []*dynamodb.WriteRequest
@@ -332,11 +369,15 @@ func (s *DynamoStore) Close() error {
close(s.readQueue)
close(s.writeQueue)
if s.readCount > 0 {
fmt.Printf("Read batch count: %d, Read batch latency: %d\n", s.readCount, s.readTime/s.readCount/1e6)
if s.readBatchCount > 0 {
fmt.Printf("Read batch count: %d, Read batch latency: %dms\n", s.readBatchCount, s.readTime/s.readBatchCount/1e6)
}
if s.writeBatchCount > 0 {
fmt.Printf("Write batch count: %d, Write batch latency: %dms\n", s.writeBatchCount, uint64(s.writeTime)/s.writeBatchCount/1e6)
}
if s.writeCount > 0 {
fmt.Printf("Write batch count: %d, Write batch latency: %d\n", s.writeCount, s.writeTime/s.writeCount/1e6)
fmt.Printf("Write chunk count: %d, Avg chunk size: %.3fK\n", s.writeCount, float64(s.writeTotal)/float64(s.writeCount)/1024.0)
fmt.Printf("Avg compression ratio: %.2fx, Avg compressed chunk size: %.3fK\n", float64(s.writeTotal)/float64(s.writeCompTotal), float64(s.writeCompTotal)/float64(s.writeCount)/1024.0)
}
return nil
}
@@ -354,7 +395,13 @@ func (s *DynamoStore) Root() ref.Ref {
return ref.Ref{}
}
d.Chk.Equal(len(result.Item), 2)
itemLen := len(result.Item)
d.Chk.True(itemLen == 2 || itemLen == 3)
if itemLen == 3 {
d.Chk.NotNil(result.Item[compAttr])
d.Chk.NotNil(result.Item[compAttr].S)
d.Chk.Equal(noneValue, *result.Item[compAttr].S)
}
return ref.FromSlice(result.Item[chunkAttr].B)
}
@@ -366,10 +413,11 @@ func (s *DynamoStore) UpdateRoot(current, last ref.Ref) bool {
Item: map[string]*dynamodb.AttributeValue{
refAttr: {B: s.rootKey},
chunkAttr: {B: current.DigestSlice()},
// compAttr: {S: aws.String(noneValue)}, // We want to add this, but old versions of the code assert that items have only 2 elements.
},
}
if (last == ref.Ref{}) {
if last.IsEmpty() {
putArgs.ConditionExpression = aws.String(valueNotExistsExpression)
} else {
putArgs.ConditionExpression = aws.String(valueEqualsExpression)
@@ -384,7 +432,6 @@ func (s *DynamoStore) UpdateRoot(current, last ref.Ref) bool {
if awsErr.Code() == "ConditionalCheckFailedException" {
return false
}
d.Chk.NoError(awsErr)
} else {
d.Chk.NoError(err)
+38 -21
View File
@@ -3,6 +3,7 @@ package chunks
import (
"bytes"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/stretchr/testify/assert"
)
@@ -15,13 +16,19 @@ func (m mockAWSError) Message() string { return string(m) }
func (m mockAWSError) OrigErr() error { return nil }
type fakeDDB struct {
data map[string][]byte
numPuts int
assert *assert.Assertions
data map[string]record
assert *assert.Assertions
numPuts int
numCompPuts int
}
type record struct {
chunk []byte
comp string
}
func createFakeDDB(a *assert.Assertions) *fakeDDB {
return &fakeDDB{map[string][]byte{}, 0, a}
return &fakeDDB{data: map[string]record{}, assert: a}
}
func (m *fakeDDB) BatchGetItem(input *dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) {
@@ -31,12 +38,13 @@ func (m *fakeDDB) BatchGetItem(input *dynamodb.BatchGetItemInput) (*dynamodb.Bat
out.Responses[tableName] = nil
for _, keyMap := range keysAndAttrs.Keys {
key := keyMap[refAttr].B
value := m.get(key)
value, comp := m.get(key)
if value != nil {
item := map[string]*dynamodb.AttributeValue{
refAttr: &dynamodb.AttributeValue{B: key},
chunkAttr: &dynamodb.AttributeValue{B: value},
refAttr: {B: key},
chunkAttr: {B: value},
compAttr: {S: aws.String(comp)},
}
out.Responses[tableName] = append(out.Responses[tableName], item)
}
@@ -54,11 +62,16 @@ func (m *fakeDDB) BatchWriteItem(input *dynamodb.BatchWriteItemInput) (*dynamodb
m.assert.NotNil(putReq)
key := putReq.Item[refAttr].B
value := putReq.Item[chunkAttr].B
comp := putReq.Item[compAttr].S
m.assert.NotNil(key, "key should have been a blob: %+v", putReq.Item[refAttr])
m.assert.NotNil(value, "value should have been a blob: %+v", putReq.Item[chunkAttr])
m.assert.NotNil(comp, "comp should have been a string: %+v", putReq.Item[compAttr])
m.assert.False(bytes.Equal(key, dynamoRootKey), "Can't batch-write the root!")
m.put(key, value)
m.put(key, value, *comp)
if *comp != noneValue {
m.numCompPuts++
}
m.numPuts++
}
}
@@ -68,24 +81,27 @@ func (m *fakeDDB) BatchWriteItem(input *dynamodb.BatchWriteItemInput) (*dynamodb
func (m *fakeDDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
key := input.Key[refAttr].B
m.assert.NotNil(key, "key should have been a blob: %+v", input.Key[refAttr])
value := m.get(key)
value, comp := m.get(key)
item := map[string]*dynamodb.AttributeValue{}
if value != nil {
item[refAttr] = &dynamodb.AttributeValue{B: key}
item[chunkAttr] = &dynamodb.AttributeValue{B: value}
item = map[string]*dynamodb.AttributeValue{
refAttr: {B: key},
chunkAttr: {B: value},
compAttr: {S: aws.String(comp)},
}
}
return &dynamodb.GetItemOutput{
Item: item,
}, nil
}
func (m *fakeDDB) get(k []byte) []byte {
return m.data[string(k)]
func (m *fakeDDB) get(k []byte) ([]byte, string) {
return m.data[string(k)].chunk, m.data[string(k)].comp
}
func (m *fakeDDB) put(k, v []byte) {
m.data[string(k)] = v
func (m *fakeDDB) put(k, v []byte, c string) {
m.data[string(k)] = record{v, c}
}
func (m *fakeDDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
@@ -99,11 +115,11 @@ func (m *fakeDDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput
if mustNotExist && present {
return nil, mockAWSError("ConditionalCheckFailedException")
} else if !mustNotExist && !bytes.Equal(current, input.ExpressionAttributeValues[":prev"].B) {
} else if !mustNotExist && !bytes.Equal(current.chunk, input.ExpressionAttributeValues[":prev"].B) {
return nil, mockAWSError("ConditionalCheckFailedException")
}
m.data[string(key)] = value
m.put(key, value, noneValue)
if !bytes.HasSuffix(key, dynamoRootKey) {
m.numPuts++
}
@@ -117,7 +133,7 @@ type lowCapFakeDDB struct {
}
func createLowCapFakeDDB(a *assert.Assertions) *lowCapFakeDDB {
return &lowCapFakeDDB{fakeDDB{map[string][]byte{}, 0, a}, true}
return &lowCapFakeDDB{fakeDDB{data: map[string]record{}, assert: a}, true}
}
func (m *lowCapFakeDDB) BatchGetItem(input *dynamodb.BatchGetItemInput) (*dynamodb.BatchGetItemOutput, error) {
@@ -132,11 +148,12 @@ func (m *lowCapFakeDDB) BatchGetItem(input *dynamodb.BatchGetItemInput) (*dynamo
out.Responses[tableName] = nil
key := keysAndAttrs.Keys[0][refAttr].B
value := m.get(key)
value, comp := m.get(key)
if value != nil {
item := map[string]*dynamodb.AttributeValue{
refAttr: &dynamodb.AttributeValue{B: key},
chunkAttr: &dynamodb.AttributeValue{B: value},
refAttr: {B: key},
chunkAttr: {B: value},
compAttr: {S: aws.String(comp)},
}
out.Responses[tableName] = append(out.Responses[tableName], item)
}
+15 -3
View File
@@ -13,13 +13,14 @@ func TestDynamoStoreTestSuite(t *testing.T) {
type DynamoStoreTestSuite struct {
ChunkStoreTestSuite
ddb *fakeDDB
}
func (suite *DynamoStoreTestSuite) SetupTest() {
ddb := createFakeDDB(suite.Assert())
suite.Store = newDynamoStoreFromDDBsvc("table", "namespace", ddb)
suite.ddb = createFakeDDB(suite.Assert())
suite.Store = newDynamoStoreFromDDBsvc("table", "namespace", suite.ddb)
suite.putCountFn = func() int {
return ddb.numPuts
return suite.ddb.numPuts
}
}
@@ -38,3 +39,14 @@ func TestGetRetrying(t *testing.T) {
assert.True(store.Has(c1.Ref()))
store.Close()
}
func (suite *DynamoStoreTestSuite) TestChunkCompression() {
c1 := NewChunk(make([]byte, dynamoWriteUnitSize+1))
suite.Store.Put(c1)
suite.Store.UpdateRoot(c1.Ref(), suite.Store.Root()) // Commit writes
suite.True(suite.Store.Has(c1.Ref()))
suite.Equal(1, suite.ddb.numCompPuts)
roundTrip := suite.Store.Get(c1.Ref())
suite.Equal(c1.Data(), roundTrip.Data())
}
+1
View File
@@ -8,6 +8,7 @@ import (
"github.com/attic-labs/noms/types"
)
// KindSlice is an alias for []types.NomsKind. It's needed because types.NomsKind are really just 8 bit unsigned ints, which are what Go uses to represent 'byte', and this confuses the Go JSON marshal/unmarshal code -- it treats them as byte arrays and base64 encodes them!
type KindSlice []types.NomsKind
func (ks KindSlice) MarshalJSON() ([]byte, error) {
+1 -1
View File
@@ -130,7 +130,7 @@ func Read(res io.Reader, structName, header string, kinds KindSlice, comma rune,
r.FieldsPerRecord = 0 // Let first row determine the number of fields.
typeRef, typeDef = MakeStructTypeFromHeader(r, structName, kinds)
valueChan := make(chan types.Value)
valueChan := make(chan types.Value, 128) // TODO: Make this a function param?
listType := types.MakeCompoundType(types.ListKind, typeRef)
listChan := types.NewStreamingTypedList(listType, cs, valueChan)