mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-13 19:28:50 -06:00
refactored prolly.MutableMap to use skip list checkpoints
This commit is contained in:
@@ -19,7 +19,6 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/skip"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
@@ -32,18 +31,14 @@ const (
|
||||
// However, once ApplyPending() is called, those mutations are moved to the applied tier, and the pending tier is
|
||||
// cleared.
|
||||
type MutableMap struct {
|
||||
uncommitted *skip.List
|
||||
tuples orderedMap[val.Tuple, val.Tuple, val.TupleDesc]
|
||||
keyDesc val.TupleDesc
|
||||
valDesc val.TupleDesc
|
||||
tuples orderedMap[val.Tuple, val.Tuple, val.TupleDesc]
|
||||
keyDesc val.TupleDesc
|
||||
valDesc val.TupleDesc
|
||||
}
|
||||
|
||||
// newMutableMap returns a new MutableMap.
|
||||
func newMutableMap(m Map) MutableMap {
|
||||
return MutableMap{
|
||||
uncommitted: skip.NewSkipList(func(left, right []byte) int {
|
||||
return m.tuples.order.Compare(left, right)
|
||||
}),
|
||||
tuples: m.tuples.mutate(),
|
||||
keyDesc: m.keyDesc,
|
||||
valDesc: m.valDesc,
|
||||
@@ -76,65 +71,34 @@ func (mut MutableMap) Map(ctx context.Context) (Map, error) {
|
||||
|
||||
// Put adds the Tuple pair |key|, |value| to the MutableMap.
|
||||
func (mut MutableMap) Put(ctx context.Context, key, value val.Tuple) error {
|
||||
mut.uncommitted.Put(key, value)
|
||||
return nil
|
||||
return mut.tuples.put(ctx, key, value)
|
||||
}
|
||||
|
||||
// Delete deletes the pair keyed by |key| from the MutableMap.
|
||||
func (mut MutableMap) Delete(ctx context.Context, key val.Tuple) error {
|
||||
mut.uncommitted.Put(key, nil)
|
||||
return nil
|
||||
return mut.tuples.delete(ctx, key)
|
||||
}
|
||||
|
||||
// Get fetches the Tuple pair keyed by |key|, if it exists, and passes it to |cb|.
|
||||
// If the |key| is not present in the MutableMap, a nil Tuple pair is passed to |cb|.
|
||||
func (mut MutableMap) Get(ctx context.Context, key val.Tuple, cb KeyValueFn[val.Tuple, val.Tuple]) (err error) {
|
||||
value, ok := mut.uncommitted.Get(key)
|
||||
if ok {
|
||||
if value == nil {
|
||||
// there is a pending delete of |key| in |mut.uncommitted|.
|
||||
key = nil
|
||||
}
|
||||
return cb(key, value)
|
||||
}
|
||||
return mut.tuples.get(ctx, key, cb)
|
||||
}
|
||||
|
||||
// Has returns true if |key| is present in the MutableMap.
|
||||
func (mut MutableMap) Has(ctx context.Context, key val.Tuple) (ok bool, err error) {
|
||||
value, inUncommitted := mut.uncommitted.Get(key)
|
||||
if inUncommitted {
|
||||
ok = value != nil
|
||||
return
|
||||
}
|
||||
return mut.tuples.has(ctx, key)
|
||||
}
|
||||
|
||||
// ApplyPending moves all pending mutations to the underlying map.
|
||||
func (mut *MutableMap) ApplyPending(ctx context.Context) error {
|
||||
if mut.uncommitted.Count() == 0 {
|
||||
return nil
|
||||
}
|
||||
uncommittedIter := memIterFromRange(mut.uncommitted, Range{Start: nil, Stop: nil, Desc: mut.keyDesc})
|
||||
for true {
|
||||
k, v := uncommittedIter.current()
|
||||
if k == nil {
|
||||
break
|
||||
}
|
||||
if err := mut.tuples.put(ctx, k, v); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := uncommittedIter.iterate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
mut.uncommitted.Truncate()
|
||||
mut.tuples.edits.Checkpoint()
|
||||
return nil
|
||||
}
|
||||
|
||||
// DiscardPending removes all pending mutations.
|
||||
func (mut *MutableMap) DiscardPending(context.Context) {
|
||||
mut.uncommitted.Truncate()
|
||||
mut.tuples.edits.Revert()
|
||||
}
|
||||
|
||||
// IterAll returns a mutableMapIter that iterates over the entire MutableMap.
|
||||
@@ -150,19 +114,10 @@ func (mut MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var memoryIter rangeIter[val.Tuple, val.Tuple]
|
||||
if mut.uncommitted.Count() > 0 {
|
||||
memoryIter = &mmEditIter{
|
||||
committedIter: memIterFromRange(mut.tuples.edits, rng),
|
||||
uncommittedIter: memIterFromRange(mut.uncommitted, rng),
|
||||
rng: rng,
|
||||
}
|
||||
} else {
|
||||
memoryIter = memIterFromRange(mut.tuples.edits, rng)
|
||||
}
|
||||
memIter := memIterFromRange(mut.tuples.edits, rng)
|
||||
|
||||
return &mutableMapIter[val.Tuple, val.Tuple, val.TupleDesc]{
|
||||
memory: memoryIter,
|
||||
memory: memIter,
|
||||
prolly: treeIter,
|
||||
order: rng.Desc,
|
||||
}, nil
|
||||
@@ -171,63 +126,5 @@ func (mut MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error)
|
||||
// HasEdits returns true when the MutableMap has performed at least one Put or Delete operation. This does not indicate
|
||||
// whether the materialized map contains different values to the contained unedited map.
|
||||
func (mut MutableMap) HasEdits() bool {
|
||||
return mut.uncommitted.Count() > 0 || mut.tuples.edits.Count() > 0
|
||||
}
|
||||
|
||||
// mmEditIter handles iterating over the committed and uncommitted mutations. Returns all keys, including those
|
||||
// representing deletes (which is a non-nil key with a nil value).
|
||||
type mmEditIter struct {
|
||||
committedIter *memRangeIter
|
||||
uncommittedIter *memRangeIter
|
||||
rng Range
|
||||
}
|
||||
|
||||
var _ rangeIter[val.Tuple, val.Tuple] = &mmEditIter{}
|
||||
|
||||
// iterate implements rangeIter. Does not return io.EOF once the end of the range has been reached. Instead, check for
|
||||
// a nil key from current().
|
||||
func (it *mmEditIter) iterate(ctx context.Context) error {
|
||||
comKey, _ := it.committedIter.current()
|
||||
uncomKey, _ := it.uncommittedIter.current()
|
||||
if comKey == nil && uncomKey == nil {
|
||||
// range is exhausted
|
||||
return nil
|
||||
}
|
||||
|
||||
cmp := it.compareKeys(comKey, uncomKey)
|
||||
if cmp <= 0 {
|
||||
if err := it.committedIter.iterate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if cmp >= 0 {
|
||||
if err := it.uncommittedIter.iterate(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// current implements rangeIter. Returns a nil tuple pair once the end of the range has been reached.
|
||||
func (it *mmEditIter) current() (key, value val.Tuple) {
|
||||
comKey, comValue := it.committedIter.current()
|
||||
uncomKey, uncomValue := it.uncommittedIter.current()
|
||||
cmp := it.compareKeys(comKey, uncomKey)
|
||||
if cmp < 0 {
|
||||
return comKey, comValue
|
||||
} else /* cmp >= 0 */ {
|
||||
// |it.uncommittedIter| wins ties
|
||||
return uncomKey, uncomValue
|
||||
}
|
||||
}
|
||||
|
||||
// compareKeys compares the given keys. A nil key is treated as the lowest value. If both keys are nil, returns 1.
|
||||
func (it *mmEditIter) compareKeys(leftKey, rightKey val.Tuple) int {
|
||||
if leftKey == nil {
|
||||
return 1
|
||||
}
|
||||
if rightKey == nil {
|
||||
return -1
|
||||
}
|
||||
return it.rng.Desc.Compare(leftKey, rightKey)
|
||||
return mut.tuples.edits.Count() > 0
|
||||
}
|
||||
|
||||
@@ -55,16 +55,12 @@ type skipNode struct {
|
||||
height uint8
|
||||
}
|
||||
|
||||
func NewSkipList(cmp ValueCmp) (l *List) {
|
||||
l = &List{
|
||||
// todo(andy): buffer pool
|
||||
nodes: make([]skipNode, 1, 128),
|
||||
cmp: cmp,
|
||||
src: rand.NewSource(0),
|
||||
}
|
||||
func NewSkipList(cmp ValueCmp) *List {
|
||||
// todo(andy): buffer pool
|
||||
nodes := make([]skipNode, 1, 128)
|
||||
|
||||
// initialize sentinel node
|
||||
l.nodes[sentinelId] = skipNode{
|
||||
nodes[sentinelId] = skipNode{
|
||||
id: sentinelId,
|
||||
key: nil, val: nil,
|
||||
height: maxHeight,
|
||||
@@ -72,7 +68,12 @@ func NewSkipList(cmp ValueCmp) (l *List) {
|
||||
prev: sentinelId,
|
||||
}
|
||||
|
||||
return
|
||||
return &List{
|
||||
nodes: nodes,
|
||||
checkpoint: nodeId(1),
|
||||
cmp: cmp,
|
||||
src: rand.NewSource(0),
|
||||
}
|
||||
}
|
||||
|
||||
// Checkpoint records a checkpoint that can be reverted to.
|
||||
|
||||
@@ -63,7 +63,6 @@ func TestSkipListCheckpoints(t *testing.T) {
|
||||
testSkipListCheckpoints(t, bytes.Compare, vals...)
|
||||
})
|
||||
|
||||
t.Skip("todo(andy)")
|
||||
t.Run("test skip list of random bytes", func(t *testing.T) {
|
||||
vals := randomVals((src.Int63() % 10_000) + 100)
|
||||
testSkipListCheckpoints(t, bytes.Compare, vals...)
|
||||
@@ -331,6 +330,10 @@ func testSkipListCheckpoints(t *testing.T, compare ValueCmp, data ...[]byte) {
|
||||
inserts := data[k*2:]
|
||||
|
||||
list := NewSkipList(compare)
|
||||
|
||||
// test empty revert
|
||||
list.Revert()
|
||||
|
||||
for _, v := range init {
|
||||
list.Put(v, v)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user