diff --git a/chunks/chunk_store.go b/chunks/chunk_store.go index 31dc567967..7329c91fa8 100644 --- a/chunks/chunk_store.go +++ b/chunks/chunk_store.go @@ -46,6 +46,7 @@ func NewFlags() Flags { func NewFlagsWithPrefix(prefix string) Flags { return Flags{ awsFlags(prefix), + levelDBFlags(prefix), fileFlags(prefix), memoryFlags(prefix), nopFlags(prefix), @@ -55,6 +56,7 @@ func NewFlagsWithPrefix(prefix string) Flags { // Flags abstracts away definitions for and handling of command-line flags for all ChunkStore implementations. type Flags struct { aws awsStoreFlags + ldb ldbStoreFlags file fileStoreFlags memory memoryStoreFlags nop nopStoreFlags @@ -63,6 +65,7 @@ type Flags struct { // CreateStore creates a ChunkStore implementation based on the values of command-line flags. func (f Flags) CreateStore() (cs ChunkStore) { if cs = f.aws.createStore(); cs != nil { + } else if cs = f.ldb.createStore(); cs != nil { } else if cs = f.file.createStore(); cs != nil { } else if cs = f.memory.createStore(); cs != nil { } else if cs = f.nop.createStore(); cs != nil { diff --git a/chunks/leveldb_store.go b/chunks/leveldb_store.go new file mode 100644 index 0000000000..19abc760f7 --- /dev/null +++ b/chunks/leveldb_store.go @@ -0,0 +1,145 @@ +package chunks + +import ( + "bytes" + "flag" + "hash" + "io" + "io/ioutil" + "os" + "sync" + + "github.com/attic-labs/noms/d" + "github.com/attic-labs/noms/ref" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/filter" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +var rootKey = []byte("/root") +var chunkPrefix = []byte("/chunk/") + +func toChunkKey(r ref.Ref) []byte { + digest := r.Digest() + return append(chunkPrefix, digest[:]...) +} + +type LevelDBStore struct { + db *leveldb.DB + mu *sync.Mutex +} + +func NewLevelDBStore(dir string) LevelDBStore { + d.Exp.NotEmpty(dir) + d.Exp.NoError(os.MkdirAll(dir, 0700)) + db, err := leveldb.OpenFile(dir, &opt.Options{ + Compression: opt.NoCompression, + Filter: filter.NewBloomFilter(10), // 10 bits/key + WriteBuffer: 1 << 24, // 16MiB + }) + d.Chk.NoError(err) + return LevelDBStore{db, &sync.Mutex{}} +} + +func (l LevelDBStore) Root() ref.Ref { + val, err := l.db.Get([]byte(rootKey), nil) + if err == errors.ErrNotFound { + return ref.Ref{} + } + d.Chk.NoError(err) + + return ref.MustParse(string(val)) +} + +func (l LevelDBStore) UpdateRoot(current, last ref.Ref) bool { + l.mu.Lock() + defer l.mu.Unlock() + if last != l.Root() { + return false + } + + // Sync: true write option should fsync memtable data to disk + err := l.db.Put([]byte(rootKey), []byte(current.String()), &opt.WriteOptions{Sync: true}) + d.Chk.NoError(err) + return true +} + +func (l LevelDBStore) Get(ref ref.Ref) (io.ReadCloser, error) { + key := toChunkKey(ref) + chunk, err := l.db.Get(key, nil) + if err == errors.ErrNotFound { + return nil, nil + } + d.Chk.NoError(err) + + return ioutil.NopCloser(bytes.NewReader(chunk)), nil +} + +func (l LevelDBStore) Put() ChunkWriter { + b := &bytes.Buffer{} + h := ref.NewHash() + return &ldbChunkWriter{ + db: l.db, + buffer: b, + writer: io.MultiWriter(b, h), + hash: h, + } +} + +type ldbChunkWriter struct { + db *leveldb.DB + buffer *bytes.Buffer + writer io.Writer + hash hash.Hash +} + +func (w *ldbChunkWriter) Write(data []byte) (int, error) { + d.Chk.NotNil(w.buffer, "Write() cannot be called after Ref() or Close().") + size, err := w.writer.Write(data) + d.Chk.NoError(err) + return size, nil +} + +func (w *ldbChunkWriter) Ref() (ref.Ref, error) { + d.Chk.NoError(w.Close()) + return ref.FromHash(w.hash), nil +} + +func (w *ldbChunkWriter) Close() error { + if w.buffer == nil { + return nil + } + + key := toChunkKey(ref.FromHash(w.hash)) + + exists, err := w.db.Has(key, &opt.ReadOptions{DontFillCache: true}) // This isn't really a "read", so don't signal the cache to treat it as one. + d.Chk.NoError(err) + if exists { + return nil + } + + err = w.db.Put(key, w.buffer.Bytes(), nil) + d.Chk.NoError(err) + w.buffer = nil + return nil +} + +type ldbStoreFlags struct { + dir *string +} + +func levelDBFlags(prefix string) ldbStoreFlags { + return ldbStoreFlags{ + flag.String(prefix+"db", "", "directory to use for a LevelDB-backed chunkstore"), + } +} + +func (f ldbStoreFlags) createStore() ChunkStore { + if *f.dir == "" { + return nil + } else { + fs := NewLevelDBStore(*f.dir) + return &fs + } +} diff --git a/chunks/leveldb_store_test.go b/chunks/leveldb_store_test.go new file mode 100644 index 0000000000..e7e02f9874 --- /dev/null +++ b/chunks/leveldb_store_test.go @@ -0,0 +1,121 @@ +package chunks + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/attic-labs/noms/ref" + "github.com/stretchr/testify/suite" +) + +func TestLevelDBStoreTestSuite(t *testing.T) { + suite.Run(t, &LevelDBStoreTestSuite{}) +} + +type LevelDBStoreTestSuite struct { + suite.Suite + dir string + store LevelDBStore +} + +func (suite *LevelDBStoreTestSuite) SetupTest() { + var err error + suite.dir, err = ioutil.TempDir(os.TempDir(), "") + suite.NoError(err) + suite.store = NewLevelDBStore(suite.dir) +} + +func (suite *LevelDBStoreTestSuite) TearDownTest() { + os.Remove(suite.dir) +} + +func (suite *LevelDBStoreTestSuite) TestLevelDBStorePut() { + input := "abc" + w := suite.store.Put() + _, err := w.Write([]byte(input)) + suite.NoError(err) + ref, err := w.Ref() + suite.NoError(err) + + // See http://www.di-mgt.com.au/sha_testvectors.html + suite.Equal("sha1-a9993e364706816aba3e25717850c26c9cd0d89d", ref.String()) + + // And reading it via the API should work... + assertInputInStore(input, ref, suite.store, suite.Assert()) +} + +func (suite *LevelDBStoreTestSuite) TestLevelDBStoreWriteAfterCloseFails() { + input := "abc" + w := suite.store.Put() + _, err := w.Write([]byte(input)) + suite.NoError(err) + + suite.NoError(w.Close()) + suite.Panics(func() { w.Write([]byte(input)) }, "Write() after Close() should barf!") +} + +func (suite *LevelDBStoreTestSuite) TestLevelDBStoreWriteAfterRefFails() { + input := "abc" + w := suite.store.Put() + _, err := w.Write([]byte(input)) + suite.NoError(err) + + _, _ = w.Ref() + suite.NoError(err) + suite.Panics(func() { w.Write([]byte(input)) }, "Write() after Close() should barf!") +} + +func (suite *LevelDBStoreTestSuite) TestLevelDBStorePutWithRefAfterClose() { + input := "abc" + w := suite.store.Put() + _, err := w.Write([]byte(input)) + suite.NoError(err) + + suite.NoError(w.Close()) + ref, err := w.Ref() // Ref() after Close() should work... + suite.NoError(err) + + // And reading the data via the API should work... + assertInputInStore(input, ref, suite.store, suite.Assert()) +} + +func (suite *LevelDBStoreTestSuite) TestLevelDBStorePutWithMultipleRef() { + input := "abc" + w := suite.store.Put() + _, err := w.Write([]byte(input)) + suite.NoError(err) + + _, _ = w.Ref() + suite.NoError(err) + ref, err := w.Ref() // Multiple calls to Ref() should work... + suite.NoError(err) + + // And reading the data via the API should work... + assertInputInStore(input, ref, suite.store, suite.Assert()) +} + +func (suite *LevelDBStoreTestSuite) TestLevelDBStoreRoot() { + oldRoot := suite.store.Root() + suite.Equal(oldRoot, ref.Ref{}) + + bogusRoot, err := ref.Parse("sha1-81c870618113ba29b6f2b396ea3a69c6f1d626c5") // sha1("Bogus, Dude") + suite.NoError(err) + newRoot, err := ref.Parse("sha1-907d14fb3af2b0d4f18c2d46abe8aedce17367bd") // sha1("Hello, World") + suite.NoError(err) + + // Try to update root with bogus oldRoot + result := suite.store.UpdateRoot(newRoot, bogusRoot) + suite.False(result) + + // Now do a valid root update + result = suite.store.UpdateRoot(newRoot, oldRoot) + suite.True(result) +} + +func (suite *LevelDBStoreTestSuite) TestLevelDBStoreGetNonExisting() { + ref := ref.MustParse("sha1-1111111111111111111111111111111111111111") + r, err := suite.store.Get(ref) + suite.NoError(err) + suite.Nil(r) +}