From b1e918d1d47f9c880048bfafc77ac057cb09f1ff Mon Sep 17 00:00:00 2001 From: cmasone-attic Date: Wed, 22 Feb 2017 13:41:23 -0800 Subject: [PATCH] Share s3, dynamodb clients (#3212) These objects manage their own pools of HTTP connections and other resources, so it's generally best to share them process-wide if you can. Fixes #3027 --- go/chunks/dynamo_store.go | 11 ++++------- go/chunks/dynamo_store_test.go | 4 ++-- go/nbs/benchmarks/main.go | 5 +++-- go/nbs/frag/main.go | 4 +++- go/nbs/store.go | 17 +++++++++-------- go/spec/spec.go | 7 +++++-- 6 files changed, 26 insertions(+), 22 deletions(-) diff --git a/go/chunks/dynamo_store.go b/go/chunks/dynamo_store.go index e6fe2fdb44..01023506ac 100644 --- a/go/chunks/dynamo_store.go +++ b/go/chunks/dynamo_store.go @@ -81,12 +81,8 @@ type DynamoStore struct { } // NewDynamoStore returns a new DynamoStore instance pointed at a DynamoDB table in the given region. All keys used to access items are prefixed with the given namespace. -// Uses credentials from the AWS session parameter. -func NewDynamoStore(table, namespace string, sess *session.Session, showStats bool) *DynamoStore { - return newDynamoStoreFromDDBsvc(table, namespace, dynamodb.New(sess), showStats) -} - -func newDynamoStoreFromDDBsvc(table, namespace string, ddb ddbsvc, showStats bool) *DynamoStore { +// Uses the given ddbsvc object to access DynamoDB. +func NewDynamoStore(table, namespace string, ddb ddbsvc, showStats bool) *DynamoStore { store := &DynamoStore{ table: table, namespace: []byte(namespace), @@ -538,7 +534,8 @@ func (f DynamoStoreFlags) CreateStore(ns string) ChunkStore { if *f.awsKey != "" { config = config.WithCredentials(credentials.NewStaticCredentials(*f.awsKey, *f.awsSecret, "")) } - return NewDynamoStore(*f.dynamoTable, ns, session.Must(session.NewSession(config)), *f.dynamoStats) + sess := session.Must(session.NewSession(config)) + return NewDynamoStore(*f.dynamoTable, ns, dynamodb.New(sess), *f.dynamoStats) } return nil } diff --git a/go/chunks/dynamo_store_test.go b/go/chunks/dynamo_store_test.go index 4a0032a20b..d3914f9e6a 100644 --- a/go/chunks/dynamo_store_test.go +++ b/go/chunks/dynamo_store_test.go @@ -22,7 +22,7 @@ type DynamoStoreTestSuite struct { func (suite *DynamoStoreTestSuite) SetupTest() { suite.ddb = createFakeDDB(suite.Assert()) - suite.Store = newDynamoStoreFromDDBsvc("table", "namespace", suite.ddb, false) + suite.Store = NewDynamoStore("table", "namespace", suite.ddb, false) suite.putCountFn = func() int { return suite.ddb.numPuts } @@ -34,7 +34,7 @@ func (suite *DynamoStoreTestSuite) TearDownTest() { func TestGetRetrying(t *testing.T) { assert := assert.New(t) - store := newDynamoStoreFromDDBsvc("table", "namespace", createLowCapFakeDDB(assert), false) + store := NewDynamoStore("table", "namespace", createLowCapFakeDDB(assert), false) c1 := NewChunk([]byte("abc")) diff --git a/go/nbs/benchmarks/main.go b/go/nbs/benchmarks/main.go index ac3364b35e..9698501ee7 100644 --- a/go/nbs/benchmarks/main.go +++ b/go/nbs/benchmarks/main.go @@ -20,6 +20,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/s3" "github.com/dustin/go-humanize" flag "github.com/juju/gnuflag" ) @@ -96,7 +97,7 @@ func main() { } else if *toAWS != "" { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2"))) open = func() types.BatchStore { - return nbs.NewAWSStore(dynamoTable, *toAWS, s3Bucket, sess, bufSize) + return nbs.NewAWSStore(dynamoTable, *toAWS, s3Bucket, s3.New(sess), dynamodb.New(sess), bufSize) } reset = func() { ddb := dynamodb.New(sess) @@ -121,7 +122,7 @@ func main() { } else if *useAWS != "" { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2"))) open = func() types.BatchStore { - return nbs.NewAWSStore(dynamoTable, *useAWS, s3Bucket, sess, bufSize) + return nbs.NewAWSStore(dynamoTable, *useAWS, s3Bucket, s3.New(sess), dynamodb.New(sess), bufSize) } } writeDB = func() {} diff --git a/go/nbs/frag/main.go b/go/nbs/frag/main.go index f9b89880a0..75d34aad43 100644 --- a/go/nbs/frag/main.go +++ b/go/nbs/frag/main.go @@ -18,6 +18,8 @@ import ( "github.com/attic-labs/noms/go/util/profile" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/s3" "github.com/dustin/go-humanize" flag "github.com/juju/gnuflag" ) @@ -51,7 +53,7 @@ func main() { *dbName = *dir } else if *table != "" && *bucket != "" && *dbName != "" { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2"))) - store = nbs.NewAWSStore(*table, *dbName, *bucket, sess, memTableSize) + store = nbs.NewAWSStore(*table, *dbName, *bucket, s3.New(sess), dynamodb.New(sess), memTableSize) } else { log.Fatalf("Must set either --dir or ALL of --table, --bucket and --db\n") } diff --git a/go/nbs/store.go b/go/nbs/store.go index df800b429c..0a57eac9de 100644 --- a/go/nbs/store.go +++ b/go/nbs/store.go @@ -64,7 +64,8 @@ type NomsBlockStore struct { } type AWSStoreFactory struct { - sess *session.Session + s3 s3svc + ddb ddbsvc table, bucket string indexCache *indexCache readRl chan struct{} @@ -75,11 +76,11 @@ func NewAWSStoreFactory(sess *session.Session, table, bucket string, indexCacheS if indexCacheSize > 0 { indexCache = newIndexCache(indexCacheSize) } - return &AWSStoreFactory{sess, table, bucket, indexCache, make(chan struct{}, defaultAWSReadLimit)} + return &AWSStoreFactory{s3.New(sess), dynamodb.New(sess), table, bucket, indexCache, make(chan struct{}, defaultAWSReadLimit)} } func (asf *AWSStoreFactory) CreateStore(ns string) chunks.ChunkStore { - return newAWSStore(asf.table, ns, asf.bucket, asf.sess, defaultMemTableSize, asf.indexCache, asf.readRl) + return newAWSStore(asf.table, ns, asf.bucket, asf.s3, asf.ddb, defaultMemTableSize, asf.indexCache, asf.readRl) } func (asf *AWSStoreFactory) Shutter() { @@ -109,14 +110,14 @@ func (lsf *LocalStoreFactory) CreateStore(ns string) chunks.ChunkStore { func (lsf *LocalStoreFactory) Shutter() { } -func NewAWSStore(table, ns, bucket string, sess *session.Session, memTableSize uint64) *NomsBlockStore { +func NewAWSStore(table, ns, bucket string, s3 s3svc, ddb ddbsvc, memTableSize uint64) *NomsBlockStore { indexCacheOnce.Do(makeGlobalIndexCache) - return newAWSStore(table, ns, bucket, sess, memTableSize, globalIndexCache, make(chan struct{}, 32)) + return newAWSStore(table, ns, bucket, s3, ddb, memTableSize, globalIndexCache, make(chan struct{}, 32)) } -func newAWSStore(table, ns, bucket string, sess *session.Session, memTableSize uint64, indexCache *indexCache, readRl chan struct{}) *NomsBlockStore { - mm := newDynamoManifest(table, ns, dynamodb.New(sess)) - ts := newS3TableSet(s3.New(sess), bucket, indexCache, readRl) +func newAWSStore(table, ns, bucket string, s3 s3svc, ddb ddbsvc, memTableSize uint64, indexCache *indexCache, readRl chan struct{}) *NomsBlockStore { + mm := newDynamoManifest(table, ns, ddb) + ts := newS3TableSet(s3, bucket, indexCache, readRl) return newNomsBlockStore(mm, ts, memTableSize, defaultMaxTables) } diff --git a/go/spec/spec.go b/go/spec/spec.go index 809fd61628..41e08983a5 100644 --- a/go/spec/spec.go +++ b/go/spec/spec.go @@ -19,6 +19,8 @@ import ( "github.com/attic-labs/noms/go/types" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/s3" ) const Separator = "::" @@ -179,10 +181,11 @@ func parseAWSSpec(awsURL string) chunks.ChunkStore { u, _ := url.Parse(awsURL) parts := strings.SplitN(u.Host, ":", 2) // [table] [, bucket]? sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2"))) + ddb := dynamodb.New(sess) if len(parts) == 1 { - return chunks.NewDynamoStore(parts[0], u.Path, sess, false) + return chunks.NewDynamoStore(parts[0], u.Path, ddb, false) } - return nbs.NewAWSStore(parts[0], u.Path, parts[1], sess, 1<<28) + return nbs.NewAWSStore(parts[0], u.Path, parts[1], s3.New(sess), ddb, 1<<28) } // GetDataset returns the current Dataset instance for this Spec's Database.