Go: In httpBatchStore, order outgoing Chunks by ref-height (#1519)

* Go: In httpBatchStore, order outgoing Chunks by ref-height

Rather than sending chunks to the server in the order that callers put
them into an httpBatchStore, instead order them by ref-height at the
time that Flush() is called.  Making this change requires that
ref-height for a given chunk be passed in through the SchedulePut()
API, because Chunks (by design) don't carry much metadata about
themselves.

Toward #1510
This commit is contained in:
cmasone-attic
2016-05-18 08:33:49 -07:00
parent df1c91843d
commit cefb014d15
11 changed files with 155 additions and 116 deletions

View File

@@ -41,7 +41,7 @@ type Database interface {
// This interface exists solely to allow RemoteDatabaseClient to pass back a gross side-channel thing for the purposes of pull.
type batchSink interface {
SchedulePut(c chunks.Chunk, hints types.Hints)
SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints)
Flush()
io.Closer
}

View File

@@ -39,7 +39,7 @@ type httpBatchStore struct {
rateLimit chan struct{}
requestWg *sync.WaitGroup
workerWg *sync.WaitGroup
unwrittenPuts *unwrittenPutCache
unwrittenPuts *orderedChunkCache
}
func newHTTPBatchStore(baseURL, auth string) *httpBatchStore {
@@ -57,7 +57,7 @@ func newHTTPBatchStore(baseURL, auth string) *httpBatchStore {
rateLimit: make(chan struct{}, httpChunkSinkConcurrency),
requestWg: &sync.WaitGroup{},
workerWg: &sync.WaitGroup{},
unwrittenPuts: newUnwrittenPutCache(),
unwrittenPuts: newOrderedChunkCache(),
}
buffSink.batchGetRequests()
buffSink.batchPutRequests()
@@ -138,16 +138,16 @@ func (bhcs *httpBatchStore) batchGetRequests() {
func (bhcs *httpBatchStore) sendGetRequests(req chunks.ReadRequest) {
batch := chunks.ReadBatch{}
refs := map[ref.Ref]struct{}{}
hashes := hashSet{}
addReq := func(req chunks.ReadRequest) {
r := req.Ref()
batch[r] = append(batch[r], req.Outstanding())
refs[r] = struct{}{}
hash := req.Ref()
batch[hash] = append(batch[hash], req.Outstanding())
hashes.Insert(hash)
}
addReq(req)
for drained := false; !drained && len(refs) < readBufferSize; {
for drained := false; !drained && len(hashes) < readBufferSize; {
select {
case req := <-bhcs.readQueue:
addReq(req)
@@ -164,17 +164,17 @@ func (bhcs *httpBatchStore) sendGetRequests(req chunks.ReadRequest) {
batch.Close()
}()
bhcs.getRefs(refs, &batch)
bhcs.getRefs(hashes, &batch)
<-bhcs.rateLimit
}()
}
func (bhcs *httpBatchStore) getRefs(refs types.Hints, cs chunks.ChunkSink) {
func (bhcs *httpBatchStore) getRefs(hashes hashSet, cs chunks.ChunkSink) {
// POST http://<host>/getRefs/. Post body: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent.
u := *bhcs.host
u.Path = httprouter.CleanPath(bhcs.host.Path + constants.GetRefsPath)
req := newRequest("POST", bhcs.auth, u.String(), buildGetRefsRequest(refs), http.Header{
req := newRequest("POST", bhcs.auth, u.String(), buildGetRefsRequest(hashes), http.Header{
"Accept-Encoding": {"gzip"},
"Content-Type": {"application/x-www-form-urlencoded"},
})
@@ -196,8 +196,8 @@ func (bhcs *httpBatchStore) getRefs(refs types.Hints, cs chunks.ChunkSink) {
chunks.Deserialize(reader, cs, rl)
}
func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, hints types.Hints) {
if !bhcs.unwrittenPuts.Add(c) {
func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
if !bhcs.unwrittenPuts.Insert(c, refHeight) {
return
}
@@ -211,9 +211,13 @@ func (bhcs *httpBatchStore) batchPutRequests() {
defer bhcs.workerWg.Done()
hints := types.Hints{}
hashes := ref.RefSlice{}
hashes := hashSet{}
handleRequest := func(wr writeRequest) {
hashes = append(hashes, wr.hash)
if hashes.Has(wr.hash) {
bhcs.requestWg.Done() // Already have a put enqueued for wr.hash.
} else {
hashes.Insert(wr.hash)
}
for hint := range wr.hints {
hints[hint] = struct{}{}
}
@@ -238,9 +242,8 @@ func (bhcs *httpBatchStore) batchPutRequests() {
default:
drained = true
bhcs.sendWriteRequests(hashes, hints) // Takes ownership of hashes, hints
hints = types.Hints{}
hashes = ref.RefSlice{}
hashes = hashSet{}
}
}
}
@@ -248,7 +251,7 @@ func (bhcs *httpBatchStore) batchPutRequests() {
}()
}
func (bhcs *httpBatchStore) sendWriteRequests(hashes ref.RefSlice, hints types.Hints) {
func (bhcs *httpBatchStore) sendWriteRequests(hashes hashSet, hints types.Hints) {
if len(hashes) == 0 {
return
}
@@ -267,7 +270,7 @@ func (bhcs *httpBatchStore) sendWriteRequests(hashes ref.RefSlice, hints types.H
errChan := make(chan error)
go func() {
gw := gzip.NewWriter(pw)
err := bhcs.unwrittenPuts.ExtractChunks(hashes[0], hashes[len(hashes)-1], gw)
err := bhcs.unwrittenPuts.ExtractChunks(hashes, gw)
// The ordering of these is important. Close the gzipper to flush data to pw, then close pw so that the HTTP stack which is reading from serializedChunks knows it has everything, and only THEN block on errChan.
gw.Close()
pw.Close()

View File

@@ -83,7 +83,7 @@ func (suite *HTTPBatchStoreSuite) TearDownTest() {
func (suite *HTTPBatchStoreSuite) TestPutChunk() {
c := types.EncodeValue(types.NewString("abc"), nil)
suite.store.SchedulePut(c, types.Hints{})
suite.store.SchedulePut(c, 1, types.Hints{})
suite.store.Flush()
suite.Equal(1, suite.cs.Writes)
@@ -96,10 +96,10 @@ func (suite *HTTPBatchStoreSuite) TestPutChunksInOrder() {
}
l := types.NewList()
for _, val := range vals {
suite.store.SchedulePut(types.EncodeValue(val, nil), types.Hints{})
suite.store.SchedulePut(types.EncodeValue(val, nil), 1, types.Hints{})
l = l.Append(types.NewRef(val))
}
suite.store.SchedulePut(types.EncodeValue(l, nil), types.Hints{})
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{})
suite.store.Flush()
suite.Equal(3, suite.cs.Writes)
@@ -117,7 +117,7 @@ func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() {
suite.NoError(suite.cs.PutMany(chnx))
l := types.NewList(types.NewRef(vals[0]), types.NewRef(vals[1]))
suite.store.SchedulePut(types.EncodeValue(l, nil), types.Hints{
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{
chnx[0].Ref(): struct{}{},
chnx[1].Ref(): struct{}{},
})
@@ -163,10 +163,10 @@ func (suite *HTTPBatchStoreSuite) TestPutChunksBackpressure() {
}
l := types.NewList()
for _, v := range vals {
bs.SchedulePut(types.EncodeValue(v, nil), types.Hints{})
bs.SchedulePut(types.EncodeValue(v, nil), 1, types.Hints{})
l = l.Append(types.NewRef(v))
}
bs.SchedulePut(types.EncodeValue(l, nil), types.Hints{})
bs.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{})
bs.Flush()
suite.Equal(6, suite.cs.Writes)

View File

@@ -26,7 +26,7 @@ type notABatchSink struct {
rateLimit chan struct{}
requestWg *sync.WaitGroup
workerWg *sync.WaitGroup
unwrittenPuts *unwrittenPutCache
unwrittenPuts *orderedChunkCache
}
func newNotABatchSink(host *url.URL, auth string) *notABatchSink {
@@ -40,7 +40,7 @@ func newNotABatchSink(host *url.URL, auth string) *notABatchSink {
rateLimit: make(chan struct{}, httpChunkSinkConcurrency),
requestWg: &sync.WaitGroup{},
workerWg: &sync.WaitGroup{},
unwrittenPuts: newUnwrittenPutCache(),
unwrittenPuts: newOrderedChunkCache(),
}
sink.batchPutRequests()
return sink
@@ -63,8 +63,8 @@ func (bhcs *notABatchSink) Close() (e error) {
return
}
func (bhcs *notABatchSink) SchedulePut(c chunks.Chunk, hints types.Hints) {
if !bhcs.unwrittenPuts.Add(c) {
func (bhcs *notABatchSink) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
if !bhcs.unwrittenPuts.Insert(c, refHeight) {
return
}
@@ -119,7 +119,7 @@ func (bhcs *notABatchSink) batchPutRequests() {
func (bhcs *notABatchSink) sendWriteRequests(chnx []chunks.Chunk) {
bhcs.rateLimit <- struct{}{}
go func() {
hashes := make(ref.RefSlice, len(chnx))
hashes := make(hashSet, len(chnx))
defer func() {
bhcs.unwrittenPuts.Clear(hashes)
bhcs.requestWg.Add(-len(chnx))
@@ -128,8 +128,8 @@ func (bhcs *notABatchSink) sendWriteRequests(chnx []chunks.Chunk) {
body := &bytes.Buffer{}
gw := gzip.NewWriter(body)
sz := chunks.NewSerializer(gw)
for i, chunk := range chnx {
hashes[i] = chunk.Ref()
for _, chunk := range chnx {
hashes.Insert(chunk.Ref())
sz.Put(chunk)
}
sz.Close()

View File

@@ -53,10 +53,10 @@ func (suite *NotABatchSinkSuite) TestPutChunks() {
}
l := types.NewList()
for _, v := range vals {
suite.store.SchedulePut(types.EncodeValue(v, nil), types.Hints{})
suite.store.SchedulePut(types.EncodeValue(v, nil), 1, types.Hints{})
l = l.Append(types.NewRef(v))
}
suite.store.SchedulePut(types.EncodeValue(l, nil), types.Hints{})
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{})
suite.store.Flush()
suite.Equal(3, suite.cs.Writes)

View File

@@ -25,8 +25,8 @@ func CopyReachableChunksP(source, sink Database, sourceRef, exclude types.Ref, c
mu := sync.Mutex{}
excludeCallback := func(r types.Ref) bool {
mu.Lock()
defer mu.Unlock()
excludeRefs[r.TargetRef()] = true
mu.Unlock()
return false
}
@@ -42,8 +42,8 @@ func CopyReachableChunksP(source, sink Database, sourceRef, exclude types.Ref, c
func copyWorker(source, sink Database, sourceRef types.Ref, stopCb walk.SomeChunksStopCallback, concurrency int) {
bs := sink.batchSink()
walk.SomeChunksP(sourceRef, source.batchStore(), stopCb, func(c chunks.Chunk) {
bs.SchedulePut(c, types.Hints{})
walk.SomeChunksP(sourceRef, source.batchStore(), stopCb, func(r types.Ref, c chunks.Chunk) {
bs.SchedulePut(c, r.Height(), types.Hints{})
}, concurrency)
bs.Flush()

View File

@@ -14,10 +14,9 @@ import (
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
func newUnwrittenPutCache() *unwrittenPutCache {
func newOrderedChunkCache() *orderedChunkCache {
dir, err := ioutil.TempDir("", "")
d.Exp.NoError(err)
db, err := leveldb.OpenFile(dir, &opt.Options{
@@ -27,30 +26,41 @@ func newUnwrittenPutCache() *unwrittenPutCache {
WriteBuffer: 1 << 24, // 16MiB,
})
d.Chk.NoError(err, "opening put cache in %s", dir)
return &unwrittenPutCache{
return &orderedChunkCache{
orderedChunks: db,
chunkIndex: map[ref.Ref][]byte{},
dbDir: dir,
mu: &sync.Mutex{},
mu: &sync.RWMutex{},
}
}
type unwrittenPutCache struct {
// orderedChunkCache holds Chunks, allowing them to be retrieved by hash or enumerated in ref-height order.
type orderedChunkCache struct {
orderedChunks *leveldb.DB
chunkIndex map[ref.Ref][]byte
dbDir string
mu *sync.Mutex
next uint64
mu *sync.RWMutex
}
func (p *unwrittenPutCache) Add(c chunks.Chunk) bool {
type hashSet map[ref.Ref]struct{}
func (hs hashSet) Insert(hash ref.Ref) {
hs[hash] = struct{}{}
}
func (hs hashSet) Has(hash ref.Ref) (has bool) {
_, has = hs[hash]
return
}
// Insert can be called from any goroutine to store c in the cache. If c is successfully added to the cache, Insert returns true. If c was already in the cache, Insert returns false.
func (p *orderedChunkCache) Insert(c chunks.Chunk, refHeight uint64) bool {
hash := c.Ref()
dbKey, present := func() (dbKey []byte, present bool) {
p.mu.Lock()
defer p.mu.Unlock()
if _, present = p.chunkIndex[hash]; !present {
dbKey = toDbKey(p.next)
p.next++
dbKey = toDbKey(refHeight, c.Ref())
p.chunkIndex[hash] = dbKey
}
return
@@ -67,18 +77,19 @@ func (p *unwrittenPutCache) Add(c chunks.Chunk) bool {
return false
}
func (p *unwrittenPutCache) Has(hash ref.Ref) (has bool) {
p.mu.Lock()
defer p.mu.Unlock()
func (p *orderedChunkCache) has(hash ref.Ref) (has bool) {
p.mu.RLock()
defer p.mu.RUnlock()
_, has = p.chunkIndex[hash]
return
}
func (p *unwrittenPutCache) Get(hash ref.Ref) chunks.Chunk {
// Don't use defer p.mu.Unlock() here, because I want reading from orderedChunks NOT to be guarded by the lock. LevelDB handles its own goroutine-safety.
p.mu.Lock()
// Get can be called from any goroutine to retrieve the chunk referenced by hash. If the chunk is not present, Get returns the empty Chunk.
func (p *orderedChunkCache) Get(hash ref.Ref) chunks.Chunk {
// Don't use defer p.mu.RUnlock() here, because I want reading from orderedChunks NOT to be guarded by the lock. LevelDB handles its own goroutine-safety.
p.mu.RLock()
dbKey, ok := p.chunkIndex[hash]
p.mu.Unlock()
p.mu.RUnlock()
if !ok {
return chunks.EmptyChunk
@@ -91,10 +102,11 @@ func (p *unwrittenPutCache) Get(hash ref.Ref) chunks.Chunk {
return <-chunkChan
}
func (p *unwrittenPutCache) Clear(hashes ref.RefSlice) {
// Clear can be called from any goroutine to remove chunks referenced by the given hashes from the cache.
func (p *orderedChunkCache) Clear(hashes hashSet) {
deleteBatch := &leveldb.Batch{}
p.mu.Lock()
for _, hash := range hashes {
for hash := range hashes {
deleteBatch.Delete(p.chunkIndex[hash])
delete(p.chunkIndex, hash)
}
@@ -103,30 +115,39 @@ func (p *unwrittenPutCache) Clear(hashes ref.RefSlice) {
return
}
func toDbKey(idx uint64) []byte {
buf := &bytes.Buffer{}
err := binary.Write(buf, binary.BigEndian, idx)
var uint64Size = binary.Size(uint64(0))
// toDbKey takes a refHeight and a hash and returns a binary key suitable for use with LevelDB. The default sort order used by LevelDB ensures that these keys (and their associated values) will be iterated in ref-height order.
func toDbKey(refHeight uint64, hash ref.Ref) []byte {
digest := hash.DigestSlice()
buf := bytes.NewBuffer(make([]byte, 0, uint64Size+binary.Size(digest)))
err := binary.Write(buf, binary.BigEndian, refHeight)
d.Chk.NoError(err)
err = binary.Write(buf, binary.BigEndian, digest)
d.Chk.NoError(err)
return buf.Bytes()
}
func fromDbKey(key []byte) (idx uint64) {
err := binary.Read(bytes.NewReader(key), binary.BigEndian, &idx)
func fromDbKey(key []byte) (uint64, ref.Ref) {
refHeight := uint64(0)
r := bytes.NewReader(key)
err := binary.Read(r, binary.BigEndian, &refHeight)
d.Chk.NoError(err)
return
digest := ref.Sha1Digest{}
err = binary.Read(r, binary.BigEndian, &digest)
d.Chk.NoError(err)
return refHeight, ref.New(digest)
}
func (p *unwrittenPutCache) ExtractChunks(start, end ref.Ref, w io.Writer) error {
p.mu.Lock()
iterRange := &util.Range{
Start: p.chunkIndex[start],
Limit: toDbKey(fromDbKey(p.chunkIndex[end]) + 1),
}
p.mu.Unlock()
iter := p.orderedChunks.NewIterator(iterRange, nil)
// ExtractChunks can be called from any goroutine to write Chunks referenced by the given hashes to w. The chunks are ordered by ref-height. Chunks of the same height are written in an unspecified order, relative to one another.
func (p *orderedChunkCache) ExtractChunks(hashes hashSet, w io.Writer) error {
iter := p.orderedChunks.NewIterator(nil, nil)
defer iter.Release()
for iter.Next() {
_, hash := fromDbKey(iter.Key())
if !hashes.Has(hash) {
continue
}
_, err := w.Write(iter.Value())
if err != nil {
return err
@@ -135,7 +156,7 @@ func (p *unwrittenPutCache) ExtractChunks(start, end ref.Ref, w io.Writer) error
return nil
}
func (p *unwrittenPutCache) Destroy() error {
func (p *orderedChunkCache) Destroy() error {
d.Chk.NoError(p.orderedChunks.Close())
return os.RemoveAll(p.dbDir)
}

View File

@@ -2,6 +2,7 @@ package datas
import (
"bytes"
"math/rand"
"sync"
"testing"
@@ -17,13 +18,13 @@ func TestLevelDBPutCacheSuite(t *testing.T) {
type LevelDBPutCacheSuite struct {
suite.Suite
cache *unwrittenPutCache
cache *orderedChunkCache
values []types.Value
chnx map[ref.Ref]chunks.Chunk
}
func (suite *LevelDBPutCacheSuite) SetupTest() {
suite.cache = newUnwrittenPutCache()
suite.cache = newOrderedChunkCache()
suite.values = []types.Value{
types.NewString("abc"),
types.NewString("def"),
@@ -43,22 +44,22 @@ func (suite *LevelDBPutCacheSuite) TearDownTest() {
func (suite *LevelDBPutCacheSuite) TestAddTwice() {
chunk := suite.chnx[suite.values[0].Ref()]
suite.True(suite.cache.Add(chunk))
suite.False(suite.cache.Add(chunk))
suite.True(suite.cache.Insert(chunk, 1))
suite.False(suite.cache.Insert(chunk, 1))
}
func (suite *LevelDBPutCacheSuite) TestAddParallel() {
hashes := make(chan ref.Ref)
for _, chunk := range suite.chnx {
go func(c chunks.Chunk) {
suite.cache.Add(c)
suite.cache.Insert(c, 1)
hashes <- c.Ref()
}(chunk)
}
for i := 0; i < len(suite.values); i++ {
hash := <-hashes
suite.True(suite.cache.Has(hash))
suite.True(suite.cache.has(hash))
delete(suite.chnx, hash)
}
close(hashes)
@@ -67,7 +68,7 @@ func (suite *LevelDBPutCacheSuite) TestAddParallel() {
func (suite *LevelDBPutCacheSuite) TestGetParallel() {
for _, c := range suite.chnx {
suite.cache.Add(c)
suite.cache.Insert(c, 1)
}
chunkChan := make(chan chunks.Chunk)
@@ -86,57 +87,65 @@ func (suite *LevelDBPutCacheSuite) TestGetParallel() {
}
func (suite *LevelDBPutCacheSuite) TestClearParallel() {
hashes := ref.RefSlice{}
for h, c := range suite.chnx {
hashes = append(hashes, h)
suite.cache.Add(c)
keepIdx := 2
toClear1, toClear2 := hashSet{}, hashSet{}
for i, v := range suite.values {
suite.cache.Insert(types.EncodeValue(v, nil), 1)
if i < keepIdx {
toClear1.Insert(v.Ref())
} else if i > keepIdx {
toClear2.Insert(v.Ref())
}
}
wg := &sync.WaitGroup{}
wg.Add(2)
clear := func(hs ref.RefSlice) {
clear := func(hs hashSet) {
suite.cache.Clear(hs)
wg.Done()
}
keepIdx := 2
go clear(hashes[:keepIdx])
go clear(hashes[keepIdx+1:])
go clear(toClear1)
go clear(toClear2)
wg.Wait()
for i, hash := range hashes {
for i, v := range suite.values {
if i == keepIdx {
suite.True(suite.cache.Has(hash))
suite.True(suite.cache.has(v.Ref()))
continue
}
suite.False(suite.cache.Has(hash))
suite.False(suite.cache.has(v.Ref()))
}
}
func (suite *LevelDBPutCacheSuite) TestReaderSubset() {
orderedHashes := ref.RefSlice{}
toExtract := hashSet{}
for hash, c := range suite.chnx {
orderedHashes = append(orderedHashes, hash)
suite.cache.Add(c)
if len(toExtract) < 2 {
toExtract.Insert(hash)
}
suite.cache.Insert(c, 1)
}
// Only iterate over the first 2 elements in the DB
chunkChan := suite.extractChunks(orderedHashes[0], orderedHashes[1])
origLen := len(orderedHashes)
chunkChan := suite.extractChunks(toExtract)
count := 0
for c := range chunkChan {
suite.Equal(orderedHashes[0], c.Ref())
orderedHashes = orderedHashes[1:]
if suite.Contains(toExtract, c.Ref()) {
count++
}
}
suite.Len(orderedHashes, origLen-2)
suite.Equal(len(toExtract), count)
}
func (suite *LevelDBPutCacheSuite) TestReaderSnapshot() {
hashes := ref.RefSlice{}
hashes := hashSet{}
for h, c := range suite.chnx {
hashes = append(hashes, h)
suite.cache.Add(c)
hashes.Insert(h)
suite.cache.Insert(c, 1)
}
chunkChan := suite.extractChunks(hashes[0], hashes[len(hashes)-1])
chunkChan := suite.extractChunks(hashes)
// Clear chunks from suite.cache. Should still be enumerated by reader
suite.cache.Clear(hashes)
@@ -147,13 +156,18 @@ func (suite *LevelDBPutCacheSuite) TestReaderSnapshot() {
}
func (suite *LevelDBPutCacheSuite) TestExtractChunksOrder() {
orderedHashes := ref.RefSlice{}
maxHeight := len(suite.chnx)
orderedHashes := make(ref.RefSlice, maxHeight)
toExtract := hashSet{}
heights := rand.Perm(maxHeight)
for hash, c := range suite.chnx {
orderedHashes = append(orderedHashes, hash)
suite.cache.Add(c)
toExtract.Insert(hash)
orderedHashes[heights[0]] = hash
suite.cache.Insert(c, uint64(heights[0]))
heights = heights[1:]
}
chunkChan := suite.extractChunks(orderedHashes[0], orderedHashes[len(orderedHashes)-1])
chunkChan := suite.extractChunks(toExtract)
for c := range chunkChan {
suite.Equal(orderedHashes[0], c.Ref())
orderedHashes = orderedHashes[1:]
@@ -161,9 +175,9 @@ func (suite *LevelDBPutCacheSuite) TestExtractChunksOrder() {
suite.Len(orderedHashes, 0)
}
func (suite *LevelDBPutCacheSuite) extractChunks(start, end ref.Ref) <-chan chunks.Chunk {
func (suite *LevelDBPutCacheSuite) extractChunks(hashes hashSet) <-chan chunks.Chunk {
buf := &bytes.Buffer{}
err := suite.cache.ExtractChunks(start, end, buf)
err := suite.cache.ExtractChunks(hashes, buf)
suite.NoError(err)
chunkChan := make(chan chunks.Chunk)

View File

@@ -12,9 +12,9 @@ type BatchStore interface {
// Get returns from the store the Value Chunk by r. If r is absent from the store, chunks.EmptyChunk is returned.
Get(r ref.Ref) chunks.Chunk
// SchedulePut enqueues a write for the Chunk c, using the provided hints to assist in validation. It may or may not block until c is persisted. Validation requires checking that all refs embedded in c are themselves valid, which could naively be done by resolving each one. Instead, hints provides a (smaller) set of refs that point to Chunks that themselves contain many of c's refs. Thus, by checking only the hinted Chunks, c can be validated with fewer read operations.
// SchedulePut enqueues a write for the Chunk c with the given refHeight. Typically, the Value which was encoded to provide c can also be queried for its refHeight. The call may or may not block until c is persisted. The provided hints are used to assist in validation. Validation requires checking that all refs embedded in c are themselves valid, which could naively be done by resolving each one. Instead, hints provides a (smaller) set of refs that point to Chunks that themselves contain many of c's refs. Thus, by checking only the hinted Chunks, c can be validated with fewer read operations.
// c may or may not be persisted when Put() returns, but is guaranteed to be persistent after a call to Flush() or Close().
SchedulePut(c chunks.Chunk, hints Hints)
SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints)
// Flush causes enqueued Puts to be persisted.
Flush()
@@ -40,7 +40,7 @@ func (lbs *BatchStoreAdaptor) Get(ref ref.Ref) chunks.Chunk {
}
// SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints.
func (lbs *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, hints Hints) {
func (lbs *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) {
lbs.cs.Put(c)
}

View File

@@ -67,12 +67,13 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
c := EncodeValue(v, lvs)
d.Chk.False(c.IsEmpty())
hash := c.Ref()
r := constructRef(MakeRefType(v.Type()), hash, maxChunkHeight(v)+1)
height := maxChunkHeight(v) + 1
r := constructRef(MakeRefType(v.Type()), hash, height)
if lvs.isPresent(hash) {
return r
}
hints := lvs.checkChunksInCache(v)
lvs.bs.SchedulePut(c, hints)
lvs.bs.SchedulePut(c, height, hints)
lvs.set(hash, hintedChunk{v.Type(), hash})
return r
}

View File

@@ -95,8 +95,8 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren
// SomeChunksStopCallback is called for every unique types.Ref |r|. Return true to stop walking beyond |r|.
type SomeChunksStopCallback func(r types.Ref) bool
// SomeChunksChunkCallback is called for every unique chunks.Chunk |c| which wasn't stopped from SomeChunksStopCallback.
type SomeChunksChunkCallback func(c chunks.Chunk)
// SomeChunksChunkCallback is called for every unique chunks.Chunk |c| which wasn't stopped from SomeChunksStopCallback. |r| is a types.Ref referring to |c|.
type SomeChunksChunkCallback func(r types.Ref, c chunks.Chunk)
// SomeChunksP invokes callbacks on every unique chunk reachable from |r| in top-down order. Callbacks are invoked only once for each chunk regardless of how many times the chunk appears.
//
@@ -130,7 +130,7 @@ func SomeChunksP(r types.Ref, bs types.BatchStore, stopCb SomeChunksStopCallback
d.Chk.False(c.IsEmpty())
if chunkCb != nil {
chunkCb(c)
chunkCb(r, c)
}
}