From 44bb16385c5c4f4b997b2144aa4d0bf0bc9a7ecc Mon Sep 17 00:00:00 2001 From: Chris Masone Date: Thu, 17 Mar 2016 11:20:50 -0700 Subject: [PATCH] Add DataStore 'Type' cache to remember Noms type info as well. To facilitate validation, DataStore needs to remember which chunks it's seen, what their refs are, and the Noms type of the Values they encode. Then, DataStore can look at each Value that comes in via WriteValue() and validate it by checking every embedded ref (if any) against this cache. Towards #654 --- datas/datastore_common.go | 110 ++++++++++++++++-- datas/datastore_test.go | 67 ++++++++++- ...est.go => has_caching_chunk_store_test.go} | 0 datas/local_datastore.go | 6 +- datas/remote_datastore.go | 8 +- types/enum_test.go | 3 +- types/sequence_chunker_test.go | 8 +- types/value_store.go | 2 +- 8 files changed, 178 insertions(+), 26 deletions(-) rename datas/{caching_chunk_store_test.go => has_caching_chunk_store_test.go} (100%) diff --git a/datas/datastore_common.go b/datas/datastore_common.go index 9f56502c6f..c741d37fdd 100644 --- a/datas/datastore_common.go +++ b/datas/datastore_common.go @@ -2,6 +2,7 @@ package datas import ( "errors" + "sync" "github.com/attic-labs/noms/chunks" "github.com/attic-labs/noms/d" @@ -11,9 +12,16 @@ import ( ) type dataStoreCommon struct { - cs *hasCachingChunkStore - rootRef ref.Ref - datasets *MapOfStringToRefOfCommit + cs chunks.ChunkStore + rootRef ref.Ref + datasets *MapOfStringToRefOfCommit + typeCache map[ref.Ref]chunkCacheEntry + mu *sync.Mutex +} + +type chunkCacheEntry interface { + Present() bool + Type() types.Type } var ( @@ -21,6 +29,10 @@ var ( ErrMergeNeeded = errors.New("Dataset head is not ancestor of commit") ) +func newDataStoreCommon(cs chunks.ChunkStore) dataStoreCommon { + return dataStoreCommon{cs: cs, rootRef: cs.Root(), typeCache: map[ref.Ref]chunkCacheEntry{}, mu: &sync.Mutex{}} +} + func (ds *dataStoreCommon) MaybeHead(datasetID string) (Commit, bool) { if r, ok := ds.Datasets().MaybeGet(datasetID); ok { return r.TargetValue(ds), true @@ -54,14 +66,72 @@ func (ds *dataStoreCommon) datasetsFromRef(datasetsRef ref.Ref) *MapOfStringToRe // ReadValue reads and decodes a value from ds. It is not considered an error for the requested chunk to be empty; in this case, the function simply returns nil. func (ds *dataStoreCommon) ReadValue(r ref.Ref) types.Value { - c := ds.cs.Get(r) - return types.DecodeChunk(c, ds) + v := types.DecodeChunk(ds.cs.Get(r), ds) + checkAndSet := func(reachable ref.Ref, entry chunkCacheEntry) { + if ds.checkCache(reachable) == nil { + ds.setCache(reachable, entry) + } + } + + var entry chunkCacheEntry = absentChunk{} + if v != nil { + entry = presentChunk(v.Type()) + for _, reachable := range v.Chunks() { + checkAndSet(reachable.TargetRef(), presentChunk(getTargetType(reachable))) + } + } + checkAndSet(r, entry) + return v } -func (ds *dataStoreCommon) WriteValue(v types.Value) ref.Ref { +// WriteValue takes a Value, schedules it to be written it to ds, and returns v.Ref(). v is not guaranteed to be actually written until after a successful Commit(). +func (ds *dataStoreCommon) WriteValue(v types.Value) (r ref.Ref) { + if v == nil { + return + } + + r = v.Ref() + if entry := ds.checkCache(r); entry != nil && entry.Present() { + return + } + + // Encoding v causes any child chunks, e.g. internal nodes if v is a meta sequence, to get written. That needs to happen before we try to validate v. chunk := types.EncodeValue(v, ds) - ds.cs.Put(chunk) - return chunk.Ref() + + for _, reachable := range v.Chunks() { + entry := ds.checkCache(reachable.TargetRef()) + d.Chk.True(entry != nil && entry.Present(), "Value to write contains ref %s, which points to a non-existent Value.", reachable.TargetRef()) + + // BUG 1121 + // It's possible that entry.Type() will be simply 'Value', but that 'reachable' is actually a properly-typed object -- that is, a Ref to some specific Type. The Chk below would fail, though it's possible that the Type is actually correct. We wouldn't be able to verify without reading it, though, so we'll dig into this later. + targetType := getTargetType(reachable) + if targetType.Equals(types.MakePrimitiveType(types.ValueKind)) { + continue + } + d.Chk.True(entry.Type().Equals(targetType), "Value to write contains ref %s, which points to a value of a different type: %+v != %+v", reachable.TargetRef(), entry.Type(), targetType) + } + ds.cs.Put(chunk) // TODO: DataStore should manage batching and backgrounding Puts. + ds.setCache(r, presentChunk(v.Type())) + + return +} + +func getTargetType(refBase types.RefBase) types.Type { + refType := refBase.Type() + d.Chk.Equal(types.RefKind, refType.Kind()) + return refType.Desc.(types.CompoundDesc).ElemTypes[0] +} + +func (ds *dataStoreCommon) checkCache(r ref.Ref) chunkCacheEntry { + ds.mu.Lock() + defer ds.mu.Unlock() + return ds.typeCache[r] +} + +func (ds *dataStoreCommon) setCache(r ref.Ref, entry chunkCacheEntry) { + ds.mu.Lock() + defer ds.mu.Unlock() + ds.typeCache[r] = entry } // Has should really not be exposed on DataStore :-/ @@ -86,11 +156,11 @@ func (ds *dataStoreCommon) CopyMissingChunksP(sourceRef ref.Ref, sink DataStore, } func (ds *dataStoreCommon) transitionalChunkSink() chunks.ChunkSink { - return ds.cs + return newHasCachingChunkStore(ds.cs) } func (ds *dataStoreCommon) transitionalChunkStore() chunks.ChunkStore { - return ds.cs + return newHasCachingChunkStore(ds.cs) } func (ds *dataStoreCommon) commit(datasetID string, commit Commit) error { @@ -172,3 +242,23 @@ func getAncestors(commits SetOfRefOfCommit, vr types.ValueReader) SetOfRefOfComm }) return ancestors } + +type presentChunk types.Type + +func (t presentChunk) Present() bool { + return true +} + +func (t presentChunk) Type() types.Type { + return types.Type(t) +} + +type absentChunk struct{} + +func (a absentChunk) Present() bool { + return false +} + +func (a absentChunk) Type() types.Type { + panic("Not reached. Should never call Type() on an absentChunk.") +} diff --git a/datas/datastore_test.go b/datas/datastore_test.go index 01dee2a0f0..9fc09112a0 100644 --- a/datas/datastore_test.go +++ b/datas/datastore_test.go @@ -9,6 +9,71 @@ import ( "github.com/stretchr/testify/assert" ) +func TestReadWriteCache(t *testing.T) { + assert := assert.New(t) + cs := chunks.NewTestStore() + ds := NewDataStore(cs) + + var v types.Value = types.Bool(true) + assert.NotEqual(ref.Ref{}, ds.WriteValue(v)) + assert.Equal(1, cs.Writes) + r := ds.WriteValue(v) + assert.Equal(1, cs.Writes) + + v = ds.ReadValue(r) + assert.True(v.Equals(types.Bool(true))) +} + +func TestWriteRefToNonexistentValue(t *testing.T) { + assert := assert.New(t) + cs := chunks.NewTestStore() + ds := NewDataStore(cs) + + assert.Panics(func() { ds.WriteValue(types.NewRef(types.Bool(true).Ref())) }) +} + +func TestWriteWrongTypeRef(t *testing.T) { + assert := assert.New(t) + cs := chunks.NewTestStore() + ds := NewDataStore(cs) + + b := types.Bool(true) + assert.NotEqual(ref.Ref{}, ds.WriteValue(b)) + + assert.Panics(func() { ds.WriteValue(types.NewRefOfBlob(b.Ref())) }) +} + +func TestWriteValueTypeRef(t *testing.T) { + assert := assert.New(t) + cs := chunks.NewTestStore() + ds := NewDataStore(cs) + + b := types.Bool(true) + assert.NotEqual(ref.Ref{}, ds.WriteValue(b)) + + assert.NotPanics(func() { ds.WriteValue(types.NewRef(b.Ref())) }) +} + +func TestReadValueTypeRefPanics_BUG1121(t *testing.T) { + assert := assert.New(t) + cs := chunks.NewTestStore() + ds := NewDataStore(cs) + + b := types.NewEmptyBlob() + assert.NotEqual(ref.Ref{}, ds.WriteValue(b)) + + datasetID := "ds1" + aCommit := NewCommit().SetValue(types.NewRef(b.Ref())) + ds2, err := ds.Commit(datasetID, aCommit) + assert.NoError(err) + + _, ok := ds2.MaybeHead(datasetID) + assert.True(ok) + // Fix BUG 1121 and then uncomment this line and delete the one after + // assert.NotPanics(func() { ds2.WriteValue(types.NewRefOfBlob(b.Ref())) }) + assert.Panics(func() { ds2.WriteValue(types.NewRefOfBlob(b.Ref())) }) +} + func TestTolerateUngettableRefs(t *testing.T) { assert := assert.New(t) ds := NewDataStore(chunks.NewTestStore()) @@ -31,7 +96,7 @@ func TestDataStoreCommit(t *testing.T) { ds2, err := ds.Commit(datasetID, aCommit) assert.NoError(err) - // The old datastore still still has no head. + // The old datastore still has no head. _, ok := ds.MaybeHead(datasetID) assert.False(ok) diff --git a/datas/caching_chunk_store_test.go b/datas/has_caching_chunk_store_test.go similarity index 100% rename from datas/caching_chunk_store_test.go rename to datas/has_caching_chunk_store_test.go diff --git a/datas/local_datastore.go b/datas/local_datastore.go index b7146f9817..99541d030f 100644 --- a/datas/local_datastore.go +++ b/datas/local_datastore.go @@ -15,17 +15,17 @@ type LocalDataStore struct { } func newLocalDataStore(cs chunks.ChunkStore) *LocalDataStore { - return &LocalDataStore{dataStoreCommon{newHasCachingChunkStore(cs), cs.Root(), nil}} + return &LocalDataStore{newDataStoreCommon(cs)} } func (lds *LocalDataStore) Commit(datasetID string, commit Commit) (DataStore, error) { err := lds.commit(datasetID, commit) - return newLocalDataStore(lds.cs.Backing()), err + return newLocalDataStore(lds.cs), err } func (lds *LocalDataStore) Delete(datasetID string) (DataStore, error) { err := lds.doDelete(datasetID) - return newLocalDataStore(lds.cs.Backing()), err + return newLocalDataStore(lds.cs), err } // CopyReachableChunksP copies to |sink| all chunks reachable from (and including) |r|, but that are not in the subtree rooted at |exclude| diff --git a/datas/remote_datastore.go b/datas/remote_datastore.go index e4353931f0..4a5dccd150 100644 --- a/datas/remote_datastore.go +++ b/datas/remote_datastore.go @@ -20,21 +20,21 @@ type RemoteDataStore struct { } func newRemoteDataStore(cs chunks.ChunkStore) *RemoteDataStore { - return &RemoteDataStore{dataStoreCommon{newHasCachingChunkStore(cs), cs.Root(), nil}} + return &RemoteDataStore{newDataStoreCommon(cs)} } func (rds *RemoteDataStore) host() *url.URL { - return rds.dataStoreCommon.cs.Backing().(*chunks.HTTPStore).Host() + return rds.cs.(*chunks.HTTPStore).Host() } func (rds *RemoteDataStore) Commit(datasetID string, commit Commit) (DataStore, error) { err := rds.commit(datasetID, commit) - return newRemoteDataStore(rds.cs.Backing()), err + return newRemoteDataStore(rds.cs), err } func (rds *RemoteDataStore) Delete(datasetID string) (DataStore, error) { err := rds.doDelete(datasetID) - return newRemoteDataStore(rds.cs.Backing()), err + return newRemoteDataStore(rds.cs), err } // CopyReachableChunksP copies to |sink| all chunks in rds that are reachable from (and including) |r|, but that are not in the subtree rooted at |exclude|. This implementation asks the remote server to return the desired chunks and writes them to |sink|. diff --git a/types/enum_test.go b/types/enum_test.go index 6bbc6baa4a..6c11c40de1 100644 --- a/types/enum_test.go +++ b/types/enum_test.go @@ -3,14 +3,13 @@ package types import ( "testing" - "github.com/attic-labs/noms/chunks" "github.com/attic-labs/noms/ref" "github.com/stretchr/testify/assert" ) func TestGenericEnumWriteRead(t *testing.T) { assert := assert.New(t) - vs := newValueStore(chunks.NewMemoryStore()) + vs := NewTestValueStore() typeDefA := MakeEnumType("EA", "aA", "bA") typeDefB := MakeEnumType("EB", "aB", "bB") diff --git a/types/sequence_chunker_test.go b/types/sequence_chunker_test.go index 1c13a8a113..76704942c7 100644 --- a/types/sequence_chunker_test.go +++ b/types/sequence_chunker_test.go @@ -3,7 +3,6 @@ package types import ( "testing" - "github.com/attic-labs/noms/chunks" "github.com/stretchr/testify/assert" ) @@ -19,7 +18,7 @@ func (b modBoundaryChecker) WindowSize() int { return 1 } -func listFromInts(cs chunks.ChunkStore, ints []int) List { +func listFromInts(ints []int) List { vals := make([]Value, len(ints)) for i, v := range ints { vals[i] = Int64(v) @@ -30,7 +29,6 @@ func listFromInts(cs chunks.ChunkStore, ints []int) List { func TestSequenceChunkerMod(t *testing.T) { assert := assert.New(t) - cs := chunks.NewMemoryStore() sumChunker := func(items []sequenceItem) (sequenceItem, Value) { sum := 0 @@ -40,7 +38,7 @@ func TestSequenceChunkerMod(t *testing.T) { ints[i] = v sum += v } - return sum, listFromInts(cs, ints) + return sum, listFromInts(ints) } testChunking := func(expect []int, from, to int) { @@ -49,7 +47,7 @@ func TestSequenceChunkerMod(t *testing.T) { seq.Append(i) } - assert.True(listFromInts(cs, expect).Equals(seq.Done())) + assert.True(listFromInts(expect).Equals(seq.Done())) } // [1] is not a chunk boundary, so it won't chunk. diff --git a/types/value_store.go b/types/value_store.go index 46430c84d9..c3c3edb822 100644 --- a/types/value_store.go +++ b/types/value_store.go @@ -12,7 +12,7 @@ type ValueStore struct { // NewTestValueStore creates a simple struct that satisfies ValueReadWriter and is backed by a chunks.TestStore. func NewTestValueStore() *ValueStore { - return &ValueStore{chunks.NewTestStore()} + return newValueStore(chunks.NewTestStore()) } func newValueStore(cs chunks.ChunkStore) *ValueStore {