remove panics from ChunkStore::Get anad GetMany

This commit is contained in:
Brian Hendriks
2019-06-25 11:33:17 -07:00
parent b734b7dc7f
commit b232b28fd8
14 changed files with 120 additions and 58 deletions
@@ -98,22 +98,26 @@ func (dcs *DoltChunkStore) getRepoId() *remotesapi.RepoId {
}
// Get the Chunk for the value of the hash in the store. If the hash is absent from the store EmptyChunk is returned.
func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) chunks.Chunk {
func (dcs *DoltChunkStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
hashes := hash.HashSet{h: struct{}{}}
foundChan := make(chan *chunks.Chunk, 1)
dcs.GetMany(ctx, hashes, foundChan)
err := dcs.GetMany(ctx, hashes, foundChan)
if err != nil {
return chunks.EmptyChunk, err
}
select {
case ch := <-foundChan:
return *ch
return *ch, nil
default:
return chunks.EmptyChunk
return chunks.EmptyChunk, nil
}
}
// GetMany gets the Chunks with |hashes| from the store. On return, |foundChunks| will have been fully sent all chunks
// which have been found. Any non-present chunks will silently be ignored.
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
hashToChunk := dcs.cache.Get(hashes)
notCached := make([]hash.Hash, 0, len(hashes))
@@ -131,10 +135,11 @@ func (dcs *DoltChunkStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
err := dcs.readChunksAndCache(ctx, hashes, notCached, foundChunks)
if err != nil {
//follow noms convention
panic(err)
return err
}
}
return nil
}
const (
+2 -2
View File
@@ -16,12 +16,12 @@ import (
type ChunkStore interface {
// Get the Chunk for the value of the hash in the store. If the hash is
// absent from the store EmptyChunk is returned.
Get(ctx context.Context, h hash.Hash) Chunk
Get(ctx context.Context, h hash.Hash) (Chunk, error)
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk)
GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error
// Returns true iff the value at the address |h| is contained in the
// store
+2 -1
View File
@@ -69,7 +69,8 @@ func (suite *ChunkStoreTestSuite) TestChunkStoreCommitPut() {
func (suite *ChunkStoreTestSuite) TestChunkStoreGetNonExisting() {
store := suite.Factory.CreateStore(context.Background(), "ns")
h := hash.Parse("11111111111111111111111111111111")
c := store.Get(context.Background(), h)
c, err := store.Get(context.Background(), h)
suite.NoError(err)
suite.True(c.IsEmpty())
}
+14 -7
View File
@@ -32,13 +32,13 @@ func (ms *MemoryStorage) NewView() ChunkStore {
// Get retrieves the Chunk with the Hash h, returning EmptyChunk if it's not
// present.
func (ms *MemoryStorage) Get(ctx context.Context, h hash.Hash) Chunk {
func (ms *MemoryStorage) Get(ctx context.Context, h hash.Hash) (Chunk, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if c, ok := ms.data[h]; ok {
return c
return c, nil
}
return EmptyChunk
return EmptyChunk, nil
}
// Has returns true if the Chunk with the Hash h is present in ms.data, false
@@ -96,22 +96,29 @@ type MemoryStoreView struct {
storage *MemoryStorage
}
func (ms *MemoryStoreView) Get(ctx context.Context, h hash.Hash) Chunk {
func (ms *MemoryStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
if c, ok := ms.pending[h]; ok {
return c
return c, nil
}
return ms.storage.Get(ctx, h)
}
func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) {
func (ms *MemoryStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error {
for h := range hashes {
c := ms.Get(ctx, h)
c, err := ms.Get(ctx, h)
if err != nil {
return err
}
if !c.IsEmpty() {
foundChunks <- &c
}
}
return nil
}
func (ms *MemoryStoreView) Has(ctx context.Context, h hash.Hash) bool {
+7 -5
View File
@@ -12,13 +12,15 @@ import (
)
func assertInputInStore(input string, h hash.Hash, s ChunkStore, assert *assert.Assertions) {
chunk := s.Get(context.Background(), h)
chunk, err := s.Get(context.Background(), h)
assert.NoError(err)
assert.False(chunk.IsEmpty(), "Shouldn't get empty chunk for %s", h.String())
assert.Equal(input, string(chunk.Data()))
}
func assertInputNotInStore(input string, h hash.Hash, s ChunkStore, assert *assert.Assertions) {
chunk := s.Get(context.Background(), h)
chunk, err := s.Get(context.Background(), h)
assert.NoError(err)
assert.True(chunk.IsEmpty(), "Shouldn't get non-empty chunk for %s: %v", h.String(), chunk)
}
@@ -37,14 +39,14 @@ type TestStoreView struct {
Writes int
}
func (s *TestStoreView) Get(ctx context.Context, h hash.Hash) Chunk {
func (s *TestStoreView) Get(ctx context.Context, h hash.Hash) (Chunk, error) {
s.Reads++
return s.ChunkStore.Get(ctx, h)
}
func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) {
func (s *TestStoreView) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *Chunk) error {
s.Reads += len(hashes)
s.ChunkStore.GetMany(ctx, hashes, foundChunks)
return s.ChunkStore.GetMany(ctx, hashes, foundChunks)
}
func (s *TestStoreView) Has(ctx context.Context, h hash.Hash) bool {
+4 -1
View File
@@ -103,7 +103,10 @@ func getChunks(ctx context.Context, srcDB Database, batch hash.HashSlice, sample
go func() {
defer close(found)
srcDB.chunkStore().GetMany(ctx, batch.HashSet(), found)
err := srcDB.chunkStore().GetMany(ctx, batch.HashSet(), found)
// TODO: fix panics
d.PanicIfError(err)
}()
for c := range found {
@@ -7,6 +7,7 @@ package main
import (
"context"
"fmt"
"github.com/liquidata-inc/ld/dolt/go/store/d"
"sync"
"github.com/liquidata-inc/ld/dolt/go/store/chunks"
@@ -65,7 +66,9 @@ func verifyChunk(h hash.Hash, c chunks.Chunk) {
func benchmarkRead(openStore storeOpenFn, hashes hashSlice, src *dataSource, t assert.TestingT) {
store := openStore()
for _, h := range hashes {
verifyChunk(h, store.Get(context.Background(), h))
c, err := store.Get(context.Background(), h)
assert.NoError(t, err)
verifyChunk(h, c)
}
assert.NoError(t, store.Close())
}
@@ -104,7 +107,11 @@ func benchmarkReadMany(openStore storeOpenFn, hashes hashSlice, src *dataSource,
wg.Add(1)
go func(hashes hash.HashSlice) {
chunkChan := make(chan *chunks.Chunk, len(hashes))
store.GetMany(context.Background(), hashes.HashSet(), chunkChan)
err := store.GetMany(context.Background(), hashes.HashSet(), chunkChan)
// TODO: fix panics
d.PanicIfError(err)
close(chunkChan)
verifyChunks(hashes, chunkChan)
wg.Done()
@@ -117,7 +124,11 @@ func benchmarkReadMany(openStore storeOpenFn, hashes hashSlice, src *dataSource,
if len(batch) > 0 {
chunkChan := make(chan *chunks.Chunk, len(batch))
store.GetMany(context.Background(), batch.HashSet(), chunkChan)
err := store.GetMany(context.Background(), batch.HashSet(), chunkChan)
// TODO: fix panics
d.PanicIfError(err)
close(chunkChan)
verifyChunks(batch, chunkChan)
+2 -2
View File
@@ -24,11 +24,11 @@ func newFileBlockStore(w io.WriteCloser) chunks.ChunkStore {
return fileBlockStore{bufio.NewWriterSize(w, humanize.MiByte), w}
}
func (fb fileBlockStore) Get(ctx context.Context, h hash.Hash) chunks.Chunk {
func (fb fileBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
panic("not impl")
}
func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
func (fb fileBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
panic("not impl")
}
+2 -2
View File
@@ -17,11 +17,11 @@ func newNullBlockStore() chunks.ChunkStore {
return nullBlockStore{}
}
func (nb nullBlockStore) Get(ctx context.Context, h hash.Hash) chunks.Chunk {
func (nb nullBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
panic("not impl")
}
func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
func (nb nullBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
panic("not impl")
}
+11 -6
View File
@@ -140,14 +140,17 @@ func (suite *BlockStoreSuite) TestChunkStorePutMoreThanMemTable() {
func (suite *BlockStoreSuite) TestChunkStoreGetMany() {
inputs := [][]byte{make([]byte, testMemTableSize/2+1), make([]byte, testMemTableSize/2+1), []byte("abc")}
rand.Read(inputs[0])
rand.Read(inputs[1])
_, err := rand.Read(inputs[0])
suite.NoError(err)
_, err = rand.Read(inputs[1])
suite.NoError(err)
chnx := make([]chunks.Chunk, len(inputs))
for i, data := range inputs {
chnx[i] = chunks.NewChunk(data)
suite.store.Put(context.Background(), chnx[i])
}
suite.store.Commit(context.Background(), chnx[0].Hash(), suite.store.Root(context.Background())) // Commit writes
_, err = suite.store.Commit(context.Background(), chnx[0].Hash(), suite.store.Root(context.Background())) // Commit writes
suite.NoError(err)
hashes := make(hash.HashSlice, len(chnx))
for i, c := range chnx {
@@ -155,7 +158,7 @@ func (suite *BlockStoreSuite) TestChunkStoreGetMany() {
}
chunkChan := make(chan *chunks.Chunk, len(hashes))
suite.store.GetMany(context.Background(), hashes.HashSet(), chunkChan)
err = suite.store.GetMany(context.Background(), hashes.HashSet(), chunkChan)
close(chunkChan)
found := make(hash.HashSlice, 0)
@@ -425,13 +428,15 @@ func (fc *fakeConjoiner) Conjoin(ctx context.Context, upstream manifestContents,
}
func assertInputInStore(input []byte, h hash.Hash, s chunks.ChunkStore, assert *assert.Assertions) {
c := s.Get(context.Background(), h)
c, err := s.Get(context.Background(), h)
assert.NoError(err)
assert.False(c.IsEmpty(), "Shouldn't get empty chunk for %s", h.String())
assert.Zero(bytes.Compare(input, c.Data()), "%s != %s", string(input), string(c.Data()))
}
func (suite *BlockStoreSuite) TestChunkStoreGetNonExisting() {
h := hash.Parse("11111111111111111111111111111111")
c := suite.store.Get(context.Background(), h)
c, err := suite.store.Get(context.Background(), h)
suite.NoError(err)
suite.True(c.IsEmpty())
}
+3 -3
View File
@@ -50,15 +50,15 @@ func (nbc *NomsBlockCache) HasMany(ctx context.Context, hashes hash.HashSet) has
// Get retrieves the chunk referenced by hash. If the chunk is not present,
// Get returns the empty Chunk.
func (nbc *NomsBlockCache) Get(ctx context.Context, hash hash.Hash) chunks.Chunk {
func (nbc *NomsBlockCache) Get(ctx context.Context, hash hash.Hash) (chunks.Chunk, error) {
return nbc.chunks.Get(ctx, hash)
}
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
func (nbc *NomsBlockCache) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
nbc.chunks.GetMany(ctx, hashes, foundChunks)
func (nbc *NomsBlockCache) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
return nbc.chunks.GetMany(ctx, hashes, foundChunks)
}
// ExtractChunks writes the entire contents of the cache to chunkChan. The
+23 -10
View File
@@ -48,14 +48,21 @@ func TestStats(t *testing.T) {
assert.Equal(uint64(3), stats(store).HasLatency.Samples())
assert.Equal(uint64(3), stats(store).AddressesPerHas.Sum())
assert.False(store.Get(context.Background(), c1.Hash()).IsEmpty())
assert.False(store.Get(context.Background(), c2.Hash()).IsEmpty())
assert.False(store.Get(context.Background(), c3.Hash()).IsEmpty())
c, err := store.Get(context.Background(), c1.Hash())
assert.NoError(err)
assert.False(c.IsEmpty())
c, err = store.Get(context.Background(), c2.Hash())
assert.NoError(err)
assert.False(c.IsEmpty())
c, err = store.Get(context.Background(), c3.Hash())
assert.NoError(err)
assert.False(c.IsEmpty())
assert.Equal(uint64(3), stats(store).GetLatency.Samples())
assert.Equal(uint64(0), stats(store).FileReadLatency.Samples())
assert.Equal(uint64(3), stats(store).ChunksPerGet.Sum())
store.Commit(context.Background(), store.Root(context.Background()), store.Root(context.Background()))
_, err = store.Commit(context.Background(), store.Root(context.Background()), store.Root(context.Background()))
assert.NoError(err)
// Commit will update the manifest
assert.EqualValues(1, stats(store).WriteManifestLatency.Samples())
@@ -67,9 +74,12 @@ func TestStats(t *testing.T) {
assert.Equal(uint64(131), stats(store).BytesPerPersist.Sum())
// Now some gets that will incur read IO
store.Get(context.Background(), c1.Hash())
store.Get(context.Background(), c2.Hash())
store.Get(context.Background(), c3.Hash())
_, err = store.Get(context.Background(), c1.Hash())
assert.NoError(err)
_, err = store.Get(context.Background(), c2.Hash())
assert.NoError(err)
_, err = store.Get(context.Background(), c3.Hash())
assert.NoError(err)
assert.Equal(uint64(3), stats(store).FileReadLatency.Samples())
assert.Equal(uint64(27), stats(store).FileBytesPerRead.Sum())
@@ -83,16 +93,19 @@ func TestStats(t *testing.T) {
hashes[i] = c.Hash()
}
chunkChan := make(chan *chunks.Chunk, 3)
store.GetMany(context.Background(), hashes.HashSet(), chunkChan)
err = store.GetMany(context.Background(), hashes.HashSet(), chunkChan)
assert.NoError(err)
assert.Equal(uint64(4), stats(store).FileReadLatency.Samples())
assert.Equal(uint64(54), stats(store).FileBytesPerRead.Sum())
// Force a conjoin
store.c = inlineConjoiner{2}
store.Put(context.Background(), c4)
store.Commit(context.Background(), store.Root(context.Background()), store.Root(context.Background()))
_, err = store.Commit(context.Background(), store.Root(context.Background()), store.Root(context.Background()))
assert.NoError(err)
store.Put(context.Background(), c5)
store.Commit(context.Background(), store.Root(context.Background()), store.Root(context.Background()))
_, err = store.Commit(context.Background(), store.Root(context.Background()), store.Root(context.Background()))
assert.NoError(err)
assert.Equal(uint64(1), stats(store).ConjoinLatency.Samples())
// TODO: Once random conjoin hack is out, test other conjoin stats
+10 -7
View File
@@ -314,7 +314,7 @@ func (nbs *NomsBlockStore) addChunk(ctx context.Context, h addr, data []byte) bo
return true
}
func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) chunks.Chunk {
func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
t1 := time.Now()
defer func() {
nbs.stats.GetLatency.SampleTimeSince(t1)
@@ -330,17 +330,19 @@ func (nbs *NomsBlockStore) Get(ctx context.Context, h hash.Hash) chunks.Chunk {
}
return data, nbs.tables
}()
if data != nil {
return chunks.NewChunkWithHash(h, data)
}
if data := tables.get(ctx, a, nbs.stats); data != nil {
return chunks.NewChunkWithHash(h, data)
return chunks.NewChunkWithHash(h, data), nil
}
return chunks.EmptyChunk
if data := tables.get(ctx, a, nbs.stats); data != nil {
return chunks.NewChunkWithHash(h, data), nil
}
return chunks.EmptyChunk, nil
}
func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, foundChunks chan *chunks.Chunk) error {
t1 := time.Now()
reqs := toGetRecords(hashes)
@@ -370,6 +372,7 @@ func (nbs *NomsBlockStore) GetMany(ctx context.Context, hashes hash.HashSet, fou
wg.Wait()
}
return nil
}
func toGetRecords(hashes hash.HashSet) []getRecord {
+14 -2
View File
@@ -132,8 +132,13 @@ func (lvs *ValueStore) ReadValue(ctx context.Context, h hash.Hash) Value {
}
return chunks.EmptyChunk
}()
if chunk.IsEmpty() {
chunk = lvs.cs.Get(ctx, h)
var err error
chunk, err = lvs.cs.Get(ctx, h)
// TODO: fix panics
d.PanicIfError(err)
}
if chunk.IsEmpty() {
return nil
@@ -189,7 +194,14 @@ func (lvs *ValueStore) ReadManyValues(ctx context.Context, hashes hash.HashSlice
// Request remaining hashes from ChunkStore, processing the found chunks as they come in.
foundChunks := make(chan *chunks.Chunk, 16)
go func() { lvs.cs.GetMany(ctx, remaining, foundChunks); close(foundChunks) }()
go func() {
err := lvs.cs.GetMany(ctx, remaining, foundChunks)
// TODO: fix panics
d.PanicIfError(err)
close(foundChunks)
}()
for c := range foundChunks {
h := c.Hash()
foundValues[h] = decode(h, c)