diff --git a/go/chunks/chunk_store.go b/go/chunks/chunk_store.go index b9bd53385f..3351eda4ec 100644 --- a/go/chunks/chunk_store.go +++ b/go/chunks/chunk_store.go @@ -49,9 +49,14 @@ type ChunkSource interface { // found. Any non-present chunks will silently be ignored. GetMany(hashes hash.HashSet, foundChunks chan *Chunk) - // Returns true iff the value at the address |h| is contained in the source + // Returns true iff the value at the address |h| is contained in the + // source Has(h hash.Hash) bool + // Returns a new HashSet containing any members of |hashes| that are + // present in the source. + HasMany(hashes hash.HashSet) (present hash.HashSet) + // Returns the NomsVersion with which this ChunkSource is compatible. Version() string } diff --git a/go/chunks/dynamo_store.go b/go/chunks/dynamo_store.go index 653764a4ae..227282e4ce 100644 --- a/go/chunks/dynamo_store.go +++ b/go/chunks/dynamo_store.go @@ -137,6 +137,16 @@ func (s *DynamoStore) Has(h hash.Hash) bool { return <-ch } +func (s *DynamoStore) HasMany(hashes hash.HashSet) hash.HashSet { + present := hash.HashSet{} + for h := range hashes { + if s.Has(h) { + present.Insert(h) + } + } + return present +} + func (s *DynamoStore) PutMany(chunks []Chunk) { for _, c := range chunks { s.Put(c) diff --git a/go/chunks/memory_store.go b/go/chunks/memory_store.go index b25c62b686..3158c8f518 100644 --- a/go/chunks/memory_store.go +++ b/go/chunks/memory_store.go @@ -52,6 +52,16 @@ func (ms *MemoryStore) Has(r hash.Hash) bool { return ok } +func (ms *MemoryStore) HasMany(hashes hash.HashSet) hash.HashSet { + present := hash.HashSet{} + for h := range hashes { + if ms.Has(h) { + present.Insert(h) + } + } + return present +} + func (ms *MemoryStore) Version() string { return constants.NomsVersion } diff --git a/go/chunks/test_utils.go b/go/chunks/test_utils.go index f1a108a58c..c3b20a8ed9 100644 --- a/go/chunks/test_utils.go +++ b/go/chunks/test_utils.go @@ -46,6 +46,11 @@ func (s *TestStore) Has(h hash.Hash) bool { return s.MemoryStore.Has(h) } +func (s *TestStore) HasMany(hashes hash.HashSet) hash.HashSet { + s.Hases += len(hashes) + return s.MemoryStore.HasMany(hashes) +} + func (s *TestStore) Put(c Chunk) { s.Writes++ s.MemoryStore.Put(c) diff --git a/go/constants/version.go b/go/constants/version.go index 186946fc6c..ce62072e08 100644 --- a/go/constants/version.go +++ b/go/constants/version.go @@ -11,7 +11,7 @@ import ( ) // TODO: generate this from some central thing with go generate. -const NomsVersion = "7.4" +const NomsVersion = "7.5" const NOMS_VERSION_NEXT_ENV_NAME = "NOMS_VERSION_NEXT" const NOMS_VERSION_NEXT_ENV_VALUE = "1" diff --git a/go/datas/database_common.go b/go/datas/database_common.go index 3a9659dd10..0ef47fc9da 100644 --- a/go/datas/database_common.go +++ b/go/datas/database_common.go @@ -31,6 +31,10 @@ func newDatabaseCommon(cch *cachingChunkHaver, vs *types.ValueStore, rt chunks.R return databaseCommon{ValueStore: vs, cch: cch, rt: rt, rootHash: rt.Root()} } +func (dbc *databaseCommon) validatingBatchStore() types.BatchStore { + return dbc.BatchStore() +} + func (dbc *databaseCommon) Datasets() types.Map { if dbc.datasets == nil { if dbc.rootHash.IsEmpty() { diff --git a/go/datas/database_test.go b/go/datas/database_test.go index f80331cf09..64c5d89079 100644 --- a/go/datas/database_test.go +++ b/go/datas/database_test.go @@ -98,8 +98,10 @@ func (suite *DatabaseSuite) TestReadWriteCachePersists() { suite.NoError(err) } -func (suite *DatabaseSuite) TestWriteRefToNonexistentValue() { - suite.Panics(func() { suite.db.WriteValue(types.NewRef(types.Bool(true))) }) +func (suite *RemoteDatabaseSuite) TestWriteRefToNonexistentValue() { + ds := suite.db.GetDataset("foo") + r := types.NewRef(types.Bool(true)) + suite.Panics(func() { suite.db.CommitValue(ds, r) }) } func (suite *DatabaseSuite) TestTolerateUngettableRefs() { diff --git a/go/datas/http_batch_store.go b/go/datas/http_batch_store.go index 1635a54cc9..1be65f3454 100644 --- a/go/datas/http_batch_store.go +++ b/go/datas/http_batch_store.go @@ -22,7 +22,6 @@ import ( "github.com/attic-labs/noms/go/d" "github.com/attic-labs/noms/go/hash" "github.com/attic-labs/noms/go/nbs" - "github.com/attic-labs/noms/go/types" "github.com/attic-labs/noms/go/util/verbose" "github.com/golang/snappy" "github.com/julienschmidt/httprouter" @@ -48,16 +47,13 @@ type httpBatchStore struct { auth string getQueue chan chunks.ReadRequest hasQueue chan chunks.ReadRequest - writeQueue chan writeRequest finishedChan chan struct{} rateLimit chan struct{} requestWg *sync.WaitGroup workerWg *sync.WaitGroup - flushOrder nbs.EnumerationOrder cacheMu *sync.RWMutex unwrittenPuts *nbs.NomsBlockCache - hints types.Hints } func NewHTTPBatchStore(baseURL, auth string) *httpBatchStore { @@ -73,15 +69,12 @@ func NewHTTPBatchStore(baseURL, auth string) *httpBatchStore { auth: auth, getQueue: make(chan chunks.ReadRequest, readBufferSize), hasQueue: make(chan chunks.ReadRequest, readBufferSize), - writeQueue: make(chan writeRequest, writeBufferSize), finishedChan: make(chan struct{}), rateLimit: make(chan struct{}, httpChunkSinkConcurrency), requestWg: &sync.WaitGroup{}, workerWg: &sync.WaitGroup{}, - flushOrder: nbs.InsertOrder, cacheMu: &sync.RWMutex{}, unwrittenPuts: nbs.NewCache(), - hints: types.Hints{}, } buffSink.batchGetRequests() buffSink.batchHasRequests() @@ -92,17 +85,6 @@ type httpDoer interface { Do(req *http.Request) (resp *http.Response, err error) } -type writeRequest struct { - hints types.Hints - justHints bool -} - -func (bhcs *httpBatchStore) SetReverseFlushOrder() { - bhcs.cacheMu.Lock() - defer bhcs.cacheMu.Unlock() - bhcs.flushOrder = nbs.ReverseOrder -} - func (bhcs *httpBatchStore) Flush() { bhcs.sendWriteRequests() bhcs.requestWg.Wait() @@ -116,7 +98,6 @@ func (bhcs *httpBatchStore) Close() (e error) { close(bhcs.getQueue) close(bhcs.hasQueue) - close(bhcs.writeQueue) close(bhcs.rateLimit) bhcs.cacheMu.Lock() @@ -335,21 +316,10 @@ func resBodyReader(res *http.Response) (reader io.ReadCloser) { return } -func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) { +func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk) { bhcs.cacheMu.RLock() defer bhcs.cacheMu.RUnlock() bhcs.unwrittenPuts.Insert(c) - for hint := range hints { - bhcs.hints[hint] = struct{}{} - } -} - -func (bhcs *httpBatchStore) AddHints(hints types.Hints) { - bhcs.cacheMu.RLock() - defer bhcs.cacheMu.RUnlock() - for hint := range hints { - bhcs.hints[hint] = struct{}{} - } } func (bhcs *httpBatchStore) sendWriteRequests() { @@ -358,7 +328,6 @@ func (bhcs *httpBatchStore) sendWriteRequests() { bhcs.cacheMu.Lock() defer func() { - bhcs.flushOrder = nbs.InsertOrder // This needs to happen even if no chunks get written. bhcs.cacheMu.Unlock() }() @@ -369,47 +338,29 @@ func (bhcs *httpBatchStore) sendWriteRequests() { defer func() { bhcs.unwrittenPuts.Destroy() bhcs.unwrittenPuts = nbs.NewCache() - bhcs.hints = types.Hints{} }() - var res *http.Response - var err error - for tryAgain := true; tryAgain; { - verbose.Log("Sending %d chunks", count) - chunkChan := make(chan *chunks.Chunk, 1024) - go func() { - bhcs.unwrittenPuts.ExtractChunks(bhcs.flushOrder, chunkChan) - close(chunkChan) - }() + verbose.Log("Sending %d chunks", count) + chunkChan := make(chan *chunks.Chunk, 1024) + go func() { + bhcs.unwrittenPuts.ExtractChunks(chunkChan) + close(chunkChan) + }() - body := buildWriteValueRequest(chunkChan, bhcs.hints) - url := *bhcs.host - url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath) - // TODO: Make this accept snappy encoding - req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{ - "Accept-Encoding": {"gzip"}, - "Content-Encoding": {"x-snappy-framed"}, - "Content-Type": {"application/octet-stream"}, - }) + body := buildWriteValueRequest(chunkChan) + url := *bhcs.host + url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath) + // TODO: Make this accept snappy encoding + req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{ + "Accept-Encoding": {"gzip"}, + "Content-Encoding": {"x-snappy-framed"}, + "Content-Type": {"application/octet-stream"}, + }) - res, err = bhcs.httpClient.Do(req) - d.PanicIfError(err) - expectVersion(res) - defer closeResponse(res.Body) - - if tryAgain = res.StatusCode == http.StatusTooManyRequests; tryAgain { - reader := res.Body - if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") { - gr, err := gzip.NewReader(reader) - d.PanicIfError(err) - defer gr.Close() - reader = gr - } - /*hashes :=*/ deserializeHashes(reader) - // TODO: BUG 1259 Since the client must currently send all chunks in one batch, the only thing to do in response to backpressure is send EVERYTHING again. Once batching is again possible, this code should figure out how to resend the chunks indicated by hashes. - verbose.Log("Retrying...") - } - } + res, err := bhcs.httpClient.Do(req) + d.PanicIfError(err) + expectVersion(res) + defer closeResponse(res.Body) if http.StatusCreated != res.StatusCode { d.Panic("Unexpected response: %s", formatErrorResponse(res)) diff --git a/go/datas/http_batch_store_test.go b/go/datas/http_batch_store_test.go index 8e96830d47..fbdaa0f4b9 100644 --- a/go/datas/http_batch_store_test.go +++ b/go/datas/http_batch_store_test.go @@ -128,7 +128,7 @@ func (suite *HTTPBatchStoreSuite) TearDownTest() { func (suite *HTTPBatchStoreSuite) TestPutChunk() { c := types.EncodeValue(types.String("abc"), nil) - suite.store.SchedulePut(c, 1, types.Hints{}) + suite.store.SchedulePut(c) suite.store.Flush() suite.Equal(1, suite.cs.Writes) @@ -141,43 +141,10 @@ func (suite *HTTPBatchStoreSuite) TestPutChunksInOrder() { } l := types.NewList() for _, val := range vals { - suite.store.SchedulePut(types.EncodeValue(val, nil), 1, types.Hints{}) + suite.store.SchedulePut(types.EncodeValue(val, nil)) l = l.Append(types.NewRef(val)) } - suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{}) - suite.store.Flush() - - suite.Equal(3, suite.cs.Writes) -} - -func (suite *HTTPBatchStoreSuite) TestPutChunksReverseOrder() { - val := types.String("abc") - l := types.NewList(types.NewRef(val)) - - suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{}) - suite.store.SchedulePut(types.EncodeValue(val, nil), 1, types.Hints{}) - suite.store.SetReverseFlushOrder() - suite.store.Flush() - - suite.Equal(2, suite.cs.Writes) -} - -func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() { - vals := []types.Value{ - types.String("abc"), - types.String("def"), - } - chnx := []chunks.Chunk{ - types.EncodeValue(vals[0], nil), - types.EncodeValue(vals[1], nil), - } - suite.cs.PutMany(chnx) - l := types.NewList(types.NewRef(vals[0]), types.NewRef(vals[1])) - - suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{ - chnx[0].Hash(): struct{}{}, - chnx[1].Hash(): struct{}{}, - }) + suite.store.SchedulePut(types.EncodeValue(l, nil)) suite.store.Flush() suite.Equal(3, suite.cs.Writes) @@ -251,8 +218,8 @@ func (suite *HTTPBatchStoreSuite) TestGetManyAllCached() { chunks.NewChunk([]byte("abc")), chunks.NewChunk([]byte("def")), } - suite.store.SchedulePut(chnx[0], 1, types.Hints{}) - suite.store.SchedulePut(chnx[1], 1, types.Hints{}) + suite.store.SchedulePut(chnx[0]) + suite.store.SchedulePut(chnx[1]) hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash()) foundChunks := make(chan *chunks.Chunk) @@ -271,7 +238,7 @@ func (suite *HTTPBatchStoreSuite) TestGetManySomeCached() { } cached := chunks.NewChunk([]byte("ghi")) suite.cs.PutMany(chnx) - suite.store.SchedulePut(cached, 1, types.Hints{}) + suite.store.SchedulePut(cached) hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), cached.Hash()) foundChunks := make(chan *chunks.Chunk) diff --git a/go/datas/local_batch_store.go b/go/datas/local_batch_store.go index 946cd0ee31..c72ced3c79 100644 --- a/go/datas/local_batch_store.go +++ b/go/datas/local_batch_store.go @@ -11,31 +11,27 @@ import ( "github.com/attic-labs/noms/go/constants" "github.com/attic-labs/noms/go/d" "github.com/attic-labs/noms/go/hash" + "github.com/attic-labs/noms/go/nbs" "github.com/attic-labs/noms/go/types" ) type localBatchStore struct { cs chunks.ChunkStore - unwrittenPuts *orderedChunkCache + unwrittenPuts *nbs.NomsBlockCache vbs *types.ValidatingBatchingSink - hints types.Hints - hashes hash.HashSet - mu *sync.Mutex once sync.Once } func newLocalBatchStore(cs chunks.ChunkStore) *localBatchStore { return &localBatchStore{ cs: cs, - unwrittenPuts: newOrderedChunkCache(), - vbs: types.NewValidatingBatchingSink(cs), - hints: types.Hints{}, - hashes: hash.HashSet{}, - mu: &sync.Mutex{}, + unwrittenPuts: nbs.NewCache(), + vbs: types.NewCompletenessCheckingBatchingSink(cs), } } -// Get checks the internal Chunk cache, proxying to the backing ChunkStore if not present. +// Get checks the internal Chunk cache, proxying to the backing ChunkStore if +// not present. func (lbs *localBatchStore) Get(h hash.Hash) chunks.Chunk { lbs.once.Do(lbs.expectVersion) if pending := lbs.unwrittenPuts.Get(h); !pending.IsEmpty() { @@ -45,27 +41,23 @@ func (lbs *localBatchStore) Get(h hash.Hash) chunks.Chunk { } func (lbs *localBatchStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) { - lbs.cs.GetMany(hashes, foundChunks) -} - -// Has checks the internal Chunk cache, proxying to the backing ChunkStore if not present. -func (lbs *localBatchStore) Has(h hash.Hash) bool { - lbs.once.Do(lbs.expectVersion) - if lbs.unwrittenPuts.has(h) { - return true + remaining := make(hash.HashSet, len(hashes)) + for h := range hashes { + remaining.Insert(h) } - return lbs.cs.Has(h) + localChunks := make(chan *chunks.Chunk) + go func() { defer close(localChunks); lbs.unwrittenPuts.GetMany(hashes, localChunks) }() + for c := range localChunks { + remaining.Remove(c.Hash()) + foundChunks <- c + } + lbs.cs.GetMany(remaining, foundChunks) } -// SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints. -func (lbs *localBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) { +// SchedulePut simply calls Put on the underlying ChunkStore. +func (lbs *localBatchStore) SchedulePut(c chunks.Chunk) { lbs.once.Do(lbs.expectVersion) - - lbs.unwrittenPuts.Insert(c, refHeight) - lbs.mu.Lock() - defer lbs.mu.Unlock() - lbs.hashes.Insert(c.Hash()) - lbs.AddHints(hints) + lbs.unwrittenPuts.Insert(c) } func (lbs *localBatchStore) expectVersion() { @@ -80,53 +72,44 @@ func (lbs *localBatchStore) Root() hash.Hash { return lbs.cs.Root() } -// UpdateRoot flushes outstanding writes to the backing ChunkStore before updating its Root, because it's almost certainly the case that the caller wants to point that root at some recently-Put Chunk. +// UpdateRoot flushes outstanding writes to the backing ChunkStore before +// updating its Root, because it's almost certainly the case that the caller +// wants to point that root at some recently-Put Chunk. func (lbs *localBatchStore) UpdateRoot(current, last hash.Hash) bool { lbs.once.Do(lbs.expectVersion) lbs.Flush() return lbs.cs.UpdateRoot(current, last) } -func (lbs *localBatchStore) AddHints(hints types.Hints) { - for h := range hints { - lbs.hints[h] = struct{}{} - } -} - func (lbs *localBatchStore) Flush() { lbs.once.Do(lbs.expectVersion) chunkChan := make(chan *chunks.Chunk, 128) go func() { - err := lbs.unwrittenPuts.ExtractChunks(lbs.hashes, chunkChan) - d.PanicIfError(err) - close(chunkChan) + defer close(chunkChan) + lbs.unwrittenPuts.ExtractChunks(chunkChan) }() - lbs.vbs.Prepare(lbs.hints) for c := range chunkChan { dc := lbs.vbs.DecodeUnqueued(c) - lbs.vbs.Enqueue(*dc.Chunk, *dc.Value) + lbs.vbs.Put(*dc.Chunk, *dc.Value) } + lbs.vbs.PanicIfDangling() lbs.vbs.Flush() - lbs.unwrittenPuts.Clear(lbs.hashes) - lbs.hashes = hash.HashSet{} - lbs.hints = types.Hints{} -} - -// FlushAndDestroyWithoutClose flushes lbs and destroys its cache of unwritten chunks. It's needed because LocalDatabase wraps a localBatchStore around a ChunkStore that's used by a separate BatchStore, so calling Close() on one is semantically incorrect while it still wants to use the other. -func (lbs *localBatchStore) FlushAndDestroyWithoutClose() { - lbs.Flush() lbs.unwrittenPuts.Destroy() + lbs.unwrittenPuts = nbs.NewCache() } -// Destroy blows away lbs' cache of unwritten chunks without flushing. Used when the owning Database is closing and it isn't semantically correct to flush. +// Destroy blows away lbs' cache of unwritten chunks without flushing. Used +// when the owning Database is closing and it isn't semantically correct to +// flush. func (lbs *localBatchStore) Destroy() { lbs.unwrittenPuts.Destroy() } -// Close is supposed to close the underlying ChunkStore, but the only place localBatchStore is currently used wants to keep the underlying ChunkStore open after it's done with lbs. Hence, the above method and the panic() here. +// Close closes the underlying ChunkStore. func (lbs *localBatchStore) Close() error { - panic("Unreached") + lbs.Flush() + return lbs.cs.Close() } diff --git a/go/datas/local_database.go b/go/datas/local_database.go index 723da3f54c..141c94f5d6 100644 --- a/go/datas/local_database.go +++ b/go/datas/local_database.go @@ -12,16 +12,12 @@ import ( // Database provides versioned storage for noms values. Each Database instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new Database, representing a new moment in history. type LocalDatabase struct { databaseCommon - cs chunks.ChunkStore - vbs *localBatchStore } func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase { - bs := types.NewBatchStoreAdaptor(cs) + bs := newLocalBatchStore(cs) return &LocalDatabase{ newDatabaseCommon(newCachingChunkHaver(cs), types.NewValueStore(bs), bs), - cs, - nil, } } @@ -53,25 +49,11 @@ func (ldb *LocalDatabase) FastForward(ds Dataset, newHeadRef types.Ref) (Dataset } func (ldb *LocalDatabase) doHeadUpdate(ds Dataset, updateFunc func(ds Dataset) error) (Dataset, error) { - if ldb.vbs != nil { - ldb.vbs.FlushAndDestroyWithoutClose() - ldb.vbs = nil - } err := updateFunc(ds) return ldb.GetDataset(ds.ID()), err } -func (ldb *LocalDatabase) validatingBatchStore() types.BatchStore { - if ldb.vbs == nil { - ldb.vbs = newLocalBatchStore(ldb.cs) - } - return ldb.vbs -} - func (ldb *LocalDatabase) Close() error { - if ldb.vbs != nil { - ldb.vbs.Destroy() - ldb.vbs = nil - } + ldb.BatchStore().(*localBatchStore).Destroy() return ldb.databaseCommon.Close() } diff --git a/go/datas/pull.go b/go/datas/pull.go index 29aa017008..b8b7bdddbb 100644 --- a/go/datas/pull.go +++ b/go/datas/pull.go @@ -115,9 +115,6 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency progressCh <- PullProgress{doneCount, knownCount + uint64(srcQ.Len()), approxBytesWritten} } - // hc and reachableChunks aren't goroutine-safe, so only write them here. - hc := hintCache{} - reachableChunks := hash.HashSet{} sampleSize := uint64(0) sampleCount := uint64(0) for !srcQ.Empty() { @@ -138,22 +135,17 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency case res := <-srcResChan: for _, reachable := range res.reachables { srcQ.PushBack(reachable) - reachableChunks.Insert(reachable.TargetHash()) } if res.writeBytes > 0 { sampleSize += uint64(res.writeBytes) sampleCount += 1 } - if !res.readHash.IsEmpty() { - reachableChunks.Remove(res.readHash) - } srcWork-- updateProgress(1, 0, uint64(res.readBytes), sampleSize/uint64(math.Max(1, float64(sampleCount)))) case res := <-sinkResChan: for _, reachable := range res.reachables { sinkQ.PushBack(reachable) - hc[reachable.TargetHash()] = res.readHash } sinkWork-- case res := <-comResChan: @@ -163,7 +155,6 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency if !isHeadOfSink { srcQ.PushBack(reachable) } - hc[reachable.TargetHash()] = res.readHash } comWork-- updateProgress(1, 0, uint64(res.readBytes), 0) @@ -174,14 +165,6 @@ func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency sinkQ.Unique() srcQ.Unique() } - - hints := types.Hints{} - for hash := range reachableChunks { - if hint, present := hc[hash]; present { - hints[hint] = struct{}{} - } - } - sinkDB.validatingBatchStore().AddHints(hints) } type traverseResult struct { @@ -265,7 +248,7 @@ func traverseSource(srcRef types.Ref, srcDB, sinkDB Database, estimateBytesWritt if v == nil { d.Panic("Expected decoded chunk to be non-nil.") } - sinkDB.validatingBatchStore().SchedulePut(c, srcRef.Height(), types.Hints{}) + sinkDB.validatingBatchStore().SchedulePut(c) bytesWritten := 0 if estimateBytesWritten { // TODO: Probably better to hide this behind the BatchStore abstraction since diff --git a/go/datas/put_cache.go b/go/datas/put_cache.go deleted file mode 100644 index 9a21dc0a2d..0000000000 --- a/go/datas/put_cache.go +++ /dev/null @@ -1,152 +0,0 @@ -// Copyright 2016 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package datas - -import ( - "bytes" - "encoding/binary" - "io/ioutil" - "os" - "sync" - - "github.com/attic-labs/noms/go/chunks" - "github.com/attic-labs/noms/go/d" - "github.com/attic-labs/noms/go/hash" - "github.com/golang/snappy" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/filter" - "github.com/syndtr/goleveldb/leveldb/opt" -) - -func newOrderedChunkCache() *orderedChunkCache { - dir, err := ioutil.TempDir("", "") - d.PanicIfError(err) - db, err := leveldb.OpenFile(dir, &opt.Options{ - Compression: opt.NoCompression, - Filter: filter.NewBloomFilter(10), // 10 bits/key - OpenFilesCacheCapacity: 24, - NoSync: true, // We dont need this data to be durable. LDB is acting as sorting temporary storage that can be larger than main memory. - WriteBuffer: 1 << 27, // 128MiB - }) - d.Chk.NoError(err, "opening put cache in %s", dir) - return &orderedChunkCache{ - orderedChunks: db, - chunkIndex: map[hash.Hash][]byte{}, - dbDir: dir, - mu: &sync.RWMutex{}, - } -} - -// orderedChunkCache holds Chunks, allowing them to be retrieved by hash or enumerated in ref-height order. -type orderedChunkCache struct { - orderedChunks *leveldb.DB - chunkIndex map[hash.Hash][]byte - dbDir string - mu *sync.RWMutex -} - -// Insert can be called from any goroutine to store c in the cache. If c is successfully added to the cache, Insert returns true. If c was already in the cache, Insert returns false. -func (p *orderedChunkCache) Insert(c chunks.Chunk, refHeight uint64) bool { - hash := c.Hash() - dbKey, present := func() (dbKey []byte, present bool) { - p.mu.Lock() - defer p.mu.Unlock() - if _, present = p.chunkIndex[hash]; !present { - dbKey = toDbKey(refHeight, c.Hash()) - p.chunkIndex[hash] = dbKey - } - return - }() - - if !present { - compressed := snappy.Encode(nil, c.Data()) - d.Chk.NoError(p.orderedChunks.Put(dbKey, compressed, nil)) - return true - } - return false -} - -func (p *orderedChunkCache) has(hash hash.Hash) (has bool) { - p.mu.RLock() - defer p.mu.RUnlock() - _, has = p.chunkIndex[hash] - return -} - -// Get can be called from any goroutine to retrieve the chunk referenced by hash. If the chunk is not present, Get returns the empty Chunk. -func (p *orderedChunkCache) Get(hash hash.Hash) chunks.Chunk { - // Don't use defer p.mu.RUnlock() here, because I want reading from orderedChunks NOT to be guarded by the lock. LevelDB handles its own goroutine-safety. - p.mu.RLock() - dbKey, ok := p.chunkIndex[hash] - p.mu.RUnlock() - - if !ok { - return chunks.EmptyChunk - } - compressed, err := p.orderedChunks.Get(dbKey, nil) - d.Chk.NoError(err) - data, err := snappy.Decode(nil, compressed) - d.Chk.NoError(err) - return chunks.NewChunkWithHash(hash, data) -} - -// Clear can be called from any goroutine to remove chunks referenced by the given hashes from the cache. -func (p *orderedChunkCache) Clear(hashes hash.HashSet) { - deleteBatch := &leveldb.Batch{} - p.mu.Lock() - for hash := range hashes { - deleteBatch.Delete(p.chunkIndex[hash]) - delete(p.chunkIndex, hash) - } - p.mu.Unlock() - d.Chk.NoError(p.orderedChunks.Write(deleteBatch, nil)) - return -} - -var uint64Size = binary.Size(uint64(0)) - -// toDbKey takes a refHeight and a hash and returns a binary key suitable for use with LevelDB. The default sort order used by LevelDB ensures that these keys (and their associated values) will be iterated in ref-height order. -func toDbKey(refHeight uint64, h hash.Hash) []byte { - buf := bytes.NewBuffer(make([]byte, 0, uint64Size+hash.ByteLen)) - err := binary.Write(buf, binary.BigEndian, refHeight) - d.Chk.NoError(err) - err = binary.Write(buf, binary.BigEndian, h[:]) - d.Chk.NoError(err) - return buf.Bytes() -} - -func fromDbKey(key []byte) (uint64, hash.Hash) { - refHeight := uint64(0) - r := bytes.NewReader(key) - err := binary.Read(r, binary.BigEndian, &refHeight) - d.Chk.NoError(err) - h := hash.Hash{} - err = binary.Read(r, binary.BigEndian, &h) - d.Chk.NoError(err) - return refHeight, h -} - -// ExtractChunks can be called from any goroutine to write Chunks referenced by the given hashes to w. The chunks are ordered by ref-height. Chunks of the same height are written in an unspecified order, relative to one another. -func (p *orderedChunkCache) ExtractChunks(hashes hash.HashSet, chunkChan chan *chunks.Chunk) error { - iter := p.orderedChunks.NewIterator(nil, nil) - defer iter.Release() - for iter.Next() { - _, hash := fromDbKey(iter.Key()) - if !hashes.Has(hash) { - continue - } - compressed := iter.Value() - data, err := snappy.Decode(nil, compressed) - d.Chk.NoError(err) - c := chunks.NewChunkWithHash(hash, data) - chunkChan <- &c - } - return nil -} - -func (p *orderedChunkCache) Destroy() error { - d.Chk.NoError(p.orderedChunks.Close()) - return os.RemoveAll(p.dbDir) -} diff --git a/go/datas/put_cache_test.go b/go/datas/put_cache_test.go deleted file mode 100644 index e4580a1c47..0000000000 --- a/go/datas/put_cache_test.go +++ /dev/null @@ -1,172 +0,0 @@ -// Copyright 2016 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package datas - -import ( - "math/rand" - "sync" - "testing" - - "github.com/attic-labs/noms/go/chunks" - "github.com/attic-labs/noms/go/hash" - "github.com/attic-labs/noms/go/types" - "github.com/attic-labs/testify/suite" -) - -func TestLevelDBPutCacheSuite(t *testing.T) { - suite.Run(t, &LevelDBPutCacheSuite{}) -} - -type LevelDBPutCacheSuite struct { - suite.Suite - cache *orderedChunkCache - values []types.Value - chnx map[hash.Hash]chunks.Chunk -} - -func (suite *LevelDBPutCacheSuite) SetupTest() { - suite.cache = newOrderedChunkCache() - suite.values = []types.Value{ - types.String("abc"), - types.String("def"), - types.String("ghi"), - types.String("jkl"), - types.String("mno"), - } - suite.chnx = map[hash.Hash]chunks.Chunk{} - for _, v := range suite.values { - suite.chnx[v.Hash()] = types.EncodeValue(v, nil) - } -} - -func (suite *LevelDBPutCacheSuite) TearDownTest() { - suite.cache.Destroy() -} - -func (suite *LevelDBPutCacheSuite) TestAddTwice() { - chunk := suite.chnx[suite.values[0].Hash()] - suite.True(suite.cache.Insert(chunk, 1)) - suite.False(suite.cache.Insert(chunk, 1)) -} - -func (suite *LevelDBPutCacheSuite) TestAddParallel() { - hashes := make(chan hash.Hash) - for _, chunk := range suite.chnx { - go func(c chunks.Chunk) { - suite.cache.Insert(c, 1) - hashes <- c.Hash() - }(chunk) - } - - for i := 0; i < len(suite.values); i++ { - hash := <-hashes - suite.True(suite.cache.has(hash)) - delete(suite.chnx, hash) - } - close(hashes) - suite.Len(suite.chnx, 0) -} - -func (suite *LevelDBPutCacheSuite) TestGetParallel() { - for _, c := range suite.chnx { - suite.cache.Insert(c, 1) - } - - chunkChan := make(chan chunks.Chunk) - for h := range suite.chnx { - go func(h hash.Hash) { - chunkChan <- suite.cache.Get(h) - }(h) - } - - for i := 0; i < len(suite.values); i++ { - c := <-chunkChan - delete(suite.chnx, c.Hash()) - } - close(chunkChan) - suite.Len(suite.chnx, 0) -} - -func (suite *LevelDBPutCacheSuite) TestClearParallel() { - keepIdx := 2 - toClear1, toClear2 := hash.HashSet{}, hash.HashSet{} - for i, v := range suite.values { - suite.cache.Insert(types.EncodeValue(v, nil), 1) - if i < keepIdx { - toClear1.Insert(v.Hash()) - } else if i > keepIdx { - toClear2.Insert(v.Hash()) - } - } - - wg := &sync.WaitGroup{} - wg.Add(2) - clear := func(hs hash.HashSet) { - suite.cache.Clear(hs) - wg.Done() - } - - go clear(toClear1) - go clear(toClear2) - - wg.Wait() - for i, v := range suite.values { - if i == keepIdx { - suite.True(suite.cache.has(v.Hash())) - continue - } - suite.False(suite.cache.has(v.Hash())) - } -} - -func (suite *LevelDBPutCacheSuite) TestReaderSubset() { - toExtract := hash.HashSet{} - for hash, c := range suite.chnx { - if len(toExtract) < 2 { - toExtract.Insert(hash) - } - suite.cache.Insert(c, 1) - } - - // Only iterate over the first 2 elements in the DB - chunkChan := suite.extractChunks(toExtract) - count := 0 - for c := range chunkChan { - if suite.Contains(toExtract, c.Hash()) { - count++ - } - } - suite.Equal(len(toExtract), count) -} - -func (suite *LevelDBPutCacheSuite) TestExtractChunksOrder() { - maxHeight := len(suite.chnx) - orderedHashes := make(hash.HashSlice, maxHeight) - toExtract := hash.HashSet{} - heights := rand.Perm(maxHeight) - for hash, c := range suite.chnx { - toExtract.Insert(hash) - orderedHashes[heights[0]] = hash - suite.cache.Insert(c, uint64(heights[0])) - heights = heights[1:] - } - - chunkChan := suite.extractChunks(toExtract) - for c := range chunkChan { - suite.Equal(orderedHashes[0], c.Hash()) - orderedHashes = orderedHashes[1:] - } - suite.Len(orderedHashes, 0) -} - -func (suite *LevelDBPutCacheSuite) extractChunks(hashes hash.HashSet) <-chan *chunks.Chunk { - chunkChan := make(chan *chunks.Chunk) - go func() { - err := suite.cache.ExtractChunks(hashes, chunkChan) - suite.NoError(err) - close(chunkChan) - }() - return chunkChan -} diff --git a/go/datas/remote_database_client.go b/go/datas/remote_database_client.go index 2206836e34..970bbcebf2 100644 --- a/go/datas/remote_database_client.go +++ b/go/datas/remote_database_client.go @@ -19,13 +19,6 @@ func NewRemoteDatabase(baseURL, auth string) *RemoteDatabaseClient { return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(httpBS), types.NewValueStore(httpBS), httpBS)} } -func (rdb *RemoteDatabaseClient) validatingBatchStore() types.BatchStore { - hbs := rdb.BatchStore().(*httpBatchStore) - // TODO: Get rid of this (BUG 2982) - hbs.SetReverseFlushOrder() - return hbs -} - func (rdb *RemoteDatabaseClient) GetDataset(datasetID string) Dataset { return getDataset(rdb, datasetID) } diff --git a/go/datas/remote_database_handlers.go b/go/datas/remote_database_handlers.go index a92e182ee8..969ea89fdd 100644 --- a/go/datas/remote_database_handlers.go +++ b/go/datas/remote_database_handlers.go @@ -137,7 +137,6 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs reader.Close() }() vbs := types.NewValidatingBatchingSink(cs) - vbs.Prepare(deserializeHints(reader)) // Deserialize chunks from reader in background, recovering from errors errChan := make(chan error) @@ -168,7 +167,7 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs dc := <-ch if dc.Chunk != nil && dc.Value != nil { totalDataWritten += len(dc.Chunk.Data()) - vbs.Enqueue(*dc.Chunk, *dc.Value) + vbs.Put(*dc.Chunk, *dc.Value) chunkCount++ if chunkCount%100 == 0 { verbose.Log("Enqueued %d chunks", chunkCount) @@ -178,20 +177,21 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs // If there was an error during chunk deserialization, raise so it can be logged and responded to. if err := <-errChan; err != nil { - panic(d.Wrap(fmt.Errorf("Deserialization failure: %v", err))) + d.Panic("Deserialization failure: %v", err) } + vbs.PanicIfDangling() vbs.Flush() + w.WriteHeader(http.StatusCreated) } // Contents of the returned io.Reader are snappy-compressed. -func buildWriteValueRequest(chunkChan chan *chunks.Chunk, hints map[hash.Hash]struct{}) io.Reader { +func buildWriteValueRequest(chunkChan chan *chunks.Chunk) io.Reader { body, pw := io.Pipe() go func() { gw := snappy.NewBufferedWriter(pw) - serializeHints(gw, hints) for c := range chunkChan { chunks.Serialize(*c, gw) } diff --git a/go/datas/remote_database_handlers_test.go b/go/datas/remote_database_handlers_test.go index 24986b5245..572982505a 100644 --- a/go/datas/remote_database_handlers_test.go +++ b/go/datas/remote_database_handlers_test.go @@ -36,14 +36,12 @@ func TestHandleWriteValue(t *testing.T) { _, err := db.CommitValue(db.GetDataset("datasetID"), r) assert.NoError(err) - hint := l.Hash() newItem := types.NewEmptyBlob() itemChunk := types.EncodeValue(newItem, nil) l2 := l.Insert(1, types.NewRef(newItem)) listChunk := types.EncodeValue(l2, nil) body := &bytes.Buffer{} - serializeHints(body, map[hash.Hash]struct{}{hint: {}}) chunks.Serialize(itemChunk, body) chunks.Serialize(listChunk, body) @@ -64,7 +62,6 @@ func TestHandleWriteValuePanic(t *testing.T) { cs := chunks.NewTestStore() body := &bytes.Buffer{} - serializeHints(body, types.Hints{}) body.WriteString("Bogus") w := httptest.NewRecorder() @@ -81,7 +78,6 @@ func TestHandleWriteValueDupChunks(t *testing.T) { itemChunk := types.EncodeValue(newItem, nil) body := &bytes.Buffer{} - serializeHints(body, map[hash.Hash]struct{}{}) // Write the same chunk to body enough times to be certain that at least one of the concurrent deserialize/decode passes completes before the last one can continue. for i := 0; i <= writeValueConcurrency; i++ { chunks.Serialize(itemChunk, body) @@ -112,21 +108,9 @@ func TestBuildWriteValueRequest(t *testing.T) { inChunkChan <- &chnx[1] close(inChunkChan) - hints := map[hash.Hash]struct{}{ - hash.Parse("00000000000000000000000000000002"): {}, - hash.Parse("00000000000000000000000000000003"): {}, - } - compressed := buildWriteValueRequest(inChunkChan, hints) + compressed := buildWriteValueRequest(inChunkChan) gr := snappy.NewReader(compressed) - count := 0 - for hint := range deserializeHints(gr) { - count++ - _, present := hints[hint] - assert.True(present) - } - assert.Equal(len(hints), count) - outChunkChan := make(chan *chunks.Chunk, len(chnx)) chunks.Deserialize(gr, outChunkChan) close(outChunkChan) diff --git a/go/datas/serialize_hints.go b/go/datas/serialize_hints.go deleted file mode 100644 index 013ae14b4a..0000000000 --- a/go/datas/serialize_hints.go +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2016 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package datas - -import ( - "bytes" - "encoding/binary" - "io" - - "github.com/attic-labs/noms/go/d" - "github.com/attic-labs/noms/go/hash" - "github.com/attic-labs/noms/go/types" -) - -func serializeHints(w io.Writer, hints types.Hints) { - err := binary.Write(w, binary.BigEndian, uint32(len(hints))) // 4 billion hints is probably absurd. Maybe this should be smaller? - d.Chk.NoError(err) - for r := range hints { - serializeHash(w, r) - } -} - -func serializeHashes(w io.Writer, hashes hash.HashSlice) { - err := binary.Write(w, binary.BigEndian, uint32(len(hashes))) // 4 billion hashes is probably absurd. Maybe this should be smaller? - d.Chk.NoError(err) - for _, r := range hashes { - serializeHash(w, r) - } -} - -func serializeHash(w io.Writer, h hash.Hash) { - n, err := io.Copy(w, bytes.NewReader(h[:])) - d.Chk.NoError(err) - d.PanicIfFalse(int64(hash.ByteLen) == n) -} - -func deserializeHints(reader io.Reader) types.Hints { - numRefs := uint32(0) - err := binary.Read(reader, binary.BigEndian, &numRefs) - d.Chk.NoError(err) - - hints := make(types.Hints, numRefs) - for i := uint32(0); i < numRefs; i++ { - hints[deserializeHash(reader)] = struct{}{} - } - return hints -} - -func deserializeHashes(reader io.Reader) hash.HashSlice { - numRefs := uint32(0) - err := binary.Read(reader, binary.BigEndian, &numRefs) - d.Chk.NoError(err) - - hashes := make(hash.HashSlice, numRefs) - for i := range hashes { - hashes[i] = deserializeHash(reader) - } - return hashes -} - -func deserializeHash(reader io.Reader) hash.Hash { - h := hash.Hash{} - n, err := io.ReadFull(reader, h[:]) - d.Chk.NoError(err) - d.PanicIfFalse(int(hash.ByteLen) == n) - return h -} diff --git a/go/datas/serialize_hints_test.go b/go/datas/serialize_hints_test.go deleted file mode 100644 index 627bad4703..0000000000 --- a/go/datas/serialize_hints_test.go +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2016 Attic Labs, Inc. All rights reserved. -// Licensed under the Apache License, version 2.0: -// http://www.apache.org/licenses/LICENSE-2.0 - -package datas - -import ( - "bytes" - "testing" - - "github.com/attic-labs/noms/go/hash" - "github.com/attic-labs/testify/assert" -) - -func TestHintRoundTrip(t *testing.T) { - b := &bytes.Buffer{} - input := map[hash.Hash]struct{}{ - hash.Parse("00000000000000000000000000000000"): {}, - hash.Parse("00000000000000000000000000000001"): {}, - hash.Parse("00000000000000000000000000000002"): {}, - hash.Parse("00000000000000000000000000000003"): {}, - } - serializeHints(b, input) - output := deserializeHints(b) - assert.Len(t, output, len(input), "Output has different number of elements than input: %v, %v", output, input) - for h := range output { - _, present := input[h] - assert.True(t, present, "%s is in output but not in input", h) - } -} diff --git a/go/nbs/benchmarks/block_store_benchmarks.go b/go/nbs/benchmarks/block_store_benchmarks.go index 0d253c968e..43736ee7d0 100644 --- a/go/nbs/benchmarks/block_store_benchmarks.go +++ b/go/nbs/benchmarks/block_store_benchmarks.go @@ -29,10 +29,10 @@ func writeToEmptyStore(store types.BatchStore, src *dataSource, t assert.Testing chunx := goReadChunks(src) for c := range chunx { - store.SchedulePut(*c, 1, types.Hints{}) + store.SchedulePut(*c) } newRoot := chunks.NewChunk([]byte("root")) - store.SchedulePut(newRoot, 1, types.Hints{}) + store.SchedulePut(newRoot) assert.True(t, store.UpdateRoot(newRoot.Hash(), root)) } @@ -49,7 +49,7 @@ func benchmarkNoRefreshWrite(openStore storeOpenFn, src *dataSource, t assert.Te store := openStore() chunx := goReadChunks(src) for c := range chunx { - store.SchedulePut(*c, 1, types.Hints{}) + store.SchedulePut(*c) } assert.NoError(t, store.Close()) } diff --git a/go/nbs/benchmarks/file_block_store.go b/go/nbs/benchmarks/file_block_store.go index 05c6587368..2809a4f431 100644 --- a/go/nbs/benchmarks/file_block_store.go +++ b/go/nbs/benchmarks/file_block_store.go @@ -32,13 +32,10 @@ func (fb fileBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.C panic("not impl") } -func (fb fileBlockStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) { +func (fb fileBlockStore) SchedulePut(c chunks.Chunk) { io.Copy(fb.bw, bytes.NewReader(c.Data())) } -func (fb fileBlockStore) AddHints(hints types.Hints) { -} - func (fb fileBlockStore) Flush() {} func (fb fileBlockStore) Close() error { diff --git a/go/nbs/benchmarks/null_block_store.go b/go/nbs/benchmarks/null_block_store.go index f711565a0f..f30f043393 100644 --- a/go/nbs/benchmarks/null_block_store.go +++ b/go/nbs/benchmarks/null_block_store.go @@ -26,11 +26,7 @@ func (nb nullBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.C panic("not impl") } -func (nb nullBlockStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) { -} - -func (nb nullBlockStore) AddHints(hints types.Hints) { -} +func (nb nullBlockStore) SchedulePut(c chunks.Chunk) {} func (nb nullBlockStore) Flush() {} diff --git a/go/nbs/block_store_test.go b/go/nbs/block_store_test.go index d8ecb235c4..c7a512af40 100644 --- a/go/nbs/block_store_test.go +++ b/go/nbs/block_store_test.go @@ -158,23 +158,13 @@ func (suite *BlockStoreSuite) TestChunkStoreExtractChunks() { suite.store.PutMany(chnx) chunkChan := make(chan *chunks.Chunk) - go func() { suite.store.extractChunks(InsertOrder, chunkChan); close(chunkChan) }() + go func() { suite.store.extractChunks(chunkChan); close(chunkChan) }() i := 0 for c := range chunkChan { suite.Equal(chnx[i].Data(), c.Data()) suite.Equal(chnx[i].Hash(), c.Hash()) i++ } - - chunkChan = make(chan *chunks.Chunk) - go func() { suite.store.extractChunks(ReverseOrder, chunkChan); close(chunkChan) }() - i = len(chnx) - 1 - for c := range chunkChan { - suite.Equal(chnx[i].Data(), c.Data()) - suite.Equal(chnx[i].Hash(), c.Hash()) - i-- - } - } func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() { diff --git a/go/nbs/cache.go b/go/nbs/cache.go index f280c4270e..102cfe1117 100644 --- a/go/nbs/cache.go +++ b/go/nbs/cache.go @@ -56,8 +56,8 @@ func (nbc *NomsBlockCache) GetMany(hashes hash.HashSet, foundChunks chan *chunks // ExtractChunks writes the entire contents of the cache to chunkChan. The // chunks are extracted in insertion order. -func (nbc *NomsBlockCache) ExtractChunks(order EnumerationOrder, chunkChan chan *chunks.Chunk) { - nbc.chunks.extractChunks(order, chunkChan) +func (nbc *NomsBlockCache) ExtractChunks(chunkChan chan *chunks.Chunk) { + nbc.chunks.extractChunks(chunkChan) } // Count returns the number of items in the cache. diff --git a/go/nbs/compacting_chunk_source.go b/go/nbs/compacting_chunk_source.go index e3307fe4e3..d793ac4fda 100644 --- a/go/nbs/compacting_chunk_source.go +++ b/go/nbs/compacting_chunk_source.go @@ -99,10 +99,10 @@ func (ccs *compactingChunkSource) calcReads(reqs []getRecord, blockSize uint64) return ccs.cs.calcReads(reqs, blockSize) } -func (ccs *compactingChunkSource) extract(order EnumerationOrder, chunks chan<- extractRecord) { +func (ccs *compactingChunkSource) extract(chunks chan<- extractRecord) { ccs.wg.Wait() d.Chk.True(ccs.cs != nil) - ccs.cs.extract(order, chunks) + ccs.cs.extract(chunks) } type emptyChunkSource struct{} @@ -143,4 +143,4 @@ func (ecs emptyChunkSource) calcReads(reqs []getRecord, blockSize uint64) (reads return 0, true } -func (ecs emptyChunkSource) extract(order EnumerationOrder, chunks chan<- extractRecord) {} +func (ecs emptyChunkSource) extract(chunks chan<- extractRecord) {} diff --git a/go/nbs/mem_table.go b/go/nbs/mem_table.go index a700e53123..1aa580fad1 100644 --- a/go/nbs/mem_table.go +++ b/go/nbs/mem_table.go @@ -91,17 +91,11 @@ func (mt *memTable) getMany(reqs []getRecord, foundChunks chan *chunks.Chunk, wg return } -func (mt *memTable) extract(order EnumerationOrder, chunks chan<- extractRecord) { - if order == InsertOrder { - for _, hrec := range mt.order { - chunks <- extractRecord{*hrec.a, mt.chunks[*hrec.a]} - } - return - } - for i := len(mt.order) - 1; i >= 0; i-- { - hrec := mt.order[i] +func (mt *memTable) extract(chunks chan<- extractRecord) { + for _, hrec := range mt.order { chunks <- extractRecord{*hrec.a, mt.chunks[*hrec.a]} } + return } func (mt *memTable) write(haver chunkReader) (name addr, data []byte, count uint32) { diff --git a/go/nbs/mem_table_test.go b/go/nbs/mem_table_test.go index 5c15b49aef..6400d9daa6 100644 --- a/go/nbs/mem_table_test.go +++ b/go/nbs/mem_table_test.go @@ -184,14 +184,8 @@ func (crg chunkReaderGroup) uncompressedLen() (data uint64) { return } -func (crg chunkReaderGroup) extract(order EnumerationOrder, chunks chan<- extractRecord) { - if order == InsertOrder { - for _, haver := range crg { - haver.extract(order, chunks) - } - return - } - for i := len(crg) - 1; i >= 0; i-- { - crg[i].extract(order, chunks) +func (crg chunkReaderGroup) extract(chunks chan<- extractRecord) { + for _, haver := range crg { + haver.extract(chunks) } } diff --git a/go/nbs/s3_table_persister_test.go b/go/nbs/s3_table_persister_test.go index 64bf7fa3c5..e2f2d9e3d1 100644 --- a/go/nbs/s3_table_persister_test.go +++ b/go/nbs/s3_table_persister_test.go @@ -166,6 +166,6 @@ type panicingChunkSource struct { chunkSource } -func (pcs panicingChunkSource) extract(order EnumerationOrder, chunks chan<- extractRecord) { +func (pcs panicingChunkSource) extract(chunks chan<- extractRecord) { panic("onoes") } diff --git a/go/nbs/store.go b/go/nbs/store.go index 0a0a772643..61885b4219 100644 --- a/go/nbs/store.go +++ b/go/nbs/store.go @@ -16,7 +16,6 @@ import ( "github.com/attic-labs/noms/go/constants" "github.com/attic-labs/noms/go/d" "github.com/attic-labs/noms/go/hash" - "github.com/attic-labs/noms/go/types" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/s3" @@ -27,8 +26,6 @@ import ( // names of the tables that hold all the chunks in the store. The number of // chunks in each table is also stored in the manifest. -type EnumerationOrder uint8 - const ( // StorageVersion is the version of the on-disk Noms Chunks Store data format. StorageVersion = "2" @@ -38,9 +35,6 @@ const ( defaultMaxTables = 128 defaultIndexCacheSize = (1 << 20) * 8 // 8MB - - InsertOrder EnumerationOrder = iota - ReverseOrder ) var ( @@ -174,7 +168,7 @@ func (nbs *NomsBlockStore) Put(c chunks.Chunk) { nbs.putCount++ } -func (nbs *NomsBlockStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) { +func (nbs *NomsBlockStore) SchedulePut(c chunks.Chunk) { nbs.Put(c) } @@ -227,12 +221,9 @@ func (nbs *NomsBlockStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks nbs.mu.RLock() defer nbs.mu.RUnlock() tables = nbs.tables - + remaining = true if nbs.mt != nil { - remaining = nbs.mt.getMany(reqs, foundChunks, &sync.WaitGroup{}) - wg.Wait() - } else { - remaining = true + remaining = nbs.mt.getMany(reqs, foundChunks, nil) } return @@ -275,24 +266,17 @@ func (nbs *NomsBlockStore) CalcReads(hashes hash.HashSet, blockSize uint64) (rea return } -func (nbs *NomsBlockStore) extractChunks(order EnumerationOrder, chunkChan chan<- *chunks.Chunk) { +func (nbs *NomsBlockStore) extractChunks(chunkChan chan<- *chunks.Chunk) { ch := make(chan extractRecord, 1) go func() { + defer close(ch) nbs.mu.RLock() defer nbs.mu.RUnlock() - // Chunks in nbs.tables were inserted before those in nbs.mt, so extract chunks there _first_ if we're doing InsertOrder... - if order == InsertOrder { - nbs.tables.extract(order, ch) - } + // Chunks in nbs.tables were inserted before those in nbs.mt, so extract chunks there _first_ + nbs.tables.extract(ch) if nbs.mt != nil { - nbs.mt.extract(order, ch) + nbs.mt.extract(ch) } - // ...and do them _second_ if we're doing ReverseOrder - if order == ReverseOrder { - nbs.tables.extract(order, ch) - } - - close(ch) }() for rec := range ch { c := chunks.NewChunkWithHash(hash.Hash(rec.a), rec.data) @@ -322,6 +306,55 @@ func (nbs *NomsBlockStore) Has(h hash.Hash) bool { return has || tables.has(a) } +func (nbs *NomsBlockStore) HasMany(hashes hash.HashSet) hash.HashSet { + reqs := toHasRecords(hashes) + + tables, remaining := func() (tables chunkReader, remaining bool) { + nbs.mu.RLock() + defer nbs.mu.RUnlock() + tables = nbs.tables + + remaining = true + if nbs.mt != nil { + remaining = nbs.mt.hasMany(reqs) + } + + return + }() + + if remaining { + tables.hasMany(reqs) + } + return fromHasRecords(reqs) +} + +func toHasRecords(hashes hash.HashSet) []hasRecord { + reqs := make([]hasRecord, len(hashes)) + idx := 0 + for h := range hashes { + a := addr(h) + reqs[idx] = hasRecord{ + a: &a, + prefix: a.Prefix(), + order: idx, + } + idx++ + } + + sort.Sort(hasRecordByPrefix(reqs)) + return reqs +} + +func fromHasRecords(reqs []hasRecord) hash.HashSet { + present := hash.HashSet{} + for _, r := range reqs { + if r.has { + present.Insert(hash.New(r.a[:])) + } + } + return present +} + func (nbs *NomsBlockStore) Root() hash.Hash { nbs.mu.RLock() defer nbs.mu.RUnlock() @@ -373,11 +406,6 @@ func (nbs *NomsBlockStore) Close() (err error) { return nbs.tables.Close() } -// types.BatchStore -func (nbs *NomsBlockStore) AddHints(hints types.Hints) { - // noop -} - func (nbs *NomsBlockStore) Flush() { b := &backoff.Backoff{ Min: 128 * time.Microsecond, diff --git a/go/nbs/table.go b/go/nbs/table.go index 0f0031d67e..6b37f2f34a 100644 --- a/go/nbs/table.go +++ b/go/nbs/table.go @@ -206,7 +206,7 @@ type chunkReader interface { getMany(reqs []getRecord, foundChunks chan *chunks.Chunk, wg *sync.WaitGroup) bool count() uint32 uncompressedLen() uint64 - extract(order EnumerationOrder, chunks chan<- extractRecord) + extract(chunks chan<- extractRecord) } type chunkSource interface { diff --git a/go/nbs/table_persister.go b/go/nbs/table_persister.go index dcedca5167..40a723ac1f 100644 --- a/go/nbs/table_persister.go +++ b/go/nbs/table_persister.go @@ -94,7 +94,7 @@ func compactSourcesToBuffer(sources chunkSources, rl chan struct{}) (name addr, }() rl <- struct{}{} - s.extract(InsertOrder, c) + s.extract(c) }(src, chunks) chunkChans <- chunks } diff --git a/go/nbs/table_reader.go b/go/nbs/table_reader.go index 7839e4ff84..c05a7d4fab 100644 --- a/go/nbs/table_reader.go +++ b/go/nbs/table_reader.go @@ -412,7 +412,7 @@ func (tr tableReader) calcReads(reqs []getRecord, blockSize uint64) (reads int, return } -func (tr tableReader) extract(order EnumerationOrder, chunks chan<- extractRecord) { +func (tr tableReader) extract(chunks chan<- extractRecord) { // Build reverse lookup table from ordinal -> chunk hash hashes := make(addrSlice, len(tr.prefixes)) for idx, prefix := range tr.prefixes { @@ -432,12 +432,6 @@ func (tr tableReader) extract(order EnumerationOrder, chunks chan<- extractRecor chunks <- extractRecord{hashes[i], tr.parseChunk(buff[localOffset : localOffset+uint64(tr.lengths[i])])} } - if order == ReverseOrder { - for i := uint32(1); i <= tr.chunkCount; i++ { - sendChunk(tr.chunkCount - i) - } - return - } for i := uint32(0); i < tr.chunkCount; i++ { sendChunk(i) } diff --git a/go/nbs/table_set.go b/go/nbs/table_set.go index 78a110aefe..141c46f9dc 100644 --- a/go/nbs/table_set.go +++ b/go/nbs/table_set.go @@ -180,22 +180,13 @@ func (ts tableSet) Compact() (ns tableSet, compactees chunkSources) { return ns, toCompact } -func (ts tableSet) extract(order EnumerationOrder, chunks chan<- extractRecord) { - // Since new tables are _prepended_ to a tableSet, extracting chunks in ReverseOrder requires iterating ts.novel, then ts.upstream, from front to back, while doing insertOrder requires iterating ts.upstream back to front, followed by ts.novel. - if order == ReverseOrder { - for _, cs := range ts.novel { - cs.extract(order, chunks) - } - for _, cs := range ts.upstream { - cs.extract(order, chunks) - } - return - } +func (ts tableSet) extract(chunks chan<- extractRecord) { + // Since new tables are _prepended_ to a tableSet, extracting chunks in insertOrder requires iterating ts.upstream back to front, followed by ts.novel. for i := len(ts.upstream) - 1; i >= 0; i-- { - ts.upstream[i].extract(order, chunks) + ts.upstream[i].extract(chunks) } for i := len(ts.novel) - 1; i >= 0; i-- { - ts.novel[i].extract(order, chunks) + ts.novel[i].extract(chunks) } } diff --git a/go/nbs/table_set_test.go b/go/nbs/table_set_test.go index 1acaed913b..fe27cd6283 100644 --- a/go/nbs/table_set_test.go +++ b/go/nbs/table_set_test.go @@ -101,7 +101,7 @@ func TestTableSetExtract(t *testing.T) { ts = ts.Prepend(mt) chunkChan := make(chan extractRecord) - go func() { ts.extract(InsertOrder, chunkChan); close(chunkChan) }() + go func() { defer close(chunkChan); ts.extract(chunkChan) }() i := 0 for rec := range chunkChan { a := computeAddr(testChunks[i]) @@ -111,17 +111,6 @@ func TestTableSetExtract(t *testing.T) { i++ } - chunkChan = make(chan extractRecord) - go func() { ts.extract(ReverseOrder, chunkChan); close(chunkChan) }() - i = len(testChunks) - 1 - for rec := range chunkChan { - a := computeAddr(testChunks[i]) - assert.NotNil(rec.data, "Nothing for", a) - assert.Equal(testChunks[i], rec.data, "Item %d: %s != %s", i, string(testChunks[i]), string(rec.data)) - assert.Equal(a, rec.a) - i-- - } - ts.Close() } diff --git a/go/nbs/table_test.go b/go/nbs/table_test.go index 6b9773266a..cd0e448a3d 100644 --- a/go/nbs/table_test.go +++ b/go/nbs/table_test.go @@ -233,7 +233,7 @@ func TestExtract(t *testing.T) { addrs := addrSlice{computeAddr(chunks[0]), computeAddr(chunks[1]), computeAddr(chunks[2])} chunkChan := make(chan extractRecord) - go func() { tr.extract(InsertOrder, chunkChan); close(chunkChan) }() + go func() { tr.extract(chunkChan); close(chunkChan) }() i := 0 for rec := range chunkChan { assert.NotNil(rec.data, "Nothing for", addrs[i]) @@ -241,16 +241,6 @@ func TestExtract(t *testing.T) { assert.Equal(chunks[i], rec.data) i++ } - - chunkChan = make(chan extractRecord) - go func() { tr.extract(ReverseOrder, chunkChan); close(chunkChan) }() - i = len(chunks) - 1 - for rec := range chunkChan { - assert.NotNil(rec.data, "Nothing for", addrs[i]) - assert.Equal(addrs[i], rec.a) - assert.Equal(chunks[i], rec.data) - i-- - } } func Test65k(t *testing.T) { diff --git a/go/types/batch_store.go b/go/types/batch_store.go index eddf0e501e..0c90d38e50 100644 --- a/go/types/batch_store.go +++ b/go/types/batch_store.go @@ -14,7 +14,9 @@ import ( "github.com/attic-labs/noms/go/hash" ) -// BatchStore provides an interface similar to chunks.ChunkStore, but batch-oriented. Instead of Put(), it provides SchedulePut(), which enqueues a Chunk to be sent at a possibly later time. +// BatchStore provides an interface similar to chunks.ChunkStore, but batch- +// oriented. Instead of Put(), it provides SchedulePut(), which enqueues a +// Chunk to be sent at a possibly later time. type BatchStore interface { // Get returns the Chunk with the hash h from the store. If h is absent // from the store, chunks.EmptyChunk is returned. @@ -28,19 +30,9 @@ type BatchStore interface { // SchedulePut enqueues a write for the Chunk c with the given refHeight. // c must be visible to subsequent Get() calls upon return. Typically, the // Value which was encoded to provide c can also be queried for its - // refHeight. The call may or may not block until c is persisted. The - // provided hints are used to assist in validation. Validation requires - // checking that all refs embedded in c are themselves valid, which could - // naively be done by resolving each one. Instead, hints provides a - // (smaller) set of refs that point to Chunks that themselves contain many - // of c's refs. Thus, by checking only the hinted Chunks, c can be - // validated with fewer read operations. - // c may or may not be persisted when SchedulePut() returns, but is + // refHeight. The call may or may not block until c is persisted, but c is // guaranteed to be persistent after a call to Flush() or UpdateRoot(). - SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) - - // AddHints allows additional hints, as used by SchedulePut, to be added for use in the current batch. - AddHints(hints Hints) + SchedulePut(c chunks.Chunk) // Flush causes enqueued Puts to be persisted. Flush() @@ -48,16 +40,18 @@ type BatchStore interface { io.Closer } -// Hints are a set of hashes that should be used to speed up the validation of one or more Chunks. -type Hints hash.HashSet - -// BatchStoreAdaptor provides a naive implementation of BatchStore should only be used with ChunkStores that can Put relatively quickly. It provides no actual batching or validation. Its intended use is for adapting a ChunkStore for use in something that requires a BatchStore. +// BatchStoreAdaptor provides a naive implementation of BatchStore should only +// be used with ChunkStores that can Put relatively quickly. It provides no +// actual batching or validation. Its intended use is for adapting a +// ChunkStore for use in something that requires a BatchStore. type BatchStoreAdaptor struct { cs chunks.ChunkStore once sync.Once } -// NewBatchStoreAdaptor returns a BatchStore instance backed by a ChunkStore. Takes ownership of cs and manages its lifetime; calling Close on the returned BatchStore will Close cs. +// NewBatchStoreAdaptor returns a BatchStore instance backed by a ChunkStore. +// Takes ownership of cs and manages its lifetime; calling Close on the +// returned BatchStore will Close cs. func NewBatchStoreAdaptor(cs chunks.ChunkStore) BatchStore { return &BatchStoreAdaptor{cs: cs} } @@ -74,8 +68,8 @@ func (bsa *BatchStoreAdaptor) GetMany(hashes hash.HashSet, foundChunks chan *chu bsa.cs.GetMany(hashes, foundChunks) } -// SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints. -func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) { +// SchedulePut simply calls Put on the underlying ChunkStore. +func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk) { bsa.once.Do(bsa.expectVersion) bsa.cs.Put(c) } @@ -96,9 +90,6 @@ func (bsa *BatchStoreAdaptor) UpdateRoot(current, last hash.Hash) bool { return bsa.cs.UpdateRoot(current, last) } -// AddHints is a noop. -func (bsa *BatchStoreAdaptor) AddHints(hints Hints) {} - func (bsa *BatchStoreAdaptor) Flush() { bsa.once.Do(bsa.expectVersion) bsa.cs.Flush() diff --git a/go/types/validating_batching_sink.go b/go/types/validating_batching_sink.go index 4dfa496ba1..4d6fb5e826 100644 --- a/go/types/validating_batching_sink.go +++ b/go/types/validating_batching_sink.go @@ -5,8 +5,6 @@ package types import ( - "sync" - "github.com/attic-labs/noms/go/chunks" "github.com/attic-labs/noms/go/d" "github.com/attic-labs/noms/go/hash" @@ -15,37 +13,28 @@ import ( const batchSize = 100 type ValidatingBatchingSink struct { - vs *ValueStore - cs chunks.ChunkStore - batch [batchSize]chunks.Chunk - count int - pool sync.Pool + vs *ValueStore + cs chunks.ChunkStore + validate bool + unresolved hash.HashSet } func NewValidatingBatchingSink(cs chunks.ChunkStore) *ValidatingBatchingSink { return &ValidatingBatchingSink{ - vs: newLocalValueStore(cs), - cs: cs, + vs: newLocalValueStore(cs), + cs: cs, + validate: true, + unresolved: hash.HashSet{}, } } -// Prepare primes the type info cache used to validate Enqueued Chunks by reading the Chunks referenced by the provided hints. -func (vbs *ValidatingBatchingSink) Prepare(hints Hints) { - foundChunks := make(chan *chunks.Chunk, batchSize) - wg := sync.WaitGroup{} - for i := 0; i < batchSize; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for c := range foundChunks { - v := DecodeFromBytes(c.Data(), vbs.vs) - vbs.vs.setHintsForReadValue(v, c.Hash(), false) - } - }() +func NewCompletenessCheckingBatchingSink(cs chunks.ChunkStore) *ValidatingBatchingSink { + return &ValidatingBatchingSink{ + vs: newLocalValueStore(cs), + cs: cs, + validate: false, + unresolved: hash.HashSet{}, } - vbs.cs.GetMany(hash.HashSet(hints), foundChunks) - close(foundChunks) - wg.Wait() } // DecodedChunk holds a pointer to a Chunk and the Value that results from @@ -57,45 +46,50 @@ type DecodedChunk struct { // DecodeUnqueued decodes c and checks that the hash of the resulting value // matches c.Hash(). It returns a DecodedChunk holding both c and a pointer to -// the decoded Value. However, if c has already been Enqueued, DecodeUnqueued -// returns an empty DecodedChunk. +// the decoded Value. func (vbs *ValidatingBatchingSink) DecodeUnqueued(c *chunks.Chunk) DecodedChunk { h := c.Hash() - if vbs.vs.isPresent(h) { - return DecodedChunk{} + var v Value + if vbs.validate { + v = decodeFromBytesWithValidation(c.Data(), vbs.vs) + } else { + v = DecodeFromBytes(c.Data(), vbs.vs) } - v := decodeFromBytesWithValidation(c.Data(), vbs.vs) + if getHash(v) != h { d.Panic("Invalid hash found") } return DecodedChunk{c, &v} } -// Enqueue adds c to the queue of Chunks waiting to be Put into vbs' backing -// ChunkStore. It is assumed that v is the Value decoded from c, and so v can -// be used to validate the ref-completeness of c. The instance keeps an -// internal buffer of Chunks, spilling to the ChunkStore when the buffer is -// full. -func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk, v Value) { +// Put Puts c into vbs' backing ChunkStore. It is assumed that v is the Value +// decoded from c, and so v can be used to validate the ref-completeness of c. +func (vbs *ValidatingBatchingSink) Put(c chunks.Chunk, v Value) { h := c.Hash() - vbs.vs.ensureChunksInCache(v) - vbs.vs.set(h, hintedChunk{TypeOf(v), h}, false) - - vbs.batch[vbs.count] = c - vbs.count++ - - if vbs.count == batchSize { - vbs.cs.PutMany(vbs.batch[:vbs.count]) - vbs.count = 0 - } + vbs.unresolved.Remove(h) + v.WalkRefs(func(ref Ref) { + vbs.unresolved.Insert(ref.TargetHash()) + }) + vbs.cs.Put(c) } -// Flush Puts any Chunks buffered by Enqueue calls into the backing -// ChunkStore. +// Flush makes durable all enqueued Chunks. func (vbs *ValidatingBatchingSink) Flush() { - if vbs.count > 0 { - vbs.cs.PutMany(vbs.batch[:vbs.count]) - } vbs.cs.Flush() - vbs.count = 0 +} + +// PanicIfDangling does a Has check on all the references encountered +// while enqueuing novel chunks. It panics if any of these refs point +// to Chunks that don't exist in the backing ChunkStore. +func (vbs *ValidatingBatchingSink) PanicIfDangling() { + present := vbs.cs.HasMany(vbs.unresolved) + absent := hash.HashSlice{} + for h := range vbs.unresolved { + if !present.Has(h) { + absent = append(absent, h) + } + } + if len(absent) != 0 { + d.Panic("Found dangling references to %v", absent) + } } diff --git a/go/types/validating_batching_sink_test.go b/go/types/validating_batching_sink_test.go index a1549d5ab0..aad5752997 100644 --- a/go/types/validating_batching_sink_test.go +++ b/go/types/validating_batching_sink_test.go @@ -11,28 +11,6 @@ import ( "github.com/attic-labs/testify/assert" ) -func TestValidatingBatchingSinkPrepare(t *testing.T) { - cs := chunks.NewTestStore() - hints := Hints{} - chnx := []chunks.Chunk{ - EncodeValue(Number(42), nil), - EncodeValue(Number(-7), nil), - EncodeValue(String("oy"), nil), - EncodeValue(Bool(true), nil), - EncodeValue(NewBlob(), nil), - } - for _, c := range chnx { - cs.Put(c) - hints[c.Hash()] = struct{}{} - } - - vbs := NewValidatingBatchingSink(cs) - vbs.Prepare(hints) - for h := range hints { - vbs.vs.isPresent(h) - } -} - func TestValidatingBatchingSinkDecode(t *testing.T) { v := Number(42) c := EncodeValue(v, nil) @@ -42,17 +20,6 @@ func TestValidatingBatchingSinkDecode(t *testing.T) { assert.True(t, v.Equals(*dc.Value)) } -func TestValidatingBatchingSinkDecodeAlreadyEnqueued(t *testing.T) { - v := Number(42) - c := EncodeValue(v, nil) - vbs := NewValidatingBatchingSink(chunks.NewTestStore()) - - vbs.Enqueue(c, v) - dc := vbs.DecodeUnqueued(&c) - assert.Nil(t, dc.Chunk) - assert.Nil(t, dc.Value) -} - func assertPanicsOnInvalidChunk(t *testing.T, data []interface{}) { cs := chunks.NewTestStore() vs := newLocalValueStore(cs) @@ -109,20 +76,54 @@ func TestValidatingBatchingSinkEnqueueAndFlush(t *testing.T) { cs := chunks.NewTestStore() vbs := NewValidatingBatchingSink(cs) - vbs.Enqueue(c, v) + vbs.Put(c, v) vbs.Flush() assert.Equal(t, 1, cs.Writes) } -func TestValidatingBatchingSinkEnqueueImplicitFlush(t *testing.T) { - cs := chunks.NewTestStore() - vbs := NewValidatingBatchingSink(cs) +func TestValidatingBatchingSinkPanicIfDangling(t *testing.T) { + b := Bool(true) + r := NewRef(b) - for i := 0; i <= batchSize; i++ { - v := Number(i) - vbs.Enqueue(EncodeValue(v, nil), v) - } - assert.Equal(t, batchSize, cs.Writes) - vbs.Flush() - assert.Equal(t, 1, cs.Writes-batchSize) + t.Run("Panic", func(t *testing.T) { + t.Run("PreFlush", func(t *testing.T) { + t.Parallel() + vbs := NewValidatingBatchingSink(chunks.NewTestStore()) + vbs.Put(EncodeValue(r, nil), r) + assert.Panics(t, vbs.PanicIfDangling) + }) + t.Run("PostFlush", func(t *testing.T) { + t.Parallel() + vbs := NewValidatingBatchingSink(chunks.NewTestStore()) + vbs.Put(EncodeValue(r, nil), r) + vbs.Flush() + assert.Panics(t, vbs.PanicIfDangling) + }) + }) + + t.Run("Success", func(t *testing.T) { + t.Run("BatchInOrder", func(t *testing.T) { + t.Parallel() + vbs := NewValidatingBatchingSink(chunks.NewTestStore()) + vbs.Put(EncodeValue(b, nil), b) + vbs.Put(EncodeValue(r, nil), r) + assert.NotPanics(t, vbs.PanicIfDangling) + }) + t.Run("BatchOutOfOrder", func(t *testing.T) { + t.Parallel() + vbs := NewValidatingBatchingSink(chunks.NewTestStore()) + vbs.Put(EncodeValue(r, nil), r) + vbs.Put(EncodeValue(b, nil), b) + assert.NotPanics(t, vbs.PanicIfDangling) + }) + t.Run("ExistingChunk", func(t *testing.T) { + t.Parallel() + cs := chunks.NewTestStore() + cs.Put(EncodeValue(b, nil)) + + vbs := NewValidatingBatchingSink(cs) + vbs.Put(EncodeValue(r, nil), r) + assert.NotPanics(t, vbs.PanicIfDangling) + }) + }) } diff --git a/go/types/value_store.go b/go/types/value_store.go index b815760a56..701f71857b 100644 --- a/go/types/value_store.go +++ b/go/types/value_store.go @@ -38,19 +38,15 @@ type ValueReadWriter interface { } // ValueStore provides methods to read and write Noms Values to a BatchStore. -// It validates Values as they are written, but does not guarantee that these -// Values are persisted through the BatchStore until a subsequent Flush. +// It minimally validates Values as they're written, but does not guarantee +// that these Values are persisted through the BatchStore until a subsequent +// Flush. // Currently, WriteValue validates the following properties of a Value v: // - v can be correctly serialized and its Ref taken -// - all Refs in v point to a Value that can be read from this ValueStore -// - all Refs in v point to a Value of the correct Type type ValueStore struct { bs BatchStore - cacheMu sync.RWMutex - hintCache map[hash.Hash]chunkCacheEntry - pendingHints map[hash.Hash]chunkCacheEntry pendingMu sync.RWMutex - pendingPuts map[hash.Hash]pendingChunk + pendingPuts map[hash.Hash]chunks.Chunk pendingPutMax uint64 pendingPutSize uint64 pendingParents map[hash.Hash]uint64 // chunk Hash -> ref height @@ -64,18 +60,6 @@ const ( defaultPendingPutMax = 1 << 28 // 256MB ) -type chunkCacheEntry interface { - Present() bool - Hint() hash.Hash - typeOf() *Type -} - -type pendingChunk struct { - c chunks.Chunk - height uint64 - hints Hints -} - // NewTestValueStore creates a simple struct that satisfies ValueReadWriter // and is backed by a chunks.TestStore. func NewTestValueStore() *ValueStore { @@ -99,12 +83,10 @@ func NewValueStoreWithCache(bs BatchStore, cacheSize uint64) *ValueStore { func newValueStoreWithCacheAndPending(bs BatchStore, cacheSize, pendingMax uint64) *ValueStore { return &ValueStore{ - bs: bs, - cacheMu: sync.RWMutex{}, - hintCache: map[hash.Hash]chunkCacheEntry{}, - pendingHints: map[hash.Hash]chunkCacheEntry{}, + bs: bs, + pendingMu: sync.RWMutex{}, - pendingPuts: map[hash.Hash]pendingChunk{}, + pendingPuts: map[hash.Hash]chunks.Chunk{}, pendingPutMax: pendingMax, pendingParents: map[hash.Hash]uint64{}, @@ -128,13 +110,11 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value { return v.(Value) } - stillPending := false chunk := func() chunks.Chunk { lvs.pendingMu.RLock() defer lvs.pendingMu.RUnlock() - if pc, ok := lvs.pendingPuts[h]; ok { - stillPending = true - return pc.c + if pending, ok := lvs.pendingPuts[h]; ok { + return pending } return chunks.EmptyChunk }() @@ -148,23 +128,9 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value { v := DecodeValue(chunk, lvs) lvs.valueCache.Add(h, uint64(len(chunk.Data())), v) - lvs.setHintsForReadValue(v, h, stillPending) return v } -func (lvs *ValueStore) setHintsForReadValue(v Value, h hash.Hash, toPending bool) { - var entry chunkCacheEntry = absentChunk{} - if v != nil { - lvs.setHintsForReachable(v, h, toPending) - // h is trivially a hint for v, so consider putting that in the hintCache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because h already has a hint in the hintCache. If we later read some other chunk that references v, setHintsForReachable will overwrite this with a hint pointing to that chunk. - // If we don't do this, top-level Values that get read but not written -- such as the existing Head of a Database upon a Commit -- can be erroneously left out during a pull. - entry = hintedChunk{TypeOf(v), h} - } - if cur := lvs.check(h); cur == nil || cur.Hint().IsEmpty() { - lvs.set(h, entry, toPending) - } -} - // ReadManyValues reads and decodes Values indicated by |hashes| from lvs. On // return, |foundValues| will have been fully sent all Values which have been // found. Any non-present Values will silently be ignored. @@ -172,7 +138,6 @@ func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Va decode := func(h hash.Hash, chunk *chunks.Chunk, toPending bool) Value { v := DecodeValue(*chunk, lvs) lvs.valueCache.Add(h, uint64(len(chunk.Data())), v) - lvs.setHintsForReadValue(v, h, toPending) return v } @@ -189,8 +154,8 @@ func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Va chunk := func() chunks.Chunk { lvs.pendingMu.RLock() defer lvs.pendingMu.RUnlock() - if pc, ok := lvs.pendingPuts[h]; ok { - return pc.c + if pending, ok := lvs.pendingPuts[h]; ok { + return pending } return chunks.EmptyChunk }() @@ -238,17 +203,12 @@ func (lvs *ValueStore) WriteValue(v Value) Ref { h := c.Hash() height := maxChunkHeight(v) + 1 r := constructRef(MakeRefType(TypeOf(v)), h, height) - if lvs.isPresent(h) { + if v, ok := lvs.valueCache.Get(h); ok && v != nil { return r } - // TODO: It _really_ feels like there should be some refactoring that allows us to only have to walk the refs of |v| once, but I'm hesitant to undertake that refactor right now. - hints := lvs.chunkHintsFromCache(v) - lvs.bufferChunk(v, c, height, hints) - - lvs.setHintsForReachable(v, h, true) - lvs.set(h, (*presentChunk)(TypeOf(v)), false) - lvs.valueCache.Drop(h) + lvs.bufferChunk(v, c, height) + lvs.valueCache.Drop(h) // valueCache may have an entry saying h is not present. Clear that. return r } @@ -262,22 +222,22 @@ func (lvs *ValueStore) WriteValue(v Value) Ref { // flushed). // 2. The total data occupied by buffered chunks does not exceed // lvs.pendingPutMax -func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64, hints Hints) { +func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64) { lvs.pendingMu.Lock() defer lvs.pendingMu.Unlock() h := c.Hash() - d.Chk.NotZero(height) - lvs.pendingPuts[h] = pendingChunk{c, height, hints} + d.PanicIfTrue(height == 0) + lvs.pendingPuts[h] = c lvs.pendingPutSize += uint64(len(c.Data())) putChildren := func(parent hash.Hash) (dataPut int) { - pc, present := lvs.pendingPuts[parent] - d.Chk.True(present) - v := DecodeValue(pc.c, lvs) + pending, present := lvs.pendingPuts[parent] + d.PanicIfFalse(present) + v := DecodeValue(pending, lvs) v.WalkRefs(func(grandchildRef Ref) { - if pc, present := lvs.pendingPuts[grandchildRef.TargetHash()]; present { - lvs.bs.SchedulePut(pc.c, pc.height, pc.hints) - dataPut += len(pc.c.Data()) + if pending, present := lvs.pendingPuts[grandchildRef.TargetHash()]; present { + lvs.bs.SchedulePut(pending) + dataPut += len(pending.Data()) delete(lvs.pendingPuts, grandchildRef.TargetHash()) } }) @@ -314,13 +274,13 @@ func (lvs *ValueStore) bufferChunk(v Value, c chunks.Chunk, height uint64, hints } } if height == 0 { // This can happen if there are no pending parents - var pc pendingChunk - for tallest, pc = range lvs.pendingPuts { + var chunk chunks.Chunk + for tallest, chunk = range lvs.pendingPuts { // Any pendingPut is as good as another in this case, so take the first one break } - lvs.bs.SchedulePut(pc.c, pc.height, pc.hints) - lvs.pendingPutSize -= uint64(len(pc.c.Data())) + lvs.bs.SchedulePut(chunk) + lvs.pendingPutSize -= uint64(len(chunk.Data())) delete(lvs.pendingPuts, tallest) continue } @@ -335,34 +295,24 @@ func (lvs *ValueStore) Flush(root hash.Hash) { lvs.pendingMu.Lock() defer lvs.pendingMu.Unlock() - pc, present := lvs.pendingPuts[root] + pending, present := lvs.pendingPuts[root] if !present { return } - put := func(h hash.Hash, pc pendingChunk) uint64 { - lvs.bs.SchedulePut(pc.c, pc.height, pc.hints) + put := func(h hash.Hash, chunk chunks.Chunk) uint64 { + lvs.bs.SchedulePut(chunk) delete(lvs.pendingPuts, h) - return uint64(len(pc.c.Data())) + return uint64(len(chunk.Data())) } - v := DecodeValue(pc.c, lvs) + v := DecodeValue(pending, lvs) v.WalkRefs(func(reachable Ref) { - if pc, present := lvs.pendingPuts[reachable.TargetHash()]; present { - lvs.pendingPutSize -= put(reachable.TargetHash(), pc) + if pending, present := lvs.pendingPuts[reachable.TargetHash()]; present { + lvs.pendingPutSize -= put(reachable.TargetHash(), pending) } }) delete(lvs.pendingParents, root) // If not present, this is idempotent - lvs.pendingPutSize -= put(root, pc) - - // Merge in pending hints - lvs.cacheMu.Lock() - defer lvs.cacheMu.Unlock() - for h, entry := range lvs.pendingHints { - if _, present := lvs.pendingPuts[h]; !present { - lvs.hintCache[h] = entry - delete(lvs.pendingHints, h) - } - } + lvs.pendingPutSize -= put(root, pending) }() lvs.bs.Flush() } @@ -377,47 +327,6 @@ func (lvs *ValueStore) Close() error { return lvs.bs.Close() } -// setHintsForReachable looks at the Chunks reachable from v and, for each one checks if there's a hint in the hintCache. If there isn't, or if the hint is a self-reference, the chunk gets r set as its new hint. -func (lvs *ValueStore) setHintsForReachable(v Value, r hash.Hash, toPending bool) { - v.WalkRefs(func(reachable Ref) { - hash := reachable.TargetHash() - if cur := lvs.check(hash); cur == nil || cur.Hint().IsEmpty() || cur.Hint() == hash { - lvs.set(hash, hintedChunk{getTargetType(reachable), r}, toPending) - } - }) -} - -func (lvs *ValueStore) isPresent(r hash.Hash) (present bool) { - if entry := lvs.check(r); entry != nil && entry.Present() { - present = true - } - return -} - -func (lvs *ValueStore) check(r hash.Hash) chunkCacheEntry { - lvs.cacheMu.RLock() - defer lvs.cacheMu.RUnlock() - return lvs.hintCache[r] -} - -func (lvs *ValueStore) set(r hash.Hash, entry chunkCacheEntry, toPending bool) { - lvs.cacheMu.Lock() - defer lvs.cacheMu.Unlock() - if toPending { - lvs.pendingHints[r] = entry - } else { - lvs.hintCache[r] = entry - } -} - -func (lvs *ValueStore) chunkHintsFromCache(v Value) Hints { - return lvs.checkChunksInCache(v, false) -} - -func (lvs *ValueStore) ensureChunksInCache(v Value) { - lvs.checkChunksInCache(v, true) -} - func (lvs *ValueStore) opCache() opCache { lvs.once.Do(func() { lvs.opcStore = newLdbOpCacheStore(lvs) @@ -425,98 +334,8 @@ func (lvs *ValueStore) opCache() opCache { return lvs.opcStore.opCache() } -func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints { - hints := map[hash.Hash]struct{}{} - collectHints := func(reachable Ref) { - // First, check the hintCache to see if reachable is already known to be valid. - targetHash := reachable.TargetHash() - entry := lvs.check(targetHash) - - // If it's not already in the hintCache, attempt to read the value directly, which will put it and its chunks into the hintCache. - var reachableV Value - if entry == nil || !entry.Present() { - if readValues { - // TODO: log or report that we needed to ReadValue here BUG 1762 - reachableV = lvs.ReadValue(targetHash) - entry = lvs.check(targetHash) - } - if reachableV == nil { - d.Chk.Fail("Attempted to write Value containing Ref to non-existent object.", "%s\n, contains ref %s, which points to a non-existent Value.", v.Hash(), reachable.TargetHash()) - } - } - if hint := entry.Hint(); !hint.IsEmpty() { - hints[hint] = struct{}{} - } - - targetType := getTargetType(reachable) - if entry.typeOf().TargetKind() == ValueKind && targetType.TargetKind() != ValueKind { - // We've seen targetHash before, but only in a Ref, and reachable has a more specific type than that. Deref reachable to check the real type on the chunk it points to, and cache the result if everything checks out. - if reachableV == nil { - reachableV = lvs.ReadValue(targetHash) - } - entry = hintedChunk{TypeOf(reachableV), entry.Hint()} - lvs.set(targetHash, entry, false) - } - // At this point, entry should have the most specific type info possible. Unless it matches targetType, or targetType is 'Value', bail. - if !IsSubtype(targetType, entry.typeOf()) { - // TODO: This should really be! - // if !(targetType.TargetKind() == ValueKind || entry.Type().Equals(targetType)) { - // https://github.com/attic-labs/noms/issues/3325 - - d.Panic("Value to write contains ref %s, which points to a value of type %s which is not a subtype of %s", reachable.TargetHash(), entry.typeOf().Describe(), targetType.Describe()) - } - } - v.WalkRefs(collectHints) - return hints -} - func getTargetType(refBase Ref) *Type { refType := TypeOf(refBase) d.PanicIfFalse(RefKind == refType.TargetKind()) return refType.Desc.(CompoundDesc).ElemTypes[0] } - -type hintedChunk struct { - t *Type - hint hash.Hash -} - -func (h hintedChunk) Present() bool { - return true -} - -func (h hintedChunk) Hint() (r hash.Hash) { - return h.hint -} - -func (h hintedChunk) typeOf() *Type { - return h.t -} - -type presentChunk Type - -func (p *presentChunk) Present() bool { - return true -} - -func (p *presentChunk) Hint() (h hash.Hash) { - return -} - -func (p *presentChunk) typeOf() *Type { - return (*Type)(p) -} - -type absentChunk struct{} - -func (a absentChunk) Present() bool { - return false -} - -func (a absentChunk) Hint() (r hash.Hash) { - return -} - -func (a absentChunk) typeOf() *Type { - panic("Not reached. Should never call typeOf() on an absentChunk.") -} diff --git a/go/types/value_store_test.go b/go/types/value_store_test.go index 79cd02ffc0..f95011a59e 100644 --- a/go/types/value_store_test.go +++ b/go/types/value_store_test.go @@ -21,7 +21,9 @@ func TestValueReadWriteRead(t *testing.T) { h := vs.WriteValue(s).TargetHash() vs.Flush(h) v := vs.ReadValue(h) // non-nil - assert.True(s.Equals(v)) + if assert.NotNil(v) { + assert.True(s.Equals(v), "%s != %s", EncodedValue(s), EncodedValue(v)) + } } func TestValueReadMany(t *testing.T) { @@ -78,105 +80,6 @@ func TestValueWriteFlush(t *testing.T) { assert.Zero(vs.pendingPutSize) } -func TestCheckChunksInCache(t *testing.T) { - assert := assert.New(t) - cs := chunks.NewTestStore() - cvs := newLocalValueStore(cs) - - b := NewEmptyBlob() - cs.Put(EncodeValue(b, nil)) - cvs.set(b.Hash(), hintedChunk{TypeOf(b), b.Hash()}, false) - - bref := NewRef(b) - assert.NotPanics(func() { cvs.chunkHintsFromCache(bref) }) -} - -func TestCheckChunksInCachePostCommit(t *testing.T) { - assert := assert.New(t) - vs := NewTestValueStore() - - l := NewList() - r := NewRef(l) - i := 0 - for r.Height() == 1 { - l = l.Append(Number(i)) - r = NewRef(l) - i++ - } - - h := vs.WriteValue(l).TargetHash() - // Hints for leaf sequences should be absent prior to Flush... - l.WalkRefs(func(ref Ref) { - assert.True(vs.check(ref.TargetHash()).Hint().IsEmpty()) - }) - vs.Flush(h) - // ...And present afterwards - l.WalkRefs(func(ref Ref) { - assert.True(vs.check(ref.TargetHash()).Hint() == l.Hash()) - }) -} - -func TestCheckChunksNotInCacheAfterReadingNovelValue(t *testing.T) { - assert := assert.New(t) - vs := NewTestValueStore() - - l := NewList() - r := NewRef(l) - i := 0 - for r.Height() == 1 { - l = l.Append(Number(i)) - r = NewRef(l) - i++ - } - - h := vs.WriteValue(l).TargetHash() - // Hints for leaf sequences should be absent prior to ReadValue... - l.WalkRefs(func(ref Ref) { - assert.True(vs.check(ref.TargetHash()).Hint().IsEmpty()) - }) - vs.ReadValue(h) - // ...And remain absent! - l.WalkRefs(func(ref Ref) { - assert.False(vs.check(ref.TargetHash()).Hint() == l.Hash()) - }) -} - -func TestCheckChunksInCacheRefValue(t *testing.T) { - assert := assert.New(t) - cs := chunks.NewTestStore() - cvs := newLocalValueStore(cs) - - l := NewList() - r := NewRef(l) - i := 0 - for r.Height() == 1 { - l = l.Append(Number(i)) - r = NewRef(l) - i++ - } - - r = cvs.WriteValue(l) - rr := cvs.WriteValue(ToRefOfValue(r)) - cvs.Flush(rr.TargetHash()) - - cvs2 := newLocalValueStore(cs) - rv := cvs2.ReadValue(rr.TargetHash()).(Ref) - assert.True(ValueType.Equals(getTargetType(rv))) - assert.NotPanics(func() { cvs.chunkHintsFromCache(r) }) -} - -func TestCheckChunksNotInCache(t *testing.T) { - assert := assert.New(t) - cs := chunks.NewTestStore() - cvs := newLocalValueStore(cs) - - b := NewEmptyBlob() - cs.Put(EncodeValue(b, nil)) - - bref := NewRef(b) - assert.Panics(func() { cvs.chunkHintsFromCache(bref) }) -} - type checkingBatchStore struct { BatchStore a *assert.Assertions @@ -189,11 +92,12 @@ func (cbs *checkingBatchStore) expect(rs ...Ref) { } } -func (cbs *checkingBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) { +func (cbs *checkingBatchStore) SchedulePut(c chunks.Chunk) { if cbs.a.NotZero(len(cbs.expectedOrder)) { cbs.a.Equal(cbs.expectedOrder[0], c.Hash()) cbs.expectedOrder = cbs.expectedOrder[1:] } + cbs.BatchStore.SchedulePut(c) } func (cbs *checkingBatchStore) Flush() { @@ -253,92 +157,6 @@ func TestFlushOverSize(t *testing.T) { vs.Flush(l.Hash()) } -func TestEnsureChunksInCache(t *testing.T) { - assert := assert.New(t) - cs := chunks.NewTestStore() - cvs := newLocalValueStore(cs) - - b := NewEmptyBlob() - s := String("oy") - bref := NewRef(b) - sref := NewRef(s) - l := NewList(bref, sref) - - cs.Put(EncodeValue(b, nil)) - cs.Put(EncodeValue(s, nil)) - cs.Put(EncodeValue(l, nil)) - - assert.NotPanics(func() { cvs.ensureChunksInCache(bref) }) - assert.NotPanics(func() { cvs.ensureChunksInCache(l) }) -} - -func TestEnsureChunksFails(t *testing.T) { - assert := assert.New(t) - cs := chunks.NewTestStore() - cvs := newLocalValueStore(cs) - - b := NewEmptyBlob() - bref := NewRef(b) - assert.Panics(func() { cvs.ensureChunksInCache(bref) }) - - s := String("oy") - cs.Put(EncodeValue(b, nil)) - cs.Put(EncodeValue(s, nil)) - - badRef := constructRef(MakeRefType(MakePrimitiveType(BoolKind)), s.Hash(), 1) - l := NewList(bref, badRef) - - cs.Put(EncodeValue(l, nil)) - assert.Panics(func() { cvs.ensureChunksInCache(l) }) -} - -func TestCacheOnReadValue(t *testing.T) { - assert := assert.New(t) - cs := chunks.NewTestStore() - cvs := newLocalValueStore(cs) - - b := NewEmptyBlob() - bref := cvs.WriteValue(b) - r := cvs.WriteValue(bref) - cvs.Flush(r.TargetHash()) - - cvs2 := newLocalValueStore(cs) - v := cvs2.ReadValue(r.TargetHash()) - assert.True(bref.Equals(v)) - assert.True(cvs2.isPresent(b.Hash())) - assert.True(cvs2.isPresent(bref.Hash())) -} - -func TestHintsOnCache(t *testing.T) { - assert := assert.New(t) - cvs := newLocalValueStore(chunks.NewTestStore()) - - cr1 := cvs.WriteValue(Number(1)) - cr2 := cvs.WriteValue(Number(2)) - s1 := NewStruct("", StructData{ - "a": cr1, - "b": cr2, - }) - r := cvs.WriteValue(s1) - cvs.Flush(r.TargetHash()) - v := cvs.ReadValue(r.TargetHash()) - - if assert.True(v.Equals(s1)) { - cr3 := cvs.WriteValue(Number(3)) - s2 := NewStruct("", StructData{ - "a": cr1, - "b": cr2, - "c": cr3, - }) - - hints := cvs.chunkHintsFromCache(s2) - if assert.Len(hints, 1) { - _, present := hints[r.TargetHash()] - assert.True(present) - } - } -} - func TestPanicOnReadBadVersion(t *testing.T) { cvs := newLocalValueStore(&badVersionStore{chunks.NewTestStore()}) assert.Panics(t, func() { cvs.ReadValue(hash.Hash{}) }) diff --git a/samples/go/hr/test-data/manifest b/samples/go/hr/test-data/manifest index 23a7ce8764..29bfd3529a 100644 --- a/samples/go/hr/test-data/manifest +++ b/samples/go/hr/test-data/manifest @@ -1 +1 @@ -2:7.4:rio54mdafn882tk79i32ggkg8q118fdb:o9iif7sno01q3qef8qubatd1kmlsmj9t:2:ogs2jpuuc89u7hpliadrjfqcpc749sel:2 \ No newline at end of file +2:7.5:rio54mdafn882tk79i32ggkg8q118fdb:ogs2jpuuc89u7hpliadrjfqcpc749sel:2:o9iif7sno01q3qef8qubatd1kmlsmj9t:2 \ No newline at end of file