mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-05 18:58:58 -06:00
Rip out hinting, reverse-order hack; make validation lazy (#3340)
* Add HasMany() to the ChunkStore interface We'll need this as a part of #3180 * Rip out hinting The hinting mechanism used to assist in server-side validation of values served us well, but now it's in the way of building a more suitable validation strategy. Tear it out and go without validation for a hot minute until #3180 gets done. Fixes #3178 * Implement server-side lazy ref validation The server, when handling writeValue, now just keeps track of all the refs it sees in the novel chunks coming from the client. Once it's processed all the incoming chunks, it just does a big bulk HasMany to determine if any of them aren't present in the storage backend. Fixes #3180 * Remove chunk-write-order requirements With our old validation strategy, it was critical that chunk graphs be written bottom-up, during both novel value creation and sync. With the strategy implemented in #3180, this is no longer required, which lets us get rid of a bunch of machinery: 1) The reverse-order hack in httpBatchStore 2) the EnumerationOrder stuff in NomsBlockCache 3) the orderedPutCache in datas/ 4) the refHeight arg on SchedulePut() Fixes #2982
This commit is contained in:
@@ -49,9 +49,14 @@ type ChunkSource interface {
|
||||
// found. Any non-present chunks will silently be ignored.
|
||||
GetMany(hashes hash.HashSet, foundChunks chan *Chunk)
|
||||
|
||||
// Returns true iff the value at the address |h| is contained in the source
|
||||
// Returns true iff the value at the address |h| is contained in the
|
||||
// source
|
||||
Has(h hash.Hash) bool
|
||||
|
||||
// Returns a new HashSet containing any members of |hashes| that are
|
||||
// present in the source.
|
||||
HasMany(hashes hash.HashSet) (present hash.HashSet)
|
||||
|
||||
// Returns the NomsVersion with which this ChunkSource is compatible.
|
||||
Version() string
|
||||
}
|
||||
|
||||
@@ -137,6 +137,16 @@ func (s *DynamoStore) Has(h hash.Hash) bool {
|
||||
return <-ch
|
||||
}
|
||||
|
||||
func (s *DynamoStore) HasMany(hashes hash.HashSet) hash.HashSet {
|
||||
present := hash.HashSet{}
|
||||
for h := range hashes {
|
||||
if s.Has(h) {
|
||||
present.Insert(h)
|
||||
}
|
||||
}
|
||||
return present
|
||||
}
|
||||
|
||||
func (s *DynamoStore) PutMany(chunks []Chunk) {
|
||||
for _, c := range chunks {
|
||||
s.Put(c)
|
||||
|
||||
@@ -52,6 +52,16 @@ func (ms *MemoryStore) Has(r hash.Hash) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (ms *MemoryStore) HasMany(hashes hash.HashSet) hash.HashSet {
|
||||
present := hash.HashSet{}
|
||||
for h := range hashes {
|
||||
if ms.Has(h) {
|
||||
present.Insert(h)
|
||||
}
|
||||
}
|
||||
return present
|
||||
}
|
||||
|
||||
func (ms *MemoryStore) Version() string {
|
||||
return constants.NomsVersion
|
||||
}
|
||||
|
||||
@@ -46,6 +46,11 @@ func (s *TestStore) Has(h hash.Hash) bool {
|
||||
return s.MemoryStore.Has(h)
|
||||
}
|
||||
|
||||
func (s *TestStore) HasMany(hashes hash.HashSet) hash.HashSet {
|
||||
s.Hases += len(hashes)
|
||||
return s.MemoryStore.HasMany(hashes)
|
||||
}
|
||||
|
||||
func (s *TestStore) Put(c Chunk) {
|
||||
s.Writes++
|
||||
s.MemoryStore.Put(c)
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
// TODO: generate this from some central thing with go generate.
|
||||
const NomsVersion = "7.4"
|
||||
const NomsVersion = "7.5"
|
||||
const NOMS_VERSION_NEXT_ENV_NAME = "NOMS_VERSION_NEXT"
|
||||
const NOMS_VERSION_NEXT_ENV_VALUE = "1"
|
||||
|
||||
|
||||
@@ -31,6 +31,10 @@ func newDatabaseCommon(cch *cachingChunkHaver, vs *types.ValueStore, rt chunks.R
|
||||
return databaseCommon{ValueStore: vs, cch: cch, rt: rt, rootHash: rt.Root()}
|
||||
}
|
||||
|
||||
func (dbc *databaseCommon) validatingBatchStore() types.BatchStore {
|
||||
return dbc.BatchStore()
|
||||
}
|
||||
|
||||
func (dbc *databaseCommon) Datasets() types.Map {
|
||||
if dbc.datasets == nil {
|
||||
if dbc.rootHash.IsEmpty() {
|
||||
|
||||
@@ -98,8 +98,10 @@ func (suite *DatabaseSuite) TestReadWriteCachePersists() {
|
||||
suite.NoError(err)
|
||||
}
|
||||
|
||||
func (suite *DatabaseSuite) TestWriteRefToNonexistentValue() {
|
||||
suite.Panics(func() { suite.db.WriteValue(types.NewRef(types.Bool(true))) })
|
||||
func (suite *RemoteDatabaseSuite) TestWriteRefToNonexistentValue() {
|
||||
ds := suite.db.GetDataset("foo")
|
||||
r := types.NewRef(types.Bool(true))
|
||||
suite.Panics(func() { suite.db.CommitValue(ds, r) })
|
||||
}
|
||||
|
||||
func (suite *DatabaseSuite) TestTolerateUngettableRefs() {
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/nbs"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/attic-labs/noms/go/util/verbose"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/julienschmidt/httprouter"
|
||||
@@ -48,16 +47,13 @@ type httpBatchStore struct {
|
||||
auth string
|
||||
getQueue chan chunks.ReadRequest
|
||||
hasQueue chan chunks.ReadRequest
|
||||
writeQueue chan writeRequest
|
||||
finishedChan chan struct{}
|
||||
rateLimit chan struct{}
|
||||
requestWg *sync.WaitGroup
|
||||
workerWg *sync.WaitGroup
|
||||
flushOrder nbs.EnumerationOrder
|
||||
|
||||
cacheMu *sync.RWMutex
|
||||
unwrittenPuts *nbs.NomsBlockCache
|
||||
hints types.Hints
|
||||
}
|
||||
|
||||
func NewHTTPBatchStore(baseURL, auth string) *httpBatchStore {
|
||||
@@ -73,15 +69,12 @@ func NewHTTPBatchStore(baseURL, auth string) *httpBatchStore {
|
||||
auth: auth,
|
||||
getQueue: make(chan chunks.ReadRequest, readBufferSize),
|
||||
hasQueue: make(chan chunks.ReadRequest, readBufferSize),
|
||||
writeQueue: make(chan writeRequest, writeBufferSize),
|
||||
finishedChan: make(chan struct{}),
|
||||
rateLimit: make(chan struct{}, httpChunkSinkConcurrency),
|
||||
requestWg: &sync.WaitGroup{},
|
||||
workerWg: &sync.WaitGroup{},
|
||||
flushOrder: nbs.InsertOrder,
|
||||
cacheMu: &sync.RWMutex{},
|
||||
unwrittenPuts: nbs.NewCache(),
|
||||
hints: types.Hints{},
|
||||
}
|
||||
buffSink.batchGetRequests()
|
||||
buffSink.batchHasRequests()
|
||||
@@ -92,17 +85,6 @@ type httpDoer interface {
|
||||
Do(req *http.Request) (resp *http.Response, err error)
|
||||
}
|
||||
|
||||
type writeRequest struct {
|
||||
hints types.Hints
|
||||
justHints bool
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) SetReverseFlushOrder() {
|
||||
bhcs.cacheMu.Lock()
|
||||
defer bhcs.cacheMu.Unlock()
|
||||
bhcs.flushOrder = nbs.ReverseOrder
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) Flush() {
|
||||
bhcs.sendWriteRequests()
|
||||
bhcs.requestWg.Wait()
|
||||
@@ -116,7 +98,6 @@ func (bhcs *httpBatchStore) Close() (e error) {
|
||||
|
||||
close(bhcs.getQueue)
|
||||
close(bhcs.hasQueue)
|
||||
close(bhcs.writeQueue)
|
||||
close(bhcs.rateLimit)
|
||||
|
||||
bhcs.cacheMu.Lock()
|
||||
@@ -335,21 +316,10 @@ func resBodyReader(res *http.Response) (reader io.ReadCloser) {
|
||||
return
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
|
||||
func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk) {
|
||||
bhcs.cacheMu.RLock()
|
||||
defer bhcs.cacheMu.RUnlock()
|
||||
bhcs.unwrittenPuts.Insert(c)
|
||||
for hint := range hints {
|
||||
bhcs.hints[hint] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) AddHints(hints types.Hints) {
|
||||
bhcs.cacheMu.RLock()
|
||||
defer bhcs.cacheMu.RUnlock()
|
||||
for hint := range hints {
|
||||
bhcs.hints[hint] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) sendWriteRequests() {
|
||||
@@ -358,7 +328,6 @@ func (bhcs *httpBatchStore) sendWriteRequests() {
|
||||
|
||||
bhcs.cacheMu.Lock()
|
||||
defer func() {
|
||||
bhcs.flushOrder = nbs.InsertOrder // This needs to happen even if no chunks get written.
|
||||
bhcs.cacheMu.Unlock()
|
||||
}()
|
||||
|
||||
@@ -369,47 +338,29 @@ func (bhcs *httpBatchStore) sendWriteRequests() {
|
||||
defer func() {
|
||||
bhcs.unwrittenPuts.Destroy()
|
||||
bhcs.unwrittenPuts = nbs.NewCache()
|
||||
bhcs.hints = types.Hints{}
|
||||
}()
|
||||
|
||||
var res *http.Response
|
||||
var err error
|
||||
for tryAgain := true; tryAgain; {
|
||||
verbose.Log("Sending %d chunks", count)
|
||||
chunkChan := make(chan *chunks.Chunk, 1024)
|
||||
go func() {
|
||||
bhcs.unwrittenPuts.ExtractChunks(bhcs.flushOrder, chunkChan)
|
||||
close(chunkChan)
|
||||
}()
|
||||
verbose.Log("Sending %d chunks", count)
|
||||
chunkChan := make(chan *chunks.Chunk, 1024)
|
||||
go func() {
|
||||
bhcs.unwrittenPuts.ExtractChunks(chunkChan)
|
||||
close(chunkChan)
|
||||
}()
|
||||
|
||||
body := buildWriteValueRequest(chunkChan, bhcs.hints)
|
||||
url := *bhcs.host
|
||||
url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath)
|
||||
// TODO: Make this accept snappy encoding
|
||||
req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{
|
||||
"Accept-Encoding": {"gzip"},
|
||||
"Content-Encoding": {"x-snappy-framed"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
})
|
||||
body := buildWriteValueRequest(chunkChan)
|
||||
url := *bhcs.host
|
||||
url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath)
|
||||
// TODO: Make this accept snappy encoding
|
||||
req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{
|
||||
"Accept-Encoding": {"gzip"},
|
||||
"Content-Encoding": {"x-snappy-framed"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
})
|
||||
|
||||
res, err = bhcs.httpClient.Do(req)
|
||||
d.PanicIfError(err)
|
||||
expectVersion(res)
|
||||
defer closeResponse(res.Body)
|
||||
|
||||
if tryAgain = res.StatusCode == http.StatusTooManyRequests; tryAgain {
|
||||
reader := res.Body
|
||||
if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") {
|
||||
gr, err := gzip.NewReader(reader)
|
||||
d.PanicIfError(err)
|
||||
defer gr.Close()
|
||||
reader = gr
|
||||
}
|
||||
/*hashes :=*/ deserializeHashes(reader)
|
||||
// TODO: BUG 1259 Since the client must currently send all chunks in one batch, the only thing to do in response to backpressure is send EVERYTHING again. Once batching is again possible, this code should figure out how to resend the chunks indicated by hashes.
|
||||
verbose.Log("Retrying...")
|
||||
}
|
||||
}
|
||||
res, err := bhcs.httpClient.Do(req)
|
||||
d.PanicIfError(err)
|
||||
expectVersion(res)
|
||||
defer closeResponse(res.Body)
|
||||
|
||||
if http.StatusCreated != res.StatusCode {
|
||||
d.Panic("Unexpected response: %s", formatErrorResponse(res))
|
||||
|
||||
@@ -128,7 +128,7 @@ func (suite *HTTPBatchStoreSuite) TearDownTest() {
|
||||
|
||||
func (suite *HTTPBatchStoreSuite) TestPutChunk() {
|
||||
c := types.EncodeValue(types.String("abc"), nil)
|
||||
suite.store.SchedulePut(c, 1, types.Hints{})
|
||||
suite.store.SchedulePut(c)
|
||||
suite.store.Flush()
|
||||
|
||||
suite.Equal(1, suite.cs.Writes)
|
||||
@@ -141,43 +141,10 @@ func (suite *HTTPBatchStoreSuite) TestPutChunksInOrder() {
|
||||
}
|
||||
l := types.NewList()
|
||||
for _, val := range vals {
|
||||
suite.store.SchedulePut(types.EncodeValue(val, nil), 1, types.Hints{})
|
||||
suite.store.SchedulePut(types.EncodeValue(val, nil))
|
||||
l = l.Append(types.NewRef(val))
|
||||
}
|
||||
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{})
|
||||
suite.store.Flush()
|
||||
|
||||
suite.Equal(3, suite.cs.Writes)
|
||||
}
|
||||
|
||||
func (suite *HTTPBatchStoreSuite) TestPutChunksReverseOrder() {
|
||||
val := types.String("abc")
|
||||
l := types.NewList(types.NewRef(val))
|
||||
|
||||
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{})
|
||||
suite.store.SchedulePut(types.EncodeValue(val, nil), 1, types.Hints{})
|
||||
suite.store.SetReverseFlushOrder()
|
||||
suite.store.Flush()
|
||||
|
||||
suite.Equal(2, suite.cs.Writes)
|
||||
}
|
||||
|
||||
func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() {
|
||||
vals := []types.Value{
|
||||
types.String("abc"),
|
||||
types.String("def"),
|
||||
}
|
||||
chnx := []chunks.Chunk{
|
||||
types.EncodeValue(vals[0], nil),
|
||||
types.EncodeValue(vals[1], nil),
|
||||
}
|
||||
suite.cs.PutMany(chnx)
|
||||
l := types.NewList(types.NewRef(vals[0]), types.NewRef(vals[1]))
|
||||
|
||||
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{
|
||||
chnx[0].Hash(): struct{}{},
|
||||
chnx[1].Hash(): struct{}{},
|
||||
})
|
||||
suite.store.SchedulePut(types.EncodeValue(l, nil))
|
||||
suite.store.Flush()
|
||||
|
||||
suite.Equal(3, suite.cs.Writes)
|
||||
@@ -251,8 +218,8 @@ func (suite *HTTPBatchStoreSuite) TestGetManyAllCached() {
|
||||
chunks.NewChunk([]byte("abc")),
|
||||
chunks.NewChunk([]byte("def")),
|
||||
}
|
||||
suite.store.SchedulePut(chnx[0], 1, types.Hints{})
|
||||
suite.store.SchedulePut(chnx[1], 1, types.Hints{})
|
||||
suite.store.SchedulePut(chnx[0])
|
||||
suite.store.SchedulePut(chnx[1])
|
||||
|
||||
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash())
|
||||
foundChunks := make(chan *chunks.Chunk)
|
||||
@@ -271,7 +238,7 @@ func (suite *HTTPBatchStoreSuite) TestGetManySomeCached() {
|
||||
}
|
||||
cached := chunks.NewChunk([]byte("ghi"))
|
||||
suite.cs.PutMany(chnx)
|
||||
suite.store.SchedulePut(cached, 1, types.Hints{})
|
||||
suite.store.SchedulePut(cached)
|
||||
|
||||
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), cached.Hash())
|
||||
foundChunks := make(chan *chunks.Chunk)
|
||||
|
||||
@@ -11,31 +11,27 @@ import (
|
||||
"github.com/attic-labs/noms/go/constants"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/nbs"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
)
|
||||
|
||||
type localBatchStore struct {
|
||||
cs chunks.ChunkStore
|
||||
unwrittenPuts *orderedChunkCache
|
||||
unwrittenPuts *nbs.NomsBlockCache
|
||||
vbs *types.ValidatingBatchingSink
|
||||
hints types.Hints
|
||||
hashes hash.HashSet
|
||||
mu *sync.Mutex
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func newLocalBatchStore(cs chunks.ChunkStore) *localBatchStore {
|
||||
return &localBatchStore{
|
||||
cs: cs,
|
||||
unwrittenPuts: newOrderedChunkCache(),
|
||||
vbs: types.NewValidatingBatchingSink(cs),
|
||||
hints: types.Hints{},
|
||||
hashes: hash.HashSet{},
|
||||
mu: &sync.Mutex{},
|
||||
unwrittenPuts: nbs.NewCache(),
|
||||
vbs: types.NewCompletenessCheckingBatchingSink(cs),
|
||||
}
|
||||
}
|
||||
|
||||
// Get checks the internal Chunk cache, proxying to the backing ChunkStore if not present.
|
||||
// Get checks the internal Chunk cache, proxying to the backing ChunkStore if
|
||||
// not present.
|
||||
func (lbs *localBatchStore) Get(h hash.Hash) chunks.Chunk {
|
||||
lbs.once.Do(lbs.expectVersion)
|
||||
if pending := lbs.unwrittenPuts.Get(h); !pending.IsEmpty() {
|
||||
@@ -45,27 +41,23 @@ func (lbs *localBatchStore) Get(h hash.Hash) chunks.Chunk {
|
||||
}
|
||||
|
||||
func (lbs *localBatchStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
|
||||
lbs.cs.GetMany(hashes, foundChunks)
|
||||
}
|
||||
|
||||
// Has checks the internal Chunk cache, proxying to the backing ChunkStore if not present.
|
||||
func (lbs *localBatchStore) Has(h hash.Hash) bool {
|
||||
lbs.once.Do(lbs.expectVersion)
|
||||
if lbs.unwrittenPuts.has(h) {
|
||||
return true
|
||||
remaining := make(hash.HashSet, len(hashes))
|
||||
for h := range hashes {
|
||||
remaining.Insert(h)
|
||||
}
|
||||
return lbs.cs.Has(h)
|
||||
localChunks := make(chan *chunks.Chunk)
|
||||
go func() { defer close(localChunks); lbs.unwrittenPuts.GetMany(hashes, localChunks) }()
|
||||
for c := range localChunks {
|
||||
remaining.Remove(c.Hash())
|
||||
foundChunks <- c
|
||||
}
|
||||
lbs.cs.GetMany(remaining, foundChunks)
|
||||
}
|
||||
|
||||
// SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints.
|
||||
func (lbs *localBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
|
||||
// SchedulePut simply calls Put on the underlying ChunkStore.
|
||||
func (lbs *localBatchStore) SchedulePut(c chunks.Chunk) {
|
||||
lbs.once.Do(lbs.expectVersion)
|
||||
|
||||
lbs.unwrittenPuts.Insert(c, refHeight)
|
||||
lbs.mu.Lock()
|
||||
defer lbs.mu.Unlock()
|
||||
lbs.hashes.Insert(c.Hash())
|
||||
lbs.AddHints(hints)
|
||||
lbs.unwrittenPuts.Insert(c)
|
||||
}
|
||||
|
||||
func (lbs *localBatchStore) expectVersion() {
|
||||
@@ -80,53 +72,44 @@ func (lbs *localBatchStore) Root() hash.Hash {
|
||||
return lbs.cs.Root()
|
||||
}
|
||||
|
||||
// UpdateRoot flushes outstanding writes to the backing ChunkStore before updating its Root, because it's almost certainly the case that the caller wants to point that root at some recently-Put Chunk.
|
||||
// UpdateRoot flushes outstanding writes to the backing ChunkStore before
|
||||
// updating its Root, because it's almost certainly the case that the caller
|
||||
// wants to point that root at some recently-Put Chunk.
|
||||
func (lbs *localBatchStore) UpdateRoot(current, last hash.Hash) bool {
|
||||
lbs.once.Do(lbs.expectVersion)
|
||||
lbs.Flush()
|
||||
return lbs.cs.UpdateRoot(current, last)
|
||||
}
|
||||
|
||||
func (lbs *localBatchStore) AddHints(hints types.Hints) {
|
||||
for h := range hints {
|
||||
lbs.hints[h] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (lbs *localBatchStore) Flush() {
|
||||
lbs.once.Do(lbs.expectVersion)
|
||||
|
||||
chunkChan := make(chan *chunks.Chunk, 128)
|
||||
go func() {
|
||||
err := lbs.unwrittenPuts.ExtractChunks(lbs.hashes, chunkChan)
|
||||
d.PanicIfError(err)
|
||||
close(chunkChan)
|
||||
defer close(chunkChan)
|
||||
lbs.unwrittenPuts.ExtractChunks(chunkChan)
|
||||
}()
|
||||
|
||||
lbs.vbs.Prepare(lbs.hints)
|
||||
for c := range chunkChan {
|
||||
dc := lbs.vbs.DecodeUnqueued(c)
|
||||
lbs.vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
lbs.vbs.Put(*dc.Chunk, *dc.Value)
|
||||
}
|
||||
lbs.vbs.PanicIfDangling()
|
||||
lbs.vbs.Flush()
|
||||
|
||||
lbs.unwrittenPuts.Clear(lbs.hashes)
|
||||
lbs.hashes = hash.HashSet{}
|
||||
lbs.hints = types.Hints{}
|
||||
}
|
||||
|
||||
// FlushAndDestroyWithoutClose flushes lbs and destroys its cache of unwritten chunks. It's needed because LocalDatabase wraps a localBatchStore around a ChunkStore that's used by a separate BatchStore, so calling Close() on one is semantically incorrect while it still wants to use the other.
|
||||
func (lbs *localBatchStore) FlushAndDestroyWithoutClose() {
|
||||
lbs.Flush()
|
||||
lbs.unwrittenPuts.Destroy()
|
||||
lbs.unwrittenPuts = nbs.NewCache()
|
||||
}
|
||||
|
||||
// Destroy blows away lbs' cache of unwritten chunks without flushing. Used when the owning Database is closing and it isn't semantically correct to flush.
|
||||
// Destroy blows away lbs' cache of unwritten chunks without flushing. Used
|
||||
// when the owning Database is closing and it isn't semantically correct to
|
||||
// flush.
|
||||
func (lbs *localBatchStore) Destroy() {
|
||||
lbs.unwrittenPuts.Destroy()
|
||||
}
|
||||
|
||||
// Close is supposed to close the underlying ChunkStore, but the only place localBatchStore is currently used wants to keep the underlying ChunkStore open after it's done with lbs. Hence, the above method and the panic() here.
|
||||
// Close closes the underlying ChunkStore.
|
||||
func (lbs *localBatchStore) Close() error {
|
||||
panic("Unreached")
|
||||
lbs.Flush()
|
||||
return lbs.cs.Close()
|
||||
}
|
||||
|
||||
@@ -12,16 +12,12 @@ import (
|
||||
// Database provides versioned storage for noms values. Each Database instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new Database, representing a new moment in history.
|
||||
type LocalDatabase struct {
|
||||
databaseCommon
|
||||
cs chunks.ChunkStore
|
||||
vbs *localBatchStore
|
||||
}
|
||||
|
||||
func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase {
|
||||
bs := types.NewBatchStoreAdaptor(cs)
|
||||
bs := newLocalBatchStore(cs)
|
||||
return &LocalDatabase{
|
||||
newDatabaseCommon(newCachingChunkHaver(cs), types.NewValueStore(bs), bs),
|
||||
cs,
|
||||
nil,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,25 +49,11 @@ func (ldb *LocalDatabase) FastForward(ds Dataset, newHeadRef types.Ref) (Dataset
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) doHeadUpdate(ds Dataset, updateFunc func(ds Dataset) error) (Dataset, error) {
|
||||
if ldb.vbs != nil {
|
||||
ldb.vbs.FlushAndDestroyWithoutClose()
|
||||
ldb.vbs = nil
|
||||
}
|
||||
err := updateFunc(ds)
|
||||
return ldb.GetDataset(ds.ID()), err
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) validatingBatchStore() types.BatchStore {
|
||||
if ldb.vbs == nil {
|
||||
ldb.vbs = newLocalBatchStore(ldb.cs)
|
||||
}
|
||||
return ldb.vbs
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) Close() error {
|
||||
if ldb.vbs != nil {
|
||||
ldb.vbs.Destroy()
|
||||
ldb.vbs = nil
|
||||
}
|
||||
ldb.BatchStore().(*localBatchStore).Destroy()
|
||||
return ldb.databaseCommon.Close()
|
||||
}
|
||||
|
||||
@@ -115,9 +115,6 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
progressCh <- PullProgress{doneCount, knownCount + uint64(srcQ.Len()), approxBytesWritten}
|
||||
}
|
||||
|
||||
// hc and reachableChunks aren't goroutine-safe, so only write them here.
|
||||
hc := hintCache{}
|
||||
reachableChunks := hash.HashSet{}
|
||||
sampleSize := uint64(0)
|
||||
sampleCount := uint64(0)
|
||||
for !srcQ.Empty() {
|
||||
@@ -138,22 +135,17 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
case res := <-srcResChan:
|
||||
for _, reachable := range res.reachables {
|
||||
srcQ.PushBack(reachable)
|
||||
reachableChunks.Insert(reachable.TargetHash())
|
||||
}
|
||||
if res.writeBytes > 0 {
|
||||
sampleSize += uint64(res.writeBytes)
|
||||
sampleCount += 1
|
||||
}
|
||||
if !res.readHash.IsEmpty() {
|
||||
reachableChunks.Remove(res.readHash)
|
||||
}
|
||||
srcWork--
|
||||
|
||||
updateProgress(1, 0, uint64(res.readBytes), sampleSize/uint64(math.Max(1, float64(sampleCount))))
|
||||
case res := <-sinkResChan:
|
||||
for _, reachable := range res.reachables {
|
||||
sinkQ.PushBack(reachable)
|
||||
hc[reachable.TargetHash()] = res.readHash
|
||||
}
|
||||
sinkWork--
|
||||
case res := <-comResChan:
|
||||
@@ -163,7 +155,6 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
if !isHeadOfSink {
|
||||
srcQ.PushBack(reachable)
|
||||
}
|
||||
hc[reachable.TargetHash()] = res.readHash
|
||||
}
|
||||
comWork--
|
||||
updateProgress(1, 0, uint64(res.readBytes), 0)
|
||||
@@ -174,14 +165,6 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency
|
||||
sinkQ.Unique()
|
||||
srcQ.Unique()
|
||||
}
|
||||
|
||||
hints := types.Hints{}
|
||||
for hash := range reachableChunks {
|
||||
if hint, present := hc[hash]; present {
|
||||
hints[hint] = struct{}{}
|
||||
}
|
||||
}
|
||||
sinkDB.validatingBatchStore().AddHints(hints)
|
||||
}
|
||||
|
||||
type traverseResult struct {
|
||||
@@ -265,7 +248,7 @@ func traverseSource(srcRef types.Ref, srcDB, sinkDB Database, estimateBytesWritt
|
||||
if v == nil {
|
||||
d.Panic("Expected decoded chunk to be non-nil.")
|
||||
}
|
||||
sinkDB.validatingBatchStore().SchedulePut(c, srcRef.Height(), types.Hints{})
|
||||
sinkDB.validatingBatchStore().SchedulePut(c)
|
||||
bytesWritten := 0
|
||||
if estimateBytesWritten {
|
||||
// TODO: Probably better to hide this behind the BatchStore abstraction since
|
||||
|
||||
@@ -1,152 +0,0 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/golang/snappy"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
)
|
||||
|
||||
func newOrderedChunkCache() *orderedChunkCache {
|
||||
dir, err := ioutil.TempDir("", "")
|
||||
d.PanicIfError(err)
|
||||
db, err := leveldb.OpenFile(dir, &opt.Options{
|
||||
Compression: opt.NoCompression,
|
||||
Filter: filter.NewBloomFilter(10), // 10 bits/key
|
||||
OpenFilesCacheCapacity: 24,
|
||||
NoSync: true, // We dont need this data to be durable. LDB is acting as sorting temporary storage that can be larger than main memory.
|
||||
WriteBuffer: 1 << 27, // 128MiB
|
||||
})
|
||||
d.Chk.NoError(err, "opening put cache in %s", dir)
|
||||
return &orderedChunkCache{
|
||||
orderedChunks: db,
|
||||
chunkIndex: map[hash.Hash][]byte{},
|
||||
dbDir: dir,
|
||||
mu: &sync.RWMutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// orderedChunkCache holds Chunks, allowing them to be retrieved by hash or enumerated in ref-height order.
|
||||
type orderedChunkCache struct {
|
||||
orderedChunks *leveldb.DB
|
||||
chunkIndex map[hash.Hash][]byte
|
||||
dbDir string
|
||||
mu *sync.RWMutex
|
||||
}
|
||||
|
||||
// Insert can be called from any goroutine to store c in the cache. If c is successfully added to the cache, Insert returns true. If c was already in the cache, Insert returns false.
|
||||
func (p *orderedChunkCache) Insert(c chunks.Chunk, refHeight uint64) bool {
|
||||
hash := c.Hash()
|
||||
dbKey, present := func() (dbKey []byte, present bool) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if _, present = p.chunkIndex[hash]; !present {
|
||||
dbKey = toDbKey(refHeight, c.Hash())
|
||||
p.chunkIndex[hash] = dbKey
|
||||
}
|
||||
return
|
||||
}()
|
||||
|
||||
if !present {
|
||||
compressed := snappy.Encode(nil, c.Data())
|
||||
d.Chk.NoError(p.orderedChunks.Put(dbKey, compressed, nil))
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (p *orderedChunkCache) has(hash hash.Hash) (has bool) {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
_, has = p.chunkIndex[hash]
|
||||
return
|
||||
}
|
||||
|
||||
// Get can be called from any goroutine to retrieve the chunk referenced by hash. If the chunk is not present, Get returns the empty Chunk.
|
||||
func (p *orderedChunkCache) Get(hash hash.Hash) chunks.Chunk {
|
||||
// Don't use defer p.mu.RUnlock() here, because I want reading from orderedChunks NOT to be guarded by the lock. LevelDB handles its own goroutine-safety.
|
||||
p.mu.RLock()
|
||||
dbKey, ok := p.chunkIndex[hash]
|
||||
p.mu.RUnlock()
|
||||
|
||||
if !ok {
|
||||
return chunks.EmptyChunk
|
||||
}
|
||||
compressed, err := p.orderedChunks.Get(dbKey, nil)
|
||||
d.Chk.NoError(err)
|
||||
data, err := snappy.Decode(nil, compressed)
|
||||
d.Chk.NoError(err)
|
||||
return chunks.NewChunkWithHash(hash, data)
|
||||
}
|
||||
|
||||
// Clear can be called from any goroutine to remove chunks referenced by the given hashes from the cache.
|
||||
func (p *orderedChunkCache) Clear(hashes hash.HashSet) {
|
||||
deleteBatch := &leveldb.Batch{}
|
||||
p.mu.Lock()
|
||||
for hash := range hashes {
|
||||
deleteBatch.Delete(p.chunkIndex[hash])
|
||||
delete(p.chunkIndex, hash)
|
||||
}
|
||||
p.mu.Unlock()
|
||||
d.Chk.NoError(p.orderedChunks.Write(deleteBatch, nil))
|
||||
return
|
||||
}
|
||||
|
||||
var uint64Size = binary.Size(uint64(0))
|
||||
|
||||
// toDbKey takes a refHeight and a hash and returns a binary key suitable for use with LevelDB. The default sort order used by LevelDB ensures that these keys (and their associated values) will be iterated in ref-height order.
|
||||
func toDbKey(refHeight uint64, h hash.Hash) []byte {
|
||||
buf := bytes.NewBuffer(make([]byte, 0, uint64Size+hash.ByteLen))
|
||||
err := binary.Write(buf, binary.BigEndian, refHeight)
|
||||
d.Chk.NoError(err)
|
||||
err = binary.Write(buf, binary.BigEndian, h[:])
|
||||
d.Chk.NoError(err)
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
func fromDbKey(key []byte) (uint64, hash.Hash) {
|
||||
refHeight := uint64(0)
|
||||
r := bytes.NewReader(key)
|
||||
err := binary.Read(r, binary.BigEndian, &refHeight)
|
||||
d.Chk.NoError(err)
|
||||
h := hash.Hash{}
|
||||
err = binary.Read(r, binary.BigEndian, &h)
|
||||
d.Chk.NoError(err)
|
||||
return refHeight, h
|
||||
}
|
||||
|
||||
// ExtractChunks can be called from any goroutine to write Chunks referenced by the given hashes to w. The chunks are ordered by ref-height. Chunks of the same height are written in an unspecified order, relative to one another.
|
||||
func (p *orderedChunkCache) ExtractChunks(hashes hash.HashSet, chunkChan chan *chunks.Chunk) error {
|
||||
iter := p.orderedChunks.NewIterator(nil, nil)
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
_, hash := fromDbKey(iter.Key())
|
||||
if !hashes.Has(hash) {
|
||||
continue
|
||||
}
|
||||
compressed := iter.Value()
|
||||
data, err := snappy.Decode(nil, compressed)
|
||||
d.Chk.NoError(err)
|
||||
c := chunks.NewChunkWithHash(hash, data)
|
||||
chunkChan <- &c
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *orderedChunkCache) Destroy() error {
|
||||
d.Chk.NoError(p.orderedChunks.Close())
|
||||
return os.RemoveAll(p.dbDir)
|
||||
}
|
||||
@@ -1,172 +0,0 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/attic-labs/testify/suite"
|
||||
)
|
||||
|
||||
func TestLevelDBPutCacheSuite(t *testing.T) {
|
||||
suite.Run(t, &LevelDBPutCacheSuite{})
|
||||
}
|
||||
|
||||
type LevelDBPutCacheSuite struct {
|
||||
suite.Suite
|
||||
cache *orderedChunkCache
|
||||
values []types.Value
|
||||
chnx map[hash.Hash]chunks.Chunk
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) SetupTest() {
|
||||
suite.cache = newOrderedChunkCache()
|
||||
suite.values = []types.Value{
|
||||
types.String("abc"),
|
||||
types.String("def"),
|
||||
types.String("ghi"),
|
||||
types.String("jkl"),
|
||||
types.String("mno"),
|
||||
}
|
||||
suite.chnx = map[hash.Hash]chunks.Chunk{}
|
||||
for _, v := range suite.values {
|
||||
suite.chnx[v.Hash()] = types.EncodeValue(v, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) TearDownTest() {
|
||||
suite.cache.Destroy()
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) TestAddTwice() {
|
||||
chunk := suite.chnx[suite.values[0].Hash()]
|
||||
suite.True(suite.cache.Insert(chunk, 1))
|
||||
suite.False(suite.cache.Insert(chunk, 1))
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) TestAddParallel() {
|
||||
hashes := make(chan hash.Hash)
|
||||
for _, chunk := range suite.chnx {
|
||||
go func(c chunks.Chunk) {
|
||||
suite.cache.Insert(c, 1)
|
||||
hashes <- c.Hash()
|
||||
}(chunk)
|
||||
}
|
||||
|
||||
for i := 0; i < len(suite.values); i++ {
|
||||
hash := <-hashes
|
||||
suite.True(suite.cache.has(hash))
|
||||
delete(suite.chnx, hash)
|
||||
}
|
||||
close(hashes)
|
||||
suite.Len(suite.chnx, 0)
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) TestGetParallel() {
|
||||
for _, c := range suite.chnx {
|
||||
suite.cache.Insert(c, 1)
|
||||
}
|
||||
|
||||
chunkChan := make(chan chunks.Chunk)
|
||||
for h := range suite.chnx {
|
||||
go func(h hash.Hash) {
|
||||
chunkChan <- suite.cache.Get(h)
|
||||
}(h)
|
||||
}
|
||||
|
||||
for i := 0; i < len(suite.values); i++ {
|
||||
c := <-chunkChan
|
||||
delete(suite.chnx, c.Hash())
|
||||
}
|
||||
close(chunkChan)
|
||||
suite.Len(suite.chnx, 0)
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) TestClearParallel() {
|
||||
keepIdx := 2
|
||||
toClear1, toClear2 := hash.HashSet{}, hash.HashSet{}
|
||||
for i, v := range suite.values {
|
||||
suite.cache.Insert(types.EncodeValue(v, nil), 1)
|
||||
if i < keepIdx {
|
||||
toClear1.Insert(v.Hash())
|
||||
} else if i > keepIdx {
|
||||
toClear2.Insert(v.Hash())
|
||||
}
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
clear := func(hs hash.HashSet) {
|
||||
suite.cache.Clear(hs)
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
go clear(toClear1)
|
||||
go clear(toClear2)
|
||||
|
||||
wg.Wait()
|
||||
for i, v := range suite.values {
|
||||
if i == keepIdx {
|
||||
suite.True(suite.cache.has(v.Hash()))
|
||||
continue
|
||||
}
|
||||
suite.False(suite.cache.has(v.Hash()))
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) TestReaderSubset() {
|
||||
toExtract := hash.HashSet{}
|
||||
for hash, c := range suite.chnx {
|
||||
if len(toExtract) < 2 {
|
||||
toExtract.Insert(hash)
|
||||
}
|
||||
suite.cache.Insert(c, 1)
|
||||
}
|
||||
|
||||
// Only iterate over the first 2 elements in the DB
|
||||
chunkChan := suite.extractChunks(toExtract)
|
||||
count := 0
|
||||
for c := range chunkChan {
|
||||
if suite.Contains(toExtract, c.Hash()) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
suite.Equal(len(toExtract), count)
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) TestExtractChunksOrder() {
|
||||
maxHeight := len(suite.chnx)
|
||||
orderedHashes := make(hash.HashSlice, maxHeight)
|
||||
toExtract := hash.HashSet{}
|
||||
heights := rand.Perm(maxHeight)
|
||||
for hash, c := range suite.chnx {
|
||||
toExtract.Insert(hash)
|
||||
orderedHashes[heights[0]] = hash
|
||||
suite.cache.Insert(c, uint64(heights[0]))
|
||||
heights = heights[1:]
|
||||
}
|
||||
|
||||
chunkChan := suite.extractChunks(toExtract)
|
||||
for c := range chunkChan {
|
||||
suite.Equal(orderedHashes[0], c.Hash())
|
||||
orderedHashes = orderedHashes[1:]
|
||||
}
|
||||
suite.Len(orderedHashes, 0)
|
||||
}
|
||||
|
||||
func (suite *LevelDBPutCacheSuite) extractChunks(hashes hash.HashSet) <-chan *chunks.Chunk {
|
||||
chunkChan := make(chan *chunks.Chunk)
|
||||
go func() {
|
||||
err := suite.cache.ExtractChunks(hashes, chunkChan)
|
||||
suite.NoError(err)
|
||||
close(chunkChan)
|
||||
}()
|
||||
return chunkChan
|
||||
}
|
||||
@@ -19,13 +19,6 @@ func NewRemoteDatabase(baseURL, auth string) *RemoteDatabaseClient {
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(httpBS), types.NewValueStore(httpBS), httpBS)}
|
||||
}
|
||||
|
||||
func (rdb *RemoteDatabaseClient) validatingBatchStore() types.BatchStore {
|
||||
hbs := rdb.BatchStore().(*httpBatchStore)
|
||||
// TODO: Get rid of this (BUG 2982)
|
||||
hbs.SetReverseFlushOrder()
|
||||
return hbs
|
||||
}
|
||||
|
||||
func (rdb *RemoteDatabaseClient) GetDataset(datasetID string) Dataset {
|
||||
return getDataset(rdb, datasetID)
|
||||
}
|
||||
|
||||
@@ -137,7 +137,6 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
reader.Close()
|
||||
}()
|
||||
vbs := types.NewValidatingBatchingSink(cs)
|
||||
vbs.Prepare(deserializeHints(reader))
|
||||
|
||||
// Deserialize chunks from reader in background, recovering from errors
|
||||
errChan := make(chan error)
|
||||
@@ -168,7 +167,7 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
dc := <-ch
|
||||
if dc.Chunk != nil && dc.Value != nil {
|
||||
totalDataWritten += len(dc.Chunk.Data())
|
||||
vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
vbs.Put(*dc.Chunk, *dc.Value)
|
||||
chunkCount++
|
||||
if chunkCount%100 == 0 {
|
||||
verbose.Log("Enqueued %d chunks", chunkCount)
|
||||
@@ -178,20 +177,21 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
|
||||
// If there was an error during chunk deserialization, raise so it can be logged and responded to.
|
||||
if err := <-errChan; err != nil {
|
||||
panic(d.Wrap(fmt.Errorf("Deserialization failure: %v", err)))
|
||||
d.Panic("Deserialization failure: %v", err)
|
||||
}
|
||||
|
||||
vbs.PanicIfDangling()
|
||||
vbs.Flush()
|
||||
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
}
|
||||
|
||||
// Contents of the returned io.Reader are snappy-compressed.
|
||||
func buildWriteValueRequest(chunkChan chan *chunks.Chunk, hints map[hash.Hash]struct{}) io.Reader {
|
||||
func buildWriteValueRequest(chunkChan chan *chunks.Chunk) io.Reader {
|
||||
body, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
gw := snappy.NewBufferedWriter(pw)
|
||||
serializeHints(gw, hints)
|
||||
for c := range chunkChan {
|
||||
chunks.Serialize(*c, gw)
|
||||
}
|
||||
|
||||
@@ -36,14 +36,12 @@ func TestHandleWriteValue(t *testing.T) {
|
||||
_, err := db.CommitValue(db.GetDataset("datasetID"), r)
|
||||
assert.NoError(err)
|
||||
|
||||
hint := l.Hash()
|
||||
newItem := types.NewEmptyBlob()
|
||||
itemChunk := types.EncodeValue(newItem, nil)
|
||||
l2 := l.Insert(1, types.NewRef(newItem))
|
||||
listChunk := types.EncodeValue(l2, nil)
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
serializeHints(body, map[hash.Hash]struct{}{hint: {}})
|
||||
chunks.Serialize(itemChunk, body)
|
||||
chunks.Serialize(listChunk, body)
|
||||
|
||||
@@ -64,7 +62,6 @@ func TestHandleWriteValuePanic(t *testing.T) {
|
||||
cs := chunks.NewTestStore()
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
serializeHints(body, types.Hints{})
|
||||
body.WriteString("Bogus")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
@@ -81,7 +78,6 @@ func TestHandleWriteValueDupChunks(t *testing.T) {
|
||||
itemChunk := types.EncodeValue(newItem, nil)
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
serializeHints(body, map[hash.Hash]struct{}{})
|
||||
// Write the same chunk to body enough times to be certain that at least one of the concurrent deserialize/decode passes completes before the last one can continue.
|
||||
for i := 0; i <= writeValueConcurrency; i++ {
|
||||
chunks.Serialize(itemChunk, body)
|
||||
@@ -112,21 +108,9 @@ func TestBuildWriteValueRequest(t *testing.T) {
|
||||
inChunkChan <- &chnx[1]
|
||||
close(inChunkChan)
|
||||
|
||||
hints := map[hash.Hash]struct{}{
|
||||
hash.Parse("00000000000000000000000000000002"): {},
|
||||
hash.Parse("00000000000000000000000000000003"): {},
|
||||
}
|
||||
compressed := buildWriteValueRequest(inChunkChan, hints)
|
||||
compressed := buildWriteValueRequest(inChunkChan)
|
||||
gr := snappy.NewReader(compressed)
|
||||
|
||||
count := 0
|
||||
for hint := range deserializeHints(gr) {
|
||||
count++
|
||||
_, present := hints[hint]
|
||||
assert.True(present)
|
||||
}
|
||||
assert.Equal(len(hints), count)
|
||||
|
||||
outChunkChan := make(chan *chunks.Chunk, len(chnx))
|
||||
chunks.Deserialize(gr, outChunkChan)
|
||||
close(outChunkChan)
|
||||
|
||||
@@ -1,69 +0,0 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
)
|
||||
|
||||
func serializeHints(w io.Writer, hints types.Hints) {
|
||||
err := binary.Write(w, binary.BigEndian, uint32(len(hints))) // 4 billion hints is probably absurd. Maybe this should be smaller?
|
||||
d.Chk.NoError(err)
|
||||
for r := range hints {
|
||||
serializeHash(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func serializeHashes(w io.Writer, hashes hash.HashSlice) {
|
||||
err := binary.Write(w, binary.BigEndian, uint32(len(hashes))) // 4 billion hashes is probably absurd. Maybe this should be smaller?
|
||||
d.Chk.NoError(err)
|
||||
for _, r := range hashes {
|
||||
serializeHash(w, r)
|
||||
}
|
||||
}
|
||||
|
||||
func serializeHash(w io.Writer, h hash.Hash) {
|
||||
n, err := io.Copy(w, bytes.NewReader(h[:]))
|
||||
d.Chk.NoError(err)
|
||||
d.PanicIfFalse(int64(hash.ByteLen) == n)
|
||||
}
|
||||
|
||||
func deserializeHints(reader io.Reader) types.Hints {
|
||||
numRefs := uint32(0)
|
||||
err := binary.Read(reader, binary.BigEndian, &numRefs)
|
||||
d.Chk.NoError(err)
|
||||
|
||||
hints := make(types.Hints, numRefs)
|
||||
for i := uint32(0); i < numRefs; i++ {
|
||||
hints[deserializeHash(reader)] = struct{}{}
|
||||
}
|
||||
return hints
|
||||
}
|
||||
|
||||
func deserializeHashes(reader io.Reader) hash.HashSlice {
|
||||
numRefs := uint32(0)
|
||||
err := binary.Read(reader, binary.BigEndian, &numRefs)
|
||||
d.Chk.NoError(err)
|
||||
|
||||
hashes := make(hash.HashSlice, numRefs)
|
||||
for i := range hashes {
|
||||
hashes[i] = deserializeHash(reader)
|
||||
}
|
||||
return hashes
|
||||
}
|
||||
|
||||
func deserializeHash(reader io.Reader) hash.Hash {
|
||||
h := hash.Hash{}
|
||||
n, err := io.ReadFull(reader, h[:])
|
||||
d.Chk.NoError(err)
|
||||
d.PanicIfFalse(int(hash.ByteLen) == n)
|
||||
return h
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
func TestHintRoundTrip(t *testing.T) {
|
||||
b := &bytes.Buffer{}
|
||||
input := map[hash.Hash]struct{}{
|
||||
hash.Parse("00000000000000000000000000000000"): {},
|
||||
hash.Parse("00000000000000000000000000000001"): {},
|
||||
hash.Parse("00000000000000000000000000000002"): {},
|
||||
hash.Parse("00000000000000000000000000000003"): {},
|
||||
}
|
||||
serializeHints(b, input)
|
||||
output := deserializeHints(b)
|
||||
assert.Len(t, output, len(input), "Output has different number of elements than input: %v, %v", output, input)
|
||||
for h := range output {
|
||||
_, present := input[h]
|
||||
assert.True(t, present, "%s is in output but not in input", h)
|
||||
}
|
||||
}
|
||||
@@ -29,10 +29,10 @@ func writeToEmptyStore(store types.BatchStore, src *dataSource, t assert.Testing
|
||||
|
||||
chunx := goReadChunks(src)
|
||||
for c := range chunx {
|
||||
store.SchedulePut(*c, 1, types.Hints{})
|
||||
store.SchedulePut(*c)
|
||||
}
|
||||
newRoot := chunks.NewChunk([]byte("root"))
|
||||
store.SchedulePut(newRoot, 1, types.Hints{})
|
||||
store.SchedulePut(newRoot)
|
||||
assert.True(t, store.UpdateRoot(newRoot.Hash(), root))
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ func benchmarkNoRefreshWrite(openStore storeOpenFn, src *dataSource, t assert.Te
|
||||
store := openStore()
|
||||
chunx := goReadChunks(src)
|
||||
for c := range chunx {
|
||||
store.SchedulePut(*c, 1, types.Hints{})
|
||||
store.SchedulePut(*c)
|
||||
}
|
||||
assert.NoError(t, store.Close())
|
||||
}
|
||||
|
||||
@@ -32,13 +32,10 @@ func (fb fileBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.C
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
func (fb fileBlockStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
|
||||
func (fb fileBlockStore) SchedulePut(c chunks.Chunk) {
|
||||
io.Copy(fb.bw, bytes.NewReader(c.Data()))
|
||||
}
|
||||
|
||||
func (fb fileBlockStore) AddHints(hints types.Hints) {
|
||||
}
|
||||
|
||||
func (fb fileBlockStore) Flush() {}
|
||||
|
||||
func (fb fileBlockStore) Close() error {
|
||||
|
||||
@@ -26,11 +26,7 @@ func (nb nullBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.C
|
||||
panic("not impl")
|
||||
}
|
||||
|
||||
func (nb nullBlockStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
|
||||
}
|
||||
|
||||
func (nb nullBlockStore) AddHints(hints types.Hints) {
|
||||
}
|
||||
func (nb nullBlockStore) SchedulePut(c chunks.Chunk) {}
|
||||
|
||||
func (nb nullBlockStore) Flush() {}
|
||||
|
||||
|
||||
@@ -158,23 +158,13 @@ func (suite *BlockStoreSuite) TestChunkStoreExtractChunks() {
|
||||
suite.store.PutMany(chnx)
|
||||
|
||||
chunkChan := make(chan *chunks.Chunk)
|
||||
go func() { suite.store.extractChunks(InsertOrder, chunkChan); close(chunkChan) }()
|
||||
go func() { suite.store.extractChunks(chunkChan); close(chunkChan) }()
|
||||
i := 0
|
||||
for c := range chunkChan {
|
||||
suite.Equal(chnx[i].Data(), c.Data())
|
||||
suite.Equal(chnx[i].Hash(), c.Hash())
|
||||
i++
|
||||
}
|
||||
|
||||
chunkChan = make(chan *chunks.Chunk)
|
||||
go func() { suite.store.extractChunks(ReverseOrder, chunkChan); close(chunkChan) }()
|
||||
i = len(chnx) - 1
|
||||
for c := range chunkChan {
|
||||
suite.Equal(chnx[i].Data(), c.Data())
|
||||
suite.Equal(chnx[i].Hash(), c.Hash())
|
||||
i--
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() {
|
||||
|
||||
@@ -56,8 +56,8 @@ func (nbc *NomsBlockCache) GetMany(hashes hash.HashSet, foundChunks chan *chunks
|
||||
|
||||
// ExtractChunks writes the entire contents of the cache to chunkChan. The
|
||||
// chunks are extracted in insertion order.
|
||||
func (nbc *NomsBlockCache) ExtractChunks(order EnumerationOrder, chunkChan chan *chunks.Chunk) {
|
||||
nbc.chunks.extractChunks(order, chunkChan)
|
||||
func (nbc *NomsBlockCache) ExtractChunks(chunkChan chan *chunks.Chunk) {
|
||||
nbc.chunks.extractChunks(chunkChan)
|
||||
}
|
||||
|
||||
// Count returns the number of items in the cache.
|
||||
|
||||
@@ -99,10 +99,10 @@ func (ccs *compactingChunkSource) calcReads(reqs []getRecord, blockSize uint64)
|
||||
return ccs.cs.calcReads(reqs, blockSize)
|
||||
}
|
||||
|
||||
func (ccs *compactingChunkSource) extract(order EnumerationOrder, chunks chan<- extractRecord) {
|
||||
func (ccs *compactingChunkSource) extract(chunks chan<- extractRecord) {
|
||||
ccs.wg.Wait()
|
||||
d.Chk.True(ccs.cs != nil)
|
||||
ccs.cs.extract(order, chunks)
|
||||
ccs.cs.extract(chunks)
|
||||
}
|
||||
|
||||
type emptyChunkSource struct{}
|
||||
@@ -143,4 +143,4 @@ func (ecs emptyChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads
|
||||
return 0, true
|
||||
}
|
||||
|
||||
func (ecs emptyChunkSource) extract(order EnumerationOrder, chunks chan<- extractRecord) {}
|
||||
func (ecs emptyChunkSource) extract(chunks chan<- extractRecord) {}
|
||||
|
||||
@@ -91,17 +91,11 @@ func (mt *memTable) getMany(reqs []getRecord, foundChunks chan *chunks.Chunk, wg
|
||||
return
|
||||
}
|
||||
|
||||
func (mt *memTable) extract(order EnumerationOrder, chunks chan<- extractRecord) {
|
||||
if order == InsertOrder {
|
||||
for _, hrec := range mt.order {
|
||||
chunks <- extractRecord{*hrec.a, mt.chunks[*hrec.a]}
|
||||
}
|
||||
return
|
||||
}
|
||||
for i := len(mt.order) - 1; i >= 0; i-- {
|
||||
hrec := mt.order[i]
|
||||
func (mt *memTable) extract(chunks chan<- extractRecord) {
|
||||
for _, hrec := range mt.order {
|
||||
chunks <- extractRecord{*hrec.a, mt.chunks[*hrec.a]}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32) {
|
||||
|
||||
@@ -184,14 +184,8 @@ func (crg chunkReaderGroup) uncompressedLen() (data uint64) {
|
||||
return
|
||||
}
|
||||
|
||||
func (crg chunkReaderGroup) extract(order EnumerationOrder, chunks chan<- extractRecord) {
|
||||
if order == InsertOrder {
|
||||
for _, haver := range crg {
|
||||
haver.extract(order, chunks)
|
||||
}
|
||||
return
|
||||
}
|
||||
for i := len(crg) - 1; i >= 0; i-- {
|
||||
crg[i].extract(order, chunks)
|
||||
func (crg chunkReaderGroup) extract(chunks chan<- extractRecord) {
|
||||
for _, haver := range crg {
|
||||
haver.extract(chunks)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,6 +166,6 @@ type panicingChunkSource struct {
|
||||
chunkSource
|
||||
}
|
||||
|
||||
func (pcs panicingChunkSource) extract(order EnumerationOrder, chunks chan<- extractRecord) {
|
||||
func (pcs panicingChunkSource) extract(chunks chan<- extractRecord) {
|
||||
panic("onoes")
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ import (
|
||||
"github.com/attic-labs/noms/go/constants"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/dynamodb"
|
||||
"github.com/aws/aws-sdk-go/service/s3"
|
||||
@@ -27,8 +26,6 @@ import (
|
||||
// names of the tables that hold all the chunks in the store. The number of
|
||||
// chunks in each table is also stored in the manifest.
|
||||
|
||||
type EnumerationOrder uint8
|
||||
|
||||
const (
|
||||
// StorageVersion is the version of the on-disk Noms Chunks Store data format.
|
||||
StorageVersion = "2"
|
||||
@@ -38,9 +35,6 @@ const (
|
||||
defaultMaxTables = 128
|
||||
|
||||
defaultIndexCacheSize = (1 << 20) * 8 // 8MB
|
||||
|
||||
InsertOrder EnumerationOrder = iota
|
||||
ReverseOrder
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -174,7 +168,7 @@ func (nbs *NomsBlockStore) Put(c chunks.Chunk) {
|
||||
nbs.putCount++
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
|
||||
func (nbs *NomsBlockStore) SchedulePut(c chunks.Chunk) {
|
||||
nbs.Put(c)
|
||||
}
|
||||
|
||||
@@ -227,12 +221,9 @@ func (nbs *NomsBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks
|
||||
nbs.mu.RLock()
|
||||
defer nbs.mu.RUnlock()
|
||||
tables = nbs.tables
|
||||
|
||||
remaining = true
|
||||
if nbs.mt != nil {
|
||||
remaining = nbs.mt.getMany(reqs, foundChunks, &sync.WaitGroup{})
|
||||
wg.Wait()
|
||||
} else {
|
||||
remaining = true
|
||||
remaining = nbs.mt.getMany(reqs, foundChunks, nil)
|
||||
}
|
||||
|
||||
return
|
||||
@@ -275,24 +266,17 @@ func (nbs *NomsBlockStore) CalcReads(hashes hash.HashSet, blockSize uint64) (rea
|
||||
return
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) extractChunks(order EnumerationOrder, chunkChan chan<- *chunks.Chunk) {
|
||||
func (nbs *NomsBlockStore) extractChunks(chunkChan chan<- *chunks.Chunk) {
|
||||
ch := make(chan extractRecord, 1)
|
||||
go func() {
|
||||
defer close(ch)
|
||||
nbs.mu.RLock()
|
||||
defer nbs.mu.RUnlock()
|
||||
// Chunks in nbs.tables were inserted before those in nbs.mt, so extract chunks there _first_ if we're doing InsertOrder...
|
||||
if order == InsertOrder {
|
||||
nbs.tables.extract(order, ch)
|
||||
}
|
||||
// Chunks in nbs.tables were inserted before those in nbs.mt, so extract chunks there _first_
|
||||
nbs.tables.extract(ch)
|
||||
if nbs.mt != nil {
|
||||
nbs.mt.extract(order, ch)
|
||||
nbs.mt.extract(ch)
|
||||
}
|
||||
// ...and do them _second_ if we're doing ReverseOrder
|
||||
if order == ReverseOrder {
|
||||
nbs.tables.extract(order, ch)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}()
|
||||
for rec := range ch {
|
||||
c := chunks.NewChunkWithHash(hash.Hash(rec.a), rec.data)
|
||||
@@ -322,6 +306,55 @@ func (nbs *NomsBlockStore) Has(h hash.Hash) bool {
|
||||
return has || tables.has(a)
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) HasMany(hashes hash.HashSet) hash.HashSet {
|
||||
reqs := toHasRecords(hashes)
|
||||
|
||||
tables, remaining := func() (tables chunkReader, remaining bool) {
|
||||
nbs.mu.RLock()
|
||||
defer nbs.mu.RUnlock()
|
||||
tables = nbs.tables
|
||||
|
||||
remaining = true
|
||||
if nbs.mt != nil {
|
||||
remaining = nbs.mt.hasMany(reqs)
|
||||
}
|
||||
|
||||
return
|
||||
}()
|
||||
|
||||
if remaining {
|
||||
tables.hasMany(reqs)
|
||||
}
|
||||
return fromHasRecords(reqs)
|
||||
}
|
||||
|
||||
func toHasRecords(hashes hash.HashSet) []hasRecord {
|
||||
reqs := make([]hasRecord, len(hashes))
|
||||
idx := 0
|
||||
for h := range hashes {
|
||||
a := addr(h)
|
||||
reqs[idx] = hasRecord{
|
||||
a: &a,
|
||||
prefix: a.Prefix(),
|
||||
order: idx,
|
||||
}
|
||||
idx++
|
||||
}
|
||||
|
||||
sort.Sort(hasRecordByPrefix(reqs))
|
||||
return reqs
|
||||
}
|
||||
|
||||
func fromHasRecords(reqs []hasRecord) hash.HashSet {
|
||||
present := hash.HashSet{}
|
||||
for _, r := range reqs {
|
||||
if r.has {
|
||||
present.Insert(hash.New(r.a[:]))
|
||||
}
|
||||
}
|
||||
return present
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) Root() hash.Hash {
|
||||
nbs.mu.RLock()
|
||||
defer nbs.mu.RUnlock()
|
||||
@@ -373,11 +406,6 @@ func (nbs *NomsBlockStore) Close() (err error) {
|
||||
return nbs.tables.Close()
|
||||
}
|
||||
|
||||
// types.BatchStore
|
||||
func (nbs *NomsBlockStore) AddHints(hints types.Hints) {
|
||||
// noop
|
||||
}
|
||||
|
||||
func (nbs *NomsBlockStore) Flush() {
|
||||
b := &backoff.Backoff{
|
||||
Min: 128 * time.Microsecond,
|
||||
|
||||
@@ -206,7 +206,7 @@ type chunkReader interface {
|
||||
getMany(reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup) bool
|
||||
count() uint32
|
||||
uncompressedLen() uint64
|
||||
extract(order EnumerationOrder, chunks chan<- extractRecord)
|
||||
extract(chunks chan<- extractRecord)
|
||||
}
|
||||
|
||||
type chunkSource interface {
|
||||
|
||||
@@ -94,7 +94,7 @@ func compactSourcesToBuffer(sources chunkSources, rl chan struct{}) (name addr,
|
||||
}()
|
||||
rl <- struct{}{}
|
||||
|
||||
s.extract(InsertOrder, c)
|
||||
s.extract(c)
|
||||
}(src, chunks)
|
||||
chunkChans <- chunks
|
||||
}
|
||||
|
||||
@@ -412,7 +412,7 @@ func (tr tableReader) calcReads(reqs []getRecord, blockSize uint64) (reads int,
|
||||
return
|
||||
}
|
||||
|
||||
func (tr tableReader) extract(order EnumerationOrder, chunks chan<- extractRecord) {
|
||||
func (tr tableReader) extract(chunks chan<- extractRecord) {
|
||||
// Build reverse lookup table from ordinal -> chunk hash
|
||||
hashes := make(addrSlice, len(tr.prefixes))
|
||||
for idx, prefix := range tr.prefixes {
|
||||
@@ -432,12 +432,6 @@ func (tr tableReader) extract(order EnumerationOrder, chunks chan<- extractRecor
|
||||
chunks <- extractRecord{hashes[i], tr.parseChunk(buff[localOffset : localOffset+uint64(tr.lengths[i])])}
|
||||
}
|
||||
|
||||
if order == ReverseOrder {
|
||||
for i := uint32(1); i <= tr.chunkCount; i++ {
|
||||
sendChunk(tr.chunkCount - i)
|
||||
}
|
||||
return
|
||||
}
|
||||
for i := uint32(0); i < tr.chunkCount; i++ {
|
||||
sendChunk(i)
|
||||
}
|
||||
|
||||
@@ -180,22 +180,13 @@ func (ts tableSet) Compact() (ns tableSet, compactees chunkSources) {
|
||||
return ns, toCompact
|
||||
}
|
||||
|
||||
func (ts tableSet) extract(order EnumerationOrder, chunks chan<- extractRecord) {
|
||||
// Since new tables are _prepended_ to a tableSet, extracting chunks in ReverseOrder requires iterating ts.novel, then ts.upstream, from front to back, while doing insertOrder requires iterating ts.upstream back to front, followed by ts.novel.
|
||||
if order == ReverseOrder {
|
||||
for _, cs := range ts.novel {
|
||||
cs.extract(order, chunks)
|
||||
}
|
||||
for _, cs := range ts.upstream {
|
||||
cs.extract(order, chunks)
|
||||
}
|
||||
return
|
||||
}
|
||||
func (ts tableSet) extract(chunks chan<- extractRecord) {
|
||||
// Since new tables are _prepended_ to a tableSet, extracting chunks in insertOrder requires iterating ts.upstream back to front, followed by ts.novel.
|
||||
for i := len(ts.upstream) - 1; i >= 0; i-- {
|
||||
ts.upstream[i].extract(order, chunks)
|
||||
ts.upstream[i].extract(chunks)
|
||||
}
|
||||
for i := len(ts.novel) - 1; i >= 0; i-- {
|
||||
ts.novel[i].extract(order, chunks)
|
||||
ts.novel[i].extract(chunks)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -101,7 +101,7 @@ func TestTableSetExtract(t *testing.T) {
|
||||
ts = ts.Prepend(mt)
|
||||
|
||||
chunkChan := make(chan extractRecord)
|
||||
go func() { ts.extract(InsertOrder, chunkChan); close(chunkChan) }()
|
||||
go func() { defer close(chunkChan); ts.extract(chunkChan) }()
|
||||
i := 0
|
||||
for rec := range chunkChan {
|
||||
a := computeAddr(testChunks[i])
|
||||
@@ -111,17 +111,6 @@ func TestTableSetExtract(t *testing.T) {
|
||||
i++
|
||||
}
|
||||
|
||||
chunkChan = make(chan extractRecord)
|
||||
go func() { ts.extract(ReverseOrder, chunkChan); close(chunkChan) }()
|
||||
i = len(testChunks) - 1
|
||||
for rec := range chunkChan {
|
||||
a := computeAddr(testChunks[i])
|
||||
assert.NotNil(rec.data, "Nothing for", a)
|
||||
assert.Equal(testChunks[i], rec.data, "Item %d: %s != %s", i, string(testChunks[i]), string(rec.data))
|
||||
assert.Equal(a, rec.a)
|
||||
i--
|
||||
}
|
||||
|
||||
ts.Close()
|
||||
}
|
||||
|
||||
|
||||
@@ -233,7 +233,7 @@ func TestExtract(t *testing.T) {
|
||||
addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])}
|
||||
|
||||
chunkChan := make(chan extractRecord)
|
||||
go func() { tr.extract(InsertOrder, chunkChan); close(chunkChan) }()
|
||||
go func() { tr.extract(chunkChan); close(chunkChan) }()
|
||||
i := 0
|
||||
for rec := range chunkChan {
|
||||
assert.NotNil(rec.data, "Nothing for", addrs[i])
|
||||
@@ -241,16 +241,6 @@ func TestExtract(t *testing.T) {
|
||||
assert.Equal(chunks[i], rec.data)
|
||||
i++
|
||||
}
|
||||
|
||||
chunkChan = make(chan extractRecord)
|
||||
go func() { tr.extract(ReverseOrder, chunkChan); close(chunkChan) }()
|
||||
i = len(chunks) - 1
|
||||
for rec := range chunkChan {
|
||||
assert.NotNil(rec.data, "Nothing for", addrs[i])
|
||||
assert.Equal(addrs[i], rec.a)
|
||||
assert.Equal(chunks[i], rec.data)
|
||||
i--
|
||||
}
|
||||
}
|
||||
|
||||
func Test65k(t *testing.T) {
|
||||
|
||||
@@ -14,7 +14,9 @@ import (
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
)
|
||||
|
||||
// BatchStore provides an interface similar to chunks.ChunkStore, but batch-oriented. Instead of Put(), it provides SchedulePut(), which enqueues a Chunk to be sent at a possibly later time.
|
||||
// BatchStore provides an interface similar to chunks.ChunkStore, but batch-
|
||||
// oriented. Instead of Put(), it provides SchedulePut(), which enqueues a
|
||||
// Chunk to be sent at a possibly later time.
|
||||
type BatchStore interface {
|
||||
// Get returns the Chunk with the hash h from the store. If h is absent
|
||||
// from the store, chunks.EmptyChunk is returned.
|
||||
@@ -28,19 +30,9 @@ type BatchStore interface {
|
||||
// SchedulePut enqueues a write for the Chunk c with the given refHeight.
|
||||
// c must be visible to subsequent Get() calls upon return. Typically, the
|
||||
// Value which was encoded to provide c can also be queried for its
|
||||
// refHeight. The call may or may not block until c is persisted. The
|
||||
// provided hints are used to assist in validation. Validation requires
|
||||
// checking that all refs embedded in c are themselves valid, which could
|
||||
// naively be done by resolving each one. Instead, hints provides a
|
||||
// (smaller) set of refs that point to Chunks that themselves contain many
|
||||
// of c's refs. Thus, by checking only the hinted Chunks, c can be
|
||||
// validated with fewer read operations.
|
||||
// c may or may not be persisted when SchedulePut() returns, but is
|
||||
// refHeight. The call may or may not block until c is persisted, but c is
|
||||
// guaranteed to be persistent after a call to Flush() or UpdateRoot().
|
||||
SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints)
|
||||
|
||||
// AddHints allows additional hints, as used by SchedulePut, to be added for use in the current batch.
|
||||
AddHints(hints Hints)
|
||||
SchedulePut(c chunks.Chunk)
|
||||
|
||||
// Flush causes enqueued Puts to be persisted.
|
||||
Flush()
|
||||
@@ -48,16 +40,18 @@ type BatchStore interface {
|
||||
io.Closer
|
||||
}
|
||||
|
||||
// Hints are a set of hashes that should be used to speed up the validation of one or more Chunks.
|
||||
type Hints hash.HashSet
|
||||
|
||||
// BatchStoreAdaptor provides a naive implementation of BatchStore should only be used with ChunkStores that can Put relatively quickly. It provides no actual batching or validation. Its intended use is for adapting a ChunkStore for use in something that requires a BatchStore.
|
||||
// BatchStoreAdaptor provides a naive implementation of BatchStore should only
|
||||
// be used with ChunkStores that can Put relatively quickly. It provides no
|
||||
// actual batching or validation. Its intended use is for adapting a
|
||||
// ChunkStore for use in something that requires a BatchStore.
|
||||
type BatchStoreAdaptor struct {
|
||||
cs chunks.ChunkStore
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
// NewBatchStoreAdaptor returns a BatchStore instance backed by a ChunkStore. Takes ownership of cs and manages its lifetime; calling Close on the returned BatchStore will Close cs.
|
||||
// NewBatchStoreAdaptor returns a BatchStore instance backed by a ChunkStore.
|
||||
// Takes ownership of cs and manages its lifetime; calling Close on the
|
||||
// returned BatchStore will Close cs.
|
||||
func NewBatchStoreAdaptor(cs chunks.ChunkStore) BatchStore {
|
||||
return &BatchStoreAdaptor{cs: cs}
|
||||
}
|
||||
@@ -74,8 +68,8 @@ func (bsa *BatchStoreAdaptor) GetMany(hashes hash.HashSet, foundChunks chan *chu
|
||||
bsa.cs.GetMany(hashes, foundChunks)
|
||||
}
|
||||
|
||||
// SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints.
|
||||
func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) {
|
||||
// SchedulePut simply calls Put on the underlying ChunkStore.
|
||||
func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk) {
|
||||
bsa.once.Do(bsa.expectVersion)
|
||||
bsa.cs.Put(c)
|
||||
}
|
||||
@@ -96,9 +90,6 @@ func (bsa *BatchStoreAdaptor) UpdateRoot(current, last hash.Hash) bool {
|
||||
return bsa.cs.UpdateRoot(current, last)
|
||||
}
|
||||
|
||||
// AddHints is a noop.
|
||||
func (bsa *BatchStoreAdaptor) AddHints(hints Hints) {}
|
||||
|
||||
func (bsa *BatchStoreAdaptor) Flush() {
|
||||
bsa.once.Do(bsa.expectVersion)
|
||||
bsa.cs.Flush()
|
||||
|
||||
@@ -5,8 +5,6 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
@@ -15,37 +13,28 @@ import (
|
||||
const batchSize = 100
|
||||
|
||||
type ValidatingBatchingSink struct {
|
||||
vs *ValueStore
|
||||
cs chunks.ChunkStore
|
||||
batch [batchSize]chunks.Chunk
|
||||
count int
|
||||
pool sync.Pool
|
||||
vs *ValueStore
|
||||
cs chunks.ChunkStore
|
||||
validate bool
|
||||
unresolved hash.HashSet
|
||||
}
|
||||
|
||||
func NewValidatingBatchingSink(cs chunks.ChunkStore) *ValidatingBatchingSink {
|
||||
return &ValidatingBatchingSink{
|
||||
vs: newLocalValueStore(cs),
|
||||
cs: cs,
|
||||
vs: newLocalValueStore(cs),
|
||||
cs: cs,
|
||||
validate: true,
|
||||
unresolved: hash.HashSet{},
|
||||
}
|
||||
}
|
||||
|
||||
// Prepare primes the type info cache used to validate Enqueued Chunks by reading the Chunks referenced by the provided hints.
|
||||
func (vbs *ValidatingBatchingSink) Prepare(hints Hints) {
|
||||
foundChunks := make(chan *chunks.Chunk, batchSize)
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < batchSize; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for c := range foundChunks {
|
||||
v := DecodeFromBytes(c.Data(), vbs.vs)
|
||||
vbs.vs.setHintsForReadValue(v, c.Hash(), false)
|
||||
}
|
||||
}()
|
||||
func NewCompletenessCheckingBatchingSink(cs chunks.ChunkStore) *ValidatingBatchingSink {
|
||||
return &ValidatingBatchingSink{
|
||||
vs: newLocalValueStore(cs),
|
||||
cs: cs,
|
||||
validate: false,
|
||||
unresolved: hash.HashSet{},
|
||||
}
|
||||
vbs.cs.GetMany(hash.HashSet(hints), foundChunks)
|
||||
close(foundChunks)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// DecodedChunk holds a pointer to a Chunk and the Value that results from
|
||||
@@ -57,45 +46,50 @@ type DecodedChunk struct {
|
||||
|
||||
// DecodeUnqueued decodes c and checks that the hash of the resulting value
|
||||
// matches c.Hash(). It returns a DecodedChunk holding both c and a pointer to
|
||||
// the decoded Value. However, if c has already been Enqueued, DecodeUnqueued
|
||||
// returns an empty DecodedChunk.
|
||||
// the decoded Value.
|
||||
func (vbs *ValidatingBatchingSink) DecodeUnqueued(c *chunks.Chunk) DecodedChunk {
|
||||
h := c.Hash()
|
||||
if vbs.vs.isPresent(h) {
|
||||
return DecodedChunk{}
|
||||
var v Value
|
||||
if vbs.validate {
|
||||
v = decodeFromBytesWithValidation(c.Data(), vbs.vs)
|
||||
} else {
|
||||
v = DecodeFromBytes(c.Data(), vbs.vs)
|
||||
}
|
||||
v := decodeFromBytesWithValidation(c.Data(), vbs.vs)
|
||||
|
||||
if getHash(v) != h {
|
||||
d.Panic("Invalid hash found")
|
||||
}
|
||||
return DecodedChunk{c, &v}
|
||||
}
|
||||
|
||||
// Enqueue adds c to the queue of Chunks waiting to be Put into vbs' backing
|
||||
// ChunkStore. It is assumed that v is the Value decoded from c, and so v can
|
||||
// be used to validate the ref-completeness of c. The instance keeps an
|
||||
// internal buffer of Chunks, spilling to the ChunkStore when the buffer is
|
||||
// full.
|
||||
func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk, v Value) {
|
||||
// Put Puts c into vbs' backing ChunkStore. It is assumed that v is the Value
|
||||
// decoded from c, and so v can be used to validate the ref-completeness of c.
|
||||
func (vbs *ValidatingBatchingSink) Put(c chunks.Chunk, v Value) {
|
||||
h := c.Hash()
|
||||
vbs.vs.ensureChunksInCache(v)
|
||||
vbs.vs.set(h, hintedChunk{TypeOf(v), h}, false)
|
||||
|
||||
vbs.batch[vbs.count] = c
|
||||
vbs.count++
|
||||
|
||||
if vbs.count == batchSize {
|
||||
vbs.cs.PutMany(vbs.batch[:vbs.count])
|
||||
vbs.count = 0
|
||||
}
|
||||
vbs.unresolved.Remove(h)
|
||||
v.WalkRefs(func(ref Ref) {
|
||||
vbs.unresolved.Insert(ref.TargetHash())
|
||||
})
|
||||
vbs.cs.Put(c)
|
||||
}
|
||||
|
||||
// Flush Puts any Chunks buffered by Enqueue calls into the backing
|
||||
// ChunkStore.
|
||||
// Flush makes durable all enqueued Chunks.
|
||||
func (vbs *ValidatingBatchingSink) Flush() {
|
||||
if vbs.count > 0 {
|
||||
vbs.cs.PutMany(vbs.batch[:vbs.count])
|
||||
}
|
||||
vbs.cs.Flush()
|
||||
vbs.count = 0
|
||||
}
|
||||
|
||||
// PanicIfDangling does a Has check on all the references encountered
|
||||
// while enqueuing novel chunks. It panics if any of these refs point
|
||||
// to Chunks that don't exist in the backing ChunkStore.
|
||||
func (vbs *ValidatingBatchingSink) PanicIfDangling() {
|
||||
present := vbs.cs.HasMany(vbs.unresolved)
|
||||
absent := hash.HashSlice{}
|
||||
for h := range vbs.unresolved {
|
||||
if !present.Has(h) {
|
||||
absent = append(absent, h)
|
||||
}
|
||||
}
|
||||
if len(absent) != 0 {
|
||||
d.Panic("Found dangling references to %v", absent)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,28 +11,6 @@ import (
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
func TestValidatingBatchingSinkPrepare(t *testing.T) {
|
||||
cs := chunks.NewTestStore()
|
||||
hints := Hints{}
|
||||
chnx := []chunks.Chunk{
|
||||
EncodeValue(Number(42), nil),
|
||||
EncodeValue(Number(-7), nil),
|
||||
EncodeValue(String("oy"), nil),
|
||||
EncodeValue(Bool(true), nil),
|
||||
EncodeValue(NewBlob(), nil),
|
||||
}
|
||||
for _, c := range chnx {
|
||||
cs.Put(c)
|
||||
hints[c.Hash()] = struct{}{}
|
||||
}
|
||||
|
||||
vbs := NewValidatingBatchingSink(cs)
|
||||
vbs.Prepare(hints)
|
||||
for h := range hints {
|
||||
vbs.vs.isPresent(h)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidatingBatchingSinkDecode(t *testing.T) {
|
||||
v := Number(42)
|
||||
c := EncodeValue(v, nil)
|
||||
@@ -42,17 +20,6 @@ func TestValidatingBatchingSinkDecode(t *testing.T) {
|
||||
assert.True(t, v.Equals(*dc.Value))
|
||||
}
|
||||
|
||||
func TestValidatingBatchingSinkDecodeAlreadyEnqueued(t *testing.T) {
|
||||
v := Number(42)
|
||||
c := EncodeValue(v, nil)
|
||||
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
|
||||
|
||||
vbs.Enqueue(c, v)
|
||||
dc := vbs.DecodeUnqueued(&c)
|
||||
assert.Nil(t, dc.Chunk)
|
||||
assert.Nil(t, dc.Value)
|
||||
}
|
||||
|
||||
func assertPanicsOnInvalidChunk(t *testing.T, data []interface{}) {
|
||||
cs := chunks.NewTestStore()
|
||||
vs := newLocalValueStore(cs)
|
||||
@@ -109,20 +76,54 @@ func TestValidatingBatchingSinkEnqueueAndFlush(t *testing.T) {
|
||||
cs := chunks.NewTestStore()
|
||||
vbs := NewValidatingBatchingSink(cs)
|
||||
|
||||
vbs.Enqueue(c, v)
|
||||
vbs.Put(c, v)
|
||||
vbs.Flush()
|
||||
assert.Equal(t, 1, cs.Writes)
|
||||
}
|
||||
|
||||
func TestValidatingBatchingSinkEnqueueImplicitFlush(t *testing.T) {
|
||||
cs := chunks.NewTestStore()
|
||||
vbs := NewValidatingBatchingSink(cs)
|
||||
func TestValidatingBatchingSinkPanicIfDangling(t *testing.T) {
|
||||
b := Bool(true)
|
||||
r := NewRef(b)
|
||||
|
||||
for i := 0; i <= batchSize; i++ {
|
||||
v := Number(i)
|
||||
vbs.Enqueue(EncodeValue(v, nil), v)
|
||||
}
|
||||
assert.Equal(t, batchSize, cs.Writes)
|
||||
vbs.Flush()
|
||||
assert.Equal(t, 1, cs.Writes-batchSize)
|
||||
t.Run("Panic", func(t *testing.T) {
|
||||
t.Run("PreFlush", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
|
||||
vbs.Put(EncodeValue(r, nil), r)
|
||||
assert.Panics(t, vbs.PanicIfDangling)
|
||||
})
|
||||
t.Run("PostFlush", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
|
||||
vbs.Put(EncodeValue(r, nil), r)
|
||||
vbs.Flush()
|
||||
assert.Panics(t, vbs.PanicIfDangling)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Success", func(t *testing.T) {
|
||||
t.Run("BatchInOrder", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
|
||||
vbs.Put(EncodeValue(b, nil), b)
|
||||
vbs.Put(EncodeValue(r, nil), r)
|
||||
assert.NotPanics(t, vbs.PanicIfDangling)
|
||||
})
|
||||
t.Run("BatchOutOfOrder", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
|
||||
vbs.Put(EncodeValue(r, nil), r)
|
||||
vbs.Put(EncodeValue(b, nil), b)
|
||||
assert.NotPanics(t, vbs.PanicIfDangling)
|
||||
})
|
||||
t.Run("ExistingChunk", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
cs := chunks.NewTestStore()
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
|
||||
vbs := NewValidatingBatchingSink(cs)
|
||||
vbs.Put(EncodeValue(r, nil), r)
|
||||
assert.NotPanics(t, vbs.PanicIfDangling)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -38,19 +38,15 @@ type ValueReadWriter interface {
|
||||
}
|
||||
|
||||
// ValueStore provides methods to read and write Noms Values to a BatchStore.
|
||||
// It validates Values as they are written, but does not guarantee that these
|
||||
// Values are persisted through the BatchStore until a subsequent Flush.
|
||||
// It minimally validates Values as they're written, but does not guarantee
|
||||
// that these Values are persisted through the BatchStore until a subsequent
|
||||
// Flush.
|
||||
// Currently, WriteValue validates the following properties of a Value v:
|
||||
// - v can be correctly serialized and its Ref taken
|
||||
// - all Refs in v point to a Value that can be read from this ValueStore
|
||||
// - all Refs in v point to a Value of the correct Type
|
||||
type ValueStore struct {
|
||||
bs BatchStore
|
||||
cacheMu sync.RWMutex
|
||||
hintCache map[hash.Hash]chunkCacheEntry
|
||||
pendingHints map[hash.Hash]chunkCacheEntry
|
||||
pendingMu sync.RWMutex
|
||||
pendingPuts map[hash.Hash]pendingChunk
|
||||
pendingPuts map[hash.Hash]chunks.Chunk
|
||||
pendingPutMax uint64
|
||||
pendingPutSize uint64
|
||||
pendingParents map[hash.Hash]uint64 // chunk Hash -> ref height
|
||||
@@ -64,18 +60,6 @@ const (
|
||||
defaultPendingPutMax = 1 << 28 // 256MB
|
||||
)
|
||||
|
||||
type chunkCacheEntry interface {
|
||||
Present() bool
|
||||
Hint() hash.Hash
|
||||
typeOf() *Type
|
||||
}
|
||||
|
||||
type pendingChunk struct {
|
||||
c chunks.Chunk
|
||||
height uint64
|
||||
hints Hints
|
||||
}
|
||||
|
||||
// NewTestValueStore creates a simple struct that satisfies ValueReadWriter
|
||||
// and is backed by a chunks.TestStore.
|
||||
func NewTestValueStore() *ValueStore {
|
||||
@@ -99,12 +83,10 @@ func NewValueStoreWithCache(bs BatchStore, cacheSize uint64) *ValueStore {
|
||||
|
||||
func newValueStoreWithCacheAndPending(bs BatchStore, cacheSize, pendingMax uint64) *ValueStore {
|
||||
return &ValueStore{
|
||||
bs: bs,
|
||||
cacheMu: sync.RWMutex{},
|
||||
hintCache: map[hash.Hash]chunkCacheEntry{},
|
||||
pendingHints: map[hash.Hash]chunkCacheEntry{},
|
||||
bs: bs,
|
||||
|
||||
pendingMu: sync.RWMutex{},
|
||||
pendingPuts: map[hash.Hash]pendingChunk{},
|
||||
pendingPuts: map[hash.Hash]chunks.Chunk{},
|
||||
pendingPutMax: pendingMax,
|
||||
pendingParents: map[hash.Hash]uint64{},
|
||||
|
||||
@@ -128,13 +110,11 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value {
|
||||
return v.(Value)
|
||||
}
|
||||
|
||||
stillPending := false
|
||||
chunk := func() chunks.Chunk {
|
||||
lvs.pendingMu.RLock()
|
||||
defer lvs.pendingMu.RUnlock()
|
||||
if pc, ok := lvs.pendingPuts[h]; ok {
|
||||
stillPending = true
|
||||
return pc.c
|
||||
if pending, ok := lvs.pendingPuts[h]; ok {
|
||||
return pending
|
||||
}
|
||||
return chunks.EmptyChunk
|
||||
}()
|
||||
@@ -148,23 +128,9 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value {
|
||||
|
||||
v := DecodeValue(chunk, lvs)
|
||||
lvs.valueCache.Add(h, uint64(len(chunk.Data())), v)
|
||||
lvs.setHintsForReadValue(v, h, stillPending)
|
||||
return v
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) setHintsForReadValue(v Value, h hash.Hash, toPending bool) {
|
||||
var entry chunkCacheEntry = absentChunk{}
|
||||
if v != nil {
|
||||
lvs.setHintsForReachable(v, h, toPending)
|
||||
// h is trivially a hint for v, so consider putting that in the hintCache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because h already has a hint in the hintCache. If we later read some other chunk that references v, setHintsForReachable will overwrite this with a hint pointing to that chunk.
|
||||
// If we don't do this, top-level Values that get read but not written -- such as the existing Head of a Database upon a Commit -- can be erroneously left out during a pull.
|
||||
entry = hintedChunk{TypeOf(v), h}
|
||||
}
|
||||
if cur := lvs.check(h); cur == nil || cur.Hint().IsEmpty() {
|
||||
lvs.set(h, entry, toPending)
|
||||
}
|
||||
}
|
||||
|
||||
// ReadManyValues reads and decodes Values indicated by |hashes| from lvs. On
|
||||
// return, |foundValues| will have been fully sent all Values which have been
|
||||
// found. Any non-present Values will silently be ignored.
|
||||
@@ -172,7 +138,6 @@ func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Va
|
||||
decode := func(h hash.Hash, chunk *chunks.Chunk, toPending bool) Value {
|
||||
v := DecodeValue(*chunk, lvs)
|
||||
lvs.valueCache.Add(h, uint64(len(chunk.Data())), v)
|
||||
lvs.setHintsForReadValue(v, h, toPending)
|
||||
return v
|
||||
}
|
||||
|
||||
@@ -189,8 +154,8 @@ func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Va
|
||||
chunk := func() chunks.Chunk {
|
||||
lvs.pendingMu.RLock()
|
||||
defer lvs.pendingMu.RUnlock()
|
||||
if pc, ok := lvs.pendingPuts[h]; ok {
|
||||
return pc.c
|
||||
if pending, ok := lvs.pendingPuts[h]; ok {
|
||||
return pending
|
||||
}
|
||||
return chunks.EmptyChunk
|
||||
}()
|
||||
@@ -238,17 +203,12 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
|
||||
h := c.Hash()
|
||||
height := maxChunkHeight(v) + 1
|
||||
r := constructRef(MakeRefType(TypeOf(v)), h, height)
|
||||
if lvs.isPresent(h) {
|
||||
if v, ok := lvs.valueCache.Get(h); ok && v != nil {
|
||||
return r
|
||||
}
|
||||
|
||||
// TODO: It _really_ feels like there should be some refactoring that allows us to only have to walk the refs of |v| once, but I'm hesitant to undertake that refactor right now.
|
||||
hints := lvs.chunkHintsFromCache(v)
|
||||
lvs.bufferChunk(v, c, height, hints)
|
||||
|
||||
lvs.setHintsForReachable(v, h, true)
|
||||
lvs.set(h, (*presentChunk)(TypeOf(v)), false)
|
||||
lvs.valueCache.Drop(h)
|
||||
lvs.bufferChunk(v, c, height)
|
||||
lvs.valueCache.Drop(h) // valueCache may have an entry saying h is not present. Clear that.
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -262,22 +222,22 @@ func (lvs *ValueStore) WriteValue(v Value) Ref {
|
||||
// flushed).
|
||||
// 2. The total data occupied by buffered chunks does not exceed
|
||||
// lvs.pendingPutMax
|
||||
func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64, hints Hints) {
|
||||
func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64) {
|
||||
lvs.pendingMu.Lock()
|
||||
defer lvs.pendingMu.Unlock()
|
||||
h := c.Hash()
|
||||
d.Chk.NotZero(height)
|
||||
lvs.pendingPuts[h] = pendingChunk{c, height, hints}
|
||||
d.PanicIfTrue(height == 0)
|
||||
lvs.pendingPuts[h] = c
|
||||
lvs.pendingPutSize += uint64(len(c.Data()))
|
||||
|
||||
putChildren := func(parent hash.Hash) (dataPut int) {
|
||||
pc, present := lvs.pendingPuts[parent]
|
||||
d.Chk.True(present)
|
||||
v := DecodeValue(pc.c, lvs)
|
||||
pending, present := lvs.pendingPuts[parent]
|
||||
d.PanicIfFalse(present)
|
||||
v := DecodeValue(pending, lvs)
|
||||
v.WalkRefs(func(grandchildRef Ref) {
|
||||
if pc, present := lvs.pendingPuts[grandchildRef.TargetHash()]; present {
|
||||
lvs.bs.SchedulePut(pc.c, pc.height, pc.hints)
|
||||
dataPut += len(pc.c.Data())
|
||||
if pending, present := lvs.pendingPuts[grandchildRef.TargetHash()]; present {
|
||||
lvs.bs.SchedulePut(pending)
|
||||
dataPut += len(pending.Data())
|
||||
delete(lvs.pendingPuts, grandchildRef.TargetHash())
|
||||
}
|
||||
})
|
||||
@@ -314,13 +274,13 @@ func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64, hints
|
||||
}
|
||||
}
|
||||
if height == 0 { // This can happen if there are no pending parents
|
||||
var pc pendingChunk
|
||||
for tallest, pc = range lvs.pendingPuts {
|
||||
var chunk chunks.Chunk
|
||||
for tallest, chunk = range lvs.pendingPuts {
|
||||
// Any pendingPut is as good as another in this case, so take the first one
|
||||
break
|
||||
}
|
||||
lvs.bs.SchedulePut(pc.c, pc.height, pc.hints)
|
||||
lvs.pendingPutSize -= uint64(len(pc.c.Data()))
|
||||
lvs.bs.SchedulePut(chunk)
|
||||
lvs.pendingPutSize -= uint64(len(chunk.Data()))
|
||||
delete(lvs.pendingPuts, tallest)
|
||||
continue
|
||||
}
|
||||
@@ -335,34 +295,24 @@ func (lvs *ValueStore) Flush(root hash.Hash) {
|
||||
lvs.pendingMu.Lock()
|
||||
defer lvs.pendingMu.Unlock()
|
||||
|
||||
pc, present := lvs.pendingPuts[root]
|
||||
pending, present := lvs.pendingPuts[root]
|
||||
if !present {
|
||||
return
|
||||
}
|
||||
|
||||
put := func(h hash.Hash, pc pendingChunk) uint64 {
|
||||
lvs.bs.SchedulePut(pc.c, pc.height, pc.hints)
|
||||
put := func(h hash.Hash, chunk chunks.Chunk) uint64 {
|
||||
lvs.bs.SchedulePut(chunk)
|
||||
delete(lvs.pendingPuts, h)
|
||||
return uint64(len(pc.c.Data()))
|
||||
return uint64(len(chunk.Data()))
|
||||
}
|
||||
v := DecodeValue(pc.c, lvs)
|
||||
v := DecodeValue(pending, lvs)
|
||||
v.WalkRefs(func(reachable Ref) {
|
||||
if pc, present := lvs.pendingPuts[reachable.TargetHash()]; present {
|
||||
lvs.pendingPutSize -= put(reachable.TargetHash(), pc)
|
||||
if pending, present := lvs.pendingPuts[reachable.TargetHash()]; present {
|
||||
lvs.pendingPutSize -= put(reachable.TargetHash(), pending)
|
||||
}
|
||||
})
|
||||
delete(lvs.pendingParents, root) // If not present, this is idempotent
|
||||
lvs.pendingPutSize -= put(root, pc)
|
||||
|
||||
// Merge in pending hints
|
||||
lvs.cacheMu.Lock()
|
||||
defer lvs.cacheMu.Unlock()
|
||||
for h, entry := range lvs.pendingHints {
|
||||
if _, present := lvs.pendingPuts[h]; !present {
|
||||
lvs.hintCache[h] = entry
|
||||
delete(lvs.pendingHints, h)
|
||||
}
|
||||
}
|
||||
lvs.pendingPutSize -= put(root, pending)
|
||||
}()
|
||||
lvs.bs.Flush()
|
||||
}
|
||||
@@ -377,47 +327,6 @@ func (lvs *ValueStore) Close() error {
|
||||
return lvs.bs.Close()
|
||||
}
|
||||
|
||||
// setHintsForReachable looks at the Chunks reachable from v and, for each one checks if there's a hint in the hintCache. If there isn't, or if the hint is a self-reference, the chunk gets r set as its new hint.
|
||||
func (lvs *ValueStore) setHintsForReachable(v Value, r hash.Hash, toPending bool) {
|
||||
v.WalkRefs(func(reachable Ref) {
|
||||
hash := reachable.TargetHash()
|
||||
if cur := lvs.check(hash); cur == nil || cur.Hint().IsEmpty() || cur.Hint() == hash {
|
||||
lvs.set(hash, hintedChunk{getTargetType(reachable), r}, toPending)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) isPresent(r hash.Hash) (present bool) {
|
||||
if entry := lvs.check(r); entry != nil && entry.Present() {
|
||||
present = true
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) check(r hash.Hash) chunkCacheEntry {
|
||||
lvs.cacheMu.RLock()
|
||||
defer lvs.cacheMu.RUnlock()
|
||||
return lvs.hintCache[r]
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) set(r hash.Hash, entry chunkCacheEntry, toPending bool) {
|
||||
lvs.cacheMu.Lock()
|
||||
defer lvs.cacheMu.Unlock()
|
||||
if toPending {
|
||||
lvs.pendingHints[r] = entry
|
||||
} else {
|
||||
lvs.hintCache[r] = entry
|
||||
}
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) chunkHintsFromCache(v Value) Hints {
|
||||
return lvs.checkChunksInCache(v, false)
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) ensureChunksInCache(v Value) {
|
||||
lvs.checkChunksInCache(v, true)
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) opCache() opCache {
|
||||
lvs.once.Do(func() {
|
||||
lvs.opcStore = newLdbOpCacheStore(lvs)
|
||||
@@ -425,98 +334,8 @@ func (lvs *ValueStore) opCache() opCache {
|
||||
return lvs.opcStore.opCache()
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints {
|
||||
hints := map[hash.Hash]struct{}{}
|
||||
collectHints := func(reachable Ref) {
|
||||
// First, check the hintCache to see if reachable is already known to be valid.
|
||||
targetHash := reachable.TargetHash()
|
||||
entry := lvs.check(targetHash)
|
||||
|
||||
// If it's not already in the hintCache, attempt to read the value directly, which will put it and its chunks into the hintCache.
|
||||
var reachableV Value
|
||||
if entry == nil || !entry.Present() {
|
||||
if readValues {
|
||||
// TODO: log or report that we needed to ReadValue here BUG 1762
|
||||
reachableV = lvs.ReadValue(targetHash)
|
||||
entry = lvs.check(targetHash)
|
||||
}
|
||||
if reachableV == nil {
|
||||
d.Chk.Fail("Attempted to write Value containing Ref to non-existent object.", "%s\n, contains ref %s, which points to a non-existent Value.", v.Hash(), reachable.TargetHash())
|
||||
}
|
||||
}
|
||||
if hint := entry.Hint(); !hint.IsEmpty() {
|
||||
hints[hint] = struct{}{}
|
||||
}
|
||||
|
||||
targetType := getTargetType(reachable)
|
||||
if entry.typeOf().TargetKind() == ValueKind && targetType.TargetKind() != ValueKind {
|
||||
// We've seen targetHash before, but only in a Ref<Value>, and reachable has a more specific type than that. Deref reachable to check the real type on the chunk it points to, and cache the result if everything checks out.
|
||||
if reachableV == nil {
|
||||
reachableV = lvs.ReadValue(targetHash)
|
||||
}
|
||||
entry = hintedChunk{TypeOf(reachableV), entry.Hint()}
|
||||
lvs.set(targetHash, entry, false)
|
||||
}
|
||||
// At this point, entry should have the most specific type info possible. Unless it matches targetType, or targetType is 'Value', bail.
|
||||
if !IsSubtype(targetType, entry.typeOf()) {
|
||||
// TODO: This should really be!
|
||||
// if !(targetType.TargetKind() == ValueKind || entry.Type().Equals(targetType)) {
|
||||
// https://github.com/attic-labs/noms/issues/3325
|
||||
|
||||
d.Panic("Value to write contains ref %s, which points to a value of type %s which is not a subtype of %s", reachable.TargetHash(), entry.typeOf().Describe(), targetType.Describe())
|
||||
}
|
||||
}
|
||||
v.WalkRefs(collectHints)
|
||||
return hints
|
||||
}
|
||||
|
||||
func getTargetType(refBase Ref) *Type {
|
||||
refType := TypeOf(refBase)
|
||||
d.PanicIfFalse(RefKind == refType.TargetKind())
|
||||
return refType.Desc.(CompoundDesc).ElemTypes[0]
|
||||
}
|
||||
|
||||
type hintedChunk struct {
|
||||
t *Type
|
||||
hint hash.Hash
|
||||
}
|
||||
|
||||
func (h hintedChunk) Present() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (h hintedChunk) Hint() (r hash.Hash) {
|
||||
return h.hint
|
||||
}
|
||||
|
||||
func (h hintedChunk) typeOf() *Type {
|
||||
return h.t
|
||||
}
|
||||
|
||||
type presentChunk Type
|
||||
|
||||
func (p *presentChunk) Present() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *presentChunk) Hint() (h hash.Hash) {
|
||||
return
|
||||
}
|
||||
|
||||
func (p *presentChunk) typeOf() *Type {
|
||||
return (*Type)(p)
|
||||
}
|
||||
|
||||
type absentChunk struct{}
|
||||
|
||||
func (a absentChunk) Present() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (a absentChunk) Hint() (r hash.Hash) {
|
||||
return
|
||||
}
|
||||
|
||||
func (a absentChunk) typeOf() *Type {
|
||||
panic("Not reached. Should never call typeOf() on an absentChunk.")
|
||||
}
|
||||
|
||||
@@ -21,7 +21,9 @@ func TestValueReadWriteRead(t *testing.T) {
|
||||
h := vs.WriteValue(s).TargetHash()
|
||||
vs.Flush(h)
|
||||
v := vs.ReadValue(h) // non-nil
|
||||
assert.True(s.Equals(v))
|
||||
if assert.NotNil(v) {
|
||||
assert.True(s.Equals(v), "%s != %s", EncodedValue(s), EncodedValue(v))
|
||||
}
|
||||
}
|
||||
|
||||
func TestValueReadMany(t *testing.T) {
|
||||
@@ -78,105 +80,6 @@ func TestValueWriteFlush(t *testing.T) {
|
||||
assert.Zero(vs.pendingPutSize)
|
||||
}
|
||||
|
||||
func TestCheckChunksInCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
cvs.set(b.Hash(), hintedChunk{TypeOf(b), b.Hash()}, false)
|
||||
|
||||
bref := NewRef(b)
|
||||
assert.NotPanics(func() { cvs.chunkHintsFromCache(bref) })
|
||||
}
|
||||
|
||||
func TestCheckChunksInCachePostCommit(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
vs := NewTestValueStore()
|
||||
|
||||
l := NewList()
|
||||
r := NewRef(l)
|
||||
i := 0
|
||||
for r.Height() == 1 {
|
||||
l = l.Append(Number(i))
|
||||
r = NewRef(l)
|
||||
i++
|
||||
}
|
||||
|
||||
h := vs.WriteValue(l).TargetHash()
|
||||
// Hints for leaf sequences should be absent prior to Flush...
|
||||
l.WalkRefs(func(ref Ref) {
|
||||
assert.True(vs.check(ref.TargetHash()).Hint().IsEmpty())
|
||||
})
|
||||
vs.Flush(h)
|
||||
// ...And present afterwards
|
||||
l.WalkRefs(func(ref Ref) {
|
||||
assert.True(vs.check(ref.TargetHash()).Hint() == l.Hash())
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckChunksNotInCacheAfterReadingNovelValue(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
vs := NewTestValueStore()
|
||||
|
||||
l := NewList()
|
||||
r := NewRef(l)
|
||||
i := 0
|
||||
for r.Height() == 1 {
|
||||
l = l.Append(Number(i))
|
||||
r = NewRef(l)
|
||||
i++
|
||||
}
|
||||
|
||||
h := vs.WriteValue(l).TargetHash()
|
||||
// Hints for leaf sequences should be absent prior to ReadValue...
|
||||
l.WalkRefs(func(ref Ref) {
|
||||
assert.True(vs.check(ref.TargetHash()).Hint().IsEmpty())
|
||||
})
|
||||
vs.ReadValue(h)
|
||||
// ...And remain absent!
|
||||
l.WalkRefs(func(ref Ref) {
|
||||
assert.False(vs.check(ref.TargetHash()).Hint() == l.Hash())
|
||||
})
|
||||
}
|
||||
|
||||
func TestCheckChunksInCacheRefValue(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
l := NewList()
|
||||
r := NewRef(l)
|
||||
i := 0
|
||||
for r.Height() == 1 {
|
||||
l = l.Append(Number(i))
|
||||
r = NewRef(l)
|
||||
i++
|
||||
}
|
||||
|
||||
r = cvs.WriteValue(l)
|
||||
rr := cvs.WriteValue(ToRefOfValue(r))
|
||||
cvs.Flush(rr.TargetHash())
|
||||
|
||||
cvs2 := newLocalValueStore(cs)
|
||||
rv := cvs2.ReadValue(rr.TargetHash()).(Ref)
|
||||
assert.True(ValueType.Equals(getTargetType(rv)))
|
||||
assert.NotPanics(func() { cvs.chunkHintsFromCache(r) })
|
||||
}
|
||||
|
||||
func TestCheckChunksNotInCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
|
||||
bref := NewRef(b)
|
||||
assert.Panics(func() { cvs.chunkHintsFromCache(bref) })
|
||||
}
|
||||
|
||||
type checkingBatchStore struct {
|
||||
BatchStore
|
||||
a *assert.Assertions
|
||||
@@ -189,11 +92,12 @@ func (cbs *checkingBatchStore) expect(rs ...Ref) {
|
||||
}
|
||||
}
|
||||
|
||||
func (cbs *checkingBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) {
|
||||
func (cbs *checkingBatchStore) SchedulePut(c chunks.Chunk) {
|
||||
if cbs.a.NotZero(len(cbs.expectedOrder)) {
|
||||
cbs.a.Equal(cbs.expectedOrder[0], c.Hash())
|
||||
cbs.expectedOrder = cbs.expectedOrder[1:]
|
||||
}
|
||||
cbs.BatchStore.SchedulePut(c)
|
||||
}
|
||||
|
||||
func (cbs *checkingBatchStore) Flush() {
|
||||
@@ -253,92 +157,6 @@ func TestFlushOverSize(t *testing.T) {
|
||||
vs.Flush(l.Hash())
|
||||
}
|
||||
|
||||
func TestEnsureChunksInCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
s := String("oy")
|
||||
bref := NewRef(b)
|
||||
sref := NewRef(s)
|
||||
l := NewList(bref, sref)
|
||||
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
cs.Put(EncodeValue(s, nil))
|
||||
cs.Put(EncodeValue(l, nil))
|
||||
|
||||
assert.NotPanics(func() { cvs.ensureChunksInCache(bref) })
|
||||
assert.NotPanics(func() { cvs.ensureChunksInCache(l) })
|
||||
}
|
||||
|
||||
func TestEnsureChunksFails(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
bref := NewRef(b)
|
||||
assert.Panics(func() { cvs.ensureChunksInCache(bref) })
|
||||
|
||||
s := String("oy")
|
||||
cs.Put(EncodeValue(b, nil))
|
||||
cs.Put(EncodeValue(s, nil))
|
||||
|
||||
badRef := constructRef(MakeRefType(MakePrimitiveType(BoolKind)), s.Hash(), 1)
|
||||
l := NewList(bref, badRef)
|
||||
|
||||
cs.Put(EncodeValue(l, nil))
|
||||
assert.Panics(func() { cvs.ensureChunksInCache(l) })
|
||||
}
|
||||
|
||||
func TestCacheOnReadValue(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
cvs := newLocalValueStore(cs)
|
||||
|
||||
b := NewEmptyBlob()
|
||||
bref := cvs.WriteValue(b)
|
||||
r := cvs.WriteValue(bref)
|
||||
cvs.Flush(r.TargetHash())
|
||||
|
||||
cvs2 := newLocalValueStore(cs)
|
||||
v := cvs2.ReadValue(r.TargetHash())
|
||||
assert.True(bref.Equals(v))
|
||||
assert.True(cvs2.isPresent(b.Hash()))
|
||||
assert.True(cvs2.isPresent(bref.Hash()))
|
||||
}
|
||||
|
||||
func TestHintsOnCache(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cvs := newLocalValueStore(chunks.NewTestStore())
|
||||
|
||||
cr1 := cvs.WriteValue(Number(1))
|
||||
cr2 := cvs.WriteValue(Number(2))
|
||||
s1 := NewStruct("", StructData{
|
||||
"a": cr1,
|
||||
"b": cr2,
|
||||
})
|
||||
r := cvs.WriteValue(s1)
|
||||
cvs.Flush(r.TargetHash())
|
||||
v := cvs.ReadValue(r.TargetHash())
|
||||
|
||||
if assert.True(v.Equals(s1)) {
|
||||
cr3 := cvs.WriteValue(Number(3))
|
||||
s2 := NewStruct("", StructData{
|
||||
"a": cr1,
|
||||
"b": cr2,
|
||||
"c": cr3,
|
||||
})
|
||||
|
||||
hints := cvs.chunkHintsFromCache(s2)
|
||||
if assert.Len(hints, 1) {
|
||||
_, present := hints[r.TargetHash()]
|
||||
assert.True(present)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPanicOnReadBadVersion(t *testing.T) {
|
||||
cvs := newLocalValueStore(&badVersionStore{chunks.NewTestStore()})
|
||||
assert.Panics(t, func() { cvs.ReadValue(hash.Hash{}) })
|
||||
|
||||
@@ -1 +1 @@
|
||||
2:7.4:rio54mdafn882tk79i32ggkg8q118fdb:o9iif7sno01q3qef8qubatd1kmlsmj9t:2:ogs2jpuuc89u7hpliadrjfqcpc749sel:2
|
||||
2:7.5:rio54mdafn882tk79i32ggkg8q118fdb:ogs2jpuuc89u7hpliadrjfqcpc749sel:2:o9iif7sno01q3qef8qubatd1kmlsmj9t:2
|
||||
Reference in New Issue
Block a user