mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-29 10:41:05 -06:00
LevelDBStore
This commit is contained in:
@@ -46,6 +46,7 @@ func NewFlags() Flags {
|
||||
func NewFlagsWithPrefix(prefix string) Flags {
|
||||
return Flags{
|
||||
awsFlags(prefix),
|
||||
levelDBFlags(prefix),
|
||||
fileFlags(prefix),
|
||||
memoryFlags(prefix),
|
||||
nopFlags(prefix),
|
||||
@@ -55,6 +56,7 @@ func NewFlagsWithPrefix(prefix string) Flags {
|
||||
// Flags abstracts away definitions for and handling of command-line flags for all ChunkStore implementations.
|
||||
type Flags struct {
|
||||
aws awsStoreFlags
|
||||
ldb ldbStoreFlags
|
||||
file fileStoreFlags
|
||||
memory memoryStoreFlags
|
||||
nop nopStoreFlags
|
||||
@@ -63,6 +65,7 @@ type Flags struct {
|
||||
// CreateStore creates a ChunkStore implementation based on the values of command-line flags.
|
||||
func (f Flags) CreateStore() (cs ChunkStore) {
|
||||
if cs = f.aws.createStore(); cs != nil {
|
||||
} else if cs = f.ldb.createStore(); cs != nil {
|
||||
} else if cs = f.file.createStore(); cs != nil {
|
||||
} else if cs = f.memory.createStore(); cs != nil {
|
||||
} else if cs = f.nop.createStore(); cs != nil {
|
||||
|
||||
145
chunks/leveldb_store.go
Normal file
145
chunks/leveldb_store.go
Normal file
@@ -0,0 +1,145 @@
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"flag"
|
||||
"hash"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/d"
|
||||
"github.com/attic-labs/noms/ref"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/errors"
|
||||
"github.com/syndtr/goleveldb/leveldb/filter"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
)
|
||||
|
||||
var rootKey = []byte("/root")
|
||||
var chunkPrefix = []byte("/chunk/")
|
||||
|
||||
func toChunkKey(r ref.Ref) []byte {
|
||||
digest := r.Digest()
|
||||
return append(chunkPrefix, digest[:]...)
|
||||
}
|
||||
|
||||
type LevelDBStore struct {
|
||||
db *leveldb.DB
|
||||
mu *sync.Mutex
|
||||
}
|
||||
|
||||
func NewLevelDBStore(dir string) LevelDBStore {
|
||||
d.Exp.NotEmpty(dir)
|
||||
d.Exp.NoError(os.MkdirAll(dir, 0700))
|
||||
db, err := leveldb.OpenFile(dir, &opt.Options{
|
||||
Compression: opt.NoCompression,
|
||||
Filter: filter.NewBloomFilter(10), // 10 bits/key
|
||||
WriteBuffer: 1 << 24, // 16MiB
|
||||
})
|
||||
d.Chk.NoError(err)
|
||||
return LevelDBStore{db, &sync.Mutex{}}
|
||||
}
|
||||
|
||||
func (l LevelDBStore) Root() ref.Ref {
|
||||
val, err := l.db.Get([]byte(rootKey), nil)
|
||||
if err == errors.ErrNotFound {
|
||||
return ref.Ref{}
|
||||
}
|
||||
d.Chk.NoError(err)
|
||||
|
||||
return ref.MustParse(string(val))
|
||||
}
|
||||
|
||||
func (l LevelDBStore) UpdateRoot(current, last ref.Ref) bool {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
if last != l.Root() {
|
||||
return false
|
||||
}
|
||||
|
||||
// Sync: true write option should fsync memtable data to disk
|
||||
err := l.db.Put([]byte(rootKey), []byte(current.String()), &opt.WriteOptions{Sync: true})
|
||||
d.Chk.NoError(err)
|
||||
return true
|
||||
}
|
||||
|
||||
func (l LevelDBStore) Get(ref ref.Ref) (io.ReadCloser, error) {
|
||||
key := toChunkKey(ref)
|
||||
chunk, err := l.db.Get(key, nil)
|
||||
if err == errors.ErrNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
d.Chk.NoError(err)
|
||||
|
||||
return ioutil.NopCloser(bytes.NewReader(chunk)), nil
|
||||
}
|
||||
|
||||
func (l LevelDBStore) Put() ChunkWriter {
|
||||
b := &bytes.Buffer{}
|
||||
h := ref.NewHash()
|
||||
return &ldbChunkWriter{
|
||||
db: l.db,
|
||||
buffer: b,
|
||||
writer: io.MultiWriter(b, h),
|
||||
hash: h,
|
||||
}
|
||||
}
|
||||
|
||||
type ldbChunkWriter struct {
|
||||
db *leveldb.DB
|
||||
buffer *bytes.Buffer
|
||||
writer io.Writer
|
||||
hash hash.Hash
|
||||
}
|
||||
|
||||
func (w *ldbChunkWriter) Write(data []byte) (int, error) {
|
||||
d.Chk.NotNil(w.buffer, "Write() cannot be called after Ref() or Close().")
|
||||
size, err := w.writer.Write(data)
|
||||
d.Chk.NoError(err)
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (w *ldbChunkWriter) Ref() (ref.Ref, error) {
|
||||
d.Chk.NoError(w.Close())
|
||||
return ref.FromHash(w.hash), nil
|
||||
}
|
||||
|
||||
func (w *ldbChunkWriter) Close() error {
|
||||
if w.buffer == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key := toChunkKey(ref.FromHash(w.hash))
|
||||
|
||||
exists, err := w.db.Has(key, &opt.ReadOptions{DontFillCache: true}) // This isn't really a "read", so don't signal the cache to treat it as one.
|
||||
d.Chk.NoError(err)
|
||||
if exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
err = w.db.Put(key, w.buffer.Bytes(), nil)
|
||||
d.Chk.NoError(err)
|
||||
w.buffer = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
type ldbStoreFlags struct {
|
||||
dir *string
|
||||
}
|
||||
|
||||
func levelDBFlags(prefix string) ldbStoreFlags {
|
||||
return ldbStoreFlags{
|
||||
flag.String(prefix+"db", "", "directory to use for a LevelDB-backed chunkstore"),
|
||||
}
|
||||
}
|
||||
|
||||
func (f ldbStoreFlags) createStore() ChunkStore {
|
||||
if *f.dir == "" {
|
||||
return nil
|
||||
} else {
|
||||
fs := NewLevelDBStore(*f.dir)
|
||||
return &fs
|
||||
}
|
||||
}
|
||||
121
chunks/leveldb_store_test.go
Normal file
121
chunks/leveldb_store_test.go
Normal file
@@ -0,0 +1,121 @@
|
||||
package chunks
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/ref"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
func TestLevelDBStoreTestSuite(t *testing.T) {
|
||||
suite.Run(t, &LevelDBStoreTestSuite{})
|
||||
}
|
||||
|
||||
type LevelDBStoreTestSuite struct {
|
||||
suite.Suite
|
||||
dir string
|
||||
store LevelDBStore
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) SetupTest() {
|
||||
var err error
|
||||
suite.dir, err = ioutil.TempDir(os.TempDir(), "")
|
||||
suite.NoError(err)
|
||||
suite.store = NewLevelDBStore(suite.dir)
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TearDownTest() {
|
||||
os.Remove(suite.dir)
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TestLevelDBStorePut() {
|
||||
input := "abc"
|
||||
w := suite.store.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
suite.NoError(err)
|
||||
ref, err := w.Ref()
|
||||
suite.NoError(err)
|
||||
|
||||
// See http://www.di-mgt.com.au/sha_testvectors.html
|
||||
suite.Equal("sha1-a9993e364706816aba3e25717850c26c9cd0d89d", ref.String())
|
||||
|
||||
// And reading it via the API should work...
|
||||
assertInputInStore(input, ref, suite.store, suite.Assert())
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TestLevelDBStoreWriteAfterCloseFails() {
|
||||
input := "abc"
|
||||
w := suite.store.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
suite.NoError(err)
|
||||
|
||||
suite.NoError(w.Close())
|
||||
suite.Panics(func() { w.Write([]byte(input)) }, "Write() after Close() should barf!")
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TestLevelDBStoreWriteAfterRefFails() {
|
||||
input := "abc"
|
||||
w := suite.store.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
suite.NoError(err)
|
||||
|
||||
_, _ = w.Ref()
|
||||
suite.NoError(err)
|
||||
suite.Panics(func() { w.Write([]byte(input)) }, "Write() after Close() should barf!")
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TestLevelDBStorePutWithRefAfterClose() {
|
||||
input := "abc"
|
||||
w := suite.store.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
suite.NoError(err)
|
||||
|
||||
suite.NoError(w.Close())
|
||||
ref, err := w.Ref() // Ref() after Close() should work...
|
||||
suite.NoError(err)
|
||||
|
||||
// And reading the data via the API should work...
|
||||
assertInputInStore(input, ref, suite.store, suite.Assert())
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TestLevelDBStorePutWithMultipleRef() {
|
||||
input := "abc"
|
||||
w := suite.store.Put()
|
||||
_, err := w.Write([]byte(input))
|
||||
suite.NoError(err)
|
||||
|
||||
_, _ = w.Ref()
|
||||
suite.NoError(err)
|
||||
ref, err := w.Ref() // Multiple calls to Ref() should work...
|
||||
suite.NoError(err)
|
||||
|
||||
// And reading the data via the API should work...
|
||||
assertInputInStore(input, ref, suite.store, suite.Assert())
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TestLevelDBStoreRoot() {
|
||||
oldRoot := suite.store.Root()
|
||||
suite.Equal(oldRoot, ref.Ref{})
|
||||
|
||||
bogusRoot, err := ref.Parse("sha1-81c870618113ba29b6f2b396ea3a69c6f1d626c5") // sha1("Bogus, Dude")
|
||||
suite.NoError(err)
|
||||
newRoot, err := ref.Parse("sha1-907d14fb3af2b0d4f18c2d46abe8aedce17367bd") // sha1("Hello, World")
|
||||
suite.NoError(err)
|
||||
|
||||
// Try to update root with bogus oldRoot
|
||||
result := suite.store.UpdateRoot(newRoot, bogusRoot)
|
||||
suite.False(result)
|
||||
|
||||
// Now do a valid root update
|
||||
result = suite.store.UpdateRoot(newRoot, oldRoot)
|
||||
suite.True(result)
|
||||
}
|
||||
|
||||
func (suite *LevelDBStoreTestSuite) TestLevelDBStoreGetNonExisting() {
|
||||
ref := ref.MustParse("sha1-1111111111111111111111111111111111111111")
|
||||
r, err := suite.store.Get(ref)
|
||||
suite.NoError(err)
|
||||
suite.Nil(r)
|
||||
}
|
||||
Reference in New Issue
Block a user