mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-25 03:34:05 -05:00
LocalDatabase vends separate validating BatchStore (#2624)
This patch modifies LocalDatabase so that it no longer swaps out its embedded ValueStore during Pull(). The reason it was doing this is that Pull() injects chunks directly into a Database, without doing any work on its own to ensure correctness. For LocalDatabase, WriteValue() performs de-facto validation as it goes, so it does not need this additional validation in the general case. To address the former wtithout impacting the latter, we were making LocalDatabase swap out its ValueStore() during Pull(), replacing it with one that performs validation. This led to inconsistencies, seen in issue #2598. Collections read from the DB _before_ this swap could have weird behavior if they were modified and written after this swap. The new code just lazily creates a BatchStore for use during Pull(). Fixes #2598
This commit is contained in:
@@ -5,6 +5,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
@@ -45,17 +46,19 @@ func (s *nomsSyncTestSuite) TestSyncValidation() {
|
||||
}
|
||||
|
||||
func (s *nomsSyncTestSuite) TestSync() {
|
||||
ldb2dir := path.Join(s.TempDir, "ldb2")
|
||||
defer s.NoError(os.RemoveAll(ldb2dir))
|
||||
|
||||
sourceDB := datas.NewDatabase(chunks.NewLevelDBStore(s.LdbDir, "", 1, false))
|
||||
source1 := sourceDB.GetDataset("src")
|
||||
source1, err := sourceDB.CommitValue(source1, types.Number(42))
|
||||
s.NoError(err)
|
||||
source2, err := sourceDB.CommitValue(source1, types.Number(43))
|
||||
source1HeadRef := source1.Head().Hash() // Remember first head, so we can sync to it.
|
||||
source1, err = sourceDB.CommitValue(source1, types.Number(43))
|
||||
s.NoError(err)
|
||||
source1HeadRef := source1.Head().Hash()
|
||||
source2.Database().Close() // Close Database backing both Datasets
|
||||
sourceDB.Close()
|
||||
|
||||
sourceSpec := spec.CreateValueSpecString("ldb", s.LdbDir, "#"+source1HeadRef.String())
|
||||
ldb2dir := path.Join(s.TempDir, "ldb2")
|
||||
sinkDatasetSpec := spec.CreateValueSpecString("ldb", ldb2dir, "dest")
|
||||
sout, _ := s.MustRun(main, []string{"sync", sourceSpec, sinkDatasetSpec})
|
||||
|
||||
@@ -78,6 +81,49 @@ func (s *nomsSyncTestSuite) TestSync() {
|
||||
s.Regexp("up to date", sout)
|
||||
}
|
||||
|
||||
func (s *nomsSyncTestSuite) TestSync_Issue2598() {
|
||||
ldb2dir := path.Join(s.TempDir, "ldb2")
|
||||
defer s.NoError(os.RemoveAll(ldb2dir))
|
||||
|
||||
sourceDB := datas.NewDatabase(chunks.NewLevelDBStore(s.LdbDir, "", 1, false))
|
||||
// Create dataset "src1", which has a lineage of two commits.
|
||||
source1 := sourceDB.GetDataset("src1")
|
||||
source1, err := sourceDB.CommitValue(source1, types.Number(42))
|
||||
s.NoError(err)
|
||||
source1, err = sourceDB.CommitValue(source1, types.Number(43))
|
||||
s.NoError(err)
|
||||
|
||||
// Create dataset "src2", with a lineage of one commit.
|
||||
source2 := sourceDB.GetDataset("src2")
|
||||
source2, err = sourceDB.CommitValue(source2, types.Number(1))
|
||||
s.NoError(err)
|
||||
|
||||
sourceDB.Close() // Close Database backing both Datasets
|
||||
|
||||
// Sync over "src1"
|
||||
sourceDataset := spec.CreateValueSpecString("ldb", s.LdbDir, "src1")
|
||||
sinkDatasetSpec := spec.CreateValueSpecString("ldb", ldb2dir, "dest")
|
||||
sout, _ := s.MustRun(main, []string{"sync", sourceDataset, sinkDatasetSpec})
|
||||
|
||||
db := datas.NewDatabase(chunks.NewLevelDBStore(ldb2dir, "", 1, false))
|
||||
dest := db.GetDataset("dest")
|
||||
s.True(types.Number(43).Equals(dest.HeadValue()))
|
||||
db.Close()
|
||||
|
||||
// Now, try syncing a second dataset. This crashed in issue #2598
|
||||
sourceDataset2 := spec.CreateValueSpecString("ldb", s.LdbDir, "src2")
|
||||
sinkDatasetSpec2 := spec.CreateValueSpecString("ldb", ldb2dir, "dest2")
|
||||
sout, _ = s.MustRun(main, []string{"sync", sourceDataset2, sinkDatasetSpec2})
|
||||
|
||||
db = datas.NewDatabase(chunks.NewLevelDBStore(ldb2dir, "", 1, false))
|
||||
dest = db.GetDataset("dest2")
|
||||
s.True(types.Number(1).Equals(dest.HeadValue()))
|
||||
db.Close()
|
||||
|
||||
sout, _ = s.MustRun(main, []string{"sync", sourceDataset, sinkDatasetSpec})
|
||||
s.Regexp("up to date", sout)
|
||||
}
|
||||
|
||||
func (s *nomsSyncTestSuite) TestRewind() {
|
||||
var err error
|
||||
sourceDB := datas.NewDatabase(chunks.NewLevelDBStore(s.LdbDir, "", 1, false))
|
||||
|
||||
@@ -94,10 +94,6 @@ func makeHTTPClient(requestLimit int) *http.Client {
|
||||
return &http.Client{Transport: &t}
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) IsValidating() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) Flush() {
|
||||
bhcs.flushChan <- struct{}{}
|
||||
bhcs.requestWg.Wait()
|
||||
|
||||
@@ -35,10 +35,6 @@ func newLocalBatchStore(cs chunks.ChunkStore) *localBatchStore {
|
||||
}
|
||||
}
|
||||
|
||||
func (lbs *localBatchStore) IsValidating() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Get checks the internal Chunk cache, proxying to the backing ChunkStore if not present.
|
||||
func (lbs *localBatchStore) Get(h hash.Hash) chunks.Chunk {
|
||||
lbs.once.Do(lbs.expectVersion)
|
||||
@@ -122,9 +118,13 @@ func (lbs *localBatchStore) Flush() {
|
||||
lbs.hints = types.Hints{}
|
||||
}
|
||||
|
||||
// Close closes the underlying ChunkStore
|
||||
func (lbs *localBatchStore) Close() error {
|
||||
// FlushAndDestroyWithoutClose flushes lbs and destroys its cache of unwritten chunks. It's needed because LocalDatabase wraps a localBatchStore around a ChunkStore that's used by a separate BatchStore, so calling Close() on one is semantically incorrect while it still wants to use the other.
|
||||
func (lbs *localBatchStore) FlushAndDestroyWithoutClose() {
|
||||
lbs.Flush()
|
||||
lbs.unwrittenPuts.Destroy()
|
||||
return lbs.cs.Close()
|
||||
}
|
||||
|
||||
// Close is supposed to close the underlying ChunkStore, but the only place localBatchStore is currently used wants to keep the underlying ChunkStore open after it's done with lbs. Hence, the above method and the panic() here.
|
||||
func (lbs *localBatchStore) Close() error {
|
||||
panic("Unreached")
|
||||
}
|
||||
|
||||
+22
-17
@@ -6,14 +6,14 @@ package datas
|
||||
|
||||
import (
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
)
|
||||
|
||||
// Database provides versioned storage for noms values. Each Database instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new Database, representing a new moment in history.
|
||||
type LocalDatabase struct {
|
||||
databaseCommon
|
||||
cs chunks.ChunkStore
|
||||
cs chunks.ChunkStore
|
||||
vbs *localBatchStore
|
||||
}
|
||||
|
||||
func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase {
|
||||
@@ -21,6 +21,7 @@ func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase {
|
||||
return &LocalDatabase{
|
||||
newDatabaseCommon(newCachingChunkHaver(cs), types.NewValueStore(bs), bs),
|
||||
cs,
|
||||
nil,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,8 +30,10 @@ func (ldb *LocalDatabase) GetDataset(datasetID string) Dataset {
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) Commit(ds Dataset, v types.Value, opts CommitOptions) (Dataset, error) {
|
||||
err := ldb.doCommit(ds.ID(), buildNewCommit(ds, v, opts))
|
||||
return ldb.GetDataset(ds.ID()), err
|
||||
return ldb.doHeadUpdate(
|
||||
ds,
|
||||
func(ds Dataset) error { return ldb.doCommit(ds.ID(), buildNewCommit(ds, v, opts)) },
|
||||
)
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) CommitValue(ds Dataset, v types.Value) (Dataset, error) {
|
||||
@@ -38,27 +41,29 @@ func (ldb *LocalDatabase) CommitValue(ds Dataset, v types.Value) (Dataset, error
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) Delete(ds Dataset) (Dataset, error) {
|
||||
err := ldb.doDelete(ds.ID())
|
||||
return ldb.GetDataset(ds.ID()), err
|
||||
return ldb.doHeadUpdate(ds, func(ds Dataset) error { return ldb.doDelete(ds.ID()) })
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) SetHead(ds Dataset, newHeadRef types.Ref) (Dataset, error) {
|
||||
err := ldb.doSetHead(ds, newHeadRef)
|
||||
return ldb.GetDataset(ds.ID()), err
|
||||
return ldb.doHeadUpdate(ds, func(ds Dataset) error { return ldb.doSetHead(ds, newHeadRef) })
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) FastForward(ds Dataset, newHeadRef types.Ref) (Dataset, error) {
|
||||
err := ldb.doFastForward(ds, newHeadRef)
|
||||
return ldb.doHeadUpdate(ds, func(ds Dataset) error { return ldb.doFastForward(ds, newHeadRef) })
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) doHeadUpdate(ds Dataset, updateFunc func(ds Dataset) error) (Dataset, error) {
|
||||
if ldb.vbs != nil {
|
||||
ldb.vbs.FlushAndDestroyWithoutClose()
|
||||
ldb.vbs = nil
|
||||
}
|
||||
err := updateFunc(ds)
|
||||
return ldb.GetDataset(ds.ID()), err
|
||||
}
|
||||
|
||||
func (ldb *LocalDatabase) validatingBatchStore() (bs types.BatchStore) {
|
||||
bs = ldb.ValueStore.BatchStore()
|
||||
if !bs.IsValidating() {
|
||||
bs = newLocalBatchStore(ldb.cs)
|
||||
ldb.ValueStore = types.NewValueStore(bs)
|
||||
ldb.rt = bs
|
||||
func (ldb *LocalDatabase) validatingBatchStore() types.BatchStore {
|
||||
if ldb.vbs == nil {
|
||||
ldb.vbs = newLocalBatchStore(ldb.cs)
|
||||
}
|
||||
d.PanicIfFalse(bs.IsValidating())
|
||||
return bs
|
||||
return ldb.vbs
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
package datas
|
||||
|
||||
import (
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/julienschmidt/httprouter"
|
||||
)
|
||||
@@ -22,7 +21,6 @@ func NewRemoteDatabase(baseURL, auth string) *RemoteDatabaseClient {
|
||||
|
||||
func (rdb *RemoteDatabaseClient) validatingBatchStore() (bs types.BatchStore) {
|
||||
bs = rdb.ValueStore.BatchStore()
|
||||
d.PanicIfFalse(bs.IsValidating())
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -16,9 +16,6 @@ import (
|
||||
|
||||
// BatchStore provides an interface similar to chunks.ChunkStore, but batch-oriented. Instead of Put(), it provides SchedulePut(), which enqueues a Chunk to be sent at a possibly later time.
|
||||
type BatchStore interface {
|
||||
// IsValidating indicates whether this implementation can internally enforce chunk validity & completeness. If a BatchStore supports this, it must also support "staging" of writes -- that is, allowing chunks to be written which reference chunks which have yet to be written.
|
||||
IsValidating() bool
|
||||
|
||||
// Get returns from the store the Value Chunk by h. If h is absent from the store, chunks.EmptyChunk is returned.
|
||||
Get(h hash.Hash) chunks.Chunk
|
||||
|
||||
@@ -49,10 +46,6 @@ func NewBatchStoreAdaptor(cs chunks.ChunkStore) BatchStore {
|
||||
return &BatchStoreAdaptor{cs: cs}
|
||||
}
|
||||
|
||||
func (bsa *BatchStoreAdaptor) IsValidating() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Get simply proxies to the backing ChunkStore
|
||||
func (bsa *BatchStoreAdaptor) Get(h hash.Hash) chunks.Chunk {
|
||||
bsa.once.Do(bsa.expectVersion)
|
||||
|
||||
Reference in New Issue
Block a user