fixed bolt store, refactored

This commit is contained in:
Andy Arthur
2021-08-30 10:05:15 -07:00
parent 87d0fdd008
commit 07015db776
6 changed files with 89 additions and 48 deletions
+42 -29
View File
@@ -25,9 +25,15 @@ import (
"testing"
"time"
"github.com/boltdb/bolt"
"github.com/stretchr/testify/require"
)
const (
makeProfile = false
)
// usage: `go test -bench BenchmarkMemoryStore`
func BenchmarkMemoryStore(b *testing.B) {
benchmarkKVStore(b, newMemStore())
@@ -46,18 +52,20 @@ func BenchmarkBoltStore(b *testing.B) {
func benchmarkKVStore(b *testing.B, store keyValStore) {
keys := loadStore(b, store)
f := makePprofFile(b)
err := pprof.StartCPUProfile(f)
if err != nil {
b.Fatal(err)
}
defer func() {
pprof.StopCPUProfile()
if err = f.Close(); err != nil {
if makeProfile {
f := makePprofFile(b)
err := pprof.StartCPUProfile(f)
if err != nil {
b.Fatal(err)
}
fmt.Printf("\twriting CPU profile for %s: %s\n", b.Name(), f.Name())
}()
defer func() {
pprof.StopCPUProfile()
if err = f.Close(); err != nil {
b.Fatal(err)
}
fmt.Printf("\twriting CPU profile for %s: %s\n", b.Name(), f.Name())
}()
}
b.Run("point reads", func(b *testing.B) {
runBenchmark(b, store, keys)
@@ -66,7 +74,7 @@ func benchmarkKVStore(b *testing.B, store keyValStore) {
func loadStore(b *testing.B, store keyValStore) (keys [][]byte) {
return loadStoreWithParams(b, store, loadParams{
cardinality: 1_000_000,
cardinality: 100_000,
keySize: 16,
valSize: 128,
})
@@ -79,7 +87,7 @@ type loadParams struct {
}
func loadStoreWithParams(b *testing.B, store keyValStore, p loadParams) (keys [][]byte) {
keys = make([][]byte, p.cardinality)
keys = make([][]byte, 0, p.cardinality)
// generate 10K rows at a time
const batchSize = uint32(10_000)
@@ -89,40 +97,45 @@ func loadStoreWithParams(b *testing.B, store keyValStore, p loadParams) (keys []
bufSize := pairSize * batchSize
buf := make([]byte, bufSize)
k := 0
for i := uint32(0); i < numBatches; i++ {
_, err := rand.Read(buf)
require.NoError(b, err)
kk := make([][]byte, batchSize)
vv := make([][]byte, batchSize)
for j := uint32(0); j < batchSize; j++ {
offset := j * pairSize
key := buf[offset : offset+p.keySize]
val := buf[offset+p.keySize : offset+pairSize]
store.load(key, val)
keys[k] = key
k++
kk[j] = buf[offset : offset+p.keySize]
vv[j] = buf[offset+p.keySize : offset+pairSize]
}
}
if fs, ok := store.(flushingKeyValStore); ok {
fs.flush()
store.putMany(kk, vv)
keys = append(keys, kk...)
}
return
}
func runBenchmark(b *testing.B, store keyValStore, keys [][]byte) {
runBenchmarkWithParams(b, store, keys, benchParams{
numReads: 10_000,
})
runBenchmarkWithParams(b, store, keys, benchParams{})
}
type benchParams struct {
numReads uint32
}
type benchParams struct{}
func runBenchmarkWithParams(b *testing.B, store keyValStore, keys [][]byte, p benchParams) {
for _, k := range keys[:p.numReads] {
if bs, ok := store.(boltStore); ok {
err := bs.DB.View(func(tx *bolt.Tx) (err error) {
bk := tx.Bucket([]byte(bucketName))
err = bk.ForEach(func(k, v []byte) error {
return nil
})
require.NoError(b, err)
return nil
})
require.NoError(b, err)
}
for _, k := range keys {
_, ok := store.get(k)
require.True(b, ok)
}
+1 -1
View File
@@ -35,6 +35,6 @@ func (nbs BitcaskStore) delete(key []byte) {
panic("unimplemented")
}
func (nbs BitcaskStore) load(key, val []byte) {
func (nbs BitcaskStore) putMany(keys, vals [][]byte) {
panic("unimplemented")
}
+18 -6
View File
@@ -52,10 +52,10 @@ func (bs boltStore) get(key []byte) (val []byte, ok bool) {
err := bs.DB.View(func(tx *bolt.Tx) (err error) {
b := tx.Bucket([]byte(bucketName))
v := b.Get(key)
ok = val != nil
ok = v != nil
if ok {
val = make([]byte, len(v))
copy(v, val)
copy(val, v)
}
return
})
@@ -75,6 +75,22 @@ func (bs boltStore) put(key, val []byte) {
}
}
func (bs boltStore) putMany(keys, vals [][]byte) {
err := bs.DB.Update(func(tx *bolt.Tx) (err error) {
b := tx.Bucket([]byte(bucketName))
for i := range keys {
err = b.Put(keys[i], vals[i])
if err != nil {
break
}
}
return
})
if err != nil {
panic(err)
}
}
func (bs boltStore) delete(key []byte) {
err := bs.DB.Update(func(tx *bolt.Tx) (err error) {
b := tx.Bucket([]byte(bucketName))
@@ -84,7 +100,3 @@ func (bs boltStore) delete(key []byte) {
panic(err)
}
}
func (bs boltStore) load(key, val []byte) {
bs.put(key, val)
}
+11 -8
View File
@@ -19,10 +19,8 @@ import "sync"
type keyValStore interface {
get(key []byte) (val []byte, ok bool)
put(key, val []byte)
putMany(keys, vals [][]byte)
delete(key []byte)
// non-atomic put
load(key, val []byte)
}
type flushingKeyValStore interface {
@@ -61,7 +59,16 @@ func (m memStore) put(key, val []byte) {
m.mu.Lock()
defer m.mu.Unlock()
m.load(key, val)
m.store[string(key)] = val
}
func (m memStore) putMany(keys, vals [][]byte) {
m.mu.Lock()
defer m.mu.Unlock()
for i := range keys {
m.store[string(keys[i])] = vals[i]
}
}
func (m memStore) delete(key []byte) {
@@ -70,7 +77,3 @@ func (m memStore) delete(key []byte) {
delete(m.store, string(key))
}
func (m memStore) load(key, val []byte) {
m.store[string(key)] = val
}
+1 -1
View File
@@ -35,6 +35,6 @@ func (nbs NBSStore) delete(key []byte) {
panic("unimplemented")
}
func (nbs NBSStore) load(key, val []byte) {
func (nbs NBSStore) putMany(keys, vals [][]byte) {
panic("unimplemented")
}
+16 -3
View File
@@ -64,18 +64,19 @@ func (m *prollyStore) put(key, val []byte) {
m.mu.Lock()
defer m.mu.Unlock()
m.load(key, val)
m.set(key, val)
m.flush()
}
func (m *prollyStore) delete(key []byte) {
m.mu.Lock()
defer m.mu.Unlock()
m.load(key, nil)
m.set(key, nil)
m.flush()
}
func (m *prollyStore) load(key, val []byte) {
func (m *prollyStore) set(key, val []byte) {
k := types.String(key)
v := types.Value(nil)
if val != nil {
@@ -84,6 +85,18 @@ func (m *prollyStore) load(key, val []byte) {
m.editor.Set(k, v)
}
func (m *prollyStore) putMany(keys, vals [][]byte) {
m.mu.Lock()
defer m.mu.Unlock()
for i := range keys {
k := types.String(keys[i])
v := types.String(vals[i])
m.editor.Set(k, v)
}
m.flush()
}
func (m *prollyStore) flush() {
if m.editor.NumEdits() == 0 {
return