Merge BatchStore into ChunkStore (#3403)

BatchStore is dead, long live ChunkStore! Merging these two required
some modification of the old ChunkStore contract to make it more
BatchStore-like in places, most specifically around Root(), Put() and
PutMany().

The first big change is that Root() now returns a cached value for the
root hash of the Store. This is how NBS worked already, so the more
interesting change here is the addition of Rebase(), which loads the
latest persistent root. Any chunks that appeared in backing storage
since the ChunkStore was opened (or last rebased) also become
visible.

UpdateRoot() has been replaced with Commit(), because UpdateRoot() was
ALREADY doing the work of persisting novel chunks as well as moving
the persisted root hash of the ChunkStore in both NBS and
httpBatchStore. This name, and the new contract (essentially Flush() +
UpdateRoot()), is a more accurate representation of what's going on.

As for Put(), the former contract for claimed to block until the chunk
was durable. That's no longer the case. Indeed, NBS was already not
fulfilling this contract. The new contract reflects this, asserting
that novel chunks aren't persisted until a Flush() or Commit() --
which has replaced UpdateRoot(). Novel chunks are immediately visible
to Get and Has calls, however.

In addition to this larger change, there are also some tweaks to
ValueStore and Database. ValueStore.Flush() no longer takes a hash,
and instead just persists any and all Chunks it has buffered since the
last time anyone called Flush(). Database.Close() used to have some
side effects where it persisted Chunks belonging to any Values the
caller had written -- that is no longer so. Values written to a
Database only become persistent upon a Commit-like operation (Commit,
CommitValue, FastForward, SetHead, or Delete).

/******** New ChunkStore interface ********/

type ChunkStore interface {
     ChunkSource
     RootTracker
}

// RootTracker allows querying and management of the root of an entire tree of
// references. The "root" is the single mutable variable in a ChunkStore. It
// can store any hash, but it is typically used by higher layers (such as
// Database) to store a hash to a value that represents the current state and
// entire history of a database.
type RootTracker interface {
     // Rebase brings this RootTracker into sync with the persistent storage's
     // current root.
     Rebase()

     // Root returns the currently cached root value.
     Root() hash.Hash

     // Commit atomically attempts to persist all novel Chunks and update the
     // persisted root hash from last to current. If last doesn't match the
     // root in persistent storage, returns false.
     // TODO: is last now redundant? Maybe this should just try to update from
     // the cached root to current?
     // TODO: Does having a separate RootTracker make sense anymore? BUG 3402
     Commit(current, last hash.Hash) bool
}

// ChunkSource is a place chunks live.
type ChunkSource interface {
     // Get the Chunk for the value of the hash in the store. If the hash is
     // absent from the store nil is returned.
     Get(h hash.Hash) Chunk

     // GetMany gets the Chunks with |hashes| from the store. On return,
     // |foundChunks| will have been fully sent all chunks which have been
     // found. Any non-present chunks will silently be ignored.
     GetMany(hashes hash.HashSet, foundChunks chan *Chunk)

     // Returns true iff the value at the address |h| is contained in the
     // source
     Has(h hash.Hash) bool

     // Returns a new HashSet containing any members of |hashes| that are
     // present in the source.
     HasMany(hashes hash.HashSet) (present hash.HashSet)

     // Put caches c in the ChunkSink. Upon return, c must be visible to
     // subsequent Get and Has calls, but must not be persistent until a call
     // to Flush(). Put may be called concurrently with other calls to Put(),
     // PutMany(), Get(), GetMany(), Has() and HasMany().
     Put(c Chunk)

     // PutMany caches chunks in the ChunkSink. Upon return, all members of
     // chunks must be visible to subsequent Get and Has calls, but must not be
     // persistent until a call to Flush(). PutMany may be called concurrently
     // with other calls to Put(), PutMany(), Get(), GetMany(), Has() and
     // HasMany().
     PutMany(chunks []Chunk)

     // Returns the NomsVersion with which this ChunkSource is compatible.
     Version() string

     // On return, any previously Put chunks must be durable. It is not safe to
     // call Flush() concurrently with Put() or PutMany().
     Flush()

     io.Closer
}

Fixes #2945
This commit is contained in:
cmasone-attic
2017-04-19 13:31:58 -07:00
committed by GitHub
parent 6ef3a0ddec
commit cb930dee81
42 changed files with 927 additions and 624 deletions

View File

@@ -85,7 +85,7 @@ Continue?
return 0
}
ok = rt.UpdateRoot(h, currRoot)
ok = rt.Commit(h, currRoot)
if !ok {
fmt.Fprintln(os.Stderr, "Optimistic concurrency failure")
return 1

View File

@@ -14,7 +14,6 @@ import (
// ChunkStore implementation for.
type ChunkStore interface {
ChunkSource
ChunkSink
RootTracker
}
@@ -34,11 +33,23 @@ type Factory interface {
// Database) to store a hash to a value that represents the current state and
// entire history of a database.
type RootTracker interface {
// Rebase brings this RootTracker into sync with the persistent storage's
// current root.
Rebase()
// Root returns the currently cached root value.
Root() hash.Hash
UpdateRoot(current, last hash.Hash) bool
// Commit atomically attempts to persist all novel Chunks and update the
// persisted root hash from last to current. If last doesn't match the
// root in persistent storage, returns false.
// TODO: is last now redundant? Maybe this should just try to update from
// the cached root to current?
// TODO: Does having a separate RootTracker make sense anymore? BUG 3402
Commit(current, last hash.Hash) bool
}
// ChunkSource is a place to get chunks from.
// ChunkSource is a place chunks live.
type ChunkSource interface {
// Get the Chunk for the value of the hash in the store. If the hash is
// absent from the store nil is returned.
@@ -57,19 +68,24 @@ type ChunkSource interface {
// present in the source.
HasMany(hashes hash.HashSet) (present hash.HashSet)
// Returns the NomsVersion with which this ChunkSource is compatible.
Version() string
}
// ChunkSink is a place to put chunks.
type ChunkSink interface {
// Put writes c into the ChunkSink, blocking until the operation is complete.
// Put caches c in the ChunkSink. Upon return, c must be visible to
// subsequent Get and Has calls, but must not be persistent until a call
// to Flush(). Put may be called concurrently with other calls to Put(),
// PutMany(), Get(), GetMany(), Has() and HasMany().
Put(c Chunk)
// PutMany writes chunks into the sink, blocking until the operation is complete.
// PutMany caches chunks in the ChunkSink. Upon return, all members of
// chunks must be visible to subsequent Get and Has calls, but must not be
// persistent until a call to Flush(). PutMany may be called concurrently
// with other calls to Put(), PutMany(), Get(), GetMany(), Has() and
// HasMany().
PutMany(chunks []Chunk)
// On return, any previously Put chunks should be durable
// Returns the NomsVersion with which this ChunkSource is compatible.
Version() string
// On return, any previously Put chunks must be durable. It is not safe to
// call Flush() concurrently with Put() or PutMany().
Flush()
io.Closer

View File

@@ -26,7 +26,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStorePut() {
// See http://www.di-mgt.com.au/sha_testvectors.html
suite.Equal("rmnjb8cjc5tblj21ed4qs821649eduie", h.String())
suite.Store.UpdateRoot(h, suite.Store.Root()) // Commit writes
suite.Store.Commit(h, suite.Store.Root()) // Commit writes
// And reading it via the API should work...
assertInputInStore(input, h, suite.Store, suite.Assert())
@@ -39,7 +39,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStorePut() {
suite.Store.Put(c)
suite.Equal(h, c.Hash())
assertInputInStore(input, h, suite.Store, suite.Assert())
suite.Store.UpdateRoot(h, suite.Store.Root()) // Commit writes
suite.Store.Commit(h, suite.Store.Root()) // Commit writes
if suite.putCountFn != nil {
suite.Equal(2, suite.putCountFn())
@@ -51,7 +51,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStorePutMany() {
c1, c2 := NewChunk([]byte(input1)), NewChunk([]byte(input2))
suite.Store.PutMany([]Chunk{c1, c2})
suite.Store.UpdateRoot(c1.Hash(), suite.Store.Root()) // Commit writes
suite.Store.Commit(c1.Hash(), suite.Store.Root()) // Commit writes
// And reading it via the API should work...
assertInputInStore(input1, c1.Hash(), suite.Store, suite.Assert())
@@ -69,11 +69,11 @@ func (suite *ChunkStoreTestSuite) TestChunkStoreRoot() {
newRoot := hash.Parse("8la6qjbh81v85r6q67lqbfrkmpds14lg")
// Try to update root with bogus oldRoot
result := suite.Store.UpdateRoot(newRoot, bogusRoot)
result := suite.Store.Commit(newRoot, bogusRoot)
suite.False(result)
// Now do a valid root update
result = suite.Store.UpdateRoot(newRoot, oldRoot)
result = suite.Store.Commit(newRoot, oldRoot)
suite.True(result)
}
@@ -87,7 +87,7 @@ func (suite *ChunkStoreTestSuite) TestChunkStoreVersion() {
oldRoot := suite.Store.Root()
suite.True(oldRoot.IsEmpty())
newRoot := hash.Parse("11111222223333344444555556666677")
suite.True(suite.Store.UpdateRoot(newRoot, oldRoot))
suite.True(suite.Store.Commit(newRoot, oldRoot))
suite.Equal(constants.NomsVersion, suite.Store.Version())
}

View File

@@ -1,22 +0,0 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package chunks
import "github.com/attic-labs/noms/go/hash"
type memoryRootTracker hash.Hash
func (ms *memoryRootTracker) Root() hash.Hash {
return hash.Hash(*ms)
}
func (ms *memoryRootTracker) UpdateRoot(current, last hash.Hash) bool {
if last != hash.Hash(*ms) {
return false
}
*ms = memoryRootTracker(current)
return true
}

View File

@@ -14,9 +14,9 @@ import (
// An in-memory implementation of store.ChunkStore. Useful mainly for tests.
type MemoryStore struct {
data map[hash.Hash]Chunk
memoryRootTracker
mu sync.RWMutex
data map[hash.Hash]Chunk
rootHash hash.Hash
mu sync.RWMutex
}
func NewMemoryStore() *MemoryStore {
@@ -66,6 +66,7 @@ func (ms *MemoryStore) Version() string {
return constants.NomsVersion
}
// TODO: enforce non-persistence of novel chunks BUG 3400
func (ms *MemoryStore) Put(c Chunk) {
ms.mu.Lock()
defer ms.mu.Unlock()
@@ -89,6 +90,21 @@ func (ms *MemoryStore) Len() int {
func (ms *MemoryStore) Flush() {}
func (ms *MemoryStore) Rebase() {}
func (ms *MemoryStore) Root() hash.Hash {
return ms.rootHash
}
func (ms *MemoryStore) Commit(current, last hash.Hash) bool {
if last != ms.rootHash {
return false
}
ms.rootHash = current
return true
}
func (ms *MemoryStore) Close() error {
return nil
}

View File

@@ -43,6 +43,16 @@ type HasRequest struct {
ch chan<- bool
}
func NewHasManyRequest(hashes hash.HashSet, wg *sync.WaitGroup, ch chan<- hash.Hash) HasManyRequest {
return HasManyRequest{hashes, wg, ch}
}
type HasManyRequest struct {
hashes hash.HashSet
wg *sync.WaitGroup
ch chan<- hash.Hash
}
func (g GetRequest) Hashes() hash.HashSet {
return g.hashes
}
@@ -67,8 +77,16 @@ func (h HasRequest) Outstanding() OutstandingRequest {
return OutstandingHas(h.ch)
}
func (h HasManyRequest) Hashes() hash.HashSet {
return h.hashes
}
func (h HasManyRequest) Outstanding() OutstandingRequest {
return OutstandingHasMany{h.wg, h.ch}
}
type OutstandingRequest interface {
Satisfy(c *Chunk)
Satisfy(h hash.Hash, c *Chunk)
Fail()
}
@@ -78,8 +96,12 @@ type OutstandingGetMany struct {
ch chan<- *Chunk
}
type OutstandingHas chan<- bool
type OutstandingHasMany struct {
wg *sync.WaitGroup
ch chan<- hash.Hash
}
func (r OutstandingGet) Satisfy(c *Chunk) {
func (r OutstandingGet) Satisfy(h hash.Hash, c *Chunk) {
r <- c
close(r)
}
@@ -89,7 +111,7 @@ func (r OutstandingGet) Fail() {
close(r)
}
func (ogm OutstandingGetMany) Satisfy(c *Chunk) {
func (ogm OutstandingGetMany) Satisfy(h hash.Hash, c *Chunk) {
ogm.ch <- c
ogm.wg.Done()
}
@@ -98,14 +120,23 @@ func (ogm OutstandingGetMany) Fail() {
ogm.wg.Done()
}
func (h OutstandingHas) Satisfy(c *Chunk) {
h <- true
close(h)
func (oh OutstandingHas) Satisfy(h hash.Hash, c *Chunk) {
oh <- true
close(oh)
}
func (h OutstandingHas) Fail() {
h <- false
close(h)
func (oh OutstandingHas) Fail() {
oh <- false
close(oh)
}
func (ohm OutstandingHasMany) Satisfy(h hash.Hash, c *Chunk) {
ohm.ch <- h
ohm.wg.Done()
}
func (ohm OutstandingHasMany) Fail() {
ohm.wg.Done()
}
// ReadBatch represents a set of queued Get/Has requests, each of which are blocking on a receive channel for a response.

View File

@@ -16,9 +16,9 @@ func TestGetRequestBatch(t *testing.T) {
assert := assert.New(t)
r0 := hash.Parse("00000000000000000000000000000000")
c1 := NewChunk([]byte("abc"))
r1 := c1.Hash()
h1 := c1.Hash()
c2 := NewChunk([]byte("123"))
r2 := c2.Hash()
h2 := c2.Hash()
tally := func(b bool, trueCnt, falseCnt *int) {
if b {
@@ -36,18 +36,18 @@ func TestGetRequestBatch(t *testing.T) {
batch := ReadBatch{
r0: []OutstandingRequest{OutstandingHas(req0chan), OutstandingGet(req1chan)},
r1: []OutstandingRequest{OutstandingHas(req2chan)},
r2: []OutstandingRequest{OutstandingHas(req3chan), OutstandingGet(req4chan)},
h1: []OutstandingRequest{OutstandingHas(req2chan)},
h2: []OutstandingRequest{OutstandingHas(req3chan), OutstandingGet(req4chan)},
}
go func() {
for requestedRef, reqs := range batch {
for requestedHash, reqs := range batch {
for _, req := range reqs {
if requestedRef == r1 {
req.Satisfy(&c1)
delete(batch, r1)
} else if requestedRef == r2 {
req.Satisfy(&c2)
delete(batch, r2)
if requestedHash == h1 {
req.Satisfy(h1, &c1)
delete(batch, h1)
} else if requestedHash == h2 {
req.Satisfy(h2, &c2)
delete(batch, h2)
}
}
}
@@ -104,10 +104,10 @@ func TestGetManyRequestBatch(t *testing.T) {
for reqHash, reqs := range batch {
for _, req := range reqs {
if reqHash == h1 {
req.Satisfy(&c1)
req.Satisfy(h1, &c1)
delete(batch, h1)
} else if reqHash == h2 {
req.Satisfy(&c2)
req.Satisfy(h2, &c2)
delete(batch, h2)
}
}
@@ -121,3 +121,44 @@ func TestGetManyRequestBatch(t *testing.T) {
assert.Len(hashes, 1)
assert.True(hashes.Has(h0))
}
func TestHasManyRequestBatch(t *testing.T) {
assert := assert.New(t)
h0 := hash.Parse("00000000000000000000000000000000")
c1 := NewChunk([]byte("abc"))
h1 := c1.Hash()
c2 := NewChunk([]byte("123"))
h2 := c2.Hash()
found := make(chan hash.Hash)
hashes := hash.NewHashSet(h0, h1, h2)
wg := &sync.WaitGroup{}
wg.Add(len(hashes))
go func() { wg.Wait(); close(found) }()
req := NewHasManyRequest(hashes, wg, found)
batch := ReadBatch{}
for h := range req.Hashes() {
batch[h] = []OutstandingRequest{req.Outstanding()}
}
go func() {
for reqHash, reqs := range batch {
for _, req := range reqs {
if reqHash == h1 {
req.Satisfy(h1, &EmptyChunk)
delete(batch, h1)
} else if reqHash == h2 {
req.Satisfy(h2, &EmptyChunk)
delete(batch, h2)
}
}
}
batch.Close()
}()
for h := range found {
hashes.Remove(h)
}
assert.Len(hashes, 1)
assert.True(hashes.Has(h0))
}

View File

@@ -112,7 +112,7 @@ func (r *Resolver) GetRootTracker(str string) (chunks.RootTracker, error) {
}
var rt chunks.RootTracker = sp.NewChunkStore()
if rt == nil {
rt = datas.NewHTTPBatchStore(sp.String(), "")
rt = datas.NewHTTPChunkStore(sp.String(), "")
}
return rt, nil
}

View File

@@ -0,0 +1,60 @@
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package datas
import (
"testing"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/testify/assert"
)
func TestCompletenessChecker(t *testing.T) {
b := types.Bool(true)
r := types.NewRef(b)
t.Run("Panic", func(t *testing.T) {
badRef := types.NewRef(types.Number(42))
t.Run("AllBad", func(t *testing.T) {
t.Parallel()
cc := newCompletenessChecker()
cc.AddRefs(badRef)
cc.AddRefs(r)
assert.Panics(t, func() { cc.PanicIfDangling(chunks.NewTestStore()) })
})
t.Run("SomeBad", func(t *testing.T) {
t.Parallel()
cs := chunks.NewTestStore()
cs.Put(types.EncodeValue(b, nil))
cc := newCompletenessChecker()
cc.AddRefs(badRef)
cc.AddRefs(r)
assert.Panics(t, func() { cc.PanicIfDangling(cs) })
})
})
t.Run("Success", func(t *testing.T) {
t.Run("PendingChunk", func(t *testing.T) {
t.Parallel()
cs := chunks.NewTestStore()
cs.Put(types.EncodeValue(b, nil))
cc := newCompletenessChecker()
cc.AddRefs(r)
assert.NotPanics(t, func() { cc.PanicIfDangling(cs) })
})
t.Run("ExistingChunk", func(t *testing.T) {
t.Parallel()
cs := chunks.NewTestStore()
cs.Put(types.EncodeValue(b, nil))
cc := newCompletenessChecker()
cc.AddRefs(r)
assert.NotPanics(t, func() { cc.PanicIfDangling(cs) })
})
})
}

View File

@@ -27,7 +27,10 @@ type Database interface {
// WriteValue(). WriteValue() writes v to this Database, though v is not
// guaranteed to be be persistent until after a subsequent Commit(). The
// return value is the Ref of v.
// Written values won't be persisted until a commit-alike
types.ValueReadWriter
// Close must have no side-effects
io.Closer
// Datasets returns the root of the database which is a
@@ -94,11 +97,11 @@ type Database interface {
// Regardless, Datasets() is updated to match backing storage upon return.
FastForward(ds Dataset, newHeadRef types.Ref) (Dataset, error)
// validatingBatchStore returns the BatchStore used to read and write
// validatingChunkStore returns the ChunkStore used to read and write
// groups of values to the database efficiently. This interface is a low-
// level detail of the database that should infrequently be needed by
// clients.
validatingBatchStore() types.BatchStore
validatingChunkStore() chunks.ChunkStore
has(h hash.Hash) bool
}

View File

@@ -27,12 +27,17 @@ var (
ErrMergeNeeded = errors.New("Dataset head is not ancestor of commit")
)
func newDatabaseCommon(cch *cachingChunkHaver, vs *types.ValueStore, rt chunks.RootTracker) databaseCommon {
return databaseCommon{ValueStore: vs, cch: cch, rt: rt, rootHash: rt.Root()}
func newDatabaseCommon(cs chunks.ChunkStore) databaseCommon {
return databaseCommon{
ValueStore: types.NewValueStore(cs),
cch: newCachingChunkHaver(cs),
rt: cs,
rootHash: cs.Root(),
}
}
func (dbc *databaseCommon) validatingBatchStore() types.BatchStore {
return dbc.BatchStore()
func (dbc *databaseCommon) validatingChunkStore() chunks.ChunkStore {
return dbc.ChunkStore()
}
func (dbc *databaseCommon) Datasets() types.Map {
@@ -81,10 +86,10 @@ func (dbc *databaseCommon) doSetHead(ds Dataset, newHeadRef types.Ref) error {
defer func() { dbc.rootHash, dbc.datasets = dbc.rt.Root(), nil }()
currentRootHash, currentDatasets := dbc.getRootAndDatasets()
commitRef := dbc.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails
commitRef := dbc.WriteValue(commit) // will be orphaned if the tryCommitChunks() below fails
currentDatasets = currentDatasets.Set(types.String(ds.ID()), types.ToRefOfValue(commitRef))
return dbc.tryUpdateRoot(currentDatasets, currentRootHash)
return dbc.tryCommitChunks(currentDatasets, currentRootHash)
}
func (dbc *databaseCommon) doFastForward(ds Dataset, newHeadRef types.Ref) error {
@@ -98,7 +103,7 @@ func (dbc *databaseCommon) doFastForward(ds Dataset, newHeadRef types.Ref) error
return dbc.doCommit(ds.ID(), commit, nil)
}
// doCommit manages concurrent access the single logical piece of mutable state: the current Root. doCommit is optimistic in that it is attempting to update head making the assumption that currentRootHash is the hash of the current head. The call to UpdateRoot below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again. This method will also fail and return an 'ErrMergeNeeded' error if the |commit| is not a descendent of the current dataset head
// doCommit manages concurrent access the single logical piece of mutable state: the current Root. doCommit is optimistic in that it is attempting to update head making the assumption that currentRootHash is the hash of the current head. The call to Commit below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again. This method will also fail and return an 'ErrMergeNeeded' error if the |commit| is not a descendent of the current dataset head
func (dbc *databaseCommon) doCommit(datasetID string, commit types.Struct, mergePolicy merge.Policy) error {
if !IsCommit(commit) {
d.Panic("Can't commit a non-Commit struct to dataset %s", datasetID)
@@ -109,7 +114,7 @@ func (dbc *databaseCommon) doCommit(datasetID string, commit types.Struct, merge
var err error
for err = ErrOptimisticLockFailed; err == ErrOptimisticLockFailed; {
currentRootHash, currentDatasets := dbc.getRootAndDatasets()
commitRef := dbc.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails
commitRef := dbc.WriteValue(commit) // will be orphaned if the tryCommitChunks() below fails
// If there's nothing in the DB yet, skip all this logic.
if !currentRootHash.IsEmpty() {
@@ -142,12 +147,12 @@ func (dbc *databaseCommon) doCommit(datasetID string, commit types.Struct, merge
}
}
currentDatasets = currentDatasets.Set(types.String(datasetID), types.ToRefOfValue(commitRef))
err = dbc.tryUpdateRoot(currentDatasets, currentRootHash)
err = dbc.tryCommitChunks(currentDatasets, currentRootHash)
}
return err
}
// doDelete manages concurrent access the single logical piece of mutable state: the current Root. doDelete is optimistic in that it is attempting to update head making the assumption that currentRootHash is the hash of the current head. The call to UpdateRoot below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again.
// doDelete manages concurrent access the single logical piece of mutable state: the current Root. doDelete is optimistic in that it is attempting to update head making the assumption that currentRootHash is the hash of the current head. The call to Commit below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again.
func (dbc *databaseCommon) doDelete(datasetIDstr string) error {
defer func() { dbc.rootHash, dbc.datasets = dbc.rt.Root(), nil }()
@@ -163,7 +168,7 @@ func (dbc *databaseCommon) doDelete(datasetIDstr string) error {
var err error
for {
currentDatasets = currentDatasets.Remove(datasetID)
err = dbc.tryUpdateRoot(currentDatasets, currentRootHash)
err = dbc.tryCommitChunks(currentDatasets, currentRootHash)
if err != ErrOptimisticLockFailed {
break
}
@@ -188,12 +193,15 @@ func (dbc *databaseCommon) getRootAndDatasets() (currentRootHash hash.Hash, curr
return
}
func (dbc *databaseCommon) tryUpdateRoot(currentDatasets types.Map, currentRootHash hash.Hash) (err error) {
// TODO: This Map will be orphaned if the UpdateRoot below fails
func (dbc *databaseCommon) tryCommitChunks(currentDatasets types.Map, currentRootHash hash.Hash) (err error) {
// TODO: This Map will be orphaned if the Commit below fails
newRootHash := dbc.WriteValue(currentDatasets).TargetHash()
dbc.Flush(newRootHash)
// TODO: We've always been sorta sad that we Flush() the embedded ValueStore here, and then Commit() the composed ChunkStore below. That leads to two consecutive make-Chunks-durable operations on the underlying ChunkStore, and the latter won't actually have any novel Chunks. The problem arises from the fact that most users of ValueStore don't have access to the underlying ChunkStore, but Database _does_. ValueStore buffers Chunks, and most users who call Flush() need it to dump those Chunks into the ChunkStore and then make those Chunks durable. At this callsite, though, we only want to get those down into the ChunkStore, because we know that we're going to call Commit() later, which will make those Chunks durable. It's a conundrum :-/
dbc.Flush()
// If the root has been updated by another process in the short window since we read it, this call will fail. See issue #404
if !dbc.rt.UpdateRoot(newRootHash, currentRootHash) {
if !dbc.rt.Commit(newRootHash, currentRootHash) {
err = ErrOptimisticLockFailed
}
return

View File

@@ -60,8 +60,7 @@ type RemoteDatabaseSuite struct {
func (suite *RemoteDatabaseSuite) SetupTest() {
suite.cs = chunks.NewTestStore()
suite.makeDb = func(cs chunks.ChunkStore) Database {
hbs := NewHTTPBatchStoreForTest(cs)
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(hbs), types.NewValueStore(hbs), hbs)}
return &RemoteDatabaseClient{newDatabaseCommon(NewHTTPChunkStoreForTest(cs))}
}
suite.db = suite.makeDb(suite.cs)
}
@@ -311,11 +310,11 @@ type waitDuringUpdateRootChunkStore struct {
preUpdateRootHook func()
}
func (w *waitDuringUpdateRootChunkStore) UpdateRoot(current, last hash.Hash) bool {
func (w *waitDuringUpdateRootChunkStore) Commit(current, last hash.Hash) bool {
if w.preUpdateRootHook != nil {
w.preUpdateRootHook()
}
return w.ChunkStore.UpdateRoot(current, last)
return w.ChunkStore.Commit(current, last)
}
func (suite *DatabaseSuite) TestCommitWithConcurrentChunkStoreUse() {

View File

@@ -40,8 +40,7 @@ var customHTTPTransport = http.Transport{
ResponseHeaderTimeout: time.Duration(4) * time.Minute,
}
// httpBatchStore implements types.BatchStore
type httpBatchStore struct {
type httpChunkStore struct {
host *url.URL
httpClient httpDoer
auth string
@@ -54,18 +53,26 @@ type httpBatchStore struct {
cacheMu *sync.RWMutex
unwrittenPuts *nbs.NomsBlockCache
rootMu *sync.RWMutex
root hash.Hash
version string
}
func NewHTTPBatchStore(baseURL, auth string) *httpBatchStore {
func NewHTTPChunkStore(baseURL, auth string) *httpChunkStore {
// Custom http.Client to give control of idle connections and timeouts
return newHTTPChunkStoreWithClient(baseURL, auth, &http.Client{Transport: &customHTTPTransport})
}
func newHTTPChunkStoreWithClient(baseURL, auth string, client httpDoer) *httpChunkStore {
u, err := url.Parse(baseURL)
d.PanicIfError(err)
if u.Scheme != "http" && u.Scheme != "https" {
d.Panic("Unrecognized scheme: %s", u.Scheme)
}
buffSink := &httpBatchStore{
host: u,
// Custom http.Client to give control of idle connections and timeouts
httpClient: &http.Client{Transport: &customHTTPTransport},
hcs := &httpChunkStore{
host: u,
httpClient: client,
auth: auth,
getQueue: make(chan chunks.ReadRequest, readBufferSize),
hasQueue: make(chan chunks.ReadRequest, readBufferSize),
@@ -75,60 +82,66 @@ func NewHTTPBatchStore(baseURL, auth string) *httpBatchStore {
workerWg: &sync.WaitGroup{},
cacheMu: &sync.RWMutex{},
unwrittenPuts: nbs.NewCache(),
rootMu: &sync.RWMutex{},
}
buffSink.batchGetRequests()
buffSink.batchHasRequests()
return buffSink
hcs.root, hcs.version = hcs.getRoot(false)
hcs.batchGetRequests()
hcs.batchHasRequests()
return hcs
}
type httpDoer interface {
Do(req *http.Request) (resp *http.Response, err error)
}
func (bhcs *httpBatchStore) Flush() {
bhcs.sendWriteRequests()
bhcs.requestWg.Wait()
func (hcs *httpChunkStore) Version() string {
return hcs.version
}
func (hcs *httpChunkStore) Flush() {
hcs.sendWriteRequests()
hcs.requestWg.Wait()
return
}
func (bhcs *httpBatchStore) Close() (e error) {
close(bhcs.finishedChan)
bhcs.requestWg.Wait()
bhcs.workerWg.Wait()
func (hcs *httpChunkStore) Close() (e error) {
close(hcs.finishedChan)
hcs.requestWg.Wait()
hcs.workerWg.Wait()
close(bhcs.getQueue)
close(bhcs.hasQueue)
close(bhcs.rateLimit)
close(hcs.getQueue)
close(hcs.hasQueue)
close(hcs.rateLimit)
bhcs.cacheMu.Lock()
defer bhcs.cacheMu.Unlock()
bhcs.unwrittenPuts.Destroy()
hcs.cacheMu.Lock()
defer hcs.cacheMu.Unlock()
hcs.unwrittenPuts.Destroy()
return
}
func (bhcs *httpBatchStore) Get(h hash.Hash) chunks.Chunk {
func (hcs *httpChunkStore) Get(h hash.Hash) chunks.Chunk {
checkCache := func(h hash.Hash) chunks.Chunk {
bhcs.cacheMu.RLock()
defer bhcs.cacheMu.RUnlock()
return bhcs.unwrittenPuts.Get(h)
hcs.cacheMu.RLock()
defer hcs.cacheMu.RUnlock()
return hcs.unwrittenPuts.Get(h)
}
if pending := checkCache(h); !pending.IsEmpty() {
return pending
}
ch := make(chan *chunks.Chunk)
bhcs.requestWg.Add(1)
bhcs.getQueue <- chunks.NewGetRequest(h, ch)
hcs.requestWg.Add(1)
hcs.getQueue <- chunks.NewGetRequest(h, ch)
return *(<-ch)
}
func (bhcs *httpBatchStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
func (hcs *httpChunkStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
cachedChunks := make(chan *chunks.Chunk)
go func() {
bhcs.cacheMu.RLock()
defer bhcs.cacheMu.RUnlock()
hcs.cacheMu.RLock()
defer hcs.cacheMu.RUnlock()
defer close(cachedChunks)
bhcs.unwrittenPuts.GetMany(hashes, cachedChunks)
hcs.unwrittenPuts.GetMany(hashes, cachedChunks)
}()
remaining := hash.HashSet{}
for h := range hashes {
@@ -144,53 +157,82 @@ func (bhcs *httpBatchStore) GetMany(hashes hash.HashSet, foundChunks chan *chunk
}
wg := &sync.WaitGroup{}
wg.Add(len(remaining))
bhcs.requestWg.Add(1)
bhcs.getQueue <- chunks.NewGetManyRequest(remaining, wg, foundChunks)
hcs.requestWg.Add(1)
hcs.getQueue <- chunks.NewGetManyRequest(remaining, wg, foundChunks)
wg.Wait()
}
func (bhcs *httpBatchStore) batchGetRequests() {
bhcs.batchReadRequests(bhcs.getQueue, bhcs.getRefs)
func (hcs *httpChunkStore) batchGetRequests() {
hcs.batchReadRequests(hcs.getQueue, hcs.getRefs)
}
func (bhcs *httpBatchStore) Has(h hash.Hash) bool {
func (hcs *httpChunkStore) Has(h hash.Hash) bool {
checkCache := func(h hash.Hash) bool {
bhcs.cacheMu.RLock()
defer bhcs.cacheMu.RUnlock()
return bhcs.unwrittenPuts.Has(h)
hcs.cacheMu.RLock()
defer hcs.cacheMu.RUnlock()
return hcs.unwrittenPuts.Has(h)
}
if checkCache(h) {
return true
}
ch := make(chan bool)
bhcs.requestWg.Add(1)
bhcs.hasQueue <- chunks.NewHasRequest(h, ch)
hcs.requestWg.Add(1)
hcs.hasQueue <- chunks.NewHasRequest(h, ch)
return <-ch
}
func (bhcs *httpBatchStore) batchHasRequests() {
bhcs.batchReadRequests(bhcs.hasQueue, bhcs.hasRefs)
func (hcs *httpChunkStore) HasMany(hashes hash.HashSet) (present hash.HashSet) {
func() {
hcs.cacheMu.RLock()
defer hcs.cacheMu.RUnlock()
present = hcs.unwrittenPuts.HasMany(hashes)
}()
remaining := hash.HashSet{}
for h := range hashes {
if !present.Has(h) {
remaining.Insert(h)
}
}
if len(remaining) == 0 {
return present
}
foundChunks := make(chan hash.Hash)
wg := &sync.WaitGroup{}
wg.Add(len(remaining))
hcs.requestWg.Add(1)
hcs.hasQueue <- chunks.NewHasManyRequest(remaining, wg, foundChunks)
go func() { defer close(foundChunks); wg.Wait() }()
for found := range foundChunks {
present.Insert(found)
}
return present
}
func (hcs *httpChunkStore) batchHasRequests() {
hcs.batchReadRequests(hcs.hasQueue, hcs.hasRefs)
}
type batchGetter func(hashes hash.HashSet, batch chunks.ReadBatch)
func (bhcs *httpBatchStore) batchReadRequests(queue <-chan chunks.ReadRequest, getter batchGetter) {
bhcs.workerWg.Add(1)
func (hcs *httpChunkStore) batchReadRequests(queue <-chan chunks.ReadRequest, getter batchGetter) {
hcs.workerWg.Add(1)
go func() {
defer bhcs.workerWg.Done()
defer hcs.workerWg.Done()
for done := false; !done; {
select {
case req := <-queue:
bhcs.sendReadRequests(req, queue, getter)
case <-bhcs.finishedChan:
hcs.sendReadRequests(req, queue, getter)
case <-hcs.finishedChan:
done = true
}
// Drain queue before returning
select {
case req := <-queue:
bhcs.sendReadRequests(req, queue, getter)
hcs.sendReadRequests(req, queue, getter)
default:
//drained!
}
@@ -198,7 +240,7 @@ func (bhcs *httpBatchStore) batchReadRequests(queue <-chan chunks.ReadRequest, g
}()
}
func (bhcs *httpBatchStore) sendReadRequests(req chunks.ReadRequest, queue <-chan chunks.ReadRequest, getter batchGetter) {
func (hcs *httpChunkStore) sendReadRequests(req chunks.ReadRequest, queue <-chan chunks.ReadRequest, getter batchGetter) {
batch := chunks.ReadBatch{}
hashes := hash.HashSet{}
@@ -221,31 +263,31 @@ func (bhcs *httpBatchStore) sendReadRequests(req chunks.ReadRequest, queue <-cha
}
}
bhcs.rateLimit <- struct{}{}
hcs.rateLimit <- struct{}{}
go func() {
defer func() {
bhcs.requestWg.Add(-count)
hcs.requestWg.Add(-count)
batch.Close()
}()
getter(hashes, batch)
<-bhcs.rateLimit
<-hcs.rateLimit
}()
}
func (bhcs *httpBatchStore) getRefs(hashes hash.HashSet, batch chunks.ReadBatch) {
func (hcs *httpChunkStore) getRefs(hashes hash.HashSet, batch chunks.ReadBatch) {
// POST http://<host>/getRefs/. Post body: ref=hash0&ref=hash1& Response will be chunk data if present, 404 if absent.
u := *bhcs.host
u.Path = httprouter.CleanPath(bhcs.host.Path + constants.GetRefsPath)
u := *hcs.host
u.Path = httprouter.CleanPath(hcs.host.Path + constants.GetRefsPath)
req := newRequest("POST", bhcs.auth, u.String(), buildHashesRequest(hashes), http.Header{
req := newRequest("POST", hcs.auth, u.String(), buildHashesRequest(hashes), http.Header{
"Accept-Encoding": {"x-snappy-framed"},
"Content-Type": {"application/x-www-form-urlencoded"},
})
res, err := bhcs.httpClient.Do(req)
res, err := hcs.httpClient.Do(req)
d.Chk.NoError(err)
expectVersion(res)
expectVersion(hcs.version, res)
reader := resBodyReader(res)
defer closeResponse(reader)
@@ -257,26 +299,27 @@ func (bhcs *httpBatchStore) getRefs(hashes hash.HashSet, batch chunks.ReadBatch)
go func() { defer close(chunkChan); chunks.Deserialize(reader, chunkChan) }()
for c := range chunkChan {
for _, or := range batch[c.Hash()] {
go or.Satisfy(c)
h := c.Hash()
for _, or := range batch[h] {
go or.Satisfy(h, c)
}
delete(batch, c.Hash())
}
}
func (bhcs *httpBatchStore) hasRefs(hashes hash.HashSet, batch chunks.ReadBatch) {
func (hcs *httpChunkStore) hasRefs(hashes hash.HashSet, batch chunks.ReadBatch) {
// POST http://<host>/hasRefs/. Post body: ref=sha1---&ref=sha1---& Response will be text of lines containing "|ref| |bool|".
u := *bhcs.host
u.Path = httprouter.CleanPath(bhcs.host.Path + constants.HasRefsPath)
u := *hcs.host
u.Path = httprouter.CleanPath(hcs.host.Path + constants.HasRefsPath)
req := newRequest("POST", bhcs.auth, u.String(), buildHashesRequest(hashes), http.Header{
req := newRequest("POST", hcs.auth, u.String(), buildHashesRequest(hashes), http.Header{
"Accept-Encoding": {"x-snappy-framed"},
"Content-Type": {"application/x-www-form-urlencoded"},
})
res, err := bhcs.httpClient.Do(req)
res, err := hcs.httpClient.Do(req)
d.Chk.NoError(err)
expectVersion(res)
expectVersion(hcs.version, res)
reader := resBodyReader(res)
defer closeResponse(reader)
@@ -291,8 +334,7 @@ func (bhcs *httpBatchStore) hasRefs(hashes hash.HashSet, batch chunks.ReadBatch)
d.PanicIfFalse(scanner.Scan())
if scanner.Text() == "true" {
for _, outstanding := range batch[h] {
// This is a little gross, but OutstandingHas.Satisfy() expects a chunk. It ignores it, though, and just sends 'true' over the channel it's holding.
outstanding.Satisfy(&chunks.EmptyChunk)
outstanding.Satisfy(h, &chunks.EmptyChunk)
}
} else {
for _, outstanding := range batch[h] {
@@ -316,50 +358,64 @@ func resBodyReader(res *http.Response) (reader io.ReadCloser) {
return
}
func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk) {
bhcs.cacheMu.RLock()
defer bhcs.cacheMu.RUnlock()
bhcs.unwrittenPuts.Insert(c)
func (hcs *httpChunkStore) SchedulePut(c chunks.Chunk) {
hcs.cacheMu.RLock()
defer hcs.cacheMu.RUnlock()
hcs.unwrittenPuts.Insert(c)
}
func (bhcs *httpBatchStore) sendWriteRequests() {
bhcs.rateLimit <- struct{}{}
defer func() { <-bhcs.rateLimit }()
func (hcs *httpChunkStore) Put(c chunks.Chunk) {
hcs.cacheMu.RLock()
defer hcs.cacheMu.RUnlock()
hcs.unwrittenPuts.Insert(c)
}
bhcs.cacheMu.Lock()
func (hcs *httpChunkStore) PutMany(chunx []chunks.Chunk) {
hcs.cacheMu.RLock()
defer hcs.cacheMu.RUnlock()
for _, c := range chunx {
hcs.unwrittenPuts.Insert(c)
}
}
func (hcs *httpChunkStore) sendWriteRequests() {
hcs.rateLimit <- struct{}{}
defer func() { <-hcs.rateLimit }()
hcs.cacheMu.Lock()
defer func() {
bhcs.cacheMu.Unlock()
hcs.cacheMu.Unlock()
}()
count := bhcs.unwrittenPuts.Count()
count := hcs.unwrittenPuts.Count()
if count == 0 {
return
}
defer func() {
bhcs.unwrittenPuts.Destroy()
bhcs.unwrittenPuts = nbs.NewCache()
hcs.unwrittenPuts.Destroy()
hcs.unwrittenPuts = nbs.NewCache()
}()
verbose.Log("Sending %d chunks", count)
chunkChan := make(chan *chunks.Chunk, 1024)
go func() {
bhcs.unwrittenPuts.ExtractChunks(chunkChan)
hcs.unwrittenPuts.ExtractChunks(chunkChan)
close(chunkChan)
}()
body := buildWriteValueRequest(chunkChan)
url := *bhcs.host
url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath)
url := *hcs.host
url.Path = httprouter.CleanPath(hcs.host.Path + constants.WriteValuePath)
// TODO: Make this accept snappy encoding
req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{
req := newRequest("POST", hcs.auth, url.String(), body, http.Header{
"Accept-Encoding": {"gzip"},
"Content-Encoding": {"x-snappy-framed"},
"Content-Type": {"application/octet-stream"},
})
res, err := bhcs.httpClient.Do(req)
res, err := hcs.httpClient.Do(req)
d.PanicIfError(err)
expectVersion(res)
expectVersion(hcs.version, res)
defer closeResponse(res.Body)
if http.StatusCreated != res.StatusCode {
@@ -368,33 +424,54 @@ func (bhcs *httpBatchStore) sendWriteRequests() {
verbose.Log("Finished sending %d hashes", count)
}
func (bhcs *httpBatchStore) Root() hash.Hash {
func (hcs *httpChunkStore) Root() hash.Hash {
hcs.rootMu.RLock()
defer hcs.rootMu.RUnlock()
return hcs.root
}
func (hcs *httpChunkStore) Rebase() {
root, _ := hcs.getRoot(true)
hcs.rootMu.Lock()
defer hcs.rootMu.Unlock()
hcs.root = root
}
func (hcs *httpChunkStore) getRoot(checkVers bool) (root hash.Hash, vers string) {
// GET http://<host>/root. Response will be ref of root.
res := bhcs.requestRoot("GET", hash.Hash{}, hash.Hash{})
expectVersion(res)
res := hcs.requestRoot("GET", hash.Hash{}, hash.Hash{})
if checkVers {
expectVersion(hcs.version, res)
}
defer closeResponse(res.Body)
if http.StatusOK != res.StatusCode {
d.Panic("Unexpected response: %s", http.StatusText(res.StatusCode))
}
data, err := ioutil.ReadAll(res.Body)
d.Chk.NoError(err)
return hash.Parse(string(data))
d.PanicIfError(err)
return hash.Parse(string(data)), res.Header.Get(NomsVersionHeader)
}
// UpdateRoot flushes outstanding writes to the backing ChunkStore before updating its Root, because it's almost certainly the case that the caller wants to point that root at some recently-Put Chunk.
func (bhcs *httpBatchStore) UpdateRoot(current, last hash.Hash) bool {
// POST http://<host>/root?current=<ref>&last=<ref>. Response will be 200 on success, 409 if current is outdated.
bhcs.Flush()
func (hcs *httpChunkStore) Commit(current, last hash.Hash) bool {
hcs.rootMu.Lock()
defer hcs.rootMu.Unlock()
hcs.Flush()
res := bhcs.requestRoot("POST", current, last)
expectVersion(res)
// POST http://<host>/root?current=<ref>&last=<ref>. Response will be 200 on success, 409 if current is outdated.
res := hcs.requestRoot("POST", current, last)
expectVersion(hcs.version, res)
defer closeResponse(res.Body)
switch res.StatusCode {
case http.StatusOK:
hcs.root = current
return true
case http.StatusConflict:
data, err := ioutil.ReadAll(res.Body)
d.PanicIfError(err)
hcs.root = hash.Parse(string(data))
return false
default:
buf := bytes.Buffer{}
@@ -408,9 +485,9 @@ func (bhcs *httpBatchStore) UpdateRoot(current, last hash.Hash) bool {
}
}
func (bhcs *httpBatchStore) requestRoot(method string, current, last hash.Hash) *http.Response {
u := *bhcs.host
u.Path = httprouter.CleanPath(bhcs.host.Path + constants.RootPath)
func (hcs *httpChunkStore) requestRoot(method string, current, last hash.Hash) *http.Response {
u := *hcs.host
u.Path = httprouter.CleanPath(hcs.host.Path + constants.RootPath)
if method == "POST" {
if current.IsEmpty() {
d.Panic("Unexpected empty value")
@@ -421,9 +498,9 @@ func (bhcs *httpBatchStore) requestRoot(method string, current, last hash.Hash)
u.RawQuery = params.Encode()
}
req := newRequest(method, bhcs.auth, u.String(), nil, nil)
req := newRequest(method, hcs.auth, u.String(), nil, nil)
res, err := bhcs.httpClient.Do(req)
res, err := hcs.httpClient.Do(req)
d.PanicIfError(err)
return res
@@ -450,17 +527,17 @@ func formatErrorResponse(res *http.Response) string {
return fmt.Sprintf("%s:\n%s\n", res.Status, data)
}
func expectVersion(res *http.Response) {
func expectVersion(expected string, res *http.Response) {
dataVersion := res.Header.Get(NomsVersionHeader)
if constants.NomsVersion != dataVersion {
if expected != dataVersion {
b, _ := ioutil.ReadAll(res.Body)
res.Body.Close()
d.PanicIfError(fmt.Errorf(
"Version mismatch\n\r"+
"\tSDK version '%s' is incompatible with data of version: '%s'\n\r"+
d.Panic(
"Version skew\n\r"+
"\tServer data version changed from '%s' to '%s'\n\r"+
"\tHTTP Response: %d (%s): %s\n",
constants.NomsVersion, dataVersion,
res.StatusCode, res.Status, string(b)))
expected, dataVersion,
res.StatusCode, res.Status, string(b))
}
}

View File

@@ -21,14 +21,14 @@ import (
const testAuthToken = "aToken123"
func TestHTTPBatchStore(t *testing.T) {
suite.Run(t, &HTTPBatchStoreSuite{})
func TestHTTPChunkStore(t *testing.T) {
suite.Run(t, &HTTPChunkStoreSuite{})
}
type HTTPBatchStoreSuite struct {
type HTTPChunkStoreSuite struct {
suite.Suite
cs *chunks.TestStore
store *httpBatchStore
store *httpChunkStore
}
type inlineServer struct {
@@ -47,12 +47,12 @@ func (serv inlineServer) Do(req *http.Request) (resp *http.Response, err error)
nil
}
func (suite *HTTPBatchStoreSuite) SetupTest() {
func (suite *HTTPChunkStoreSuite) SetupTest() {
suite.cs = chunks.NewTestStore()
suite.store = NewHTTPBatchStoreForTest(suite.cs)
suite.store = NewHTTPChunkStoreForTest(suite.cs)
}
func NewHTTPBatchStoreForTest(cs chunks.ChunkStore) *httpBatchStore {
func NewHTTPChunkStoreForTest(cs chunks.ChunkStore) *httpChunkStore {
serv := inlineServer{httprouter.New()}
serv.POST(
constants.WriteValuePath,
@@ -84,12 +84,10 @@ func NewHTTPBatchStoreForTest(cs chunks.ChunkStore) *httpBatchStore {
HandleRootGet(w, req, ps, cs)
},
)
hcs := NewHTTPBatchStore("http://localhost:9000", "")
hcs.httpClient = serv
return hcs
return newHTTPChunkStoreWithClient("http://localhost:9000", "", serv)
}
func newAuthenticatingHTTPBatchStoreForTest(suite *HTTPBatchStoreSuite, hostUrl string) *httpBatchStore {
func newAuthenticatingHTTPChunkStoreForTest(suite *HTTPChunkStoreSuite, hostUrl string) *httpChunkStore {
authenticate := func(req *http.Request) {
suite.Equal(testAuthToken, req.URL.Query().Get("access_token"))
}
@@ -102,12 +100,16 @@ func newAuthenticatingHTTPBatchStoreForTest(suite *HTTPBatchStoreSuite, hostUrl
HandleRootPost(w, req, ps, suite.cs)
},
)
hcs := NewHTTPBatchStore(hostUrl, "")
hcs.httpClient = serv
return hcs
serv.GET(
constants.RootPath,
func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
HandleRootGet(w, req, ps, suite.cs)
},
)
return newHTTPChunkStoreWithClient(hostUrl, "", serv)
}
func newBadVersionHTTPBatchStoreForTest(suite *HTTPBatchStoreSuite) *httpBatchStore {
func newBadVersionHTTPChunkStoreForTest(suite *HTTPChunkStoreSuite) *httpChunkStore {
serv := inlineServer{httprouter.New()}
serv.POST(
constants.RootPath,
@@ -116,17 +118,21 @@ func newBadVersionHTTPBatchStoreForTest(suite *HTTPBatchStoreSuite) *httpBatchSt
w.Header().Set(NomsVersionHeader, "BAD")
},
)
hcs := NewHTTPBatchStore("http://localhost", "")
hcs.httpClient = serv
return hcs
serv.GET(
constants.RootPath,
func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
HandleRootGet(w, req, ps, suite.cs)
},
)
return newHTTPChunkStoreWithClient("http://localhost", "", serv)
}
func (suite *HTTPBatchStoreSuite) TearDownTest() {
func (suite *HTTPChunkStoreSuite) TearDownTest() {
suite.store.Close()
suite.cs.Close()
}
func (suite *HTTPBatchStoreSuite) TestPutChunk() {
func (suite *HTTPChunkStoreSuite) TestPutChunk() {
c := types.EncodeValue(types.String("abc"), nil)
suite.store.SchedulePut(c)
suite.store.Flush()
@@ -134,7 +140,7 @@ func (suite *HTTPBatchStoreSuite) TestPutChunk() {
suite.Equal(1, suite.cs.Writes)
}
func (suite *HTTPBatchStoreSuite) TestPutChunksInOrder() {
func (suite *HTTPChunkStoreSuite) TestPutChunksInOrder() {
vals := []types.Value{
types.String("abc"),
types.String("def"),
@@ -150,39 +156,50 @@ func (suite *HTTPBatchStoreSuite) TestPutChunksInOrder() {
suite.Equal(3, suite.cs.Writes)
}
func (suite *HTTPBatchStoreSuite) TestRoot() {
func (suite *HTTPChunkStoreSuite) TestRebase() {
suite.Equal(hash.Hash{}, suite.store.Root())
c := types.EncodeValue(types.NewMap(), nil)
suite.cs.Put(c)
suite.True(suite.store.UpdateRoot(c.Hash(), hash.Hash{}))
suite.True(suite.cs.Commit(c.Hash(), hash.Hash{})) // change happens behind our backs
suite.Equal(hash.Hash{}, suite.store.Root()) // shouldn't be visible yet
suite.store.Rebase()
suite.Equal(c.Hash(), suite.cs.Root())
}
func (suite *HTTPBatchStoreSuite) TestVersionMismatch() {
store := newBadVersionHTTPBatchStoreForTest(suite)
func (suite *HTTPChunkStoreSuite) TestRoot() {
c := types.EncodeValue(types.NewMap(), nil)
suite.cs.Put(c)
suite.True(suite.store.Commit(c.Hash(), hash.Hash{}))
suite.Equal(c.Hash(), suite.cs.Root())
}
func (suite *HTTPChunkStoreSuite) TestVersionMismatch() {
store := newBadVersionHTTPChunkStoreForTest(suite)
defer store.Close()
c := types.EncodeValue(types.NewMap(), nil)
suite.cs.Put(c)
suite.Panics(func() { store.UpdateRoot(c.Hash(), hash.Hash{}) })
suite.Panics(func() { store.Commit(c.Hash(), hash.Hash{}) })
}
func (suite *HTTPBatchStoreSuite) TestUpdateRoot() {
func (suite *HTTPChunkStoreSuite) TestCommit() {
c := types.EncodeValue(types.NewMap(), nil)
suite.cs.Put(c)
suite.True(suite.store.UpdateRoot(c.Hash(), hash.Hash{}))
suite.True(suite.store.Commit(c.Hash(), hash.Hash{}))
suite.Equal(c.Hash(), suite.cs.Root())
}
func (suite *HTTPBatchStoreSuite) TestUpdateRootWithParams() {
func (suite *HTTPChunkStoreSuite) TestCommitWithParams() {
u := fmt.Sprintf("http://localhost:9000?access_token=%s&other=19", testAuthToken)
store := newAuthenticatingHTTPBatchStoreForTest(suite, u)
store := newAuthenticatingHTTPChunkStoreForTest(suite, u)
defer store.Close()
c := types.EncodeValue(types.NewMap(), nil)
suite.cs.Put(c)
suite.True(store.UpdateRoot(c.Hash(), hash.Hash{}))
suite.True(store.Commit(c.Hash(), hash.Hash{}))
suite.Equal(c.Hash(), suite.cs.Root())
}
func (suite *HTTPBatchStoreSuite) TestGet() {
func (suite *HTTPChunkStoreSuite) TestGet() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
@@ -194,13 +211,14 @@ func (suite *HTTPBatchStoreSuite) TestGet() {
suite.Equal(chnx[1].Hash(), got.Hash())
}
func (suite *HTTPBatchStoreSuite) TestGetMany() {
func (suite *HTTPChunkStoreSuite) TestGetMany() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
}
notPresent := chunks.NewChunk([]byte("ghi")).Hash()
suite.cs.PutMany(chnx)
suite.cs.Flush()
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), notPresent)
foundChunks := make(chan *chunks.Chunk)
@@ -213,7 +231,7 @@ func (suite *HTTPBatchStoreSuite) TestGetMany() {
suite.True(hashes.Has(notPresent))
}
func (suite *HTTPBatchStoreSuite) TestGetManyAllCached() {
func (suite *HTTPChunkStoreSuite) TestGetManyAllCached() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
@@ -231,14 +249,15 @@ func (suite *HTTPBatchStoreSuite) TestGetManyAllCached() {
suite.Len(hashes, 0)
}
func (suite *HTTPBatchStoreSuite) TestGetManySomeCached() {
func (suite *HTTPChunkStoreSuite) TestGetManySomeCached() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
}
cached := chunks.NewChunk([]byte("ghi"))
suite.cs.PutMany(chnx)
suite.store.SchedulePut(cached)
suite.cs.Flush()
suite.store.Put(cached)
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), cached.Hash())
foundChunks := make(chan *chunks.Chunk)
@@ -250,7 +269,7 @@ func (suite *HTTPBatchStoreSuite) TestGetManySomeCached() {
suite.Len(hashes, 0)
}
func (suite *HTTPBatchStoreSuite) TestGetSame() {
func (suite *HTTPChunkStoreSuite) TestGetSame() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("def")),
chunks.NewChunk([]byte("def")),
@@ -262,7 +281,7 @@ func (suite *HTTPBatchStoreSuite) TestGetSame() {
suite.Equal(chnx[1].Hash(), got.Hash())
}
func (suite *HTTPBatchStoreSuite) TestHas() {
func (suite *HTTPChunkStoreSuite) TestHas() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
@@ -271,3 +290,57 @@ func (suite *HTTPBatchStoreSuite) TestHas() {
suite.True(suite.store.Has(chnx[0].Hash()))
suite.True(suite.store.Has(chnx[1].Hash()))
}
func (suite *HTTPChunkStoreSuite) TestHasMany() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
}
suite.cs.PutMany(chnx)
suite.cs.Flush()
notPresent := chunks.NewChunk([]byte("ghi")).Hash()
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), notPresent)
present := suite.store.HasMany(hashes)
suite.Len(present, len(chnx))
for _, c := range chnx {
suite.True(present.Has(c.Hash()), "%s not present in %v", c.Hash(), present)
}
suite.False(present.Has(notPresent))
}
func (suite *HTTPChunkStoreSuite) TestHasManyAllCached() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
}
suite.store.PutMany(chnx)
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash())
present := suite.store.HasMany(hashes)
suite.Len(present, len(chnx))
for _, c := range chnx {
suite.True(present.Has(c.Hash()), "%s not present in %v", c.Hash(), present)
}
}
func (suite *HTTPChunkStoreSuite) TestHasManySomeCached() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
}
cached := chunks.NewChunk([]byte("ghi"))
suite.cs.PutMany(chnx)
suite.cs.Flush()
suite.store.Put(cached)
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), cached.Hash())
present := suite.store.HasMany(hashes)
suite.Len(present, len(chnx)+1)
for _, c := range chnx {
suite.True(present.Has(c.Hash()), "%s not present in %v", c.Hash(), present)
}
}

View File

@@ -1,85 +0,0 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package datas
import (
"sync"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/constants"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
)
type localBatchStore struct {
cs chunks.ChunkStore
once sync.Once
mu sync.Mutex
cc *completenessChecker
}
func newLocalBatchStore(cs chunks.ChunkStore) *localBatchStore {
return &localBatchStore{cs: cs, cc: newCompletenessChecker()}
}
// Get checks the internal Chunk cache, proxying to the backing ChunkStore if
// not present.
func (lbs *localBatchStore) Get(h hash.Hash) chunks.Chunk {
lbs.once.Do(lbs.expectVersion)
return lbs.cs.Get(h)
}
func (lbs *localBatchStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
lbs.cs.GetMany(hashes, foundChunks)
}
// SchedulePut calls Put on the underlying ChunkStore and adds any refs in c
// to a pool of unresolved refs which are validated against the underlying
// ChunkStore during Flush() or UpdateRoot().
func (lbs *localBatchStore) SchedulePut(c chunks.Chunk) {
lbs.once.Do(lbs.expectVersion)
lbs.cs.Put(c)
lbs.mu.Lock()
defer lbs.mu.Unlock()
lbs.cc.AddRefs(types.DecodeValue(c, nil))
}
func (lbs *localBatchStore) expectVersion() {
dataVersion := lbs.cs.Version()
if constants.NomsVersion != dataVersion {
d.Panic("SDK version %s incompatible with data of version %s", constants.NomsVersion, dataVersion)
}
}
func (lbs *localBatchStore) Root() hash.Hash {
lbs.once.Do(lbs.expectVersion)
return lbs.cs.Root()
}
// UpdateRoot flushes outstanding writes to the backing ChunkStore before
// updating its Root, because it's almost certainly the case that the caller
// wants to point that root at some recently-Put Chunk.
func (lbs *localBatchStore) UpdateRoot(current, last hash.Hash) bool {
lbs.once.Do(lbs.expectVersion)
lbs.Flush()
return lbs.cs.UpdateRoot(current, last)
}
func (lbs *localBatchStore) Flush() {
lbs.once.Do(lbs.expectVersion)
func() {
lbs.mu.Lock()
defer lbs.mu.Unlock()
lbs.cc.PanicIfDangling(lbs.cs)
}()
lbs.cs.Flush()
}
// Close closes the underlying ChunkStore.
func (lbs *localBatchStore) Close() error {
lbs.Flush()
return lbs.cs.Close()
}

View File

@@ -15,10 +15,7 @@ type LocalDatabase struct {
}
func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase {
bs := newLocalBatchStore(cs)
return &LocalDatabase{
newDatabaseCommon(newCachingChunkHaver(cs), types.NewValueStore(bs), bs),
}
return &LocalDatabase{newDatabaseCommon(newValidatingChunkStore(cs))}
}
func (ldb *LocalDatabase) GetDataset(datasetID string) Dataset {

View File

@@ -31,7 +31,7 @@ const bytesWrittenSampleRate = .10
// TODO: Get rid of this (BUG 2982)
func PullWithFlush(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency int, progressCh chan PullProgress) {
Pull(srcDB, sinkDB, sourceRef, sinkHeadRef, concurrency, progressCh)
sinkDB.validatingBatchStore().Flush()
sinkDB.validatingChunkStore().Flush()
}
// Pull objects that descend from sourceRef from srcDB to sinkDB. sinkHeadRef
@@ -242,16 +242,15 @@ func getChunks(v types.Value) (chunks []types.Ref) {
func traverseSource(srcRef types.Ref, srcDB, sinkDB Database, estimateBytesWritten bool) traverseSourceResult {
h := srcRef.TargetHash()
if !sinkDB.has(h) {
srcBS := srcDB.validatingBatchStore()
c := srcBS.Get(h)
c := srcDB.validatingChunkStore().Get(h)
v := types.DecodeValue(c, srcDB)
if v == nil {
d.Panic("Expected decoded chunk to be non-nil.")
}
sinkDB.validatingBatchStore().SchedulePut(c)
sinkDB.validatingChunkStore().Put(c)
bytesWritten := 0
if estimateBytesWritten {
// TODO: Probably better to hide this behind the BatchStore abstraction since
// TODO: Probably better to hide this behind the ChunkStore abstraction since
// write size is implementation specific.
bytesWritten = len(snappy.Encode(nil, c.Data()))
}

View File

@@ -85,8 +85,7 @@ func (suite *RemoteToRemoteSuite) SetupTest() {
}
func makeRemoteDb(cs chunks.ChunkStore) Database {
hbs := NewHTTPBatchStoreForTest(cs)
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(hbs), types.NewValueStore(hbs), hbs)}
return &RemoteDatabaseClient{newDatabaseCommon(NewHTTPChunkStoreForTest(cs))}
}
func (suite *PullSuite) sinkIsLocal() bool {
@@ -158,7 +157,7 @@ func (suite *PullSuite) TestPullEverything() {
suite.Equal(0, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.validatingBatchStore().Flush()
suite.sink.validatingChunkStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
suite.NotNil(v)
suite.True(l.Equals(v.Get(ValueField)))
@@ -202,7 +201,7 @@ func (suite *PullSuite) TestPullMultiGeneration() {
suite.Equal(expectedReads, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.validatingBatchStore().Flush()
suite.sink.validatingChunkStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
suite.NotNil(v)
suite.True(srcL.Equals(v.Get(ValueField)))
@@ -250,7 +249,7 @@ func (suite *PullSuite) TestPullDivergentHistory() {
suite.Equal(preReads, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.validatingBatchStore().Flush()
suite.sink.validatingChunkStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
suite.NotNil(v)
suite.True(srcL.Equals(v.Get(ValueField)))
@@ -297,7 +296,7 @@ func (suite *PullSuite) TestPullUpdates() {
suite.Equal(expectedReads, suite.sinkCS.Reads)
pt.Validate(suite)
suite.sink.validatingBatchStore().Flush()
suite.sink.validatingChunkStore().Flush()
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
suite.NotNil(v)
suite.True(srcL.Equals(v.Get(ValueField)))

View File

@@ -15,8 +15,7 @@ type RemoteDatabaseClient struct {
}
func NewRemoteDatabase(baseURL, auth string) *RemoteDatabaseClient {
httpBS := NewHTTPBatchStore(baseURL, auth)
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(httpBS), types.NewValueStore(httpBS), httpBS)}
return &RemoteDatabaseClient{newDatabaseCommon(NewHTTPChunkStore(baseURL, auth))}
}
func (rdb *RemoteDatabaseClient) GetDataset(datasetID string) Dataset {

View File

@@ -284,7 +284,7 @@ func handleGetBlob(w http.ResponseWriter, req *http.Request, ps URLParams, cs ch
d.Panic("h failed to parse")
}
vs := types.NewValueStore(types.NewBatchStoreAdaptor(cs))
vs := types.NewValueStore(cs)
v := vs.ReadValue(h)
b, ok := v.(types.Blob)
if !ok {
@@ -342,8 +342,7 @@ func handleRootGet(w http.ResponseWriter, req *http.Request, ps URLParams, rt ch
d.Panic("Expected get method.")
}
rootRef := rt.Root()
fmt.Fprintf(w, "%v", rootRef.String())
fmt.Fprintf(w, "%v", rt.Root().String())
w.Header().Add("content-type", "text/plain")
}
@@ -364,7 +363,7 @@ func handleRootPost(w http.ResponseWriter, req *http.Request, ps URLParams, cs c
}
current := hash.Parse(tokens[0])
vs := types.NewValueStore(types.NewBatchStoreAdaptor(cs))
vs := types.NewValueStore(cs)
// Ensure that proposed new Root is present in cs
proposed := vs.ReadValue(current)
@@ -377,7 +376,7 @@ func handleRootPost(w http.ResponseWriter, req *http.Request, ps URLParams, cs c
if !last.IsEmpty() {
lastVal := vs.ReadValue(last)
if lastVal == nil {
d.Panic("Can't UpdateRoot from a non-present Chunk")
d.Panic("Can't Commit from a non-present Chunk")
}
datasets = lastVal.(types.Map)
@@ -391,8 +390,10 @@ func handleRootPost(w http.ResponseWriter, req *http.Request, ps URLParams, cs c
assertMapOfStringToRefOfCommit(m, datasets, vs)
}
if !cs.UpdateRoot(current, last) {
if !cs.Commit(current, last) {
w.WriteHeader(http.StatusConflict)
w.Header().Add("content-type", "text/plain")
fmt.Fprintf(w, "%v", cs.Root().String())
return
}
}

View File

@@ -302,7 +302,7 @@ func TestHandleGetRoot(t *testing.T) {
cs := chunks.NewTestStore()
c := chunks.NewChunk([]byte("abc"))
cs.Put(c)
assert.True(cs.UpdateRoot(c.Hash(), hash.Hash{}))
assert.True(cs.Commit(c.Hash(), hash.Hash{}))
w := httptest.NewRecorder()
HandleRootGet(w, newRequest("GET", "", "", nil, nil), params{}, cs)
@@ -318,7 +318,7 @@ func TestHandleGetBase(t *testing.T) {
cs := chunks.NewTestStore()
c := chunks.NewChunk([]byte("abc"))
cs.Put(c)
assert.True(cs.UpdateRoot(c.Hash(), hash.Hash{}))
assert.True(cs.Commit(c.Hash(), hash.Hash{}))
w := httptest.NewRecorder()
HandleBaseGet(w, newRequest("GET", "", "", nil, nil), params{}, cs)
@@ -331,18 +331,18 @@ func TestHandleGetBase(t *testing.T) {
func TestHandlePostRoot(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
vs := types.NewValueStore(types.NewBatchStoreAdaptor(cs))
vs := types.NewValueStore(cs)
commit := buildTestCommit(types.String("head"))
commitRef := vs.WriteValue(commit)
firstHead := types.NewMap(types.String("dataset1"), types.ToRefOfValue(commitRef))
firstHeadRef := vs.WriteValue(firstHead)
vs.Flush(firstHeadRef.TargetHash())
vs.Flush()
commit = buildTestCommit(types.String("second"), commitRef)
newHead := types.NewMap(types.String("dataset1"), types.ToRefOfValue(vs.WriteValue(commit)))
newHeadRef := vs.WriteValue(newHead)
vs.Flush(newHeadRef.TargetHash())
vs.Flush()
// First attempt should fail, as 'last' won't match.
u := &url.URL{}
@@ -357,7 +357,7 @@ func TestHandlePostRoot(t *testing.T) {
assert.Equal(http.StatusConflict, w.Code, "Handler error:\n%s", string(w.Body.Bytes()))
// Now, update the root manually to 'last' and try again.
assert.True(cs.UpdateRoot(firstHeadRef.TargetHash(), hash.Hash{}))
assert.True(cs.Commit(firstHeadRef.TargetHash(), hash.Hash{}))
w = httptest.NewRecorder()
HandleRootPost(w, newRequest("POST", "", url, nil, nil), params{}, cs)
assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes()))

View File

@@ -0,0 +1,65 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package datas
import (
"sync"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
)
type validatingChunkStore struct {
chunks.ChunkStore
mu sync.Mutex
cc *completenessChecker
}
func newValidatingChunkStore(cs chunks.ChunkStore) *validatingChunkStore {
return &validatingChunkStore{ChunkStore: cs, cc: newCompletenessChecker()}
}
// Put calls Put on the underlying ChunkStore and adds any refs in c
// to a pool of unresolved refs which are validated against the underlying
// ChunkStore during Flush() or Commit().
func (vcs *validatingChunkStore) Put(c chunks.Chunk) {
vcs.ChunkStore.Put(c)
vcs.mu.Lock()
defer vcs.mu.Unlock()
vcs.cc.AddRefs(types.DecodeValue(c, nil))
}
// PutMany calls PutMany on the underlying ChunkStore and adds any refs in c
// to a pool of unresolved refs which are validated against the underlying
// ChunkStore during Flush() or Commit().
func (vcs *validatingChunkStore) PutMany(chunks []chunks.Chunk) {
vcs.ChunkStore.PutMany(chunks)
vcs.mu.Lock()
defer vcs.mu.Unlock()
for _, c := range chunks {
vcs.cc.AddRefs(types.DecodeValue(c, nil))
}
}
// Commit validates pending chunks for ref-completeness before calling
// Commit() on the underlying ChunkStore.
func (vcs *validatingChunkStore) Commit(current, last hash.Hash) bool {
vcs.validate()
return vcs.ChunkStore.Commit(current, last)
}
// Flush validates pending chunks for ref-completeness before calling
// Flush() on the underlying ChunkStore.
func (vcs *validatingChunkStore) Flush() {
vcs.validate()
vcs.ChunkStore.Flush()
}
func (vcs *validatingChunkStore) validate() {
vcs.mu.Lock()
defer vcs.mu.Unlock()
vcs.cc.PanicIfDangling(vcs.ChunkStore)
}

View File

@@ -10,11 +10,10 @@ import (
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/testify/assert"
)
type storeOpenFn func() types.BatchStore
type storeOpenFn func() chunks.ChunkStore
func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.TestingT) bool {
store := refreshStore()
@@ -23,17 +22,17 @@ func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.Tes
return true
}
func writeToEmptyStore(store types.BatchStore, src *dataSource, t assert.TestingT) {
func writeToEmptyStore(store chunks.ChunkStore, src *dataSource, t assert.TestingT) {
root := store.Root()
assert.Equal(t, hash.Hash{}, root)
chunx := goReadChunks(src)
for c := range chunx {
store.SchedulePut(*c)
store.Put(*c)
}
newRoot := chunks.NewChunk([]byte("root"))
store.SchedulePut(newRoot)
assert.True(t, store.UpdateRoot(newRoot.Hash(), root))
store.Put(newRoot)
assert.True(t, store.Commit(newRoot.Hash(), root))
}
func goReadChunks(src *dataSource) <-chan *chunks.Chunk {
@@ -49,7 +48,7 @@ func benchmarkNoRefreshWrite(openStore storeOpenFn, src *dataSource, t assert.Te
store := openStore()
chunx := goReadChunks(src)
for c := range chunx {
store.SchedulePut(*c)
store.Put(*c)
}
assert.NoError(t, store.Close())
}

View File

@@ -11,7 +11,6 @@ import (
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
"github.com/dustin/go-humanize"
)
@@ -20,7 +19,7 @@ type fileBlockStore struct {
w io.WriteCloser
}
func newFileBlockStore(w io.WriteCloser) types.BatchStore {
func newFileBlockStore(w io.WriteCloser) chunks.ChunkStore {
return fileBlockStore{bufio.NewWriterSize(w, humanize.MiByte), w}
}
@@ -32,10 +31,28 @@ func (fb fileBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.C
panic("not impl")
}
func (fb fileBlockStore) SchedulePut(c chunks.Chunk) {
func (fb fileBlockStore) Has(h hash.Hash) bool {
panic("not impl")
}
func (fb fileBlockStore) HasMany(hashes hash.HashSet) (present hash.HashSet) {
panic("not impl")
}
func (fb fileBlockStore) Put(c chunks.Chunk) {
io.Copy(fb.bw, bytes.NewReader(c.Data()))
}
func (fb fileBlockStore) PutMany(chunks []chunks.Chunk) {
for _, c := range chunks {
fb.Put(c)
}
}
func (fb fileBlockStore) Version() string {
panic("not impl")
}
func (fb fileBlockStore) Flush() {}
func (fb fileBlockStore) Close() error {
@@ -43,11 +60,13 @@ func (fb fileBlockStore) Close() error {
return nil
}
func (fb fileBlockStore) Rebase() {}
func (fb fileBlockStore) Root() hash.Hash {
return hash.Hash{}
}
func (fb fileBlockStore) UpdateRoot(current, last hash.Hash) bool {
func (fb fileBlockStore) Commit(current, last hash.Hash) bool {
fb.bw.Flush()
return true
}

View File

@@ -12,9 +12,9 @@ import (
"sort"
"time"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/nbs"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/profile"
"github.com/attic-labs/testify/assert"
"github.com/aws/aws-sdk-go/aws"
@@ -75,19 +75,19 @@ func main() {
open := newNullBlockStore
wrote := false
var writeDB func()
var refresh func() types.BatchStore
var refresh func() chunks.ChunkStore
if *toNBS != "" || *toFile != "" || *toAWS != "" {
var reset func()
if *toNBS != "" {
dir := makeTempDir(*toNBS, pb)
defer os.RemoveAll(dir)
open = func() types.BatchStore { return nbs.NewLocalStore(dir, bufSize) }
open = func() chunks.ChunkStore { return nbs.NewLocalStore(dir, bufSize) }
reset = func() { os.RemoveAll(dir); os.MkdirAll(dir, 0777) }
} else if *toFile != "" {
dir := makeTempDir(*toFile, pb)
defer os.RemoveAll(dir)
open = func() types.BatchStore {
open = func() chunks.ChunkStore {
f, err := ioutil.TempFile(dir, "")
d.Chk.NoError(err)
return newFileBlockStore(f)
@@ -96,7 +96,7 @@ func main() {
} else if *toAWS != "" {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2")))
open = func() types.BatchStore {
open = func() chunks.ChunkStore {
return nbs.NewAWSStore(dynamoTable, *toAWS, s3Bucket, s3.New(sess), dynamodb.New(sess), bufSize)
}
reset = func() {
@@ -112,21 +112,21 @@ func main() {
}
writeDB = func() { wrote = ensureNovelWrite(wrote, open, src, pb) }
refresh = func() types.BatchStore {
refresh = func() chunks.ChunkStore {
reset()
return open()
}
} else {
if *useNBS != "" {
open = func() types.BatchStore { return nbs.NewLocalStore(*useNBS, bufSize) }
open = func() chunks.ChunkStore { return nbs.NewLocalStore(*useNBS, bufSize) }
} else if *useAWS != "" {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2")))
open = func() types.BatchStore {
open = func() chunks.ChunkStore {
return nbs.NewAWSStore(dynamoTable, *useAWS, s3Bucket, s3.New(sess), dynamodb.New(sess), bufSize)
}
}
writeDB = func() {}
refresh = func() types.BatchStore { panic("WriteNovel unsupported with --useLDB and --useNBS") }
refresh = func() chunks.ChunkStore { panic("WriteNovel unsupported with --useLDB and --useNBS") }
}
benchmarks := []struct {

View File

@@ -7,14 +7,13 @@ package main
import (
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
)
type nullBlockStore struct {
bogus int32
}
func newNullBlockStore() types.BatchStore {
func newNullBlockStore() chunks.ChunkStore {
return nullBlockStore{}
}
@@ -26,7 +25,21 @@ func (nb nullBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.C
panic("not impl")
}
func (nb nullBlockStore) SchedulePut(c chunks.Chunk) {}
func (nb nullBlockStore) Has(h hash.Hash) bool {
panic("not impl")
}
func (nb nullBlockStore) HasMany(hashes hash.HashSet) (present hash.HashSet) {
panic("not impl")
}
func (nb nullBlockStore) Put(c chunks.Chunk) {}
func (nb nullBlockStore) PutMany(chunks []chunks.Chunk) {}
func (nb nullBlockStore) Version() string {
panic("not impl")
}
func (nb nullBlockStore) Flush() {}
@@ -34,10 +47,12 @@ func (nb nullBlockStore) Close() error {
return nil
}
func (nb nullBlockStore) Rebase() {}
func (nb nullBlockStore) Root() hash.Hash {
return hash.Hash{}
}
func (nb nullBlockStore) UpdateRoot(current, last hash.Hash) bool {
func (nb nullBlockStore) Commit(current, last hash.Hash) bool {
return true
}

View File

@@ -67,7 +67,7 @@ func (suite *BlockStoreSuite) TestChunkStorePut() {
// See http://www.di-mgt.com.au/sha_testvectors.html
suite.Equal("rmnjb8cjc5tblj21ed4qs821649eduie", h.String())
suite.store.UpdateRoot(h, suite.store.Root()) // Commit writes
suite.store.Commit(h, suite.store.Root()) // Commit writes
// And reading it via the API should work...
assertInputInStore(input, h, suite.store, suite.Assert())
@@ -80,7 +80,7 @@ func (suite *BlockStoreSuite) TestChunkStorePut() {
suite.store.Put(c)
suite.Equal(h, c.Hash())
assertInputInStore(input, h, suite.store, suite.Assert())
suite.store.UpdateRoot(h, suite.store.Root()) // Commit writes
suite.store.Commit(h, suite.store.Root()) // Commit writes
if suite.putCountFn != nil {
suite.Equal(2, suite.putCountFn())
@@ -92,7 +92,7 @@ func (suite *BlockStoreSuite) TestChunkStorePutMany() {
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
suite.store.PutMany([]chunks.Chunk{c1, c2})
suite.store.UpdateRoot(c1.Hash(), suite.store.Root()) // Commit writes
suite.store.Commit(c1.Hash(), suite.store.Root()) // Commit writes
// And reading it via the API should work...
assertInputInStore(input1, c1.Hash(), suite.store, suite.Assert())
@@ -109,7 +109,7 @@ func (suite *BlockStoreSuite) TestChunkStorePutMoreThanMemTable() {
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
suite.store.PutMany([]chunks.Chunk{c1, c2})
suite.store.UpdateRoot(c1.Hash(), suite.store.Root()) // Commit writes
suite.store.Commit(c1.Hash(), suite.store.Root()) // Commit writes
// And reading it via the API should work...
assertInputInStore(input1, c1.Hash(), suite.store, suite.Assert())
@@ -129,7 +129,7 @@ func (suite *BlockStoreSuite) TestChunkStoreGetMany() {
chnx[i] = chunks.NewChunk(data)
}
suite.store.PutMany(chnx)
suite.store.UpdateRoot(chnx[0].Hash(), suite.store.Root()) // Commit writes
suite.store.Commit(chnx[0].Hash(), suite.store.Root()) // Commit writes
hashes := make(hash.HashSlice, len(chnx))
for i, c := range chnx {
@@ -184,12 +184,52 @@ func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() {
// And so should reading c1 via the API
assertInputInStore(input1, c1.Hash(), suite.store, suite.Assert())
suite.True(interloper.UpdateRoot(c1.Hash(), interloper.Root())) // Commit root
suite.True(interloper.Commit(c1.Hash(), interloper.Root())) // Commit root
// Updating from stale root should fail...
suite.False(suite.store.UpdateRoot(c2.Hash(), root))
suite.False(suite.store.Commit(c2.Hash(), root))
// ...but new root should succeed
suite.True(suite.store.UpdateRoot(c2.Hash(), suite.store.Root()))
suite.True(suite.store.Commit(c2.Hash(), suite.store.Root()))
}
func (suite *BlockStoreSuite) TestChunkStorePutWithRebase() {
input1, input2 := []byte("abc"), []byte("def")
c1, c2 := chunks.NewChunk(input1), chunks.NewChunk(input2)
root := suite.store.Root()
interloper := NewLocalStore(suite.dir, testMemTableSize)
interloper.Put(c1)
interloper.Flush()
suite.store.Put(c2)
// Reading c2 via the API should work pre-rebase
assertInputInStore(input2, c2.Hash(), suite.store, suite.Assert())
// Shouldn't have c1 yet.
suite.False(suite.store.Has(c1.Hash()))
suite.store.Rebase()
// Reading c2 via the API should work post-rebase
assertInputInStore(input2, c2.Hash(), suite.store, suite.Assert())
// And so should reading c1 via the API
assertInputInStore(input1, c1.Hash(), suite.store, suite.Assert())
// Commit interloper root
suite.True(interloper.Commit(c1.Hash(), interloper.Root()))
// suite.store should still have its initial root
suite.EqualValues(root, suite.store.Root())
suite.store.Rebase()
// Rebase grabbed the new root, so updating should now succeed!
suite.True(suite.store.Commit(c2.Hash(), suite.store.Root()))
// Interloper shouldn't see c2 yet....
suite.False(interloper.Has(c2.Hash()))
interloper.Rebase()
// ...but post-rebase it must
assertInputInStore(input2, c2.Hash(), interloper, suite.Assert())
}
func (suite *BlockStoreSuite) TestCompactOnUpdateRoot() {
@@ -204,7 +244,7 @@ func (suite *BlockStoreSuite) TestCompactOnUpdateRoot() {
root := smallTableStore.Root()
smallTableStore.PutMany(chunx[:testMaxTables])
suite.True(smallTableStore.UpdateRoot(chunx[0].Hash(), root)) // Commit write
suite.True(smallTableStore.Commit(chunx[0].Hash(), root)) // Commit write
exists, _, _, mRoot, specs := mm.ParseIfExists(nil)
suite.True(exists)
@@ -213,7 +253,7 @@ func (suite *BlockStoreSuite) TestCompactOnUpdateRoot() {
root = smallTableStore.Root()
smallTableStore.PutMany(chunx[testMaxTables:])
suite.True(smallTableStore.UpdateRoot(chunx[testMaxTables].Hash(), root)) // Should compact
suite.True(smallTableStore.Commit(chunx[testMaxTables].Hash(), root)) // Should compact
exists, _, _, mRoot, specs = mm.ParseIfExists(nil)
suite.True(exists)

View File

@@ -41,6 +41,12 @@ func (nbc *NomsBlockCache) Has(hash hash.Hash) bool {
return nbc.chunks.Has(hash)
}
// HasMany returns a set containing the members of hashes present in the
// cache.
func (nbc *NomsBlockCache) HasMany(hashes hash.HashSet) hash.HashSet {
return nbc.chunks.HasMany(hashes)
}
// Get retrieves the chunk referenced by hash. If the chunk is not present,
// Get returns the empty Chunk.
func (nbc *NomsBlockCache) Get(hash hash.Hash) chunks.Chunk {

View File

@@ -20,7 +20,7 @@ func TestChunkStoreZeroValue(t *testing.T) {
_, _, store := makeStoreWithFakes(t)
defer store.Close()
// No manifest file gets written until the first call to UpdateRoot(). Prior to that, Root() will simply return hash.Hash{}.
// No manifest file gets written until the first call to Commit(). Prior to that, Root() will simply return hash.Hash{}.
assert.Equal(hash.Hash{}, store.Root())
assert.Equal(constants.NomsVersion, store.Version())
}
@@ -32,11 +32,34 @@ func TestChunkStoreVersion(t *testing.T) {
assert.Equal(constants.NomsVersion, store.Version())
newRoot := hash.Of([]byte("new root"))
if assert.True(store.UpdateRoot(newRoot, hash.Hash{})) {
if assert.True(store.Commit(newRoot, hash.Hash{})) {
assert.Equal(constants.NomsVersion, store.Version())
}
}
func TestChunkStoreRebase(t *testing.T) {
assert := assert.New(t)
fm, tt, store := makeStoreWithFakes(t)
defer store.Close()
assert.Equal(hash.Hash{}, store.Root())
assert.Equal(constants.NomsVersion, store.Version())
// Simulate another process writing a manifest behind store's back.
newRoot, chunks := interloperWrite(fm, tt, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
// state in store shouldn't change
assert.Equal(hash.Hash{}, store.Root())
assert.Equal(constants.NomsVersion, store.Version())
store.Rebase()
// NOW it should
assert.Equal(newRoot, store.Root())
assert.Equal(constants.NomsVersion, store.Version())
assertDataInStore(chunks, store, assert)
}
func TestChunkStoreUpdateRoot(t *testing.T) {
assert := assert.New(t)
_, _, store := makeStoreWithFakes(t)
@@ -47,7 +70,7 @@ func TestChunkStoreUpdateRoot(t *testing.T) {
newRootChunk := chunks.NewChunk([]byte("new root"))
newRoot := newRootChunk.Hash()
store.Put(newRootChunk)
if assert.True(store.UpdateRoot(newRoot, hash.Hash{})) {
if assert.True(store.Commit(newRoot, hash.Hash{})) {
assert.True(store.Has(newRoot))
assert.Equal(newRoot, store.Root())
}
@@ -55,7 +78,7 @@ func TestChunkStoreUpdateRoot(t *testing.T) {
secondRootChunk := chunks.NewChunk([]byte("newer root"))
secondRoot := secondRootChunk.Hash()
store.Put(secondRootChunk)
if assert.True(store.UpdateRoot(secondRoot, newRoot)) {
if assert.True(store.Commit(secondRoot, newRoot)) {
assert.Equal(secondRoot, store.Root())
assert.True(store.Has(newRoot))
assert.True(store.Has(secondRoot))
@@ -70,11 +93,8 @@ func TestChunkStoreManifestAppearsAfterConstruction(t *testing.T) {
assert.Equal(hash.Hash{}, store.Root())
assert.Equal(constants.NomsVersion, store.Version())
// Simulate another process writing a manifest after construction.
chunks := [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
newLock, newRoot := computeAddr([]byte("locker")), hash.Of([]byte("new root"))
src := tt.p.Compact(createMemTable(chunks), nil)
fm.set(constants.NomsVersion, newLock, newRoot, []tableSpec{{src.hash(), uint32(len(chunks))}})
// Simulate another process writing a manifest behind store's back.
interloperWrite(fm, tt, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
// state in store shouldn't change
assert.Equal(hash.Hash{}, store.Root())
@@ -86,11 +106,8 @@ func TestChunkStoreManifestFirstWriteByOtherProcess(t *testing.T) {
fm := &fakeManifest{}
tt := newFakeTableSet()
// Simulate another process having already written a manifest.
chunks := [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
newLock, newRoot := computeAddr([]byte("locker")), hash.Of([]byte("new root"))
src := tt.p.Compact(createMemTable(chunks), nil)
fm.set(constants.NomsVersion, newLock, newRoot, []tableSpec{{src.hash(), uint32(len(chunks))}})
// Simulate another process writing a manifest behind store's back.
newRoot, chunks := interloperWrite(fm, tt, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
store := newNomsBlockStore(fm, tt, defaultMemTableSize, defaultMaxTables)
defer store.Close()
@@ -106,15 +123,12 @@ func TestChunkStoreUpdateRootOptimisticLockFail(t *testing.T) {
defer store.Close()
// Simulate another process writing a manifest behind store's back.
chunks := [][]byte{[]byte("hello2"), []byte("goodbye2"), []byte("badbye2")}
newLock, newRoot := computeAddr([]byte("locker")), hash.Of([]byte("new root"))
src := tt.p.Compact(createMemTable(chunks), nil)
fm.set(constants.NomsVersion, newLock, newRoot, []tableSpec{{src.hash(), uint32(len(chunks))}})
newRoot, chunks := interloperWrite(fm, tt, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
newRoot2 := hash.Of([]byte("new root 2"))
assert.False(store.UpdateRoot(newRoot2, hash.Hash{}))
assert.False(store.Commit(newRoot2, hash.Hash{}))
assertDataInStore(chunks, store, assert)
assert.True(store.UpdateRoot(newRoot2, newRoot))
assert.True(store.Commit(newRoot2, newRoot))
}
func makeStoreWithFakes(t *testing.T) (fm *fakeManifest, tt tableSet, store *NomsBlockStore) {
@@ -124,6 +138,15 @@ func makeStoreWithFakes(t *testing.T) (fm *fakeManifest, tt tableSet, store *Nom
return
}
// Simulate another process writing a manifest behind store's back.
func interloperWrite(fm *fakeManifest, tt tableSet, rootChunk []byte, chunks ...[]byte) (newRoot hash.Hash, persisted [][]byte) {
newLock, newRoot := computeAddr([]byte("locker")), hash.Of(rootChunk)
persisted = append(chunks, rootChunk)
src := tt.p.Compact(createMemTable(persisted), nil)
fm.set(constants.NomsVersion, newLock, newRoot, []tableSpec{{src.hash(), uint32(len(chunks))}})
return
}
func createMemTable(chunks [][]byte) *memTable {
mt := newMemTable(1 << 10)
for _, c := range chunks {

View File

@@ -356,13 +356,24 @@ func fromHasRecords(reqs []hasRecord) hash.HashSet {
return present
}
func (nbs *NomsBlockStore) Rebase() {
if exists, vers, lock, root, tableSpecs := nbs.mm.ParseIfExists(nil); exists {
nbs.mu.Lock()
defer nbs.mu.Unlock()
nbs.nomsVersion, nbs.manifestLock, nbs.root = vers, lock, root
var dropped chunkSources
nbs.tables, dropped = nbs.tables.Rebase(tableSpecs)
dropped.close()
}
}
func (nbs *NomsBlockStore) Root() hash.Hash {
nbs.mu.RLock()
defer nbs.mu.RUnlock()
return nbs.root
}
func (nbs *NomsBlockStore) UpdateRoot(current, last hash.Hash) bool {
func (nbs *NomsBlockStore) Commit(current, last hash.Hash) bool {
b := &backoff.Backoff{
Min: 128 * time.Microsecond,
Max: 10 * time.Second,

View File

@@ -12,7 +12,6 @@ import (
"testing"
"github.com/attic-labs/graphql"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/marshal"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/test"
@@ -30,8 +29,7 @@ func TestQueryGraphQL(t *testing.T) {
}
func (suite *QueryGraphQLSuite) SetupTest() {
cs := chunks.NewTestStore()
suite.vs = types.NewValueStore(types.NewBatchStoreAdaptor(cs))
suite.vs = types.NewTestValueStore()
}
func (suite *QueryGraphQLSuite) assertQueryResult(v types.Value, q, expect string) {

View File

@@ -1,102 +0,0 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"io"
"sync"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/constants"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
)
// BatchStore provides an interface similar to chunks.ChunkStore, but batch-
// oriented. Instead of Put(), it provides SchedulePut(), which enqueues a
// Chunk to be sent at a possibly later time.
type BatchStore interface {
// Get returns the Chunk with the hash h from the store. If h is absent
// from the store, chunks.EmptyChunk is returned.
Get(h hash.Hash) chunks.Chunk
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk)
// SchedulePut enqueues a write for the Chunk c with the given refHeight.
// c must be visible to subsequent Get() calls upon return. Typically, the
// Value which was encoded to provide c can also be queried for its
// refHeight. The call may or may not block until c is persisted, but c is
// guaranteed to be persistent after a call to Flush() or UpdateRoot().
SchedulePut(c chunks.Chunk)
// Flush causes enqueued Puts to be persisted.
Flush()
chunks.RootTracker
io.Closer
}
// BatchStoreAdaptor provides a naive implementation of BatchStore should only
// be used with ChunkStores that can Put relatively quickly. It provides no
// actual batching or validation. Its intended use is for adapting a
// ChunkStore for use in something that requires a BatchStore.
type BatchStoreAdaptor struct {
cs chunks.ChunkStore
once sync.Once
}
// NewBatchStoreAdaptor returns a BatchStore instance backed by a ChunkStore.
// Takes ownership of cs and manages its lifetime; calling Close on the
// returned BatchStore will Close cs.
func NewBatchStoreAdaptor(cs chunks.ChunkStore) BatchStore {
return &BatchStoreAdaptor{cs: cs}
}
// Get simply proxies to the backing ChunkStore
func (bsa *BatchStoreAdaptor) Get(h hash.Hash) chunks.Chunk {
bsa.once.Do(bsa.expectVersion)
return bsa.cs.Get(h)
}
// GetMany simply proxies to the backing ChunkStore
func (bsa *BatchStoreAdaptor) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
bsa.once.Do(bsa.expectVersion)
bsa.cs.GetMany(hashes, foundChunks)
}
// SchedulePut simply calls Put on the underlying ChunkStore.
func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk) {
bsa.once.Do(bsa.expectVersion)
bsa.cs.Put(c)
}
func (bsa *BatchStoreAdaptor) expectVersion() {
dataVersion := bsa.cs.Version()
if constants.NomsVersion != dataVersion {
d.Panic("SDK version %s incompatible with data of version %s", constants.NomsVersion, dataVersion)
}
}
func (bsa *BatchStoreAdaptor) Root() hash.Hash {
return bsa.cs.Root()
}
func (bsa *BatchStoreAdaptor) UpdateRoot(current, last hash.Hash) bool {
bsa.once.Do(bsa.expectVersion)
return bsa.cs.UpdateRoot(current, last)
}
func (bsa *BatchStoreAdaptor) Flush() {
bsa.once.Do(bsa.expectVersion)
bsa.cs.Flush()
}
// Close closes the underlying ChunkStore
func (bsa *BatchStoreAdaptor) Close() error {
bsa.Flush()
return bsa.cs.Close()
}

View File

@@ -36,11 +36,11 @@ func isEncodedOutOfLine(v Value) int {
func TestIncrementalLoadList(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
vs := newLocalValueStore(cs)
vs := NewValueStore(cs)
expected := NewList(testVals...)
hash := vs.WriteValue(expected).TargetHash()
vs.Flush(hash)
vs.Flush()
actualVar := vs.ReadValue(hash)
actual := actualVar.(List)
@@ -65,7 +65,7 @@ func TestIncrementalLoadList(t *testing.T) {
func SkipTestIncrementalLoadSet(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
vs := newLocalValueStore(cs)
vs := NewValueStore(cs)
expected := NewSet(testVals...)
ref := vs.WriteValue(expected).TargetHash()
@@ -85,7 +85,7 @@ func SkipTestIncrementalLoadSet(t *testing.T) {
func SkipTestIncrementalLoadMap(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
vs := newLocalValueStore(cs)
vs := NewValueStore(cs)
expected := NewMap(testVals...)
ref := vs.WriteValue(expected).TargetHash()
@@ -106,7 +106,7 @@ func SkipTestIncrementalLoadMap(t *testing.T) {
func SkipTestIncrementalAddRef(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
vs := newLocalValueStore(cs)
vs := NewValueStore(cs)
expectedItem := Number(42)
ref := vs.WriteValue(expectedItem)

View File

@@ -945,19 +945,19 @@ func TestListDiffLargeWithSameMiddle(t *testing.T) {
assert := assert.New(t)
cs1 := chunks.NewTestStore()
vs1 := newLocalValueStore(cs1)
vs1 := NewValueStore(cs1)
nums1 := generateNumbersAsValues(4000)
l1 := NewList(nums1...)
hash1 := vs1.WriteValue(l1).TargetHash()
vs1.Flush(hash1)
vs1.Flush()
refList1 := vs1.ReadValue(hash1).(List)
cs2 := chunks.NewTestStore()
vs2 := newLocalValueStore(cs2)
vs2 := NewValueStore(cs2)
nums2 := generateNumbersAsValuesFromToBy(5, 3550, 1)
l2 := NewList(nums2...)
hash2 := vs2.WriteValue(l2).TargetHash()
vs2.Flush(hash2)
vs2.Flush()
refList2 := vs2.ReadValue(hash2).(List)
// diff lists without value store

View File

@@ -98,7 +98,7 @@ func (suite *diffTestSuite) TestDiff() {
rw := func(col Collection) Collection {
vs := NewTestValueStore()
h := vs.WriteValue(col).TargetHash()
vs.Flush(h)
vs.Flush()
return vs.ReadValue(h).(Collection)
}
newSetAsColRw := func(vs []Value) Collection { return rw(newSetAsCol(vs)) }

View File

@@ -9,7 +9,6 @@ import (
"fmt"
"testing"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/testify/assert"
)
@@ -429,7 +428,7 @@ func TestPathTarget(t *testing.T) {
s := NewStruct("", StructData{
"foo": String("bar"),
})
vs := NewValueStore(NewBatchStoreAdaptor(chunks.NewMemoryStore()))
vs := NewTestValueStore()
r := vs.WriteValue(s)
s2 := NewStruct("", StructData{
"ref": r,

View File

@@ -14,9 +14,7 @@ type ValidatingDecoder struct {
}
func NewValidatingDecoder(cs chunks.ChunkStore) *ValidatingDecoder {
return &ValidatingDecoder{
newLocalValueStore(cs),
}
return &ValidatingDecoder{NewValueStore(cs)}
}
// DecodedChunk holds a pointer to a Chunk and the Value that results from

View File

@@ -22,7 +22,7 @@ func TestValidatingBatchingSinkDecode(t *testing.T) {
func assertPanicsOnInvalidChunk(t *testing.T, data []interface{}) {
cs := chunks.NewTestStore()
vs := newLocalValueStore(cs)
vs := NewValueStore(cs)
r := &nomsTestReader{data, 0}
dec := newValueDecoder(r, vs)
v := dec.readValue()

View File

@@ -8,6 +8,7 @@ import (
"sync"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/constants"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/util/sizecache"
@@ -37,22 +38,25 @@ type ValueReadWriter interface {
opCache() opCache
}
// ValueStore provides methods to read and write Noms Values to a BatchStore.
// ValueStore provides methods to read and write Noms Values to a ChunkStore.
// It minimally validates Values as they're written, but does not guarantee
// that these Values are persisted through the BatchStore until a subsequent
// that these Values are persisted through the ChunkStore until a subsequent
// Flush.
// Currently, WriteValue validates the following properties of a Value v:
// - v can be correctly serialized and its Ref taken
type ValueStore struct {
bs BatchStore
cs chunks.ChunkStore
bufferMu sync.RWMutex
bufferedChunks map[hash.Hash]chunks.Chunk
bufferedChunksMax uint64
bufferedChunkSize uint64
withBufferedChildren map[hash.Hash]uint64 // chunk Hash -> ref height
valueCache *sizecache.SizeCache
opcStore opCacheStore
once sync.Once
opcOnce sync.Once
opcStore opCacheStore
versOnce sync.Once
}
const (
@@ -63,46 +67,52 @@ const (
// NewTestValueStore creates a simple struct that satisfies ValueReadWriter
// and is backed by a chunks.TestStore.
func NewTestValueStore() *ValueStore {
return newLocalValueStore(chunks.NewTestStore())
}
func newLocalValueStore(cs chunks.ChunkStore) *ValueStore {
return NewValueStore(NewBatchStoreAdaptor(cs))
return NewValueStore(chunks.NewTestStore())
}
// NewValueStore returns a ValueStore instance that owns the provided
// BatchStore and manages its lifetime. Calling Close on the returned
// ValueStore will Close bs.
func NewValueStore(bs BatchStore) *ValueStore {
return NewValueStoreWithCache(bs, defaultValueCacheSize)
// ChunkStore and manages its lifetime. Calling Close on the returned
// ValueStore will Close() cs.
func NewValueStore(cs chunks.ChunkStore) *ValueStore {
return NewValueStoreWithCache(cs, defaultValueCacheSize)
}
func NewValueStoreWithCache(bs BatchStore, cacheSize uint64) *ValueStore {
return newValueStoreWithCacheAndPending(bs, cacheSize, defaultPendingPutMax)
func NewValueStoreWithCache(cs chunks.ChunkStore, cacheSize uint64) *ValueStore {
return newValueStoreWithCacheAndPending(cs, cacheSize, defaultPendingPutMax)
}
func newValueStoreWithCacheAndPending(bs BatchStore, cacheSize, pendingMax uint64) *ValueStore {
func newValueStoreWithCacheAndPending(cs chunks.ChunkStore, cacheSize, pendingMax uint64) *ValueStore {
return &ValueStore{
bs: bs,
cs: cs,
bufferMu: sync.RWMutex{},
bufferedChunks: map[hash.Hash]chunks.Chunk{},
bufferedChunksMax: pendingMax,
withBufferedChildren: map[hash.Hash]uint64{},
opcOnce: sync.Once{},
valueCache: sizecache.New(cacheSize),
once: sync.Once{},
versOnce: sync.Once{},
}
}
func (lvs *ValueStore) BatchStore() BatchStore {
return lvs.bs
func (lvs *ValueStore) expectVersion() {
dataVersion := lvs.cs.Version()
if constants.NomsVersion != dataVersion {
d.Panic("SDK version %s incompatible with data of version %s", constants.NomsVersion, dataVersion)
}
}
func (lvs *ValueStore) ChunkStore() chunks.ChunkStore {
return lvs.cs
}
// ReadValue reads and decodes a value from lvs. It is not considered an error
// for the requested chunk to be empty; in this case, the function simply
// returns nil.
func (lvs *ValueStore) ReadValue(h hash.Hash) Value {
lvs.versOnce.Do(lvs.expectVersion)
if v, ok := lvs.valueCache.Get(h); ok {
if v == nil {
return nil
@@ -119,7 +129,7 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value {
return chunks.EmptyChunk
}()
if chunk.IsEmpty() {
chunk = lvs.bs.Get(h)
chunk = lvs.cs.Get(h)
}
if chunk.IsEmpty() {
lvs.valueCache.Add(h, 0, nil)
@@ -135,13 +145,14 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value {
// return, |foundValues| will have been fully sent all Values which have been
// found. Any non-present Values will silently be ignored.
func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Value) {
lvs.versOnce.Do(lvs.expectVersion)
decode := func(h hash.Hash, chunk *chunks.Chunk, toPending bool) Value {
v := DecodeValue(*chunk, lvs)
lvs.valueCache.Add(h, uint64(len(chunk.Data())), v)
return v
}
// First, see which hashes can be found in either the Value cache or bufferedChunks. Put the rest into a new HashSet to be requested en masse from the BatchStore.
// First, see which hashes can be found in either the Value cache or bufferedChunks. Put the rest into a new HashSet to be requested en masse from the ChunkStore.
remaining := hash.HashSet{}
for h := range hashes {
if v, ok := lvs.valueCache.Get(h); ok {
@@ -171,11 +182,11 @@ func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Va
return
}
// Request remaining hashes from BatchStore, processing the found chunks as they come in.
// Request remaining hashes from ChunkStore, processing the found chunks as they come in.
foundChunks := make(chan *chunks.Chunk, 16)
foundHashes := hash.HashSet{}
go func() { lvs.bs.GetMany(remaining, foundChunks); close(foundChunks) }()
go func() { lvs.cs.GetMany(remaining, foundChunks); close(foundChunks) }()
for c := range foundChunks {
h := c.Hash()
foundHashes[h] = struct{}{}
@@ -186,7 +197,7 @@ func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Va
remaining.Remove(h) // Avoid concurrent access with the call to GetMany above
}
// Any remaining hashes weren't found in the BatchStore should be recorded as not present.
// Any remaining hashes weren't found in the ChunkStore should be recorded as not present.
for h := range remaining {
lvs.valueCache.Add(h, 0, nil)
}
@@ -196,6 +207,7 @@ func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Va
// an appropriately-typed types.Ref. v is not guaranteed to be actually
// written until after Flush().
func (lvs *ValueStore) WriteValue(v Value) Ref {
lvs.versOnce.Do(lvs.expectVersion)
d.PanicIfFalse(v != nil)
// Encoding v causes any child chunks, e.g. internal nodes if v is a meta sequence, to get written. That needs to happen before we try to validate v.
c := EncodeValue(v, lvs)
@@ -214,7 +226,7 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
// bufferChunk enqueues c (which is the serialization of v) within this
// ValueStore. Buffered chunks are flushed progressively to the underlying
// BatchStore in a way which attempts to locate children and grandchildren
// ChunkStore in a way which attempts to locate children and grandchildren
// sequentially together. The following invariants are retained:
//
// 1. For any given chunk currently in the buffer, only direct children of the
@@ -225,12 +237,20 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64) {
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
h := c.Hash()
d.PanicIfTrue(height == 0)
lvs.bufferedChunks[h] = c
lvs.bufferedChunkSize += uint64(len(c.Data()))
h := c.Hash()
if _, present := lvs.bufferedChunks[h]; !present {
lvs.bufferedChunks[h] = c
lvs.bufferedChunkSize += uint64(len(c.Data()))
}
putChildren := func(parent hash.Hash) (dataPut int) {
put := func(h hash.Hash, c chunks.Chunk) {
lvs.cs.Put(c)
lvs.bufferedChunkSize -= uint64(len(c.Data()))
delete(lvs.bufferedChunks, h)
}
putChildren := func(parent hash.Hash) {
pending, isBuffered := lvs.bufferedChunks[parent]
if !isBuffered {
return
@@ -239,9 +259,7 @@ func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64) {
pv.WalkRefs(func(grandchildRef Ref) {
gch := grandchildRef.TargetHash()
if pending, present := lvs.bufferedChunks[gch]; present {
lvs.bs.SchedulePut(pending)
dataPut += len(pending.Data())
delete(lvs.bufferedChunks, gch)
put(gch, pending)
}
})
delete(lvs.withBufferedChildren, parent)
@@ -256,7 +274,7 @@ func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64) {
lvs.withBufferedChildren[h] = height
}
if _, hasBufferedChildren := lvs.withBufferedChildren[childHash]; hasBufferedChildren {
lvs.bufferedChunkSize -= uint64(putChildren(childHash))
putChildren(childHash)
}
})
}
@@ -277,55 +295,64 @@ func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64) {
// Any pendingPut is as good as another in this case, so take the first one
break
}
lvs.bs.SchedulePut(chunk)
lvs.bufferedChunkSize -= uint64(len(chunk.Data()))
delete(lvs.bufferedChunks, tallest)
put(tallest, chunk)
continue
}
lvs.bufferedChunkSize -= uint64(putChildren(tallest))
putChildren(tallest)
}
}
func (lvs *ValueStore) Flush(root hash.Hash) {
// Flush() flushes all bufferedChunks to the ChunkStore, with best-effort
// locality, and then flushes the ChunkStore to make the Chunks durable.
// TODO: Is flushing the ChunkStore the right semantics?
func (lvs *ValueStore) Flush() {
func() {
lvs.bufferMu.Lock()
defer lvs.bufferMu.Unlock()
pending, present := lvs.bufferedChunks[root]
if !present {
return
put := func(h hash.Hash, chunk chunks.Chunk) {
lvs.cs.Put(chunk)
delete(lvs.bufferedChunks, h)
lvs.bufferedChunkSize -= uint64(len(chunk.Data()))
}
v := DecodeValue(pending, lvs)
put := func(h hash.Hash, chunk chunks.Chunk) uint64 {
lvs.bs.SchedulePut(chunk)
delete(lvs.bufferedChunks, h)
return uint64(len(chunk.Data()))
}
v.WalkRefs(func(reachable Ref) {
if pending, present := lvs.bufferedChunks[reachable.TargetHash()]; present {
lvs.bufferedChunkSize -= put(reachable.TargetHash(), pending)
for parent := range lvs.withBufferedChildren {
if pending, present := lvs.bufferedChunks[parent]; present {
v := DecodeValue(pending, lvs)
// TODO: Consider gathering up the hashes of the referenced, buffered chunks, doing a HasMany() and only writing chunks that aren't in the ChunkStore
v.WalkRefs(func(reachable Ref) {
if pending, present := lvs.bufferedChunks[reachable.TargetHash()]; present {
put(reachable.TargetHash(), pending)
}
})
put(parent, pending)
}
})
delete(lvs.withBufferedChildren, root) // If not present, this is idempotent
lvs.bufferedChunkSize -= put(root, pending)
}
for _, c := range lvs.bufferedChunks {
// Can't use put() because it's wrong to delete from a lvs.bufferedChunks while iterating it.
lvs.cs.Put(c)
lvs.bufferedChunkSize -= uint64(len(c.Data()))
}
d.PanicIfFalse(lvs.bufferedChunkSize == 0)
lvs.withBufferedChildren = map[hash.Hash]uint64{}
lvs.bufferedChunks = map[hash.Hash]chunks.Chunk{}
}()
lvs.bs.Flush()
lvs.cs.Flush()
}
// Close closes the underlying BatchStore
// Close closes the underlying ChunkStore
func (lvs *ValueStore) Close() error {
if lvs.opcStore != nil {
err := lvs.opcStore.destroy()
d.Chk.NoError(err, "Attempt to clean up opCacheStore failed, error: %s\n", err)
lvs.opcStore = nil
}
return lvs.bs.Close()
return lvs.cs.Close()
}
func (lvs *ValueStore) opCache() opCache {
lvs.once.Do(func() {
lvs.opcOnce.Do(func() {
lvs.opcStore = newLdbOpCacheStore(lvs)
})
return lvs.opcStore.opCache()

View File

@@ -19,7 +19,7 @@ func TestValueReadWriteRead(t *testing.T) {
vs := NewTestValueStore()
assert.Nil(vs.ReadValue(s.Hash())) // nil
h := vs.WriteValue(s).TargetHash()
vs.Flush(h)
vs.Flush()
v := vs.ReadValue(h) // non-nil
if assert.NotNil(v) {
assert.True(s.Equals(v), "%s != %s", EncodedValue(s), EncodedValue(v))
@@ -35,7 +35,7 @@ func TestValueReadMany(t *testing.T) {
for _, v := range vals {
h := vs.WriteValue(v).TargetHash()
hashes.Insert(h)
vs.Flush(h)
vs.Flush()
}
// Get one Value into vs's Value cache
@@ -74,39 +74,37 @@ func TestValueWriteFlush(t *testing.T) {
}
assert.NotZero(vs.bufferedChunkSize)
for h := range hashes {
vs.Flush(h)
}
vs.Flush()
assert.Zero(vs.bufferedChunkSize)
}
type checkingBatchStore struct {
BatchStore
type checkingChunkStore struct {
chunks.ChunkStore
a *assert.Assertions
expectedOrder hash.HashSlice
}
func (cbs *checkingBatchStore) expect(rs ...Ref) {
func (cbs *checkingChunkStore) expect(rs ...Ref) {
for _, r := range rs {
cbs.expectedOrder = append(cbs.expectedOrder, r.TargetHash())
}
}
func (cbs *checkingBatchStore) SchedulePut(c chunks.Chunk) {
func (cbs *checkingChunkStore) Put(c chunks.Chunk) {
if cbs.a.NotZero(len(cbs.expectedOrder), "Unexpected Put of %s", c.Hash()) {
cbs.a.Equal(cbs.expectedOrder[0], c.Hash())
cbs.expectedOrder = cbs.expectedOrder[1:]
}
cbs.BatchStore.SchedulePut(c)
cbs.ChunkStore.Put(c)
}
func (cbs *checkingBatchStore) Flush() {
func (cbs *checkingChunkStore) Flush() {
cbs.a.Empty(cbs.expectedOrder)
}
func TestFlushOrder(t *testing.T) {
assert := assert.New(t)
bs := &checkingBatchStore{NewBatchStoreAdaptor(chunks.NewTestStore()), assert, nil}
bs := &checkingChunkStore{chunks.NewTestStore(), assert, nil}
vs := NewValueStore(bs)
// Graph, which should be flushed grandchildren-first, bottom-up
// l
@@ -140,12 +138,12 @@ func TestFlushOrder(t *testing.T) {
r := vs.WriteValue(l)
bs.expect(r)
vs.Flush(l.Hash())
vs.Flush()
}
func TestFlushOverSize(t *testing.T) {
assert := assert.New(t)
bs := &checkingBatchStore{NewBatchStoreAdaptor(chunks.NewTestStore()), assert, nil}
bs := &checkingChunkStore{chunks.NewTestStore(), assert, nil}
vs := newValueStoreWithCacheAndPending(bs, 0, 10)
s := String("oy")
@@ -154,12 +152,12 @@ func TestFlushOverSize(t *testing.T) {
bs.expect(sr, NewRef(l))
vs.WriteValue(l)
vs.Flush(l.Hash())
vs.Flush()
}
func TestTolerateTopDown(t *testing.T) {
assert := assert.New(t)
bs := &checkingBatchStore{NewBatchStoreAdaptor(chunks.NewTestStore()), assert, nil}
bs := &checkingChunkStore{chunks.NewTestStore(), assert, nil}
vs := NewValueStore(bs)
// Once the L-ML-S portion of this graph is written once, it's legal to make a Struct ST that contains a ref directly to ML and write it. Then you can write S and ML and Flush ST, which contitutes top-down writing.
// L ST
@@ -179,29 +177,29 @@ func TestTolerateTopDown(t *testing.T) {
lr := vs.WriteValue(L)
bs.expect(lr)
vs.Flush(L.Hash())
vs.Flush()
assert.Zero(len(vs.bufferedChunks))
ST := NewStruct("", StructData{"r": mlr})
str := vs.WriteValue(ST)
vs.WriteValue(S) // S into bufferedChunks
vs.WriteValue(ML) // ML into bufferedChunks AND pendingParents
bs.expect(mlr, str)
vs.Flush(ST.Hash())
str := vs.WriteValue(ST) // ST into bufferedChunks
vs.WriteValue(S) // S into bufferedChunks
vs.WriteValue(ML) // ML into bufferedChunks AND withBufferedChunks
bs.expect(sr)
vs.WriteValue(L)
// At this point, ValueStore believes ST is a standalone chunk, and that ML -> S
// So, it'll look at ML, the one parent it knows about, first and write its child (S). Then, it'll write ML, and then it'll flush the remaining buffered chunks, which is just ST.
bs.expect(sr, mlr, str)
vs.Flush()
}
func TestPanicOnReadBadVersion(t *testing.T) {
cvs := newLocalValueStore(&badVersionStore{chunks.NewTestStore()})
cvs := NewValueStore(&badVersionStore{TestStore: chunks.NewTestStore()})
assert.Panics(t, func() { cvs.ReadValue(hash.Hash{}) })
}
func TestPanicOnWriteBadVersion(t *testing.T) {
cvs := newLocalValueStore(&badVersionStore{chunks.NewTestStore()})
assert.Panics(t, func() { r := cvs.WriteValue(NewEmptyBlob()); cvs.Flush(r.TargetHash()) })
cvs := NewValueStore(&badVersionStore{TestStore: chunks.NewTestStore()})
assert.Panics(t, func() { cvs.WriteValue(NewEmptyBlob()); cvs.Flush() })
}
type badVersionStore struct {

View File

@@ -29,7 +29,7 @@ type WalkAllTestSuite struct {
func (suite *WalkAllTestSuite) SetupTest() {
suite.ts = chunks.NewTestStore()
suite.vs = NewValueStore(NewBatchStoreAdaptor(suite.ts))
suite.vs = NewValueStore(suite.ts)
}
func (suite *WalkAllTestSuite) assertCallbackCount(v Value, expected int) {
@@ -244,7 +244,7 @@ type WalkTestSuite struct {
func (suite *WalkTestSuite) SetupTest() {
suite.ts = chunks.NewTestStore()
suite.vs = NewValueStore(NewBatchStoreAdaptor(suite.ts))
suite.vs = NewValueStore(suite.ts)
suite.shouldSeeItem = String("zzz")
suite.shouldSee = NewList(suite.shouldSeeItem)
suite.deadValue = Number(0xDEADBEEF)
@@ -263,7 +263,7 @@ func TestWalkDifferentStructsTestSuite(t *testing.T) {
func (suite *WalkDifferentStructsTestSuite) SetupTest() {
suite.ts = chunks.NewTestStore()
suite.vs = NewValueStore(NewBatchStoreAdaptor(suite.ts))
suite.vs = NewValueStore(suite.ts)
}
func (suite *WalkDifferentStructsTestSuite) AssertDiffs(last, current Value, expectAdded, expectRemoved []Value) {
@@ -338,11 +338,10 @@ func (suite *WalkDifferentStructsTestSuite) TestWalkStructsBasic() {
// Big, committed collections of structs
h1 := suite.vs.WriteValue(l1).TargetHash()
h2 := suite.vs.WriteValue(l2).TargetHash()
suite.vs.Flush(h1)
suite.vs.Flush(h2)
suite.vs.Flush()
// Use a fresh value store to avoid cached chunks
nvs := NewValueStore(NewBatchStoreAdaptor(suite.ts))
nvs := NewValueStore(suite.ts)
l1 = nvs.ReadValue(h1).(List)
l2 = nvs.ReadValue(h2).(List)

View File

@@ -17,10 +17,6 @@ import (
"github.com/attic-labs/testify/suite"
)
func TestCSVImporter(t *testing.T) {
suite.Run(t, &testSuite{})
}
type TestObj struct {
Key int
Fname string