diff --git a/datas/database.go b/datas/database.go index ec599fae1b..d0ac898bff 100644 --- a/datas/database.go +++ b/datas/database.go @@ -41,7 +41,7 @@ type Database interface { // This interface exists solely to allow RemoteDatabaseClient to pass back a gross side-channel thing for the purposes of pull. type batchSink interface { - SchedulePut(c chunks.Chunk, hints types.Hints) + SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) Flush() io.Closer } diff --git a/datas/http_batch_store.go b/datas/http_batch_store.go index 8fe776a6da..9a132c065f 100644 --- a/datas/http_batch_store.go +++ b/datas/http_batch_store.go @@ -39,7 +39,7 @@ type httpBatchStore struct { rateLimit chan struct{} requestWg *sync.WaitGroup workerWg *sync.WaitGroup - unwrittenPuts *unwrittenPutCache + unwrittenPuts *orderedChunkCache } func newHTTPBatchStore(baseURL, auth string) *httpBatchStore { @@ -57,7 +57,7 @@ func newHTTPBatchStore(baseURL, auth string) *httpBatchStore { rateLimit: make(chan struct{}, httpChunkSinkConcurrency), requestWg: &sync.WaitGroup{}, workerWg: &sync.WaitGroup{}, - unwrittenPuts: newUnwrittenPutCache(), + unwrittenPuts: newOrderedChunkCache(), } buffSink.batchGetRequests() buffSink.batchPutRequests() @@ -138,16 +138,16 @@ func (bhcs *httpBatchStore) batchGetRequests() { func (bhcs *httpBatchStore) sendGetRequests(req chunks.ReadRequest) { batch := chunks.ReadBatch{} - refs := map[ref.Ref]struct{}{} + hashes := hashSet{} addReq := func(req chunks.ReadRequest) { - r := req.Ref() - batch[r] = append(batch[r], req.Outstanding()) - refs[r] = struct{}{} + hash := req.Ref() + batch[hash] = append(batch[hash], req.Outstanding()) + hashes.Insert(hash) } addReq(req) - for drained := false; !drained && len(refs) < readBufferSize; { + for drained := false; !drained && len(hashes) < readBufferSize; { select { case req := <-bhcs.readQueue: addReq(req) @@ -164,17 +164,17 @@ func (bhcs *httpBatchStore) sendGetRequests(req chunks.ReadRequest) { batch.Close() }() - bhcs.getRefs(refs, &batch) + bhcs.getRefs(hashes, &batch) <-bhcs.rateLimit }() } -func (bhcs *httpBatchStore) getRefs(refs types.Hints, cs chunks.ChunkSink) { +func (bhcs *httpBatchStore) getRefs(hashes hashSet, cs chunks.ChunkSink) { // POST http:///getRefs/. Post body: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent. u := *bhcs.host u.Path = httprouter.CleanPath(bhcs.host.Path + constants.GetRefsPath) - req := newRequest("POST", bhcs.auth, u.String(), buildGetRefsRequest(refs), http.Header{ + req := newRequest("POST", bhcs.auth, u.String(), buildGetRefsRequest(hashes), http.Header{ "Accept-Encoding": {"gzip"}, "Content-Type": {"application/x-www-form-urlencoded"}, }) @@ -196,8 +196,8 @@ func (bhcs *httpBatchStore) getRefs(refs types.Hints, cs chunks.ChunkSink) { chunks.Deserialize(reader, cs, rl) } -func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, hints types.Hints) { - if !bhcs.unwrittenPuts.Add(c) { +func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) { + if !bhcs.unwrittenPuts.Insert(c, refHeight) { return } @@ -211,9 +211,13 @@ func (bhcs *httpBatchStore) batchPutRequests() { defer bhcs.workerWg.Done() hints := types.Hints{} - hashes := ref.RefSlice{} + hashes := hashSet{} handleRequest := func(wr writeRequest) { - hashes = append(hashes, wr.hash) + if hashes.Has(wr.hash) { + bhcs.requestWg.Done() // Already have a put enqueued for wr.hash. + } else { + hashes.Insert(wr.hash) + } for hint := range wr.hints { hints[hint] = struct{}{} } @@ -238,9 +242,8 @@ func (bhcs *httpBatchStore) batchPutRequests() { default: drained = true bhcs.sendWriteRequests(hashes, hints) // Takes ownership of hashes, hints - hints = types.Hints{} - hashes = ref.RefSlice{} + hashes = hashSet{} } } } @@ -248,7 +251,7 @@ func (bhcs *httpBatchStore) batchPutRequests() { }() } -func (bhcs *httpBatchStore) sendWriteRequests(hashes ref.RefSlice, hints types.Hints) { +func (bhcs *httpBatchStore) sendWriteRequests(hashes hashSet, hints types.Hints) { if len(hashes) == 0 { return } @@ -267,7 +270,7 @@ func (bhcs *httpBatchStore) sendWriteRequests(hashes ref.RefSlice, hints types.H errChan := make(chan error) go func() { gw := gzip.NewWriter(pw) - err := bhcs.unwrittenPuts.ExtractChunks(hashes[0], hashes[len(hashes)-1], gw) + err := bhcs.unwrittenPuts.ExtractChunks(hashes, gw) // The ordering of these is important. Close the gzipper to flush data to pw, then close pw so that the HTTP stack which is reading from serializedChunks knows it has everything, and only THEN block on errChan. gw.Close() pw.Close() diff --git a/datas/http_batch_store_test.go b/datas/http_batch_store_test.go index effab186b0..dfba5b8b1e 100644 --- a/datas/http_batch_store_test.go +++ b/datas/http_batch_store_test.go @@ -83,7 +83,7 @@ func (suite *HTTPBatchStoreSuite) TearDownTest() { func (suite *HTTPBatchStoreSuite) TestPutChunk() { c := types.EncodeValue(types.NewString("abc"), nil) - suite.store.SchedulePut(c, types.Hints{}) + suite.store.SchedulePut(c, 1, types.Hints{}) suite.store.Flush() suite.Equal(1, suite.cs.Writes) @@ -96,10 +96,10 @@ func (suite *HTTPBatchStoreSuite) TestPutChunksInOrder() { } l := types.NewList() for _, val := range vals { - suite.store.SchedulePut(types.EncodeValue(val, nil), types.Hints{}) + suite.store.SchedulePut(types.EncodeValue(val, nil), 1, types.Hints{}) l = l.Append(types.NewRef(val)) } - suite.store.SchedulePut(types.EncodeValue(l, nil), types.Hints{}) + suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{}) suite.store.Flush() suite.Equal(3, suite.cs.Writes) @@ -117,7 +117,7 @@ func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() { suite.NoError(suite.cs.PutMany(chnx)) l := types.NewList(types.NewRef(vals[0]), types.NewRef(vals[1])) - suite.store.SchedulePut(types.EncodeValue(l, nil), types.Hints{ + suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{ chnx[0].Ref(): struct{}{}, chnx[1].Ref(): struct{}{}, }) @@ -163,10 +163,10 @@ func (suite *HTTPBatchStoreSuite) TestPutChunksBackpressure() { } l := types.NewList() for _, v := range vals { - bs.SchedulePut(types.EncodeValue(v, nil), types.Hints{}) + bs.SchedulePut(types.EncodeValue(v, nil), 1, types.Hints{}) l = l.Append(types.NewRef(v)) } - bs.SchedulePut(types.EncodeValue(l, nil), types.Hints{}) + bs.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{}) bs.Flush() suite.Equal(6, suite.cs.Writes) diff --git a/datas/not_a_batch_sink.go b/datas/not_a_batch_sink.go index 9b2ee04069..e66166a7b7 100644 --- a/datas/not_a_batch_sink.go +++ b/datas/not_a_batch_sink.go @@ -26,7 +26,7 @@ type notABatchSink struct { rateLimit chan struct{} requestWg *sync.WaitGroup workerWg *sync.WaitGroup - unwrittenPuts *unwrittenPutCache + unwrittenPuts *orderedChunkCache } func newNotABatchSink(host *url.URL, auth string) *notABatchSink { @@ -40,7 +40,7 @@ func newNotABatchSink(host *url.URL, auth string) *notABatchSink { rateLimit: make(chan struct{}, httpChunkSinkConcurrency), requestWg: &sync.WaitGroup{}, workerWg: &sync.WaitGroup{}, - unwrittenPuts: newUnwrittenPutCache(), + unwrittenPuts: newOrderedChunkCache(), } sink.batchPutRequests() return sink @@ -63,8 +63,8 @@ func (bhcs *notABatchSink) Close() (e error) { return } -func (bhcs *notABatchSink) SchedulePut(c chunks.Chunk, hints types.Hints) { - if !bhcs.unwrittenPuts.Add(c) { +func (bhcs *notABatchSink) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) { + if !bhcs.unwrittenPuts.Insert(c, refHeight) { return } @@ -119,7 +119,7 @@ func (bhcs *notABatchSink) batchPutRequests() { func (bhcs *notABatchSink) sendWriteRequests(chnx []chunks.Chunk) { bhcs.rateLimit <- struct{}{} go func() { - hashes := make(ref.RefSlice, len(chnx)) + hashes := make(hashSet, len(chnx)) defer func() { bhcs.unwrittenPuts.Clear(hashes) bhcs.requestWg.Add(-len(chnx)) @@ -128,8 +128,8 @@ func (bhcs *notABatchSink) sendWriteRequests(chnx []chunks.Chunk) { body := &bytes.Buffer{} gw := gzip.NewWriter(body) sz := chunks.NewSerializer(gw) - for i, chunk := range chnx { - hashes[i] = chunk.Ref() + for _, chunk := range chnx { + hashes.Insert(chunk.Ref()) sz.Put(chunk) } sz.Close() diff --git a/datas/not_a_batch_sink_test.go b/datas/not_a_batch_sink_test.go index 50762c87c9..8515736c69 100644 --- a/datas/not_a_batch_sink_test.go +++ b/datas/not_a_batch_sink_test.go @@ -53,10 +53,10 @@ func (suite *NotABatchSinkSuite) TestPutChunks() { } l := types.NewList() for _, v := range vals { - suite.store.SchedulePut(types.EncodeValue(v, nil), types.Hints{}) + suite.store.SchedulePut(types.EncodeValue(v, nil), 1, types.Hints{}) l = l.Append(types.NewRef(v)) } - suite.store.SchedulePut(types.EncodeValue(l, nil), types.Hints{}) + suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{}) suite.store.Flush() suite.Equal(3, suite.cs.Writes) diff --git a/datas/pull.go b/datas/pull.go index 9f493b066c..bcdc7f84ac 100644 --- a/datas/pull.go +++ b/datas/pull.go @@ -25,8 +25,8 @@ func CopyReachableChunksP(source, sink Database, sourceRef, exclude types.Ref, c mu := sync.Mutex{} excludeCallback := func(r types.Ref) bool { mu.Lock() + defer mu.Unlock() excludeRefs[r.TargetRef()] = true - mu.Unlock() return false } @@ -42,8 +42,8 @@ func CopyReachableChunksP(source, sink Database, sourceRef, exclude types.Ref, c func copyWorker(source, sink Database, sourceRef types.Ref, stopCb walk.SomeChunksStopCallback, concurrency int) { bs := sink.batchSink() - walk.SomeChunksP(sourceRef, source.batchStore(), stopCb, func(c chunks.Chunk) { - bs.SchedulePut(c, types.Hints{}) + walk.SomeChunksP(sourceRef, source.batchStore(), stopCb, func(r types.Ref, c chunks.Chunk) { + bs.SchedulePut(c, r.Height(), types.Hints{}) }, concurrency) bs.Flush() diff --git a/datas/put_cache.go b/datas/put_cache.go index 10cde0ba56..094149fc7a 100644 --- a/datas/put_cache.go +++ b/datas/put_cache.go @@ -14,10 +14,9 @@ import ( "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/opt" - "github.com/syndtr/goleveldb/leveldb/util" ) -func newUnwrittenPutCache() *unwrittenPutCache { +func newOrderedChunkCache() *orderedChunkCache { dir, err := ioutil.TempDir("", "") d.Exp.NoError(err) db, err := leveldb.OpenFile(dir, &opt.Options{ @@ -27,30 +26,41 @@ func newUnwrittenPutCache() *unwrittenPutCache { WriteBuffer: 1 << 24, // 16MiB, }) d.Chk.NoError(err, "opening put cache in %s", dir) - return &unwrittenPutCache{ + return &orderedChunkCache{ orderedChunks: db, chunkIndex: map[ref.Ref][]byte{}, dbDir: dir, - mu: &sync.Mutex{}, + mu: &sync.RWMutex{}, } } -type unwrittenPutCache struct { +// orderedChunkCache holds Chunks, allowing them to be retrieved by hash or enumerated in ref-height order. +type orderedChunkCache struct { orderedChunks *leveldb.DB chunkIndex map[ref.Ref][]byte dbDir string - mu *sync.Mutex - next uint64 + mu *sync.RWMutex } -func (p *unwrittenPutCache) Add(c chunks.Chunk) bool { +type hashSet map[ref.Ref]struct{} + +func (hs hashSet) Insert(hash ref.Ref) { + hs[hash] = struct{}{} +} + +func (hs hashSet) Has(hash ref.Ref) (has bool) { + _, has = hs[hash] + return +} + +// Insert can be called from any goroutine to store c in the cache. If c is successfully added to the cache, Insert returns true. If c was already in the cache, Insert returns false. +func (p *orderedChunkCache) Insert(c chunks.Chunk, refHeight uint64) bool { hash := c.Ref() dbKey, present := func() (dbKey []byte, present bool) { p.mu.Lock() defer p.mu.Unlock() if _, present = p.chunkIndex[hash]; !present { - dbKey = toDbKey(p.next) - p.next++ + dbKey = toDbKey(refHeight, c.Ref()) p.chunkIndex[hash] = dbKey } return @@ -67,18 +77,19 @@ func (p *unwrittenPutCache) Add(c chunks.Chunk) bool { return false } -func (p *unwrittenPutCache) Has(hash ref.Ref) (has bool) { - p.mu.Lock() - defer p.mu.Unlock() +func (p *orderedChunkCache) has(hash ref.Ref) (has bool) { + p.mu.RLock() + defer p.mu.RUnlock() _, has = p.chunkIndex[hash] return } -func (p *unwrittenPutCache) Get(hash ref.Ref) chunks.Chunk { - // Don't use defer p.mu.Unlock() here, because I want reading from orderedChunks NOT to be guarded by the lock. LevelDB handles its own goroutine-safety. - p.mu.Lock() +// Get can be called from any goroutine to retrieve the chunk referenced by hash. If the chunk is not present, Get returns the empty Chunk. +func (p *orderedChunkCache) Get(hash ref.Ref) chunks.Chunk { + // Don't use defer p.mu.RUnlock() here, because I want reading from orderedChunks NOT to be guarded by the lock. LevelDB handles its own goroutine-safety. + p.mu.RLock() dbKey, ok := p.chunkIndex[hash] - p.mu.Unlock() + p.mu.RUnlock() if !ok { return chunks.EmptyChunk @@ -91,10 +102,11 @@ func (p *unwrittenPutCache) Get(hash ref.Ref) chunks.Chunk { return <-chunkChan } -func (p *unwrittenPutCache) Clear(hashes ref.RefSlice) { +// Clear can be called from any goroutine to remove chunks referenced by the given hashes from the cache. +func (p *orderedChunkCache) Clear(hashes hashSet) { deleteBatch := &leveldb.Batch{} p.mu.Lock() - for _, hash := range hashes { + for hash := range hashes { deleteBatch.Delete(p.chunkIndex[hash]) delete(p.chunkIndex, hash) } @@ -103,30 +115,39 @@ func (p *unwrittenPutCache) Clear(hashes ref.RefSlice) { return } -func toDbKey(idx uint64) []byte { - buf := &bytes.Buffer{} - err := binary.Write(buf, binary.BigEndian, idx) +var uint64Size = binary.Size(uint64(0)) + +// toDbKey takes a refHeight and a hash and returns a binary key suitable for use with LevelDB. The default sort order used by LevelDB ensures that these keys (and their associated values) will be iterated in ref-height order. +func toDbKey(refHeight uint64, hash ref.Ref) []byte { + digest := hash.DigestSlice() + buf := bytes.NewBuffer(make([]byte, 0, uint64Size+binary.Size(digest))) + err := binary.Write(buf, binary.BigEndian, refHeight) + d.Chk.NoError(err) + err = binary.Write(buf, binary.BigEndian, digest) d.Chk.NoError(err) return buf.Bytes() } -func fromDbKey(key []byte) (idx uint64) { - err := binary.Read(bytes.NewReader(key), binary.BigEndian, &idx) +func fromDbKey(key []byte) (uint64, ref.Ref) { + refHeight := uint64(0) + r := bytes.NewReader(key) + err := binary.Read(r, binary.BigEndian, &refHeight) d.Chk.NoError(err) - return + digest := ref.Sha1Digest{} + err = binary.Read(r, binary.BigEndian, &digest) + d.Chk.NoError(err) + return refHeight, ref.New(digest) } -func (p *unwrittenPutCache) ExtractChunks(start, end ref.Ref, w io.Writer) error { - p.mu.Lock() - iterRange := &util.Range{ - Start: p.chunkIndex[start], - Limit: toDbKey(fromDbKey(p.chunkIndex[end]) + 1), - } - p.mu.Unlock() - - iter := p.orderedChunks.NewIterator(iterRange, nil) +// ExtractChunks can be called from any goroutine to write Chunks referenced by the given hashes to w. The chunks are ordered by ref-height. Chunks of the same height are written in an unspecified order, relative to one another. +func (p *orderedChunkCache) ExtractChunks(hashes hashSet, w io.Writer) error { + iter := p.orderedChunks.NewIterator(nil, nil) defer iter.Release() for iter.Next() { + _, hash := fromDbKey(iter.Key()) + if !hashes.Has(hash) { + continue + } _, err := w.Write(iter.Value()) if err != nil { return err @@ -135,7 +156,7 @@ func (p *unwrittenPutCache) ExtractChunks(start, end ref.Ref, w io.Writer) error return nil } -func (p *unwrittenPutCache) Destroy() error { +func (p *orderedChunkCache) Destroy() error { d.Chk.NoError(p.orderedChunks.Close()) return os.RemoveAll(p.dbDir) } diff --git a/datas/put_cache_test.go b/datas/put_cache_test.go index 0350bfc774..3896fccb30 100644 --- a/datas/put_cache_test.go +++ b/datas/put_cache_test.go @@ -2,6 +2,7 @@ package datas import ( "bytes" + "math/rand" "sync" "testing" @@ -17,13 +18,13 @@ func TestLevelDBPutCacheSuite(t *testing.T) { type LevelDBPutCacheSuite struct { suite.Suite - cache *unwrittenPutCache + cache *orderedChunkCache values []types.Value chnx map[ref.Ref]chunks.Chunk } func (suite *LevelDBPutCacheSuite) SetupTest() { - suite.cache = newUnwrittenPutCache() + suite.cache = newOrderedChunkCache() suite.values = []types.Value{ types.NewString("abc"), types.NewString("def"), @@ -43,22 +44,22 @@ func (suite *LevelDBPutCacheSuite) TearDownTest() { func (suite *LevelDBPutCacheSuite) TestAddTwice() { chunk := suite.chnx[suite.values[0].Ref()] - suite.True(suite.cache.Add(chunk)) - suite.False(suite.cache.Add(chunk)) + suite.True(suite.cache.Insert(chunk, 1)) + suite.False(suite.cache.Insert(chunk, 1)) } func (suite *LevelDBPutCacheSuite) TestAddParallel() { hashes := make(chan ref.Ref) for _, chunk := range suite.chnx { go func(c chunks.Chunk) { - suite.cache.Add(c) + suite.cache.Insert(c, 1) hashes <- c.Ref() }(chunk) } for i := 0; i < len(suite.values); i++ { hash := <-hashes - suite.True(suite.cache.Has(hash)) + suite.True(suite.cache.has(hash)) delete(suite.chnx, hash) } close(hashes) @@ -67,7 +68,7 @@ func (suite *LevelDBPutCacheSuite) TestAddParallel() { func (suite *LevelDBPutCacheSuite) TestGetParallel() { for _, c := range suite.chnx { - suite.cache.Add(c) + suite.cache.Insert(c, 1) } chunkChan := make(chan chunks.Chunk) @@ -86,57 +87,65 @@ func (suite *LevelDBPutCacheSuite) TestGetParallel() { } func (suite *LevelDBPutCacheSuite) TestClearParallel() { - hashes := ref.RefSlice{} - for h, c := range suite.chnx { - hashes = append(hashes, h) - suite.cache.Add(c) + keepIdx := 2 + toClear1, toClear2 := hashSet{}, hashSet{} + for i, v := range suite.values { + suite.cache.Insert(types.EncodeValue(v, nil), 1) + if i < keepIdx { + toClear1.Insert(v.Ref()) + } else if i > keepIdx { + toClear2.Insert(v.Ref()) + } } wg := &sync.WaitGroup{} wg.Add(2) - clear := func(hs ref.RefSlice) { + clear := func(hs hashSet) { suite.cache.Clear(hs) wg.Done() } - keepIdx := 2 - go clear(hashes[:keepIdx]) - go clear(hashes[keepIdx+1:]) + + go clear(toClear1) + go clear(toClear2) wg.Wait() - for i, hash := range hashes { + for i, v := range suite.values { if i == keepIdx { - suite.True(suite.cache.Has(hash)) + suite.True(suite.cache.has(v.Ref())) continue } - suite.False(suite.cache.Has(hash)) + suite.False(suite.cache.has(v.Ref())) } } func (suite *LevelDBPutCacheSuite) TestReaderSubset() { - orderedHashes := ref.RefSlice{} + toExtract := hashSet{} for hash, c := range suite.chnx { - orderedHashes = append(orderedHashes, hash) - suite.cache.Add(c) + if len(toExtract) < 2 { + toExtract.Insert(hash) + } + suite.cache.Insert(c, 1) } // Only iterate over the first 2 elements in the DB - chunkChan := suite.extractChunks(orderedHashes[0], orderedHashes[1]) - origLen := len(orderedHashes) + chunkChan := suite.extractChunks(toExtract) + count := 0 for c := range chunkChan { - suite.Equal(orderedHashes[0], c.Ref()) - orderedHashes = orderedHashes[1:] + if suite.Contains(toExtract, c.Ref()) { + count++ + } } - suite.Len(orderedHashes, origLen-2) + suite.Equal(len(toExtract), count) } func (suite *LevelDBPutCacheSuite) TestReaderSnapshot() { - hashes := ref.RefSlice{} + hashes := hashSet{} for h, c := range suite.chnx { - hashes = append(hashes, h) - suite.cache.Add(c) + hashes.Insert(h) + suite.cache.Insert(c, 1) } - chunkChan := suite.extractChunks(hashes[0], hashes[len(hashes)-1]) + chunkChan := suite.extractChunks(hashes) // Clear chunks from suite.cache. Should still be enumerated by reader suite.cache.Clear(hashes) @@ -147,13 +156,18 @@ func (suite *LevelDBPutCacheSuite) TestReaderSnapshot() { } func (suite *LevelDBPutCacheSuite) TestExtractChunksOrder() { - orderedHashes := ref.RefSlice{} + maxHeight := len(suite.chnx) + orderedHashes := make(ref.RefSlice, maxHeight) + toExtract := hashSet{} + heights := rand.Perm(maxHeight) for hash, c := range suite.chnx { - orderedHashes = append(orderedHashes, hash) - suite.cache.Add(c) + toExtract.Insert(hash) + orderedHashes[heights[0]] = hash + suite.cache.Insert(c, uint64(heights[0])) + heights = heights[1:] } - chunkChan := suite.extractChunks(orderedHashes[0], orderedHashes[len(orderedHashes)-1]) + chunkChan := suite.extractChunks(toExtract) for c := range chunkChan { suite.Equal(orderedHashes[0], c.Ref()) orderedHashes = orderedHashes[1:] @@ -161,9 +175,9 @@ func (suite *LevelDBPutCacheSuite) TestExtractChunksOrder() { suite.Len(orderedHashes, 0) } -func (suite *LevelDBPutCacheSuite) extractChunks(start, end ref.Ref) <-chan chunks.Chunk { +func (suite *LevelDBPutCacheSuite) extractChunks(hashes hashSet) <-chan chunks.Chunk { buf := &bytes.Buffer{} - err := suite.cache.ExtractChunks(start, end, buf) + err := suite.cache.ExtractChunks(hashes, buf) suite.NoError(err) chunkChan := make(chan chunks.Chunk) diff --git a/types/batch_store.go b/types/batch_store.go index 0eb99ca614..64f35b14c5 100644 --- a/types/batch_store.go +++ b/types/batch_store.go @@ -12,9 +12,9 @@ type BatchStore interface { // Get returns from the store the Value Chunk by r. If r is absent from the store, chunks.EmptyChunk is returned. Get(r ref.Ref) chunks.Chunk - // SchedulePut enqueues a write for the Chunk c, using the provided hints to assist in validation. It may or may not block until c is persisted. Validation requires checking that all refs embedded in c are themselves valid, which could naively be done by resolving each one. Instead, hints provides a (smaller) set of refs that point to Chunks that themselves contain many of c's refs. Thus, by checking only the hinted Chunks, c can be validated with fewer read operations. + // SchedulePut enqueues a write for the Chunk c with the given refHeight. Typically, the Value which was encoded to provide c can also be queried for its refHeight. The call may or may not block until c is persisted. The provided hints are used to assist in validation. Validation requires checking that all refs embedded in c are themselves valid, which could naively be done by resolving each one. Instead, hints provides a (smaller) set of refs that point to Chunks that themselves contain many of c's refs. Thus, by checking only the hinted Chunks, c can be validated with fewer read operations. // c may or may not be persisted when Put() returns, but is guaranteed to be persistent after a call to Flush() or Close(). - SchedulePut(c chunks.Chunk, hints Hints) + SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) // Flush causes enqueued Puts to be persisted. Flush() @@ -40,7 +40,7 @@ func (lbs *BatchStoreAdaptor) Get(ref ref.Ref) chunks.Chunk { } // SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints. -func (lbs *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, hints Hints) { +func (lbs *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) { lbs.cs.Put(c) } diff --git a/types/value_store.go b/types/value_store.go index 5ed507aab3..cbb026c63c 100644 --- a/types/value_store.go +++ b/types/value_store.go @@ -67,12 +67,13 @@ func (lvs *ValueStore) WriteValue(v Value) Ref { c := EncodeValue(v, lvs) d.Chk.False(c.IsEmpty()) hash := c.Ref() - r := constructRef(MakeRefType(v.Type()), hash, maxChunkHeight(v)+1) + height := maxChunkHeight(v) + 1 + r := constructRef(MakeRefType(v.Type()), hash, height) if lvs.isPresent(hash) { return r } hints := lvs.checkChunksInCache(v) - lvs.bs.SchedulePut(c, hints) + lvs.bs.SchedulePut(c, height, hints) lvs.set(hash, hintedChunk{v.Type(), hash}) return r } diff --git a/walk/walk.go b/walk/walk.go index abb2e2296d..dd77aa8329 100644 --- a/walk/walk.go +++ b/walk/walk.go @@ -95,8 +95,8 @@ func doTreeWalkP(v types.Value, vr types.ValueReader, cb SomeCallback, concurren // SomeChunksStopCallback is called for every unique types.Ref |r|. Return true to stop walking beyond |r|. type SomeChunksStopCallback func(r types.Ref) bool -// SomeChunksChunkCallback is called for every unique chunks.Chunk |c| which wasn't stopped from SomeChunksStopCallback. -type SomeChunksChunkCallback func(c chunks.Chunk) +// SomeChunksChunkCallback is called for every unique chunks.Chunk |c| which wasn't stopped from SomeChunksStopCallback. |r| is a types.Ref referring to |c|. +type SomeChunksChunkCallback func(r types.Ref, c chunks.Chunk) // SomeChunksP invokes callbacks on every unique chunk reachable from |r| in top-down order. Callbacks are invoked only once for each chunk regardless of how many times the chunk appears. // @@ -130,7 +130,7 @@ func SomeChunksP(r types.Ref, bs types.BatchStore, stopCb SomeChunksStopCallback d.Chk.False(c.IsEmpty()) if chunkCb != nil { - chunkCb(c) + chunkCb(r, c) } }