Go: Back datas.unwrittenPutCache with a LevelDB (#1442)

* Go: Back datas.unwrittenPutCache with a LevelDB

httpBatchStore was caching as-yet-unwritten Chunks both in memory and
on disk. To avoid this, the in-memory cache is now backed by a
LevelDB, which handles spilling to disk when it needs to. When it's
time to send Chunks to the server, the cache is enumerated in
insert-order so that the payload of the write request is properly
structured.

Fixes #1348
This commit is contained in:
cmasone-attic
2016-05-09 09:29:31 -07:00
parent e2f7776614
commit 57320aad8e
3 changed files with 302 additions and 45 deletions
+25 -29
View File
@@ -7,7 +7,6 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
@@ -70,7 +69,7 @@ type httpDoer interface {
}
type writeRequest struct {
c chunks.Chunk
hash ref.Ref
hints types.Hints
}
@@ -93,6 +92,7 @@ func (bhcs *httpBatchStore) Flush() {
func (bhcs *httpBatchStore) Close() (e error) {
close(bhcs.finishedChan)
bhcs.unwrittenPuts.Destroy()
bhcs.requestWg.Wait()
bhcs.workerWg.Wait()
@@ -203,7 +203,7 @@ func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, hints types.Hints) {
}
bhcs.requestWg.Add(1)
bhcs.writeQueue <- writeRequest{c, hints}
bhcs.writeQueue <- writeRequest{c.Ref(), hints}
}
func (bhcs *httpBatchStore) batchPutRequests() {
@@ -211,14 +211,10 @@ func (bhcs *httpBatchStore) batchPutRequests() {
go func() {
defer bhcs.workerWg.Done()
numChunks := 0
hints := types.Hints{}
buf := makeBuffer()
gw := gzip.NewWriter(buf)
sz := chunks.NewSerializer(gw)
hashes := ref.RefSlice{}
handleRequest := func(wr writeRequest) {
numChunks++
sz.Put(wr.c)
hashes = append(hashes, wr.hash)
for hint := range wr.hints {
hints[hint] = struct{}{}
}
@@ -242,15 +238,10 @@ func (bhcs *httpBatchStore) batchPutRequests() {
handleRequest(wr)
default:
drained = true
d.Chk.NoError(sz.Close())
d.Chk.NoError(gw.Close())
bhcs.sendWriteRequests(buf, numChunks, hints) // Takes ownership of buf, hints
bhcs.sendWriteRequests(hashes, hints) // Takes ownership of hashes, hints
numChunks = 0
hints = types.Hints{}
buf = makeBuffer()
gw = gzip.NewWriter(buf)
sz = chunks.NewSerializer(gw)
hashes = ref.RefSlice{}
}
}
}
@@ -258,27 +249,31 @@ func (bhcs *httpBatchStore) batchPutRequests() {
}()
}
func makeBuffer() *os.File {
f, err := ioutil.TempFile("", "http_hinted_chunk_store_")
d.Chk.NoError(err, "Cannot create filesystem buffer for Chunks.")
return f
}
func (bhcs *httpBatchStore) sendWriteRequests(serializedChunks *os.File, numChunks int, hints types.Hints) {
func (bhcs *httpBatchStore) sendWriteRequests(hashes ref.RefSlice, hints types.Hints) {
if len(hashes) == 0 {
return
}
bhcs.rateLimit <- struct{}{}
go func() {
defer func() {
<-bhcs.rateLimit
bhcs.unwrittenPuts = newUnwrittenPutCache()
bhcs.requestWg.Add(-numChunks)
d.Chk.NoError(serializedChunks.Close(), "Cannot close filesystem buffer.")
d.Chk.NoError(os.Remove(serializedChunks.Name()), "Cannot remove filesystem buffer.")
bhcs.unwrittenPuts.Clear(hashes)
bhcs.requestWg.Add(-len(hashes))
}()
var res *http.Response
var err error
for tryAgain := true; tryAgain; {
_, err := serializedChunks.Seek(0, 0)
d.Chk.NoError(err, "Could not reset filesystem buffer to offset 0.")
serializedChunks, pw := io.Pipe()
errChan := make(chan error)
go func() {
gw := gzip.NewWriter(pw)
err := bhcs.unwrittenPuts.ExtractChunks(hashes[0], hashes[len(hashes)-1], gw)
// The ordering of these is important. Close the gzipper to flush data to pw, then close pw so that the HTTP stack which is reading from serializedChunks knows it has everything, and only THEN block on errChan.
gw.Close()
pw.Close()
errChan <- err
}()
body := buildWriteValueRequest(serializedChunks, hints)
url := *bhcs.host
@@ -291,6 +286,7 @@ func (bhcs *httpBatchStore) sendWriteRequests(serializedChunks *os.File, numChun
res, err = bhcs.httpClient.Do(req)
d.Exp.NoError(err)
d.Exp.NoError(<-errChan)
defer closeResponse(res)
if tryAgain = res.StatusCode == httpStatusTooManyRequests; tryAgain {
+105 -16
View File
@@ -1,52 +1,141 @@
package datas
import (
"bytes"
"encoding/binary"
"io"
"io/ioutil"
"os"
"sync"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
func newUnwrittenPutCache() *unwrittenPutCache {
return &unwrittenPutCache{map[ref.Ref]chunks.Chunk{}, &sync.Mutex{}}
dir, err := ioutil.TempDir("", "")
d.Exp.NoError(err)
db, err := leveldb.OpenFile(dir, &opt.Options{
Compression: opt.NoCompression,
Filter: filter.NewBloomFilter(10), // 10 bits/key
OpenFilesCacheCapacity: 24,
WriteBuffer: 1 << 24, // 16MiB,
})
d.Chk.NoError(err, "opening put cache in %s", dir)
return &unwrittenPutCache{
orderedChunks: db,
chunkIndex: map[ref.Ref][]byte{},
dbDir: dir,
mu: &sync.Mutex{},
}
}
type unwrittenPutCache struct {
unwrittenPuts map[ref.Ref]chunks.Chunk
orderedChunks *leveldb.DB
chunkIndex map[ref.Ref][]byte
dbDir string
mu *sync.Mutex
next uint64
}
func (p *unwrittenPutCache) Add(c chunks.Chunk) bool {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.unwrittenPuts[c.Ref()]; !ok {
p.unwrittenPuts[c.Ref()] = c
hash := c.Ref()
dbKey, present := func() (dbKey []byte, present bool) {
p.mu.Lock()
defer p.mu.Unlock()
if _, present = p.chunkIndex[hash]; !present {
dbKey = toDbKey(p.next)
p.next++
p.chunkIndex[hash] = dbKey
}
return
}()
if !present {
buf := &bytes.Buffer{}
sz := chunks.NewSerializer(buf)
sz.Put(c)
sz.Close()
d.Chk.NoError(p.orderedChunks.Put(dbKey, buf.Bytes(), nil))
return true
}
return false
}
func (p *unwrittenPutCache) Has(c chunks.Chunk) (has bool) {
func (p *unwrittenPutCache) Has(hash ref.Ref) (has bool) {
p.mu.Lock()
defer p.mu.Unlock()
_, has = p.unwrittenPuts[c.Ref()]
_, has = p.chunkIndex[hash]
return
}
func (p *unwrittenPutCache) Get(r ref.Ref) chunks.Chunk {
func (p *unwrittenPutCache) Get(hash ref.Ref) chunks.Chunk {
// Don't use defer p.mu.Unlock() here, because I want reading from orderedChunks NOT to be guarded by the lock. LevelDB handles its own goroutine-safety.
p.mu.Lock()
defer p.mu.Unlock()
if c, ok := p.unwrittenPuts[r]; ok {
return c
dbKey, ok := p.chunkIndex[hash]
p.mu.Unlock()
if !ok {
return chunks.EmptyChunk
}
return chunks.EmptyChunk
data, err := p.orderedChunks.Get(dbKey, nil)
d.Chk.NoError(err)
reader := bytes.NewReader(data)
chunkChan := make(chan chunks.Chunk)
go chunks.DeserializeToChan(reader, chunkChan)
return <-chunkChan
}
func (p *unwrittenPutCache) Clear(hashes ref.RefSlice) {
deleteBatch := &leveldb.Batch{}
p.mu.Lock()
defer p.mu.Unlock()
for _, hash := range hashes {
delete(p.unwrittenPuts, hash)
deleteBatch.Delete(p.chunkIndex[hash])
delete(p.chunkIndex, hash)
}
p.mu.Unlock()
d.Chk.NoError(p.orderedChunks.Write(deleteBatch, nil))
return
}
func toDbKey(idx uint64) []byte {
buf := &bytes.Buffer{}
err := binary.Write(buf, binary.BigEndian, idx)
d.Chk.NoError(err)
return buf.Bytes()
}
func fromDbKey(key []byte) (idx uint64) {
err := binary.Read(bytes.NewReader(key), binary.BigEndian, &idx)
d.Chk.NoError(err)
return
}
func (p *unwrittenPutCache) ExtractChunks(start, end ref.Ref, w io.Writer) error {
p.mu.Lock()
iterRange := &util.Range{
Start: p.chunkIndex[start],
Limit: toDbKey(fromDbKey(p.chunkIndex[end]) + 1),
}
p.mu.Unlock()
iter := p.orderedChunks.NewIterator(iterRange, nil)
defer iter.Release()
for iter.Next() {
_, err := w.Write(iter.Value())
if err != nil {
return err
}
}
return nil
}
func (p *unwrittenPutCache) Destroy() error {
d.Chk.NoError(p.orderedChunks.Close())
return os.RemoveAll(p.dbDir)
}
+172
View File
@@ -0,0 +1,172 @@
package datas
import (
"bytes"
"sync"
"testing"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
"github.com/stretchr/testify/suite"
)
func TestLevelDBPutCacheSuite(t *testing.T) {
suite.Run(t, &LevelDBPutCacheSuite{})
}
type LevelDBPutCacheSuite struct {
suite.Suite
cache *unwrittenPutCache
values []types.Value
chnx map[ref.Ref]chunks.Chunk
}
func (suite *LevelDBPutCacheSuite) SetupTest() {
suite.cache = newUnwrittenPutCache()
suite.values = []types.Value{
types.NewString("abc"),
types.NewString("def"),
types.NewString("ghi"),
types.NewString("jkl"),
types.NewString("mno"),
}
suite.chnx = map[ref.Ref]chunks.Chunk{}
for _, v := range suite.values {
suite.chnx[v.Ref()] = types.EncodeValue(v, nil)
}
}
func (suite *LevelDBPutCacheSuite) TearDownTest() {
suite.cache.Destroy()
}
func (suite *LevelDBPutCacheSuite) TestAddTwice() {
chunk := suite.chnx[suite.values[0].Ref()]
suite.True(suite.cache.Add(chunk))
suite.False(suite.cache.Add(chunk))
}
func (suite *LevelDBPutCacheSuite) TestAddParallel() {
hashes := make(chan ref.Ref)
for _, chunk := range suite.chnx {
go func(c chunks.Chunk) {
suite.cache.Add(c)
hashes <- c.Ref()
}(chunk)
}
for i := 0; i < len(suite.values); i++ {
hash := <-hashes
suite.True(suite.cache.Has(hash))
delete(suite.chnx, hash)
}
close(hashes)
suite.Len(suite.chnx, 0)
}
func (suite *LevelDBPutCacheSuite) TestGetParallel() {
for _, c := range suite.chnx {
suite.cache.Add(c)
}
chunkChan := make(chan chunks.Chunk)
for hash := range suite.chnx {
go func(h ref.Ref) {
chunkChan <- suite.cache.Get(h)
}(hash)
}
for i := 0; i < len(suite.values); i++ {
c := <-chunkChan
delete(suite.chnx, c.Ref())
}
close(chunkChan)
suite.Len(suite.chnx, 0)
}
func (suite *LevelDBPutCacheSuite) TestClearParallel() {
hashes := ref.RefSlice{}
for h, c := range suite.chnx {
hashes = append(hashes, h)
suite.cache.Add(c)
}
wg := &sync.WaitGroup{}
wg.Add(2)
clear := func(hs ref.RefSlice) {
suite.cache.Clear(hs)
wg.Done()
}
keepIdx := 2
go clear(hashes[:keepIdx])
go clear(hashes[keepIdx+1:])
wg.Wait()
for i, hash := range hashes {
if i == keepIdx {
suite.True(suite.cache.Has(hash))
continue
}
suite.False(suite.cache.Has(hash))
}
}
func (suite *LevelDBPutCacheSuite) TestReaderSubset() {
orderedHashes := ref.RefSlice{}
for hash, c := range suite.chnx {
orderedHashes = append(orderedHashes, hash)
suite.cache.Add(c)
}
// Only iterate over the first 2 elements in the DB
chunkChan := suite.extractChunks(orderedHashes[0], orderedHashes[1])
origLen := len(orderedHashes)
for c := range chunkChan {
suite.Equal(orderedHashes[0], c.Ref())
orderedHashes = orderedHashes[1:]
}
suite.Len(orderedHashes, origLen-2)
}
func (suite *LevelDBPutCacheSuite) TestReaderSnapshot() {
hashes := ref.RefSlice{}
for h, c := range suite.chnx {
hashes = append(hashes, h)
suite.cache.Add(c)
}
chunkChan := suite.extractChunks(hashes[0], hashes[len(hashes)-1])
// Clear chunks from suite.cache. Should still be enumerated by reader
suite.cache.Clear(hashes)
for c := range chunkChan {
delete(suite.chnx, c.Ref())
}
suite.Len(suite.chnx, 0)
}
func (suite *LevelDBPutCacheSuite) TestExtractChunksOrder() {
orderedHashes := ref.RefSlice{}
for hash, c := range suite.chnx {
orderedHashes = append(orderedHashes, hash)
suite.cache.Add(c)
}
chunkChan := suite.extractChunks(orderedHashes[0], orderedHashes[len(orderedHashes)-1])
for c := range chunkChan {
suite.Equal(orderedHashes[0], c.Ref())
orderedHashes = orderedHashes[1:]
}
suite.Len(orderedHashes, 0)
}
func (suite *LevelDBPutCacheSuite) extractChunks(start, end ref.Ref) <-chan chunks.Chunk {
buf := &bytes.Buffer{}
err := suite.cache.ExtractChunks(start, end, buf)
suite.NoError(err)
chunkChan := make(chan chunks.Chunk)
go chunks.DeserializeToChan(buf, chunkChan)
return chunkChan
}