mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-26 03:30:09 -05:00
Make compaction async (#2934)
Introduce a 'compactingChunkStore', which knows how to compact itself in the background. It satisfies get/has requests from an in-memory table until compaction is complete. Once compaction is done, it destroys the in-memory table and switches over to using solely the persistent table. Fixes #2879
This commit is contained in:
@@ -0,0 +1,120 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
)
|
||||
|
||||
func newCompactingChunkSource(mt *memTable, haver chunkReader, p tablePersister, rl chan struct{}) *compactingChunkSource {
|
||||
ccs := &compactingChunkSource{mt: mt}
|
||||
ccs.wg.Add(1)
|
||||
rl <- struct{}{}
|
||||
go func() {
|
||||
defer ccs.wg.Done()
|
||||
var cs chunkSource = emptyChunkSource{}
|
||||
if tableHash, chunkCount := p.Compact(mt, haver); chunkCount > 0 {
|
||||
cs = p.Open(tableHash, chunkCount)
|
||||
}
|
||||
ccs.mu.Lock()
|
||||
defer ccs.mu.Unlock()
|
||||
ccs.cs = cs
|
||||
ccs.mt = nil
|
||||
<-rl
|
||||
}()
|
||||
return ccs
|
||||
}
|
||||
|
||||
type compactingChunkSource struct {
|
||||
mu sync.RWMutex
|
||||
mt *memTable
|
||||
|
||||
wg sync.WaitGroup
|
||||
cs chunkSource
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) getReader() chunkReader {
|
||||
ccs.mu.RLock()
|
||||
defer ccs.mu.RUnlock()
|
||||
if ccs.mt != nil {
|
||||
return ccs.mt
|
||||
}
|
||||
return ccs.cs
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) has(h addr) bool {
|
||||
cr := ccs.getReader()
|
||||
d.Chk.True(cr != nil)
|
||||
return cr.has(h)
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) hasMany(addrs []hasRecord) bool {
|
||||
cr := ccs.getReader()
|
||||
d.Chk.True(cr != nil)
|
||||
return cr.hasMany(addrs)
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) get(h addr) []byte {
|
||||
cr := ccs.getReader()
|
||||
d.Chk.True(cr != nil)
|
||||
return cr.get(h)
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) getMany(reqs []getRecord) bool {
|
||||
cr := ccs.getReader()
|
||||
d.Chk.True(cr != nil)
|
||||
return cr.getMany(reqs)
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) close() error {
|
||||
if ccs.cs != nil {
|
||||
return ccs.cs.close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) count() uint32 {
|
||||
cr := ccs.getReader()
|
||||
d.Chk.True(cr != nil)
|
||||
return cr.count()
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) hash() addr {
|
||||
ccs.wg.Wait()
|
||||
d.Chk.True(ccs.cs != nil)
|
||||
return ccs.cs.hash()
|
||||
}
|
||||
|
||||
type emptyChunkSource struct{}
|
||||
|
||||
func (ecs emptyChunkSource) has(h addr) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) hasMany(addrs []hasRecord) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) get(h addr) []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) getMany(reqs []getRecord) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) count() uint32 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) hash() addr {
|
||||
return addr{} // TODO: is this legal?
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
func TestCompactingChunkStoreEmpty(t *testing.T) {
|
||||
mt := newMemTable(testMemTableSize)
|
||||
ccs := newCompactingChunkSource(mt, nil, newFakeTablePersister(), make(chan struct{}, 1))
|
||||
assert.Equal(t, addr{}, ccs.hash())
|
||||
assert.Zero(t, ccs.count())
|
||||
}
|
||||
|
||||
type pausingFakeTablePersister struct {
|
||||
tablePersister
|
||||
trigger <-chan struct{}
|
||||
}
|
||||
|
||||
func (ftp pausingFakeTablePersister) Compact(mt *memTable, haver chunkReader) (name addr, count uint32) {
|
||||
<-ftp.trigger
|
||||
return ftp.tablePersister.Compact(mt, haver)
|
||||
}
|
||||
|
||||
func TestCompactingChunkStore(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
dir := makeTempDir(assert)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
mt := newMemTable(testMemTableSize)
|
||||
|
||||
for _, c := range testChunks {
|
||||
assert.True(mt.addChunk(computeAddr(c), c))
|
||||
}
|
||||
|
||||
trigger := make(chan struct{})
|
||||
ccs := newCompactingChunkSource(mt, nil, pausingFakeTablePersister{fsTablePersister{dir}, trigger}, make(chan struct{}, 1))
|
||||
|
||||
assertChunksInReader(testChunks, ccs, assert)
|
||||
assert.EqualValues(mt.count(), ccs.count())
|
||||
close(trigger)
|
||||
|
||||
assert.NotEqual(addr{}, ccs.hash())
|
||||
assert.EqualValues(len(testChunks), ccs.count())
|
||||
assertChunksInReader(testChunks, ccs, assert)
|
||||
|
||||
assert.Nil(ccs.mt)
|
||||
|
||||
newChunk := []byte("additional")
|
||||
mt.addChunk(computeAddr(newChunk), newChunk)
|
||||
assert.NotEqual(mt.count(), ccs.count())
|
||||
assert.False(ccs.has(computeAddr(newChunk)))
|
||||
}
|
||||
@@ -120,7 +120,6 @@ func (crg chunkReaderGroup) hasMany(addrs []hasRecord) (remaining bool) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -130,6 +129,12 @@ func (crg chunkReaderGroup) getMany(reqs []getRecord) (remaining bool) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (crg chunkReaderGroup) count() (count uint32) {
|
||||
for _, haver := range crg {
|
||||
count += haver.count()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -184,7 +184,11 @@ func (fm *fakeManifest) set(version string, root hash.Hash, specs []tableSpec) {
|
||||
}
|
||||
|
||||
func newFakeTableSet() tableSet {
|
||||
return tableSet{p: fakeTablePersister{map[addr]*memTable{}}}
|
||||
return tableSet{p: newFakeTablePersister(), rl: make(chan struct{}, 1)}
|
||||
}
|
||||
|
||||
func newFakeTablePersister() tablePersister {
|
||||
return fakeTablePersister{map[addr]*memTable{}} // TODO: Make this a better fake. Need to move count() to tableReader, make chunkSourceAdapter take a tableReader
|
||||
}
|
||||
|
||||
type fakeTablePersister struct {
|
||||
|
||||
@@ -42,45 +42,6 @@ type NomsBlockStore struct {
|
||||
putCount uint64
|
||||
}
|
||||
|
||||
type chunkSources []chunkSource
|
||||
|
||||
func (css chunkSources) has(h addr) bool {
|
||||
for _, haver := range css {
|
||||
if haver.has(h) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (css chunkSources) hasMany(addrs []hasRecord) (remaining bool) {
|
||||
for _, haver := range css {
|
||||
if !haver.hasMany(addrs) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (css chunkSources) get(h addr) []byte {
|
||||
for _, haver := range css {
|
||||
if data := haver.get(h); data != nil {
|
||||
return data
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (css chunkSources) getMany(reqs []getRecord) (remaining bool) {
|
||||
for _, haver := range css {
|
||||
if !haver.getMany(reqs) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func NewAWSStore(table, ns, bucket string, sess *session.Session, memTableSize uint64) *NomsBlockStore {
|
||||
mm := newDynamoManifest(table, ns, dynamodb.New(sess))
|
||||
ts := newS3TableSet(s3.New(sess), bucket)
|
||||
|
||||
+1
-1
@@ -203,11 +203,11 @@ type chunkReader interface {
|
||||
hasMany(addrs []hasRecord) bool
|
||||
get(h addr) []byte
|
||||
getMany(reqs []getRecord) bool
|
||||
count() uint32
|
||||
}
|
||||
|
||||
type chunkSource interface {
|
||||
chunkReader
|
||||
close() error
|
||||
count() uint32
|
||||
hash() addr
|
||||
}
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
)
|
||||
|
||||
const defaultS3PartSize = 5 * 1 << 20 // 5MiB, smallest allowed by S3
|
||||
|
||||
type tablePersister interface {
|
||||
Compact(mt *memTable, haver chunkReader) (name addr, chunkCount uint32)
|
||||
Open(name addr, chunkCount uint32) chunkSource
|
||||
}
|
||||
|
||||
type s3TablePersister struct {
|
||||
s3 s3svc
|
||||
bucket string
|
||||
partSize int
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) Open(name addr, chunkCount uint32) chunkSource {
|
||||
return newS3TableReader(s3p.s3, s3p.bucket, name, chunkCount)
|
||||
}
|
||||
|
||||
type s3UploadedPart struct {
|
||||
idx int64
|
||||
etag string
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) Compact(mt *memTable, haver chunkReader) (name addr, chunkCount uint32) {
|
||||
name, data, chunkCount := mt.write(haver)
|
||||
|
||||
if chunkCount > 0 {
|
||||
result, err := s3p.s3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(name.String()),
|
||||
})
|
||||
d.PanicIfError(err)
|
||||
uploadID := *result.UploadId
|
||||
|
||||
multipartUpload, err := s3p.uploadParts(data, name.String(), uploadID)
|
||||
if err != nil {
|
||||
_, abrtErr := s3p.s3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(name.String()),
|
||||
UploadId: aws.String(uploadID),
|
||||
})
|
||||
d.Chk.NoError(abrtErr)
|
||||
panic(err) // TODO: Better error handling here
|
||||
}
|
||||
|
||||
_, err = s3p.s3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(name.String()),
|
||||
MultipartUpload: multipartUpload,
|
||||
UploadId: aws.String(uploadID),
|
||||
})
|
||||
d.Chk.NoError(err)
|
||||
}
|
||||
return name, chunkCount
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) uploadParts(data []byte, key, uploadID string) (*s3.CompletedMultipartUpload, error) {
|
||||
sent, failed, done := make(chan s3UploadedPart), make(chan error), make(chan struct{})
|
||||
|
||||
numParts := getNumParts(len(data), s3p.partSize)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numParts)
|
||||
sendPart := func(partNum int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Check if upload has been terminated
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
// Upload the desired part
|
||||
start, end := (partNum-1)*s3p.partSize, partNum*s3p.partSize
|
||||
if partNum == numParts { // If this is the last part, make sure it includes any overflow
|
||||
end = len(data)
|
||||
}
|
||||
result, err := s3p.s3.UploadPart(&s3.UploadPartInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(key),
|
||||
PartNumber: aws.Int64(int64(partNum)),
|
||||
UploadId: aws.String(uploadID),
|
||||
Body: bytes.NewReader(data[start:end]),
|
||||
})
|
||||
if err != nil {
|
||||
failed <- err
|
||||
return
|
||||
}
|
||||
// Try to send along part info. In the case that the upload was aborted, reading from done allows this worker to exit correctly.
|
||||
select {
|
||||
case sent <- s3UploadedPart{int64(partNum), *result.ETag}:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
for i := 1; i <= numParts; i++ {
|
||||
go sendPart(i)
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(sent)
|
||||
close(failed)
|
||||
}()
|
||||
|
||||
multipartUpload := &s3.CompletedMultipartUpload{}
|
||||
var lastFailure error
|
||||
for cont := true; cont; {
|
||||
select {
|
||||
case sentPart, open := <-sent:
|
||||
if open {
|
||||
multipartUpload.Parts = append(multipartUpload.Parts, &s3.CompletedPart{
|
||||
ETag: aws.String(sentPart.etag),
|
||||
PartNumber: aws.Int64(sentPart.idx),
|
||||
})
|
||||
}
|
||||
cont = open
|
||||
|
||||
case err := <-failed:
|
||||
if err != nil { // nil err may happen when failed gets closed
|
||||
lastFailure = err
|
||||
close(done)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lastFailure == nil {
|
||||
close(done)
|
||||
}
|
||||
sort.Sort(partsByPartNum(multipartUpload.Parts))
|
||||
return multipartUpload, lastFailure
|
||||
}
|
||||
|
||||
func getNumParts(dataLen, partSize int) int {
|
||||
numParts := dataLen / partSize
|
||||
if numParts == 0 {
|
||||
numParts = 1
|
||||
}
|
||||
return numParts
|
||||
}
|
||||
|
||||
type partsByPartNum []*s3.CompletedPart
|
||||
|
||||
func (s partsByPartNum) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s partsByPartNum) Less(i, j int) bool {
|
||||
return *s[i].PartNumber < *s[j].PartNumber
|
||||
}
|
||||
|
||||
func (s partsByPartNum) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
type fsTablePersister struct {
|
||||
dir string
|
||||
}
|
||||
|
||||
func (ftp fsTablePersister) Compact(mt *memTable, haver chunkReader) (name addr, chunkCount uint32) {
|
||||
tempName, name, chunkCount := func() (string, addr, uint32) {
|
||||
temp, err := ioutil.TempFile(ftp.dir, "nbs_table_")
|
||||
d.PanicIfError(err)
|
||||
defer checkClose(temp)
|
||||
|
||||
name, data, chunkCount := mt.write(haver)
|
||||
io.Copy(temp, bytes.NewReader(data))
|
||||
return temp.Name(), name, chunkCount
|
||||
}()
|
||||
if chunkCount > 0 {
|
||||
err := os.Rename(tempName, filepath.Join(ftp.dir, name.String()))
|
||||
d.PanicIfError(err)
|
||||
} else {
|
||||
os.Remove(tempName)
|
||||
}
|
||||
return name, chunkCount
|
||||
}
|
||||
|
||||
func (ftp fsTablePersister) Open(name addr, chunkCount uint32) chunkSource {
|
||||
return newMmapTableReader(ftp.dir, name, chunkCount)
|
||||
}
|
||||
+62
-208
@@ -4,56 +4,86 @@
|
||||
|
||||
package nbs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
)
|
||||
|
||||
const defaultS3PartSize = 5 * 1 << 20 // 5MiB, smallest allowed by S3
|
||||
|
||||
func newS3TableSet(s3 s3svc, bucket string) tableSet {
|
||||
return tableSet{p: s3TablePersister{s3, bucket, defaultS3PartSize}}
|
||||
return tableSet{p: s3TablePersister{s3, bucket, defaultS3PartSize}, rl: make(chan struct{}, 5)}
|
||||
}
|
||||
|
||||
func newFSTableSet(dir string) tableSet {
|
||||
return tableSet{p: fsTablePersister{dir}}
|
||||
return tableSet{p: fsTablePersister{dir}, rl: make(chan struct{}, 5)}
|
||||
}
|
||||
|
||||
// tableSet is an immutable set of persistable chunkSources.
|
||||
type tableSet struct {
|
||||
chunkSources
|
||||
p tablePersister
|
||||
p tablePersister
|
||||
rl chan struct{}
|
||||
}
|
||||
|
||||
type chunkSources []chunkSource
|
||||
|
||||
func (css chunkSources) has(h addr) bool {
|
||||
for _, haver := range css {
|
||||
if haver.has(h) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (css chunkSources) hasMany(addrs []hasRecord) (remaining bool) {
|
||||
for _, haver := range css {
|
||||
if !haver.hasMany(addrs) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (css chunkSources) get(h addr) []byte {
|
||||
for _, haver := range css {
|
||||
if data := haver.get(h); data != nil {
|
||||
return data
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (css chunkSources) getMany(reqs []getRecord) (remaining bool) {
|
||||
for _, haver := range css {
|
||||
if !haver.getMany(reqs) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (css chunkSources) count() (count uint32) {
|
||||
for _, haver := range css {
|
||||
count += haver.count()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Prepend adds a memTable to an existing tableSet, compacting |mt| and
|
||||
// returning a new tableSet with newly compacted table added.
|
||||
func (ts tableSet) Prepend(mt *memTable) tableSet {
|
||||
if tableHash, chunkCount := ts.p.Compact(mt, ts); chunkCount > 0 {
|
||||
newTables := make(chunkSources, len(ts.chunkSources)+1)
|
||||
newTables[0] = ts.p.Open(tableHash, chunkCount)
|
||||
copy(newTables[1:], ts.chunkSources)
|
||||
return tableSet{newTables, ts.p}
|
||||
}
|
||||
return ts
|
||||
newTables := make(chunkSources, len(ts.chunkSources)+1)
|
||||
newTables[0] = newCompactingChunkSource(mt, ts, ts.p, ts.rl)
|
||||
copy(newTables[1:], ts.chunkSources)
|
||||
return tableSet{newTables, ts.p, ts.rl}
|
||||
}
|
||||
|
||||
// Union returns a new tableSet holding the union of the tables managed by
|
||||
// |ts| and those specified by |specs|.
|
||||
func (ts tableSet) Union(specs []tableSpec) tableSet {
|
||||
newTables := make(chunkSources, len(ts.chunkSources))
|
||||
newTables := make(chunkSources, 0, len(ts.chunkSources))
|
||||
known := map[addr]struct{}{}
|
||||
for i, t := range ts.chunkSources {
|
||||
known[t.hash()] = struct{}{}
|
||||
newTables[i] = ts.chunkSources[i]
|
||||
for _, t := range ts.chunkSources {
|
||||
if t.count() > 0 {
|
||||
known[t.hash()] = struct{}{}
|
||||
newTables = append(newTables, t)
|
||||
}
|
||||
}
|
||||
|
||||
for _, t := range specs {
|
||||
@@ -61,7 +91,7 @@ func (ts tableSet) Union(specs []tableSpec) tableSet {
|
||||
newTables = append(newTables, ts.p.Open(t.name, t.chunkCount))
|
||||
}
|
||||
}
|
||||
return tableSet{newTables, ts.p}
|
||||
return tableSet{newTables, ts.p, ts.rl}
|
||||
}
|
||||
|
||||
func (ts tableSet) ToSpecs() []tableSpec {
|
||||
@@ -76,182 +106,6 @@ func (ts tableSet) Close() (err error) {
|
||||
for _, t := range ts.chunkSources {
|
||||
err = t.close() // TODO: somehow coalesce these errors??
|
||||
}
|
||||
close(ts.rl)
|
||||
return
|
||||
}
|
||||
|
||||
type tablePersister interface {
|
||||
Compact(mt *memTable, haver chunkReader) (name addr, chunkCount uint32)
|
||||
Open(name addr, chunkCount uint32) chunkSource
|
||||
}
|
||||
|
||||
type s3TablePersister struct {
|
||||
s3 s3svc
|
||||
bucket string
|
||||
partSize int
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) Open(name addr, chunkCount uint32) chunkSource {
|
||||
return newS3TableReader(s3p.s3, s3p.bucket, name, chunkCount)
|
||||
}
|
||||
|
||||
type s3UploadedPart struct {
|
||||
idx int64
|
||||
etag string
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) Compact(mt *memTable, haver chunkReader) (name addr, chunkCount uint32) {
|
||||
name, data, chunkCount := mt.write(haver)
|
||||
|
||||
if chunkCount > 0 {
|
||||
result, err := s3p.s3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(name.String()),
|
||||
})
|
||||
d.PanicIfError(err)
|
||||
uploadID := *result.UploadId
|
||||
|
||||
multipartUpload, err := s3p.uploadParts(data, name.String(), uploadID)
|
||||
if err != nil {
|
||||
_, abrtErr := s3p.s3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(name.String()),
|
||||
UploadId: aws.String(uploadID),
|
||||
})
|
||||
d.Chk.NoError(abrtErr)
|
||||
panic(err) // TODO: Better error handling here
|
||||
}
|
||||
|
||||
_, err = s3p.s3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(name.String()),
|
||||
MultipartUpload: multipartUpload,
|
||||
UploadId: aws.String(uploadID),
|
||||
})
|
||||
d.Chk.NoError(err)
|
||||
}
|
||||
return name, chunkCount
|
||||
}
|
||||
|
||||
func (s3p s3TablePersister) uploadParts(data []byte, key, uploadID string) (*s3.CompletedMultipartUpload, error) {
|
||||
sent, failed, done := make(chan s3UploadedPart), make(chan error), make(chan struct{})
|
||||
|
||||
numParts := getNumParts(len(data), s3p.partSize)
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numParts)
|
||||
sendPart := func(partNum int) {
|
||||
defer wg.Done()
|
||||
|
||||
// Check if upload has been terminated
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
// Upload the desired part
|
||||
start, end := (partNum-1)*s3p.partSize, partNum*s3p.partSize
|
||||
if partNum == numParts { // If this is the last part, make sure it includes any overflow
|
||||
end = len(data)
|
||||
}
|
||||
result, err := s3p.s3.UploadPart(&s3.UploadPartInput{
|
||||
Bucket: aws.String(s3p.bucket),
|
||||
Key: aws.String(key),
|
||||
PartNumber: aws.Int64(int64(partNum)),
|
||||
UploadId: aws.String(uploadID),
|
||||
Body: bytes.NewReader(data[start:end]),
|
||||
})
|
||||
if err != nil {
|
||||
failed <- err
|
||||
return
|
||||
}
|
||||
// Try to send along part info. In the case that the upload was aborted, reading from done allows this worker to exit correctly.
|
||||
select {
|
||||
case sent <- s3UploadedPart{int64(partNum), *result.ETag}:
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
for i := 1; i <= numParts; i++ {
|
||||
go sendPart(i)
|
||||
}
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(sent)
|
||||
close(failed)
|
||||
}()
|
||||
|
||||
multipartUpload := &s3.CompletedMultipartUpload{}
|
||||
var lastFailure error
|
||||
for cont := true; cont; {
|
||||
select {
|
||||
case sentPart, open := <-sent:
|
||||
if open {
|
||||
multipartUpload.Parts = append(multipartUpload.Parts, &s3.CompletedPart{
|
||||
ETag: aws.String(sentPart.etag),
|
||||
PartNumber: aws.Int64(sentPart.idx),
|
||||
})
|
||||
}
|
||||
cont = open
|
||||
|
||||
case err := <-failed:
|
||||
if err != nil { // nil err may happen when failed gets closed
|
||||
lastFailure = err
|
||||
close(done)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lastFailure == nil {
|
||||
close(done)
|
||||
}
|
||||
sort.Sort(partsByPartNum(multipartUpload.Parts))
|
||||
return multipartUpload, lastFailure
|
||||
}
|
||||
|
||||
func getNumParts(dataLen, partSize int) int {
|
||||
numParts := dataLen / partSize
|
||||
if numParts == 0 {
|
||||
numParts = 1
|
||||
}
|
||||
return numParts
|
||||
}
|
||||
|
||||
type partsByPartNum []*s3.CompletedPart
|
||||
|
||||
func (s partsByPartNum) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s partsByPartNum) Less(i, j int) bool {
|
||||
return *s[i].PartNumber < *s[j].PartNumber
|
||||
}
|
||||
|
||||
func (s partsByPartNum) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
type fsTablePersister struct {
|
||||
dir string
|
||||
}
|
||||
|
||||
func (ftp fsTablePersister) Compact(mt *memTable, haver chunkReader) (name addr, chunkCount uint32) {
|
||||
tempName, name, chunkCount := func() (string, addr, uint32) {
|
||||
temp, err := ioutil.TempFile(ftp.dir, "nbs_table_")
|
||||
d.PanicIfError(err)
|
||||
defer checkClose(temp)
|
||||
|
||||
name, data, chunkCount := mt.write(haver)
|
||||
io.Copy(temp, bytes.NewReader(data))
|
||||
return temp.Name(), name, chunkCount
|
||||
}()
|
||||
if chunkCount > 0 {
|
||||
err := os.Rename(tempName, filepath.Join(ftp.dir, name.String()))
|
||||
d.PanicIfError(err)
|
||||
} else {
|
||||
os.Remove(tempName)
|
||||
}
|
||||
return name, chunkCount
|
||||
}
|
||||
|
||||
func (ftp fsTablePersister) Open(name addr, chunkCount uint32) chunkSource {
|
||||
return newMmapTableReader(ftp.dir, name, chunkCount)
|
||||
}
|
||||
|
||||
@@ -20,7 +20,9 @@ var testChunks = [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2"
|
||||
|
||||
func TestTableSetPrependEmpty(t *testing.T) {
|
||||
ts := newFakeTableSet().Prepend(newMemTable(testMemTableSize))
|
||||
assert.Empty(t, ts.ToSpecs())
|
||||
// assert.Empty(t, ts.ToSpecs())
|
||||
assert.Len(t, ts.ToSpecs(), 1)
|
||||
assert.EqualValues(t, 0, ts.chunkSources[0].count())
|
||||
}
|
||||
|
||||
func TestTableSetPrepend(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user