Clobber ValueStore cache entry on WriteValue (#2804)

ValueStore caches Values that are read out of it, but it doesn't
do the same for Values that are written. This is because we expect
that reading Values shortly after writing them is an uncommon usage
pattern, and because the Chunks that make up novel Values are
generally efficiently retrievable from the BatchStore that backs
a ValueStore. The problem discovered in issue #2802 is that ValueStore
caches non-existence as well as existence of read Values. So, reading
a Value that doesn't exist in the DB would result in the ValueStore
permanently returning nil for that Value -- even if you then go and
write it to the DB.

This patch drops the cache entry for a Value whenever it's written.

Fixes #2802
This commit is contained in:
cmasone-attic
2016-11-04 15:53:26 -07:00
committed by GitHub
parent 0400b0cf3f
commit 12ddb66fc5
6 changed files with 86 additions and 16 deletions

View File

@@ -75,30 +75,30 @@ func (lvs *ValueStore) BatchStore() BatchStore {
}
// ReadValue reads and decodes a value from lvs. It is not considered an error for the requested chunk to be empty; in this case, the function simply returns nil.
func (lvs *ValueStore) ReadValue(r hash.Hash) Value {
if v, ok := lvs.valueCache.Get(r); ok {
func (lvs *ValueStore) ReadValue(h hash.Hash) Value {
if v, ok := lvs.valueCache.Get(h); ok {
if v == nil {
return nil
}
return v.(Value)
}
chunk := lvs.bs.Get(r)
chunk := lvs.bs.Get(h)
if chunk.IsEmpty() {
lvs.valueCache.Add(r, 0, nil)
lvs.valueCache.Add(h, 0, nil)
return nil
}
v := DecodeValue(chunk, lvs)
lvs.valueCache.Add(r, uint64(len(chunk.Data())), v)
lvs.valueCache.Add(h, uint64(len(chunk.Data())), v)
var entry chunkCacheEntry = absentChunk{}
if v != nil {
lvs.cacheChunks(v, r)
// r is trivially a hint for v, so consider putting that in the cache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because r already has a hint in the cache. If we later read some other chunk that references v, cacheChunks will overwrite this with a hint pointing to that chunk.
lvs.cacheChunks(v, h)
// h is trivially a hint for v, so consider putting that in the cache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because h already has a hint in the cache. If we later read some other chunk that references v, cacheChunks will overwrite this with a hint pointing to that chunk.
// If we don't do this, top-level Values that get read but not written -- such as the existing Head of a Database upon a Commit -- can be erroneously left out during a pull.
entry = hintedChunk{v.Type(), r}
entry = hintedChunk{v.Type(), h}
}
if cur := lvs.check(r); cur == nil || cur.Hint().IsEmpty() {
lvs.set(r, entry)
if cur := lvs.check(h); cur == nil || cur.Hint().IsEmpty() {
lvs.set(h, entry)
}
return v
}
@@ -118,6 +118,7 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
hints := lvs.chunkHintsFromCache(v)
lvs.bs.SchedulePut(c, height, hints)
lvs.set(hash, (*presentChunk)(v.Type()))
lvs.valueCache.Drop(hash)
return r
}

View File

@@ -12,6 +12,17 @@ import (
"github.com/attic-labs/testify/assert"
)
func TestValueReadWriteRead(t *testing.T) {
assert := assert.New(t)
s := String("hello")
vs := NewTestValueStore()
assert.Nil(vs.ReadValue(s.Hash())) // nil
r := vs.WriteValue(s)
v := vs.ReadValue(r.TargetHash()) // non-nil
assert.True(s.Equals(v))
}
func TestCheckChunksInCache(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
@@ -89,7 +100,7 @@ func TestCacheOnReadValue(t *testing.T) {
v := cvs2.ReadValue(r.TargetHash())
assert.True(bref.Equals(v))
assert.True(cvs2.isPresent(b.Hash()))
assert.True(cvs2.isPresent(b.Hash()))
assert.True(cvs2.isPresent(bref.Hash()))
}
func TestHintsOnCache(t *testing.T) {

View File

@@ -70,7 +70,7 @@ func (c *SizeCache) Add(key interface{}, size uint64, value interface{}) {
defer c.mu.Unlock()
if _, ok := c.entry(key); ok {
// this value is already in the cache just return
// this value is already in the cache; just return
return
}
@@ -92,3 +92,15 @@ func (c *SizeCache) Add(key interface{}, size uint64, value interface{}) {
}
}
}
// Drop will remove the element associated with the given key from the cache.
func (c *SizeCache) Drop(key interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
if entry, ok := c.entry(key); ok {
c.totalSize -= entry.size
c.lru.Remove(entry.lruEntry)
delete(c.cache, key)
}
}

View File

@@ -69,6 +69,11 @@ func TestSizeCache(t *testing.T) {
assert.False(ok)
_, ok = c.Get(hashFromString("data-5"))
assert.False(ok)
c.Drop(hashFromString("data-10"))
assert.Equal(uint64(800), c.totalSize)
assert.Equal(4, c.lru.Len())
assert.Equal(4, len(c.cache))
}
func concurrencySizeCacheTest(data []string) {

View File

@@ -12,6 +12,9 @@ import ValueStore from './value-store.js';
import List from './list.js';
import {encodeValue} from './codec.js';
import {equals} from './compare.js';
import Hash from './hash.js';
import {getHash} from './get-hash.js';
import {notNull} from './assert.js';
suite('ValueStore', () => {
test('readValue', async () => {
@@ -65,7 +68,7 @@ suite('ValueStore', () => {
test('write coalescing', async () => {
const bs = new BatchStoreAdaptor(new MemoryStore());
const vs = new ValueStore(bs, 1e6);
const vs = new ValueStore(bs, 128);
const r1 = vs.writeValue('hello').targetHash;
(bs: any).schedulePut = () => { assert.fail('unreachable'); };
@@ -76,7 +79,7 @@ suite('ValueStore', () => {
test('read caching', async () => {
const bs = new BatchStoreAdaptor(new MemoryStore());
const vs = new ValueStore(bs, 1e6);
const vs = new ValueStore(bs, 128);
const r1 = vs.writeValue('hello').targetHash;
const v1 = await vs.readValue(r1);
@@ -87,6 +90,31 @@ suite('ValueStore', () => {
await vs.close();
});
test('read nonexistince caching', async () => {
const bs = new BatchStoreAdaptor(new MemoryStore());
const vs = new ValueStore(bs, 128);
const hash = notNull(Hash.parse('rmnjb8cjc5tblj21ed4qs821649eduie'));
const v1 = await vs.readValue(hash);
assert.equal(null, v1);
(bs: any).get = () => { throw new Error(); };
const v2 = await vs.readValue(hash);
assert.equal(null, v2);
await vs.close();
});
test('write clobbers cached nonexistence', async () => {
const vs = new ValueStore(new BatchStoreAdaptor(new MemoryStore()), 128);
const s = 'hello';
const v1 = await vs.readValue(getHash(s)); // undefined
assert.equal(null, v1);
vs.writeValue(s);
const v2 = await vs.readValue(getHash(s)); // "hello"
assert.equal(s, v2);
await vs.close();
});
test('caching eviction', async () => {
const bs = new BatchStoreAdaptor(new MemoryStore());
const vs = new ValueStore(bs, 15);

View File

@@ -90,6 +90,7 @@ export default class ValueStore {
const hints = this._knownHashes.checkChunksInCache(v);
this._bs.schedulePut(chunk, hints);
this._knownHashes.add(hash, new HashCacheEntry(true, t));
this._valueCache.drop(hash);
return ref;
}
@@ -106,6 +107,7 @@ interface Cache<T> { // eslint-disable-line no-undef
entry(hash: Hash): ?CacheEntry<T>; // eslint-disable-line no-undef
get(hash: Hash): ?T; // eslint-disable-line no-undef
add(hash: Hash, size: number, value: T): void; // eslint-disable-line no-undef
drop(hash: Hash): void; // eslint-disable-line no-undef
}
class CacheEntry<T> {
@@ -154,7 +156,7 @@ export class SizeCache<T> {
return entry ? entry.value : undefined;
}
add(hash: Hash, size: number, value: ?T) {
add(hash: Hash, size: number, value: ?T): void {
const key = hash.toString();
if (this._cache.has(key)) {
this._cache.delete(key);
@@ -173,6 +175,15 @@ export class SizeCache<T> {
}
}
}
drop(hash: Hash): void {
const key = hash.toString();
const entry = this._cache.get(key);
if (entry) {
this._cache.delete(key);
this._size -= entry.size;
}
}
}
export class NoopCache<T> {
@@ -180,7 +191,9 @@ export class NoopCache<T> {
get(hash: Hash): ?T {} // eslint-disable-line no-unused-vars
add(hash: Hash, size: number, value: T) {} // eslint-disable-line no-unused-vars
add(hash: Hash, size: number, value: T): void {} // eslint-disable-line no-unused-vars
drop(hash: Hash): void {} // eslint-disable-line no-unused-vars
}