new store panics

This commit is contained in:
Brian Hendriks
2019-07-01 16:05:19 -07:00
parent 3694a68a49
commit 3261ede245
18 changed files with 176 additions and 100 deletions
+1 -1
View File
@@ -127,7 +127,7 @@ func (fact AWSFactory) newChunkStore(ctx context.Context, urlObj *url.URL, param
}
sess := session.Must(session.NewSessionWithOptions(opts))
return nbs.NewAWSStore(ctx, parts[0], dbName, parts[1], s3.New(sess), dynamodb.New(sess), defaultMemTableSize), nil
return nbs.NewAWSStore(ctx, parts[0], dbName, parts[1], s3.New(sess), dynamodb.New(sess), defaultMemTableSize)
}
func validatePath(path string) (string, error) {
+6 -8
View File
@@ -3,7 +3,6 @@ package dbfactory
import (
"context"
"github.com/liquidata-inc/ld/dolt/go/libraries/utils/filesys"
"github.com/liquidata-inc/ld/dolt/go/libraries/utils/pantoerr"
"github.com/liquidata-inc/ld/dolt/go/store/datas"
"github.com/liquidata-inc/ld/dolt/go/store/nbs"
"net/url"
@@ -38,13 +37,12 @@ func (fact FileFactory) CreateDB(ctx context.Context, urlObj *url.URL, params ma
return nil, filesys.ErrIsFile
}
var db datas.Database
err = pantoerr.PanicToError("failed to create database", func() error {
nbs := nbs.NewLocalStore(ctx, path, defaultMemTableSize)
db = datas.NewDatabase(nbs)
st, err := nbs.NewLocalStore(ctx, path, defaultMemTableSize)
return nil
})
if err != nil {
return nil, err
}
return datas.NewDatabase(st), nil
return db, err
}
+10 -10
View File
@@ -3,7 +3,6 @@ package dbfactory
import (
"cloud.google.com/go/storage"
"context"
"github.com/liquidata-inc/ld/dolt/go/libraries/utils/pantoerr"
"github.com/liquidata-inc/ld/dolt/go/store/datas"
"github.com/liquidata-inc/ld/dolt/go/store/nbs"
"net/url"
@@ -16,18 +15,19 @@ type GSFactory struct {
// CreateDB creates an GCS backed database
func (fact GSFactory) CreateDB(ctx context.Context, urlObj *url.URL, params map[string]string) (datas.Database, error) {
var db datas.Database
err := pantoerr.PanicToError("failed to create database", func() error {
gcs, err := storage.NewClient(ctx)
gcs, err := storage.NewClient(ctx)
if err != nil {
return err
}
if err != nil {
return nil, err
}
gcsStore := nbs.NewGCSStore(ctx, urlObj.Host, urlObj.Path, gcs, defaultMemTableSize)
db = datas.NewDatabase(gcsStore)
gcsStore, err := nbs.NewGCSStore(ctx, urlObj.Host, urlObj.Path, gcs, defaultMemTableSize)
return nil
})
if err != nil {
return nil, err
}
db = datas.NewDatabase(gcsStore)
return db, err
}
+5 -3
View File
@@ -27,7 +27,8 @@ type nomsDsTestSuite struct {
func (s *nomsDsTestSuite) TestEmptyNomsDs() {
dir := s.DBDir
cs := nbs.NewLocalStore(context.Background(), dir, clienttest.DefaultMemTableSize)
cs, err := nbs.NewLocalStore(context.Background(), dir, clienttest.DefaultMemTableSize)
s.NoError(err)
ds := datas.NewDatabase(cs)
ds.Close()
@@ -40,12 +41,13 @@ func (s *nomsDsTestSuite) TestEmptyNomsDs() {
func (s *nomsDsTestSuite) TestNomsDs() {
dir := s.DBDir
cs := nbs.NewLocalStore(context.Background(), dir, clienttest.DefaultMemTableSize)
cs, err := nbs.NewLocalStore(context.Background(), dir, clienttest.DefaultMemTableSize)
s.NoError(err)
db := datas.NewDatabase(cs)
id := "testdataset"
set := db.GetDataset(context.Background(), id)
set, err := db.CommitValue(context.Background(), set, types.String("Commit Value"))
set, err = db.CommitValue(context.Background(), set, types.String("Commit Value"))
s.NoError(err)
id2 := "testdataset2"
+32 -15
View File
@@ -26,9 +26,11 @@ type nomsSyncTestSuite struct {
}
func (s *nomsSyncTestSuite) TestSyncValidation() {
sourceDB := datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize))
cs, err := nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize)
s.NoError(err)
sourceDB := datas.NewDatabase(cs)
source1 := sourceDB.GetDataset(context.Background(), "src")
source1, err := sourceDB.CommitValue(context.Background(), source1, types.Float(42))
source1, err = sourceDB.CommitValue(context.Background(), source1, types.Float(42))
s.NoError(err)
source1HeadRef := source1.Head().Hash()
source1.Database().Close()
@@ -47,9 +49,11 @@ func (s *nomsSyncTestSuite) TestSyncValidation() {
func (s *nomsSyncTestSuite) TestSync() {
defer s.NoError(os.RemoveAll(s.DBDir2))
sourceDB := datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize))
cs, err := nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize)
s.NoError(err)
sourceDB := datas.NewDatabase(cs)
source1 := sourceDB.GetDataset(context.Background(), "src")
source1, err := sourceDB.CommitValue(context.Background(), source1, types.Float(42))
source1, err = sourceDB.CommitValue(context.Background(), source1, types.Float(42))
s.NoError(err)
source1HeadRef := source1.Head().Hash() // Remember first head, so we can sync to it.
source1, err = sourceDB.CommitValue(context.Background(), source1, types.Float(43))
@@ -62,7 +66,9 @@ func (s *nomsSyncTestSuite) TestSync() {
sout, _ := s.MustRun(main, []string{"sync", sourceSpec, sinkDatasetSpec})
s.Regexp("Synced", sout)
db := datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize))
cs, err = nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize)
s.NoError(err)
db := datas.NewDatabase(cs)
dest := db.GetDataset(context.Background(), "dest")
s.True(types.Float(42).Equals(dest.HeadValue()))
db.Close()
@@ -72,7 +78,9 @@ func (s *nomsSyncTestSuite) TestSync() {
sout, _ = s.MustRun(main, []string{"sync", sourceDataset, sinkDatasetSpec})
s.Regexp("Synced", sout)
db = datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize))
cs, err = nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize)
s.NoError(err)
db = datas.NewDatabase(cs)
dest = db.GetDataset(context.Background(), "dest")
s.True(types.Float(43).Equals(dest.HeadValue()))
db.Close()
@@ -86,7 +94,9 @@ func (s *nomsSyncTestSuite) TestSync() {
sout, _ = s.MustRun(main, []string{"sync", sourceDataset, sinkDatasetSpec})
s.Regexp("Created", sout)
db = datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize))
cs, err = nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize)
s.NoError(err)
db = datas.NewDatabase(cs)
dest = db.GetDataset(context.Background(), "dest2")
s.True(types.Float(43).Equals(dest.HeadValue()))
db.Close()
@@ -95,10 +105,12 @@ func (s *nomsSyncTestSuite) TestSync() {
func (s *nomsSyncTestSuite) TestSync_Issue2598() {
defer s.NoError(os.RemoveAll(s.DBDir2))
sourceDB := datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize))
cs, err := nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize)
s.NoError(err)
sourceDB := datas.NewDatabase(cs)
// Create dataset "src1", which has a lineage of two commits.
source1 := sourceDB.GetDataset(context.Background(), "src1")
source1, err := sourceDB.CommitValue(context.Background(), source1, types.Float(42))
source1, err = sourceDB.CommitValue(context.Background(), source1, types.Float(42))
s.NoError(err)
source1, err = sourceDB.CommitValue(context.Background(), source1, types.Float(43))
s.NoError(err)
@@ -114,8 +126,8 @@ func (s *nomsSyncTestSuite) TestSync_Issue2598() {
sourceDataset := spec.CreateValueSpecString("nbs", s.DBDir, "src1")
sinkDatasetSpec := spec.CreateValueSpecString("nbs", s.DBDir2, "dest")
sout, _ := s.MustRun(main, []string{"sync", sourceDataset, sinkDatasetSpec})
db := datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize))
cs, err = nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize)
db := datas.NewDatabase(cs)
dest := db.GetDataset(context.Background(), "dest")
s.True(types.Float(43).Equals(dest.HeadValue()))
db.Close()
@@ -124,8 +136,9 @@ func (s *nomsSyncTestSuite) TestSync_Issue2598() {
sourceDataset2 := spec.CreateValueSpecString("nbs", s.DBDir, "src2")
sinkDatasetSpec2 := spec.CreateValueSpecString("nbs", s.DBDir2, "dest2")
sout, _ = s.MustRun(main, []string{"sync", sourceDataset2, sinkDatasetSpec2})
db = datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize))
cs, err = nbs.NewLocalStore(context.Background(), s.DBDir2, clienttest.DefaultMemTableSize)
s.NoError(err)
db = datas.NewDatabase(cs)
dest = db.GetDataset(context.Background(), "dest2")
s.True(types.Float(1).Equals(dest.HeadValue()))
db.Close()
@@ -136,7 +149,9 @@ func (s *nomsSyncTestSuite) TestSync_Issue2598() {
func (s *nomsSyncTestSuite) TestRewind() {
var err error
sourceDB := datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize))
cs, err := nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize)
s.NoError(err)
sourceDB := datas.NewDatabase(cs)
src := sourceDB.GetDataset(context.Background(), "foo")
src, err = sourceDB.CommitValue(context.Background(), src, types.Float(42))
s.NoError(err)
@@ -149,7 +164,9 @@ func (s *nomsSyncTestSuite) TestRewind() {
sinkDatasetSpec := spec.CreateValueSpecString("nbs", s.DBDir, "foo")
s.MustRun(main, []string{"sync", sourceSpec, sinkDatasetSpec})
db := datas.NewDatabase(nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize))
cs, err = nbs.NewLocalStore(context.Background(), s.DBDir, clienttest.DefaultMemTableSize)
s.NoError(err)
db := datas.NewDatabase(cs)
dest := db.GetDataset(context.Background(), "foo")
s.True(types.Float(42).Equals(dest.HeadValue()))
db.Close()
@@ -15,10 +15,11 @@ import (
"github.com/stretchr/testify/assert"
)
type storeOpenFn func() chunks.ChunkStore
type storeOpenFn func() (chunks.ChunkStore, error)
func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.TestingT) bool {
store := refreshStore()
store, err := refreshStore()
assert.NoError(t, err)
writeToEmptyStore(store, src, t)
assert.NoError(t, store.Close())
return true
@@ -52,7 +53,8 @@ func goReadChunks(src *dataSource) <-chan *chunks.Chunk {
}
func benchmarkNoRefreshWrite(openStore storeOpenFn, src *dataSource, t assert.TestingT) {
store := openStore()
store, err := openStore()
assert.NoError(t, err)
chunx := goReadChunks(src)
for c := range chunx {
err := store.Put(context.Background(), *c)
@@ -68,7 +70,8 @@ func verifyChunk(h hash.Hash, c chunks.Chunk) {
}
func benchmarkRead(openStore storeOpenFn, hashes hashSlice, src *dataSource, t assert.TestingT) {
store := openStore()
store, err := openStore()
assert.NoError(t, err)
for _, h := range hashes {
c, err := store.Get(context.Background(), h)
assert.NoError(t, err)
@@ -97,7 +100,9 @@ func verifyChunks(hashes hash.HashSlice, foundChunks chan *chunks.Chunk) {
}
func benchmarkReadMany(openStore storeOpenFn, hashes hashSlice, src *dataSource, batchSize, concurrency int, t assert.TestingT) {
store := openStore()
store, err := openStore()
assert.NoError(t, err)
batch := make(hash.HashSlice, 0, batchSize)
wg := sync.WaitGroup{}
@@ -129,9 +134,7 @@ func benchmarkReadMany(openStore storeOpenFn, hashes hashSlice, src *dataSource,
if len(batch) > 0 {
chunkChan := make(chan *chunks.Chunk, len(batch))
err := store.GetMany(context.Background(), batch.HashSet(), chunkChan)
// TODO: fix panics
d.PanicIfError(err)
assert.NoError(t, err)
close(chunkChan)
@@ -145,7 +148,8 @@ func benchmarkReadMany(openStore storeOpenFn, hashes hashSlice, src *dataSource,
func ensureNovelWrite(wrote bool, openStore storeOpenFn, src *dataSource, t assert.TestingT) bool {
if !wrote {
store := openStore()
store, err := openStore()
assert.NoError(t, err)
defer store.Close()
writeToEmptyStore(store, src, t)
}
+2 -2
View File
@@ -20,8 +20,8 @@ type fileBlockStore struct {
w io.WriteCloser
}
func newFileBlockStore(w io.WriteCloser) chunks.ChunkStore {
return fileBlockStore{bufio.NewWriterSize(w, humanize.MiByte), w}
func newFileBlockStore(w io.WriteCloser) (chunks.ChunkStore, error) {
return fileBlockStore{bufio.NewWriterSize(w, humanize.MiByte), w}, nil
}
func (fb fileBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
+8 -8
View File
@@ -76,19 +76,19 @@ func main() {
open := newNullBlockStore
wrote := false
var writeDB func()
var refresh func() chunks.ChunkStore
var refresh func() (chunks.ChunkStore, error)
if *toNBS != "" || *toFile != "" || *toAWS != "" {
var reset func()
if *toNBS != "" {
dir := makeTempDir(*toNBS, pb)
defer os.RemoveAll(dir)
open = func() chunks.ChunkStore { return nbs.NewLocalStore(context.Background(), dir, bufSize) }
open = func() (chunks.ChunkStore, error) { return nbs.NewLocalStore(context.Background(), dir, bufSize) }
reset = func() { os.RemoveAll(dir); os.MkdirAll(dir, 0777) }
} else if *toFile != "" {
dir := makeTempDir(*toFile, pb)
defer os.RemoveAll(dir)
open = func() chunks.ChunkStore {
open = func() (chunks.ChunkStore, error) {
f, err := ioutil.TempFile(dir, "")
d.Chk.NoError(err)
return newFileBlockStore(f)
@@ -97,7 +97,7 @@ func main() {
} else if *toAWS != "" {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2")))
open = func() chunks.ChunkStore {
open = func() (chunks.ChunkStore, error) {
return nbs.NewAWSStore(context.Background(), dynamoTable, *toAWS, s3Bucket, s3.New(sess), dynamodb.New(sess), bufSize)
}
reset = func() {
@@ -113,21 +113,21 @@ func main() {
}
writeDB = func() { wrote = ensureNovelWrite(wrote, open, src, pb) }
refresh = func() chunks.ChunkStore {
refresh = func() (chunks.ChunkStore, error) {
reset()
return open()
}
} else {
if *useNBS != "" {
open = func() chunks.ChunkStore { return nbs.NewLocalStore(context.Background(), *useNBS, bufSize) }
open = func() (chunks.ChunkStore, error) { return nbs.NewLocalStore(context.Background(), *useNBS, bufSize) }
} else if *useAWS != "" {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2")))
open = func() chunks.ChunkStore {
open = func() (chunks.ChunkStore, error) {
return nbs.NewAWSStore(context.Background(), dynamoTable, *useAWS, s3Bucket, s3.New(sess), dynamodb.New(sess), bufSize)
}
}
writeDB = func() {}
refresh = func() chunks.ChunkStore { panic("WriteNovel unsupported with --useLDB and --useNBS") }
refresh = func() (chunks.ChunkStore, error) { panic("WriteNovel unsupported with --useLDB and --useNBS") }
}
benchmarks := []struct {
+2 -2
View File
@@ -13,8 +13,8 @@ import (
type nullBlockStore struct {
}
func newNullBlockStore() chunks.ChunkStore {
return nullBlockStore{}
func newNullBlockStore() (chunks.ChunkStore, error) {
return nullBlockStore{}, nil
}
func (nb nullBlockStore) Get(ctx context.Context, h hash.Hash) (chunks.Chunk, error) {
+20 -10
View File
@@ -40,7 +40,8 @@ func (suite *BlockStoreSuite) SetupTest() {
var err error
suite.dir, err = ioutil.TempDir("", "")
suite.NoError(err)
suite.store = NewLocalStore(context.Background(), suite.dir, testMemTableSize)
suite.store, err = NewLocalStore(context.Background(), suite.dir, testMemTableSize)
suite.NoError(err)
suite.putCountFn = func() int {
return int(suite.store.putCount)
}
@@ -57,14 +58,17 @@ func (suite *BlockStoreSuite) TearDownTest() {
func (suite *BlockStoreSuite) TestChunkStoreMissingDir() {
newDir := filepath.Join(suite.dir, "does-not-exist")
suite.Panics(func() { NewLocalStore(context.Background(), newDir, testMemTableSize) })
_, err := NewLocalStore(context.Background(), newDir, testMemTableSize)
suite.Error(err)
}
func (suite *BlockStoreSuite) TestChunkStoreNotDir() {
existingFile := filepath.Join(suite.dir, "path-exists-but-is-a-file")
_, err := os.Create(existingFile)
suite.NoError(err)
suite.Panics(func() { NewLocalStore(context.Background(), existingFile, testMemTableSize) })
_, err = NewLocalStore(context.Background(), existingFile, testMemTableSize)
suite.Error(err)
}
func (suite *BlockStoreSuite) TestChunkStorePut() {
@@ -267,7 +271,8 @@ func (suite *BlockStoreSuite) TestChunkStoreFlushOptimisticLockFail() {
root, err := suite.store.Root(context.Background())
suite.NoError(err)
interloper := NewLocalStore(context.Background(), suite.dir, testMemTableSize)
interloper, err := NewLocalStore(context.Background(), suite.dir, testMemTableSize)
suite.NoError(err)
err = interloper.Put(context.Background(), c1)
suite.NoError(err)
h, err := interloper.Root(context.Background())
@@ -312,8 +317,9 @@ func (suite *BlockStoreSuite) TestChunkStoreRebaseOnNoOpFlush() {
input1 := []byte("abc")
c1 := chunks.NewChunk(input1)
interloper := NewLocalStore(context.Background(), suite.dir, testMemTableSize)
err := interloper.Put(context.Background(), c1)
interloper, err := NewLocalStore(context.Background(), suite.dir, testMemTableSize)
suite.NoError(err)
err = interloper.Put(context.Background(), c1)
suite.NoError(err)
root, err := interloper.Root(context.Background())
suite.NoError(err)
@@ -347,7 +353,8 @@ func (suite *BlockStoreSuite) TestChunkStorePutWithRebase() {
root, err := suite.store.Root(context.Background())
suite.NoError(err)
interloper := NewLocalStore(context.Background(), suite.dir, testMemTableSize)
interloper, err := NewLocalStore(context.Background(), suite.dir, testMemTableSize)
suite.NoError(err)
err = interloper.Put(context.Background(), c1)
suite.NoError(err)
h, err := interloper.Root(context.Background())
@@ -431,7 +438,8 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
p := newFakeTablePersister()
c := &fakeConjoiner{}
smallTableStore := newNomsBlockStore(context.Background(), mm, p, c, testMemTableSize)
smallTableStore, err := newNomsBlockStore(context.Background(), mm, p, c, testMemTableSize)
assert.NoError(t, err)
root, err := smallTableStore.Root(context.Background())
assert.NoError(t, err)
@@ -470,7 +478,8 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
[]cannedConjoin{makeCanned(upstream[:2], upstream[2:], p)},
}
smallTableStore := newNomsBlockStore(context.Background(), makeManifestManager(fm), p, c, testMemTableSize)
smallTableStore, err := newNomsBlockStore(context.Background(), makeManifestManager(fm), p, c, testMemTableSize)
assert.NoError(t, err)
root, err := smallTableStore.Root(context.Background())
assert.NoError(t, err)
@@ -499,7 +508,8 @@ func TestBlockStoreConjoinOnCommit(t *testing.T) {
},
}
smallTableStore := newNomsBlockStore(context.Background(), makeManifestManager(fm), p, c, testMemTableSize)
smallTableStore, err := newNomsBlockStore(context.Background(), makeManifestManager(fm), p, c, testMemTableSize)
assert.NoError(t, err)
root, err := smallTableStore.Root(context.Background())
assert.NoError(t, err)
+13 -5
View File
@@ -18,12 +18,20 @@ const (
defaultCacheMemTableSize uint64 = 1 << 27 // 128MiB
)
func NewCache(ctx context.Context) *NomsBlockCache {
func NewCache(ctx context.Context) (*NomsBlockCache, error) {
dir, err := ioutil.TempDir("", "")
d.PanicIfError(err)
store := NewLocalStore(ctx, dir, defaultCacheMemTableSize)
d.Chk.NoError(err, "opening put cache in %s", dir)
return &NomsBlockCache{store, dir}
if err != nil {
return nil, err
}
store, err := NewLocalStore(ctx, dir, defaultCacheMemTableSize)
if err != nil {
return nil, err
}
return &NomsBlockCache{store, dir}, nil
}
// NomsBlockCache holds Chunks, allowing them to be retrieved by hash or enumerated in hash order.
+9 -2
View File
@@ -7,6 +7,7 @@ package main
import (
"context"
"fmt"
"github.com/liquidata-inc/ld/dolt/go/store/d"
"log"
"os"
"sync"
@@ -49,11 +50,17 @@ func main() {
var store *nbs.NomsBlockStore
if *dir != "" {
store = nbs.NewLocalStore(context.Background(), *dir, memTableSize)
var err error
store, err = nbs.NewLocalStore(context.Background(), *dir, memTableSize)
d.PanicIfError(err)
*dbName = *dir
} else if *table != "" && *bucket != "" && *dbName != "" {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2")))
store = nbs.NewAWSStore(context.Background(), *table, *dbName, *bucket, s3.New(sess), dynamodb.New(sess), memTableSize)
var err error
store, err = nbs.NewAWSStore(context.Background(), *table, *dbName, *bucket, s3.New(sess), dynamodb.New(sess), memTableSize)
d.PanicIfError(err)
} else {
log.Fatalf("Must set either --dir or ALL of --table, --bucket and --db\n")
}
+17 -10
View File
@@ -143,7 +143,8 @@ func TestChunkStoreManifestFirstWriteByOtherProcess(t *testing.T) {
// Simulate another process writing a manifest behind store's back.
newRoot, chunks := interloperWrite(fm, p, []byte("new root"), []byte("hello2"), []byte("goodbye2"), []byte("badbye2"))
store := newNomsBlockStore(context.Background(), mm, p, inlineConjoiner{defaultMaxTables}, defaultMemTableSize)
store, err := newNomsBlockStore(context.Background(), mm, p, inlineConjoiner{defaultMaxTables}, defaultMemTableSize)
assert.NoError(err)
defer store.Close()
h, err := store.Root(context.Background())
@@ -178,15 +179,17 @@ func TestChunkStoreManifestPreemptiveOptimisticLockFail(t *testing.T) {
p := newFakeTablePersister()
c := inlineConjoiner{defaultMaxTables}
store := newNomsBlockStore(context.Background(), mm, p, c, defaultMemTableSize)
store, err := newNomsBlockStore(context.Background(), mm, p, c, defaultMemTableSize)
assert.NoError(err)
defer store.Close()
// Simulate another goroutine writing a manifest behind store's back.
interloper := newNomsBlockStore(context.Background(), mm, p, c, defaultMemTableSize)
interloper, err := newNomsBlockStore(context.Background(), mm, p, c, defaultMemTableSize)
assert.NoError(err)
defer interloper.Close()
chunk := chunks.NewChunk([]byte("hello"))
err := interloper.Put(context.Background(), chunk)
err = interloper.Put(context.Background(), chunk)
assert.NoError(err)
assert.True(interloper.Commit(context.Background(), chunk.Hash(), hash.Hash{}))
@@ -219,7 +222,8 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) {
p := newFakeTablePersister()
c := inlineConjoiner{defaultMaxTables}
store := newNomsBlockStore(context.Background(), mm, p, c, defaultMemTableSize)
store, err := newNomsBlockStore(context.Background(), mm, p, c, defaultMemTableSize)
assert.NoError(err)
defer store.Close()
// store.Commit() should lock out calls to mm.Fetch()
@@ -236,7 +240,7 @@ func TestChunkStoreCommitLocksOutFetch(t *testing.T) {
}
rootChunk := chunks.NewChunk([]byte("new root"))
err := store.Put(context.Background(), rootChunk)
err = store.Put(context.Background(), rootChunk)
assert.NoError(err)
h, err := store.Root(context.Background())
assert.NoError(err)
@@ -259,14 +263,15 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
p := newFakeTablePersister()
c := inlineConjoiner{defaultMaxTables}
store := newNomsBlockStore(context.Background(), manifestManager{upm, mc, l}, p, c, defaultMemTableSize)
store, err := newNomsBlockStore(context.Background(), manifestManager{upm, mc, l}, p, c, defaultMemTableSize)
assert.NoError(err)
defer store.Close()
storeChunk := chunks.NewChunk([]byte("store"))
interloperChunk := chunks.NewChunk([]byte("interloper"))
updateCount := 0
interloper := newNomsBlockStore(
interloper, err := newNomsBlockStore(
context.Background(),
manifestManager{
updatePreemptManifest{fm, func() { updateCount++ }}, mc, l,
@@ -274,6 +279,7 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
p,
c,
defaultMemTableSize)
assert.NoError(err)
defer interloper.Close()
wg := sync.WaitGroup{}
@@ -293,7 +299,7 @@ func TestChunkStoreSerializeCommits(t *testing.T) {
updateCount++
}
err := store.Put(context.Background(), storeChunk)
err = store.Put(context.Background(), storeChunk)
assert.NoError(err)
h, err := store.Root(context.Background())
assert.NoError(err)
@@ -311,7 +317,8 @@ func makeStoreWithFakes(t *testing.T) (fm *fakeManifest, p tablePersister, store
fm = &fakeManifest{}
mm := manifestManager{fm, newManifestCache(0), newManifestLocks()}
p = newFakeTablePersister()
store = newNomsBlockStore(context.Background(), mm, p, inlineConjoiner{defaultMaxTables}, 0)
store, err := newNomsBlockStore(context.Background(), mm, p, inlineConjoiner{defaultMaxTables}, 0)
assert.NoError(t, err)
return
}
+1 -1
View File
@@ -24,7 +24,7 @@ func TestStats(t *testing.T) {
dir, err := ioutil.TempDir("", "")
assert.NoError(err)
store := NewLocalStore(context.Background(), dir, testMemTableSize)
store, err := NewLocalStore(context.Background(), dir, testMemTableSize)
assert.EqualValues(1, stats(store).OpenLatency.Samples())
+13 -8
View File
@@ -193,7 +193,7 @@ func (nbs *NomsBlockStore) UpdateManifest(ctx context.Context, updates map[hash.
return updatedContents, nil
}
func NewAWSStore(ctx context.Context, table, ns, bucket string, s3 s3svc, ddb ddbsvc, memTableSize uint64) *NomsBlockStore {
func NewAWSStore(ctx context.Context, table, ns, bucket string, s3 s3svc, ddb ddbsvc, memTableSize uint64) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
readRateLimiter := make(chan struct{}, 32)
p := &awsTablePersister{
@@ -211,7 +211,7 @@ func NewAWSStore(ctx context.Context, table, ns, bucket string, s3 s3svc, ddb dd
}
// NewGCSStore returns an nbs implementation backed by a GCSBlobstore
func NewGCSStore(ctx context.Context, bucketName, path string, gcs *storage.Client, memTableSize uint64) *NomsBlockStore {
func NewGCSStore(ctx context.Context, bucketName, path string, gcs *storage.Client, memTableSize uint64) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
bucket := gcs.Bucket(bucketName)
@@ -222,9 +222,13 @@ func NewGCSStore(ctx context.Context, bucketName, path string, gcs *storage.Clie
return newNomsBlockStore(ctx, mm, p, inlineConjoiner{defaultMaxTables}, memTableSize)
}
func NewLocalStore(ctx context.Context, dir string, memTableSize uint64) *NomsBlockStore {
func NewLocalStore(ctx context.Context, dir string, memTableSize uint64) (*NomsBlockStore, error) {
cacheOnce.Do(makeGlobalCaches)
d.PanicIfError(checkDir(dir))
err := checkDir(dir)
if err != nil {
return nil, err
}
mm := makeManifestManager(fileManifest{dir})
p := newFSTablePersister(dir, globalFDCache, globalIndexCache)
@@ -242,7 +246,7 @@ func checkDir(dir string) error {
return nil
}
func newNomsBlockStore(ctx context.Context, mm manifestManager, p tablePersister, c conjoiner, memTableSize uint64) *NomsBlockStore {
func newNomsBlockStore(ctx context.Context, mm manifestManager, p tablePersister, c conjoiner, memTableSize uint64) (*NomsBlockStore, error) {
if memTableSize == 0 {
memTableSize = defaultMemTableSize
}
@@ -262,15 +266,16 @@ func newNomsBlockStore(ctx context.Context, mm manifestManager, p tablePersister
exists, contents, err := nbs.mm.Fetch(ctx, nbs.stats)
// TODO: fix panics
d.PanicIfError(err)
if err != nil {
return nil, err
}
if exists {
nbs.upstream = contents
nbs.tables = nbs.tables.Rebase(ctx, contents.specs, nbs.stats)
}
return nbs
return nbs, nil
}
/*func newNomsBlockStoreWithContents(ctx context.Context, mm manifestManager, mc manifestContents, p tablePersister, c conjoiner, memTableSize uint64) *NomsBlockStore {
+16 -4
View File
@@ -271,7 +271,9 @@ func (sp Spec) NewChunkStore(ctx context.Context) chunks.ChunkStore {
case "gs":
return parseGCSSpec(ctx, sp.Href(), sp.Options)
case "nbs":
return nbs.NewLocalStore(ctx, sp.DatabaseName, 1<<28)
cs, err := nbs.NewLocalStore(ctx, sp.DatabaseName, 1<<28)
d.PanicIfError(err)
return cs
case "mem":
storage := &chunks.MemoryStorage{}
return storage.NewView()
@@ -319,7 +321,11 @@ func parseAWSSpec(ctx context.Context, awsURL string, options SpecOptions) chunk
}
sess := session.Must(session.NewSession(awsConfig))
return nbs.NewAWSStore(ctx, parts[0], u.Path, parts[1], s3.New(sess), dynamodb.New(sess), 1<<28)
cs, err := nbs.NewAWSStore(ctx, parts[0], u.Path, parts[1], s3.New(sess), dynamodb.New(sess), 1<<28)
d.PanicIfError(err)
return cs
}
func parseGCSSpec(ctx context.Context, gcsURL string, options SpecOptions) chunks.ChunkStore {
@@ -337,7 +343,11 @@ func parseGCSSpec(ctx context.Context, gcsURL string, options SpecOptions) chunk
panic("Could not create GCSBlobstore")
}
return nbs.NewGCSStore(ctx, bucket, path, gcs, 1<<28)
cs, err := nbs.NewGCSStore(ctx, bucket, path, gcs, 1<<28)
d.PanicIfError(err)
return cs
}
// GetDataset returns the current Dataset instance for this Spec's Database.
@@ -421,7 +431,9 @@ func (sp Spec) createDatabase(ctx context.Context) datas.Database {
return datas.NewDatabase(parseGCSSpec(ctx, sp.Href(), sp.Options))
case "nbs":
os.Mkdir(sp.DatabaseName, 0777)
return datas.NewDatabase(nbs.NewLocalStore(ctx, sp.DatabaseName, 1<<28))
cs, err := nbs.NewLocalStore(ctx, sp.DatabaseName, 1<<28)
d.PanicIfError(err)
return datas.NewDatabase(cs)
case "mem":
storage := &chunks.MemoryStorage{}
return datas.NewDatabase(storage.NewView())
+3 -1
View File
@@ -114,7 +114,9 @@ func TestNBSDatabaseSpec(t *testing.T) {
store1 := filepath.Join(tmpDir, "store1")
os.Mkdir(store1, 0777)
func() {
db := datas.NewDatabase(nbs.NewLocalStore(context.Background(), store1, 8*(1<<20)))
cs, err := nbs.NewLocalStore(context.Background(), store1, 8*(1<<20))
assert.NoError(err)
db := datas.NewDatabase(cs)
defer db.Close()
r := db.WriteValue(context.Background(), s)
_, err = db.CommitValue(context.Background(), db.GetDataset(context.Background(), "datasetID"), r)
+5 -1
View File
@@ -45,7 +45,11 @@ func (cache *DBCache) Get(org, repo string) (*nbs.NomsBlockStore, error) {
return nil, err
}
newCS = nbs.NewLocalStore(context.TODO(), id, defaultMemTableSize)
newCS, err = nbs.NewLocalStore(context.TODO(), id, defaultMemTableSize)
if err != nil {
return nil, err
}
}
cache.dbs[id] = newCS