mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-30 10:45:18 -06:00
Merge pull request #94 from arv/read-through-store
Add a ReadThroughStore which caches the values in a caching store
This commit is contained in:
96
chunks/read_through_store.go
Normal file
96
chunks/read_through_store.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/attic-labs/noms/ref"
|
||||
)
|
||||
|
||||
// ReadThroughStore is a store that consists of two other stores. A caching and
|
||||
// a backing store. All reads check the caching store first and if the ref is
|
||||
// present there the caching store is used. If not present the backing store is
|
||||
// used and the value gets cached in the caching store. All writes go directly
|
||||
// to the backing store.
|
||||
type ReadThroughStore struct {
|
||||
cachingStore ChunkStore
|
||||
backingStore ChunkStore
|
||||
}
|
||||
|
||||
func NewReadThroughStore(cachingStore ChunkStore, backingStore ChunkStore) ReadThroughStore {
|
||||
return ReadThroughStore{cachingStore, backingStore}
|
||||
}
|
||||
|
||||
// forwardCloser closes multiple io.Closer objects.
|
||||
type forwardCloser struct {
|
||||
io.Reader
|
||||
cs []io.Closer
|
||||
}
|
||||
|
||||
func (fc forwardCloser) Close() error {
|
||||
for _, c := range fc.cs {
|
||||
if err := c.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rts ReadThroughStore) Get(ref ref.Ref) (io.ReadCloser, error) {
|
||||
r, err := rts.cachingStore.Get(ref)
|
||||
if r != nil && err == nil {
|
||||
return r, err
|
||||
}
|
||||
r, err = rts.backingStore.Get(ref)
|
||||
if r == nil || err != nil {
|
||||
return r, err
|
||||
}
|
||||
|
||||
w := rts.cachingStore.Put()
|
||||
tr := io.TeeReader(r, w)
|
||||
return forwardCloser{tr, []io.Closer{r, w}}, nil
|
||||
}
|
||||
|
||||
type readThroughChunkWriter struct {
|
||||
cws []ChunkWriter
|
||||
}
|
||||
|
||||
func (w readThroughChunkWriter) Ref() (r ref.Ref, err error) {
|
||||
for _, cw := range w.cws {
|
||||
if r, err = cw.Ref(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (w readThroughChunkWriter) Write(p []byte) (n int, err error) {
|
||||
for _, cw := range w.cws {
|
||||
if n, err = cw.Write(p); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (w readThroughChunkWriter) Close() (err error) {
|
||||
for _, cw := range w.cws {
|
||||
if err = cw.Close(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (rts ReadThroughStore) Put() ChunkWriter {
|
||||
bw := rts.backingStore.Put()
|
||||
cw := rts.cachingStore.Put()
|
||||
return readThroughChunkWriter{[]ChunkWriter{bw, cw}}
|
||||
}
|
||||
|
||||
func (rts ReadThroughStore) Root() ref.Ref {
|
||||
return rts.backingStore.Root()
|
||||
}
|
||||
|
||||
func (rts ReadThroughStore) UpdateRoot(current, last ref.Ref) bool {
|
||||
return rts.backingStore.UpdateRoot(current, last)
|
||||
}
|
||||
127
chunks/read_through_store_test.go
Normal file
127
chunks/read_through_store_test.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/ref"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestReadThroughStoreGet(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
bs := &TestStore{}
|
||||
|
||||
// Prepopulate the backing store with "abc".
|
||||
input := "abc"
|
||||
w := bs.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
assert.NoError(err)
|
||||
ref, err := w.Ref()
|
||||
assert.NoError(err)
|
||||
|
||||
// See http://www.di-mgt.com.au/sha_testvectors.html
|
||||
assert.Equal("sha1-a9993e364706816aba3e25717850c26c9cd0d89d", ref.String())
|
||||
|
||||
assert.Equal(1, bs.Len())
|
||||
assert.Equal(1, bs.Writes)
|
||||
assert.Equal(0, bs.Reads)
|
||||
|
||||
cs := &TestStore{}
|
||||
rts := NewReadThroughStore(cs, bs)
|
||||
|
||||
// Now read "abc". It is not yet in the cache so we hit the backing store.
|
||||
reader, err := rts.Get(ref)
|
||||
assert.NoError(err)
|
||||
data, err := ioutil.ReadAll(reader)
|
||||
assert.NoError(err)
|
||||
assert.Equal(input, string(data))
|
||||
reader.Close()
|
||||
|
||||
assert.Equal(1, bs.Len())
|
||||
assert.Equal(1, cs.Len())
|
||||
assert.Equal(1, cs.Writes)
|
||||
assert.Equal(1, bs.Writes)
|
||||
assert.Equal(1, cs.Reads)
|
||||
assert.Equal(1, bs.Reads)
|
||||
|
||||
// Reading it again should not hit the backing store.
|
||||
reader, err = rts.Get(ref)
|
||||
assert.NoError(err)
|
||||
data, err = ioutil.ReadAll(reader)
|
||||
assert.NoError(err)
|
||||
assert.Equal(input, string(data))
|
||||
reader.Close()
|
||||
|
||||
assert.Equal(1, bs.Len())
|
||||
assert.Equal(1, cs.Len())
|
||||
assert.Equal(1, cs.Writes)
|
||||
assert.Equal(1, bs.Writes)
|
||||
assert.Equal(2, cs.Reads)
|
||||
assert.Equal(1, bs.Reads)
|
||||
}
|
||||
|
||||
func TestReadThroughStorePut(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
bs := &TestStore{}
|
||||
cs := &TestStore{}
|
||||
rts := NewReadThroughStore(cs, bs)
|
||||
|
||||
// Storing "abc" should store it to both backing and caching store.
|
||||
input := "abc"
|
||||
w := rts.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
assert.NoError(err)
|
||||
ref, err := w.Ref()
|
||||
assert.NoError(err)
|
||||
|
||||
// See http://www.di-mgt.com.au/sha_testvectors.html
|
||||
assert.Equal("sha1-a9993e364706816aba3e25717850c26c9cd0d89d", ref.String())
|
||||
|
||||
assertInputInStore("abc", ref, bs, assert)
|
||||
assertInputInStore("abc", ref, cs, assert)
|
||||
assertInputInStore("abc", ref, rts, assert)
|
||||
}
|
||||
|
||||
type failPutStore struct {
|
||||
MemoryStore
|
||||
}
|
||||
|
||||
type failChunkWriter struct {
|
||||
memoryChunkWriter
|
||||
}
|
||||
|
||||
func (w *failChunkWriter) Ref() (r ref.Ref, err error) {
|
||||
return ref.Ref{}, errors.New("Failed Ref")
|
||||
}
|
||||
|
||||
func (s *failPutStore) Put() ChunkWriter {
|
||||
mcw := memoryChunkWriter{&s.MemoryStore, &bytes.Buffer{}, ref.Ref{}}
|
||||
return &failChunkWriter{mcw}
|
||||
}
|
||||
|
||||
func TestReadThroughStorePutFails(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
bs := &failPutStore{MemoryStore{}}
|
||||
cs := &TestStore{}
|
||||
rts := NewReadThroughStore(cs, bs)
|
||||
|
||||
// Storing "abc" should store it to both backing and caching store.
|
||||
input := "abc"
|
||||
w := rts.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
assert.NoError(err)
|
||||
_, err = w.Ref()
|
||||
assert.Error(err)
|
||||
|
||||
// See http://www.di-mgt.com.au/sha_testvectors.html
|
||||
ref := ref.MustParse("sha1-a9993e364706816aba3e25717850c26c9cd0d89d")
|
||||
assertInputNotInStore("abc", ref, bs, assert)
|
||||
assertInputNotInStore("abc", ref, cs, assert)
|
||||
assertInputNotInStore("abc", ref, rts, assert)
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/attic-labs/noms/ref"
|
||||
@@ -14,3 +15,25 @@ func assertInputInStore(input string, ref ref.Ref, s ChunkStore, assert *assert.
|
||||
assert.NoError(err)
|
||||
assert.Equal(input, string(data))
|
||||
}
|
||||
|
||||
func assertInputNotInStore(input string, ref ref.Ref, s ChunkStore, assert *assert.Assertions) {
|
||||
reader, err := s.Get(ref)
|
||||
assert.NoError(err)
|
||||
assert.Nil(reader)
|
||||
}
|
||||
|
||||
type TestStore struct {
|
||||
MemoryStore
|
||||
Reads int
|
||||
Writes int
|
||||
}
|
||||
|
||||
func (s *TestStore) Get(ref ref.Ref) (io.ReadCloser, error) {
|
||||
s.Reads++
|
||||
return s.MemoryStore.Get(ref)
|
||||
}
|
||||
|
||||
func (s *TestStore) Put() ChunkWriter {
|
||||
s.Writes++
|
||||
return s.MemoryStore.Put()
|
||||
}
|
||||
|
||||
@@ -20,18 +20,18 @@ func TestResolvedFuture(t *testing.T) {
|
||||
func TestUnresolvedFuture(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
cs := &testStore{ChunkStore: &chunks.MemoryStore{}}
|
||||
cs := &chunks.TestStore{}
|
||||
v := NewString("hello")
|
||||
r, _ := WriteValue(v, cs)
|
||||
|
||||
f := futureFromRef(r)
|
||||
v2, err := f.Deref(cs)
|
||||
assert.Equal(1, cs.count)
|
||||
assert.Equal(1, cs.Reads)
|
||||
assert.NoError(err)
|
||||
assert.True(v.Equals(v2))
|
||||
|
||||
v3, err := f.Deref(cs)
|
||||
assert.Equal(1, cs.count)
|
||||
assert.Equal(1, cs.Reads)
|
||||
assert.NoError(err)
|
||||
assert.True(v2.Equals(v3))
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ func isEncodedOutOfLine(v Value) int {
|
||||
|
||||
func TestIncrementalLoadList(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &testStore{ChunkStore: &chunks.MemoryStore{}}
|
||||
cs := &chunks.TestStore{}
|
||||
|
||||
expected := NewList(testVals...)
|
||||
ref, err := WriteValue(expected, cs)
|
||||
@@ -46,24 +46,24 @@ func TestIncrementalLoadList(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
actual := actualVar.(List)
|
||||
|
||||
expectedCount := cs.count
|
||||
expectedCount := cs.Reads
|
||||
assert.Equal(1, expectedCount)
|
||||
for i := uint64(0); i < expected.Len(); i++ {
|
||||
v := actual.Get(i)
|
||||
assert.True(expected.Get(i).Equals(v))
|
||||
|
||||
expectedCount += isEncodedOutOfLine(v)
|
||||
assert.Equal(expectedCount, cs.count)
|
||||
assert.Equal(expectedCount, cs.Reads)
|
||||
|
||||
// Do it again to make sure multiple derefs don't do multiple loads.
|
||||
v = actual.Get(i)
|
||||
assert.Equal(expectedCount, cs.count)
|
||||
assert.Equal(expectedCount, cs.Reads)
|
||||
}
|
||||
}
|
||||
|
||||
func TestIncrementalLoadSet(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &testStore{ChunkStore: &chunks.MemoryStore{}}
|
||||
cs := &chunks.TestStore{}
|
||||
|
||||
expected := NewSet(testVals...)
|
||||
ref, err := WriteValue(expected, cs)
|
||||
@@ -73,18 +73,18 @@ func TestIncrementalLoadSet(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
actual := actualVar.(Set)
|
||||
|
||||
expectedCount := cs.count
|
||||
expectedCount := cs.Reads
|
||||
assert.Equal(1, expectedCount)
|
||||
actual.Iter(func(v Value) (stop bool) {
|
||||
expectedCount += isEncodedOutOfLine(v)
|
||||
assert.Equal(expectedCount, cs.count)
|
||||
assert.Equal(expectedCount, cs.Reads)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
func TestIncrementalLoadMap(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &testStore{ChunkStore: &chunks.MemoryStore{}}
|
||||
cs := &chunks.TestStore{}
|
||||
|
||||
expected := NewMap(testVals...)
|
||||
ref, err := WriteValue(expected, cs)
|
||||
@@ -94,12 +94,12 @@ func TestIncrementalLoadMap(t *testing.T) {
|
||||
assert.NoError(err)
|
||||
actual := actualVar.(Map)
|
||||
|
||||
expectedCount := cs.count
|
||||
expectedCount := cs.Reads
|
||||
assert.Equal(1, expectedCount)
|
||||
actual.Iter(func(k, v Value) (stop bool) {
|
||||
expectedCount += isEncodedOutOfLine(k)
|
||||
expectedCount += isEncodedOutOfLine(v)
|
||||
assert.Equal(expectedCount, cs.count)
|
||||
assert.Equal(expectedCount, cs.Reads)
|
||||
return
|
||||
})
|
||||
}
|
||||
|
||||
@@ -90,30 +90,30 @@ func TestJsonEncode(t *testing.T) {
|
||||
|
||||
func TestGetJSONChildResolvedFuture(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &testStore{ChunkStore: &chunks.MemoryStore{}}
|
||||
cs := &chunks.TestStore{}
|
||||
v := NewString("abc")
|
||||
f := futureFromValue(v)
|
||||
o, err := getChildJSON(f, cs)
|
||||
assert.NoError(err)
|
||||
assert.Equal("abc", o)
|
||||
assert.Equal(0, cs.count)
|
||||
assert.Equal(0, cs.Reads)
|
||||
}
|
||||
|
||||
func TestGetJSONChildUnresolvedFuture(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &testStore{ChunkStore: &chunks.MemoryStore{}}
|
||||
cs := &chunks.TestStore{}
|
||||
s := "sha1-a9993e364706816aba3e25717850c26c9cd0d89d"
|
||||
r := ref.MustParse(s)
|
||||
f := futureFromRef(r)
|
||||
m, err := getChildJSON(f, cs)
|
||||
assert.NoError(err)
|
||||
assert.Equal(s, m.(map[string]interface{})["ref"].(string))
|
||||
assert.Equal(0, cs.count)
|
||||
assert.Equal(0, cs.Reads)
|
||||
}
|
||||
|
||||
func TestFutureCompound(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &testStore{ChunkStore: &chunks.MemoryStore{}}
|
||||
cs := &chunks.TestStore{}
|
||||
|
||||
v := NewString("abc")
|
||||
resolved := futureFromValue(v)
|
||||
@@ -127,19 +127,19 @@ func TestFutureCompound(t *testing.T) {
|
||||
m, err := getJSONList(list, cs)
|
||||
assert.NoError(err)
|
||||
assert.IsType([]interface{}{}, m.(map[string]interface{})["list"])
|
||||
assert.Equal(0, cs.count)
|
||||
assert.Equal(0, cs.Reads)
|
||||
|
||||
set := setFromFutures(futures, cs)
|
||||
assert.NotNil(set)
|
||||
m, err = getJSONSet(set, cs)
|
||||
assert.NoError(err)
|
||||
assert.IsType([]interface{}{}, m.(map[string]interface{})["set"])
|
||||
assert.Equal(0, cs.count)
|
||||
assert.Equal(0, cs.Reads)
|
||||
|
||||
mm := mapFromFutures(futures, cs)
|
||||
assert.NotNil(mm)
|
||||
m, err = getJSONMap(mm, cs)
|
||||
assert.NoError(err)
|
||||
assert.IsType([]interface{}{}, m.(map[string]interface{})["map"])
|
||||
assert.Equal(0, cs.count)
|
||||
assert.Equal(0, cs.Reads)
|
||||
}
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/attic-labs/noms/chunks"
|
||||
"github.com/attic-labs/noms/ref"
|
||||
)
|
||||
|
||||
type testStore struct {
|
||||
chunks.ChunkStore
|
||||
count int
|
||||
}
|
||||
|
||||
func (s *testStore) Get(ref ref.Ref) (io.ReadCloser, error) {
|
||||
s.count += 1
|
||||
return s.ChunkStore.Get(ref)
|
||||
}
|
||||
Reference in New Issue
Block a user