mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-04 19:41:26 -05:00
Merge pull request #4314 from dolthub/andy/edit-flushing
go/store/prolly: Bound maximum pending writes to prolly.MutableMap
This commit is contained in:
@@ -48,7 +48,7 @@ func getMutableSecondaryIdxs(ctx context.Context, sch schema.Schema, indexes dur
|
||||
// used to modify the index based on a modification to corresponding primary row.
|
||||
type MutableSecondaryIdx struct {
|
||||
Name string
|
||||
mut prolly.MutableMap
|
||||
mut *prolly.MutableMap
|
||||
keyMap val.OrdinalMapping
|
||||
pkLen int
|
||||
keyBld *val.TupleBuilder
|
||||
|
||||
@@ -108,48 +108,55 @@ func TestSingleQuery(t *testing.T) {
|
||||
|
||||
// Convenience test for debugging a single query. Unskip and set to the desired query.
|
||||
func TestSingleScript(t *testing.T) {
|
||||
t.Skip()
|
||||
var scripts = []queries.ScriptTest{
|
||||
{
|
||||
Name: "Nautobot FOREIGN KEY panic repro",
|
||||
Name: "Insert throws unique key violations",
|
||||
SetUpScript: []string{
|
||||
"CREATE TABLE `auth_user` (" +
|
||||
" `password` varchar(128) NOT NULL," +
|
||||
" `last_login` datetime," +
|
||||
" `is_superuser` tinyint NOT NULL," +
|
||||
" `username` varchar(150) NOT NULL," +
|
||||
" `first_name` varchar(150) NOT NULL," +
|
||||
" `last_name` varchar(150) NOT NULL," +
|
||||
" `email` varchar(254) NOT NULL," +
|
||||
" `is_staff` tinyint NOT NULL," +
|
||||
" `is_active` tinyint NOT NULL," +
|
||||
" `date_joined` datetime NOT NULL," +
|
||||
" `id` char(32) NOT NULL," +
|
||||
" `config_data` json NOT NULL," +
|
||||
" PRIMARY KEY (`id`)," +
|
||||
" UNIQUE KEY `username` (`username`)" +
|
||||
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin",
|
||||
"CREATE TABLE `users_token` (" +
|
||||
" `id` char(32) NOT NULL," +
|
||||
" `created` datetime NOT NULL," +
|
||||
" `expires` datetime," +
|
||||
" `key` varchar(40) NOT NULL," +
|
||||
" `write_enabled` tinyint NOT NULL," +
|
||||
" `description` varchar(200) NOT NULL," +
|
||||
" `user_id` char(32) NOT NULL," +
|
||||
" PRIMARY KEY (`id`)," +
|
||||
" UNIQUE KEY `key` (`key`)," +
|
||||
" KEY `users_token_user_id_af964690` (`user_id`)," +
|
||||
" CONSTRAINT `users_token_user_id_af964690_fk_auth_user_id` FOREIGN KEY (`user_id`) REFERENCES `auth_user` (`id`)" +
|
||||
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin;",
|
||||
"INSERT INTO `auth_user` (`password`,`last_login`,`is_superuser`,`username`,`first_name`,`last_name`,`email`,`is_staff`,`is_active`,`date_joined`,`id`,`config_data`)" +
|
||||
"VALUES ('pbkdf2_sha256$216000$KRpZeDPgwc5E$vl/2hwrmtnckaBT0A8pf63Ph+oYuCHYI7qozMTZihTo=',NULL,1,'admin','','','admin@example.com',1,1,'2022-08-30 18:27:21.810049','1056443cc03446c592fa4c06bb06a1a6','{}');",
|
||||
"CREATE TABLE t (pk int PRIMARY key, col1 int UNIQUE);",
|
||||
"CREATE TABLE t2 (pk int PRIMARY key, col1 int, col2 int, UNIQUE KEY (col1, col2));",
|
||||
"INSERT into t VALUES (1, 1);",
|
||||
"INSERT into t2 VALUES (1, 1, 1);",
|
||||
},
|
||||
Assertions: []queries.ScriptTestAssertion{
|
||||
{
|
||||
Query: "INSERT INTO `users_token` (`id`, `user_id`, `created`, `expires`, `key`, `write_enabled`, `description`) " +
|
||||
"VALUES ('acc2e157db2845a79221cc654b1dcecc', '1056443cc03446c592fa4c06bb06a1a6', '2022-08-30 18:27:21.948487', NULL, '0123456789abcdef0123456789abcdef01234567', 1, '');",
|
||||
Expected: []sql.Row{{sql.OkResult{RowsAffected: 0x1, InsertID: 0x0}}},
|
||||
Query: "INSERT INTO t VALUES (2, 2), (3, 1), (4, 4);",
|
||||
ExpectedErr: sql.ErrUniqueKeyViolation,
|
||||
},
|
||||
{
|
||||
Query: "SELECT * from t;",
|
||||
Expected: []sql.Row{{1, 1}},
|
||||
},
|
||||
{
|
||||
Query: "INSERT INTO t2 VALUES (2, 2, 2), (3, 1, 1), (4, 4, 4);",
|
||||
ExpectedErr: sql.ErrUniqueKeyViolation,
|
||||
},
|
||||
{
|
||||
Query: "SELECT * from t2;",
|
||||
Expected: []sql.Row{{1, 1, 1}},
|
||||
},
|
||||
{
|
||||
Query: "INSERT INTO t VALUES (5, 2), (6, 2);",
|
||||
ExpectedErr: sql.ErrUniqueKeyViolation,
|
||||
},
|
||||
{
|
||||
Query: "SELECT * from t;",
|
||||
Expected: []sql.Row{{1, 1}},
|
||||
},
|
||||
{
|
||||
Query: "INSERT INTO t2 VALUES (5, 2, 2), (6, 2, 2);",
|
||||
ExpectedErr: sql.ErrUniqueKeyViolation,
|
||||
},
|
||||
{
|
||||
Query: "SELECT * from t2;",
|
||||
Expected: []sql.Row{{1, 1, 1}},
|
||||
},
|
||||
{
|
||||
Query: "INSERT into t2 VALUES (5, NULL, 1), (6, NULL, 1), (7, 1, NULL), (8, 1, NULL), (9, NULL, NULL), (10, NULL, NULL)",
|
||||
Expected: []sql.Row{{sql.NewOkResult(6)}},
|
||||
},
|
||||
{
|
||||
Query: "SELECT * from t2;",
|
||||
Expected: []sql.Row{{1, 1, 1}, {5, nil, 1}, {6, nil, 1}, {7, 1, nil}, {8, 1, nil}, {9, nil, nil}, {10, nil, nil}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
@@ -85,7 +85,7 @@ type primaryIndexErrBuilder interface {
|
||||
}
|
||||
|
||||
type prollyIndexWriter struct {
|
||||
mut prolly.MutableMap
|
||||
mut *prolly.MutableMap
|
||||
|
||||
keyBld *val.TupleBuilder
|
||||
keyMap val.OrdinalMapping
|
||||
@@ -189,11 +189,11 @@ func (m prollyIndexWriter) Update(ctx context.Context, oldRow sql.Row, newRow sq
|
||||
}
|
||||
|
||||
func (m prollyIndexWriter) Commit(ctx context.Context) error {
|
||||
return m.mut.ApplyPending(ctx)
|
||||
return m.mut.Checkpoint(ctx)
|
||||
}
|
||||
|
||||
func (m prollyIndexWriter) Discard(ctx context.Context) error {
|
||||
m.mut.DiscardPending(ctx)
|
||||
m.mut.Revert(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -238,7 +238,7 @@ func (m prollyIndexWriter) uniqueKeyError(ctx context.Context, keyStr string, ke
|
||||
|
||||
type prollySecondaryIndexWriter struct {
|
||||
name string
|
||||
mut prolly.MutableMap
|
||||
mut *prolly.MutableMap
|
||||
unique bool
|
||||
|
||||
keyBld *val.TupleBuilder
|
||||
@@ -364,11 +364,11 @@ func (m prollySecondaryIndexWriter) Update(ctx context.Context, oldRow sql.Row,
|
||||
}
|
||||
|
||||
func (m prollySecondaryIndexWriter) Commit(ctx context.Context) error {
|
||||
return m.mut.ApplyPending(ctx)
|
||||
return m.mut.Checkpoint(ctx)
|
||||
}
|
||||
|
||||
func (m prollySecondaryIndexWriter) Discard(ctx context.Context) error {
|
||||
m.mut.DiscardPending(ctx)
|
||||
m.mut.Revert(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
|
||||
type prollyKeylessWriter struct {
|
||||
name string
|
||||
mut prolly.MutableMap
|
||||
mut *prolly.MutableMap
|
||||
|
||||
keyBld *val.TupleBuilder
|
||||
valBld *val.TupleBuilder
|
||||
@@ -108,11 +108,11 @@ func (k prollyKeylessWriter) Update(ctx context.Context, oldRow sql.Row, newRow
|
||||
}
|
||||
|
||||
func (k prollyKeylessWriter) Commit(ctx context.Context) error {
|
||||
return k.mut.ApplyPending(ctx)
|
||||
return k.mut.Checkpoint(ctx)
|
||||
}
|
||||
|
||||
func (k prollyKeylessWriter) Discard(ctx context.Context) error {
|
||||
k.mut.DiscardPending(ctx)
|
||||
k.mut.Revert(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -142,10 +142,6 @@ func (k prollyKeylessWriter) tuplesFromRow(ctx context.Context, sqlRow sql.Row)
|
||||
return
|
||||
}
|
||||
|
||||
func (k prollyKeylessWriter) getMut() prolly.MutableMap {
|
||||
return k.mut
|
||||
}
|
||||
|
||||
func (k prollyKeylessWriter) errForSecondaryUniqueKeyError(ctx context.Context, err secondaryUniqueKeyError) error {
|
||||
return k.uniqueKeyError(ctx, err.keyStr, err.existingKey, false)
|
||||
}
|
||||
@@ -180,7 +176,7 @@ func (e secondaryUniqueKeyError) Error() string {
|
||||
|
||||
type prollyKeylessSecondaryWriter struct {
|
||||
name string
|
||||
mut prolly.MutableMap
|
||||
mut *prolly.MutableMap
|
||||
primary prollyKeylessWriter
|
||||
unique bool
|
||||
|
||||
@@ -308,12 +304,12 @@ func (writer prollyKeylessSecondaryWriter) Update(ctx context.Context, oldRow sq
|
||||
|
||||
// Commit implements the interface indexWriter.
|
||||
func (writer prollyKeylessSecondaryWriter) Commit(ctx context.Context) error {
|
||||
return writer.mut.ApplyPending(ctx)
|
||||
return writer.mut.Checkpoint(ctx)
|
||||
}
|
||||
|
||||
// Discard implements the interface indexWriter.
|
||||
func (writer prollyKeylessSecondaryWriter) Discard(ctx context.Context) error {
|
||||
writer.mut.DiscardPending(ctx)
|
||||
writer.mut.Revert(ctx)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -126,7 +126,7 @@ func (wr *bboltWriter) Flush() error {
|
||||
}
|
||||
|
||||
type doltWriter struct {
|
||||
mut prolly.MutableMap
|
||||
mut *prolly.MutableMap
|
||||
cs *nbs.NomsBlockStore
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,171 @@
|
||||
// Copyright 2021 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package prolly
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func TestMutableMapCheckpoints(t *testing.T) {
|
||||
scales := []int{
|
||||
10,
|
||||
100,
|
||||
1000,
|
||||
10_000,
|
||||
}
|
||||
|
||||
for _, s := range scales {
|
||||
name := "test mutable map at scale " + strconv.Itoa(s)
|
||||
t.Run(name, func(t *testing.T) {
|
||||
t.Run("stash", func(t *testing.T) {
|
||||
testCheckpoint(t, s)
|
||||
})
|
||||
t.Run("revert pre-flush", func(t *testing.T) {
|
||||
testRevertBeforeFlush(t, s)
|
||||
})
|
||||
t.Run("revert post-flush", func(t *testing.T) {
|
||||
testRevertAfterFlush(t, s)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testCheckpoint(t *testing.T, scale int) {
|
||||
// create map with |s| even int64s
|
||||
ctx := context.Background()
|
||||
m := ascendingIntMapWithStep(t, scale, 2)
|
||||
mut := m.Mutate()
|
||||
|
||||
edits := ascendingTuplesWithStepAndStart(scale/10, 2, 1)
|
||||
|
||||
for i, ed := range edits {
|
||||
ok, err := mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.False(t, ok)
|
||||
|
||||
err = mut.Put(ctx, ed[0], ed[1])
|
||||
require.NoError(t, err)
|
||||
ok, err = mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
|
||||
err = mut.Checkpoint(ctx)
|
||||
assert.NoError(t, err)
|
||||
|
||||
for j := 0; j < i; j++ {
|
||||
ok, err = mut.Has(ctx, edits[j][0])
|
||||
require.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testRevertBeforeFlush(t *testing.T, scale int) {
|
||||
// create map with |s| even int64s
|
||||
ctx := context.Background()
|
||||
m := ascendingIntMapWithStep(t, scale, 2)
|
||||
mut := m.Mutate()
|
||||
|
||||
// create 2 edit sets: pre- and post- checkpoint
|
||||
edits := ascendingTuplesWithStepAndStart(scale/5, 2, 1)
|
||||
pre, post := edits[:scale/10], edits[scale/10:]
|
||||
|
||||
for _, ed := range pre {
|
||||
err := mut.Put(ctx, ed[0], ed[1])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err := mut.Checkpoint(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, ed := range post {
|
||||
err = mut.Put(ctx, ed[0], ed[1])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
for _, ed := range edits {
|
||||
ok, err := mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
|
||||
mut.Revert(ctx)
|
||||
|
||||
for _, ed := range pre {
|
||||
ok, err := mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
for _, ed := range post {
|
||||
ok, err := mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.False(t, ok)
|
||||
}
|
||||
}
|
||||
|
||||
func testRevertAfterFlush(t *testing.T, scale int) {
|
||||
// create map with |s| even int64s
|
||||
ctx := context.Background()
|
||||
m := ascendingIntMapWithStep(t, scale, 2)
|
||||
mut := m.Mutate()
|
||||
|
||||
// create 2 edit sets: pre- and post- checkpoint
|
||||
edits := ascendingTuplesWithStepAndStart(scale/5, 2, 1)
|
||||
pre, post := edits[:scale/10], edits[scale/10:]
|
||||
|
||||
for _, ed := range pre {
|
||||
err := mut.Put(ctx, ed[0], ed[1])
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err := mut.Checkpoint(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i, ed := range post {
|
||||
err = mut.Put(ctx, ed[0], ed[1])
|
||||
require.NoError(t, err)
|
||||
|
||||
// flush post-checkpoint edits halfway through
|
||||
// this creates a stashed tree in |mut|
|
||||
if i == len(post)/2 {
|
||||
err = mut.flushPending(ctx)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, ed := range edits {
|
||||
ok, err := mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
|
||||
mut.Revert(ctx)
|
||||
|
||||
for _, ed := range pre {
|
||||
ok, err := mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.True(t, ok)
|
||||
}
|
||||
for _, ed := range post {
|
||||
ok, err := mut.Has(ctx, ed[0])
|
||||
require.NoError(t, err)
|
||||
assert.False(t, ok)
|
||||
}
|
||||
}
|
||||
@@ -61,7 +61,7 @@ func TestMutableMapReads(t *testing.T) {
|
||||
testIterPrefixRange(t, mutableIndex, idxTuples)
|
||||
})
|
||||
|
||||
mutableMap2, tuples2, deletes := deleteFromMutableMap(mutableMap.(MutableMap), tuples)
|
||||
mutableMap2, tuples2, deletes := deleteFromMutableMap(mutableMap.(*MutableMap), tuples)
|
||||
t.Run("get item from map with deletes", func(t *testing.T) {
|
||||
testMutableMapGetAndHas(t, mutableMap2, tuples2, deletes)
|
||||
})
|
||||
@@ -75,7 +75,7 @@ func TestMutableMapReads(t *testing.T) {
|
||||
t.Skip("todo(andy)")
|
||||
})
|
||||
|
||||
mutableIndex2, idxTuples2, _ := deleteFromMutableMap(mutableIndex.(MutableMap), idxTuples)
|
||||
mutableIndex2, idxTuples2, _ := deleteFromMutableMap(mutableIndex.(*MutableMap), idxTuples)
|
||||
t.Run("iter prefix range", func(t *testing.T) {
|
||||
testIterPrefixRange(t, mutableIndex, idxTuples2)
|
||||
})
|
||||
@@ -107,7 +107,7 @@ func TestMutableMapReads(t *testing.T) {
|
||||
func TestMutableMapFormat(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mutableMap, _ := makeMutableMap(t, 100)
|
||||
s, err := debugFormat(ctx, mutableMap.(MutableMap))
|
||||
s, err := debugFormat(ctx, mutableMap.(*MutableMap))
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, s)
|
||||
}
|
||||
@@ -162,7 +162,7 @@ func makeMutableSecondaryIndex(t *testing.T, count int) (testMap, [][2]val.Tuple
|
||||
return newMutableMap(m.(Map)), tuples
|
||||
}
|
||||
|
||||
func deleteFromMutableMap(mut MutableMap, tt [][2]val.Tuple) (MutableMap, [][2]val.Tuple, [][2]val.Tuple) {
|
||||
func deleteFromMutableMap(mut *MutableMap, tt [][2]val.Tuple) (*MutableMap, [][2]val.Tuple, [][2]val.Tuple) {
|
||||
count := len(tt)
|
||||
testRand.Shuffle(count, func(i, j int) {
|
||||
tt[i], tt[j] = tt[j], tt[i]
|
||||
@@ -186,7 +186,7 @@ func deleteFromMutableMap(mut MutableMap, tt [][2]val.Tuple) (MutableMap, [][2]v
|
||||
return mut, remaining, deletes
|
||||
}
|
||||
|
||||
func testMutableMapGetAndHas(t *testing.T, mut MutableMap, tuples, deletes [][2]val.Tuple) {
|
||||
func testMutableMapGetAndHas(t *testing.T, mut *MutableMap, tuples, deletes [][2]val.Tuple) {
|
||||
ctx := context.Background()
|
||||
for _, kv := range tuples {
|
||||
ok, err := mut.Has(ctx, kv[0])
|
||||
|
||||
@@ -508,15 +508,20 @@ func ascendingIntMap(t *testing.T, count int) Map {
|
||||
}
|
||||
|
||||
func ascendingIntMapWithStep(t *testing.T, count, step int) Map {
|
||||
tuples := make([][2]val.Tuple, count)
|
||||
for i := range tuples {
|
||||
v := int64(i * step)
|
||||
tuples[i][0], tuples[i][1] = makePut(v, v)
|
||||
}
|
||||
tuples := ascendingTuplesWithStepAndStart(count, step, 0)
|
||||
pm := mustProllyMapFromTuples(t, mutKeyDesc, mutValDesc, tuples)
|
||||
return pm
|
||||
}
|
||||
|
||||
func ascendingTuplesWithStepAndStart(count, step, start int) [][2]val.Tuple {
|
||||
tuples := make([][2]val.Tuple, count)
|
||||
for i := range tuples {
|
||||
v := int64((i * step) + start)
|
||||
tuples[i][0], tuples[i][1] = makePut(v, v)
|
||||
}
|
||||
return tuples
|
||||
}
|
||||
|
||||
var mutKeyDesc = val.NewTupleDescriptor(
|
||||
val.Type{Enc: val.Int64Enc, Nullable: false},
|
||||
)
|
||||
@@ -542,11 +547,11 @@ func makeDelete(k int64) (key val.Tuple) {
|
||||
}
|
||||
|
||||
// validates edit provider and materializes map
|
||||
func materializeMap(t *testing.T, mut MutableMap) Map {
|
||||
func materializeMap(t *testing.T, mut *MutableMap) Map {
|
||||
ctx := context.Background()
|
||||
|
||||
// ensure Edits are provided in Order
|
||||
err := mut.ApplyPending(ctx)
|
||||
err := mut.Checkpoint(ctx)
|
||||
require.NoError(t, err)
|
||||
iter := mut.tuples.Mutations()
|
||||
prev, _ := iter.NextMutation(ctx)
|
||||
|
||||
@@ -58,6 +58,13 @@ func (m MutableMap[K, V, O]) Has(ctx context.Context, key K) (present bool, err
|
||||
return m.StaticMap.Has(ctx, key)
|
||||
}
|
||||
|
||||
func (m MutableMap[K, V, O]) Copy() MutableMap[K, V, O] {
|
||||
return MutableMap[K, V, O]{
|
||||
Edits: m.Edits.Copy(),
|
||||
StaticMap: m.StaticMap,
|
||||
}
|
||||
}
|
||||
|
||||
func (m MutableMap[K, V, O]) Mutations() MutationIter {
|
||||
return orderedListIter[K, V]{iter: m.Edits.IterAtStart()}
|
||||
}
|
||||
|
||||
@@ -193,7 +193,7 @@ func (m Map) NodeStore() tree.NodeStore {
|
||||
}
|
||||
|
||||
// Mutate makes a MutableMap from a Map.
|
||||
func (m Map) Mutate() MutableMap {
|
||||
func (m Map) Mutate() *MutableMap {
|
||||
return newMutableMap(m)
|
||||
}
|
||||
|
||||
|
||||
@@ -29,19 +29,27 @@ const (
|
||||
maxPending = 64 * 1024
|
||||
)
|
||||
|
||||
// MutableMap represents a Map that is able to store mutations in-memory. A MutableMap has two tiers of in-memory storage:
|
||||
// pending and applied. All mutations are first written to the pending tier, which may be discarded at any time.
|
||||
// However, once ApplyPending() is called, those mutations are moved to the applied tier, and the pending tier is
|
||||
// cleared.
|
||||
// MutableMap is an ordered collection of val.Tuple backed by a Prolly Tree.
|
||||
// Writes to the map are queued in a skip.List and periodically flushed when
|
||||
// the maximum number of pending writes is exceeded.
|
||||
type MutableMap struct {
|
||||
tuples tree.MutableMap[val.Tuple, val.Tuple, val.TupleDesc]
|
||||
keyDesc val.TupleDesc
|
||||
valDesc val.TupleDesc
|
||||
// tuples contains the primary Prolly Tree and skip.List for this map.
|
||||
tuples tree.MutableMap[val.Tuple, val.Tuple, val.TupleDesc]
|
||||
|
||||
// stash, if not nil, contains a previous checkpoint of this map.
|
||||
// stashes are created when a MutableMap has been check-pointed, but
|
||||
// the number of in-memory pending writes exceeds, maxPending.
|
||||
// In this case we stash a copy MutableMap containing the checkpoint,
|
||||
// flush the pending writes and continue accumulating
|
||||
stash *tree.MutableMap[val.Tuple, val.Tuple, val.TupleDesc]
|
||||
|
||||
// keyDesc and valDesc are tuples descriptors for the map.
|
||||
keyDesc, valDesc val.TupleDesc
|
||||
}
|
||||
|
||||
// newMutableMap returns a new MutableMap.
|
||||
func newMutableMap(m Map) MutableMap {
|
||||
return MutableMap{
|
||||
func newMutableMap(m Map) *MutableMap {
|
||||
return &MutableMap{
|
||||
tuples: m.tuples.Mutate(),
|
||||
keyDesc: m.keyDesc,
|
||||
valDesc: m.valDesc,
|
||||
@@ -49,13 +57,13 @@ func newMutableMap(m Map) MutableMap {
|
||||
}
|
||||
|
||||
// Map materializes all pending and applied mutations in the MutableMap.
|
||||
func (mut MutableMap) Map(ctx context.Context) (Map, error) {
|
||||
func (mut *MutableMap) Map(ctx context.Context) (Map, error) {
|
||||
s := message.NewProllyMapSerializer(mut.valDesc, mut.NodeStore().Pool())
|
||||
return mut.flushWithSerializer(ctx, s)
|
||||
}
|
||||
|
||||
func (mut MutableMap) flushWithSerializer(ctx context.Context, s message.Serializer) (Map, error) {
|
||||
if err := mut.ApplyPending(ctx); err != nil {
|
||||
func (mut *MutableMap) flushWithSerializer(ctx context.Context, s message.Serializer) (Map, error) {
|
||||
if err := mut.Checkpoint(ctx); err != nil {
|
||||
return Map{}, err
|
||||
}
|
||||
|
||||
@@ -79,50 +87,83 @@ func (mut MutableMap) flushWithSerializer(ctx context.Context, s message.Seriali
|
||||
}
|
||||
|
||||
// NodeStore returns the map's NodeStore
|
||||
func (mut MutableMap) NodeStore() tree.NodeStore {
|
||||
func (mut *MutableMap) NodeStore() tree.NodeStore {
|
||||
return mut.tuples.StaticMap.NodeStore
|
||||
}
|
||||
|
||||
// Put adds the Tuple pair |key|, |value| to the MutableMap.
|
||||
func (mut MutableMap) Put(ctx context.Context, key, value val.Tuple) error {
|
||||
return mut.tuples.Put(ctx, key, value)
|
||||
func (mut *MutableMap) Put(ctx context.Context, key, value val.Tuple) error {
|
||||
if err := mut.tuples.Put(ctx, key, value); err != nil {
|
||||
return err
|
||||
}
|
||||
if mut.tuples.Edits.Count() > maxPending {
|
||||
return mut.flushPending(ctx)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes the pair keyed by |key| from the MutableMap.
|
||||
func (mut MutableMap) Delete(ctx context.Context, key val.Tuple) error {
|
||||
func (mut *MutableMap) Delete(ctx context.Context, key val.Tuple) error {
|
||||
return mut.tuples.Delete(ctx, key)
|
||||
}
|
||||
|
||||
// Get fetches the Tuple pair keyed by |key|, if it exists, and passes it to |cb|.
|
||||
// If the |key| is not present in the MutableMap, a nil Tuple pair is passed to |cb|.
|
||||
func (mut MutableMap) Get(ctx context.Context, key val.Tuple, cb tree.KeyValueFn[val.Tuple, val.Tuple]) (err error) {
|
||||
func (mut *MutableMap) Get(ctx context.Context, key val.Tuple, cb tree.KeyValueFn[val.Tuple, val.Tuple]) (err error) {
|
||||
return mut.tuples.Get(ctx, key, cb)
|
||||
}
|
||||
|
||||
// Has returns true if |key| is present in the MutableMap.
|
||||
func (mut MutableMap) Has(ctx context.Context, key val.Tuple) (ok bool, err error) {
|
||||
func (mut *MutableMap) Has(ctx context.Context, key val.Tuple) (ok bool, err error) {
|
||||
return mut.tuples.Has(ctx, key)
|
||||
}
|
||||
|
||||
// ApplyPending moves all pending mutations to the underlying map.
|
||||
func (mut *MutableMap) ApplyPending(ctx context.Context) error {
|
||||
// Checkpoint records a checkpoint that can be reverted to.
|
||||
func (mut *MutableMap) Checkpoint(context.Context) error {
|
||||
// discard previous stash, if one exists
|
||||
mut.stash = nil
|
||||
mut.tuples.Edits.Checkpoint()
|
||||
return nil
|
||||
}
|
||||
|
||||
// DiscardPending removes all pending mutations.
|
||||
func (mut *MutableMap) DiscardPending(context.Context) {
|
||||
// Revert discards writes made since the last checkpoint.
|
||||
func (mut *MutableMap) Revert(context.Context) {
|
||||
// if we've accumulated a large number of writes
|
||||
// since we check-pointed, our stash may
|
||||
// be stashed in a separate tree.MutableMap
|
||||
if mut.stash != nil {
|
||||
mut.tuples = *mut.stash
|
||||
return
|
||||
}
|
||||
mut.tuples.Edits.Revert()
|
||||
}
|
||||
|
||||
func (mut *MutableMap) flushPending(ctx context.Context) error {
|
||||
stash := mut.stash
|
||||
// if our in-memory edit set contains a stash, we
|
||||
// must stash a copy of |mut.tuples| we can revert to.
|
||||
if mut.tuples.Edits.HasCheckpoint() {
|
||||
cp := mut.tuples.Copy()
|
||||
cp.Edits.Revert()
|
||||
stash = &cp
|
||||
}
|
||||
sm, err := mut.Map(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mut.tuples = sm.Mutate().tuples
|
||||
mut.stash = stash
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterAll returns a mutableMapIter that iterates over the entire MutableMap.
|
||||
func (mut MutableMap) IterAll(ctx context.Context) (MapIter, error) {
|
||||
func (mut *MutableMap) IterAll(ctx context.Context) (MapIter, error) {
|
||||
rng := Range{Fields: nil, Desc: mut.keyDesc}
|
||||
return mut.IterRange(ctx, rng)
|
||||
}
|
||||
|
||||
// IterRange returns a MapIter that iterates over a Range.
|
||||
func (mut MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error) {
|
||||
func (mut *MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error) {
|
||||
treeIter, err := treeIterFromRange(ctx, mut.tuples.StaticMap.Root, mut.tuples.StaticMap.NodeStore, rng)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -140,68 +181,15 @@ func (mut MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error)
|
||||
|
||||
// HasEdits returns true when the MutableMap has performed at least one Put or Delete operation. This does not indicate
|
||||
// whether the materialized map contains different values to the contained unedited map.
|
||||
func (mut MutableMap) HasEdits() bool {
|
||||
func (mut *MutableMap) HasEdits() bool {
|
||||
return mut.tuples.Edits.Count() > 0
|
||||
}
|
||||
|
||||
// Descriptors returns the key and value val.TupleDesc.
|
||||
func (mut MutableMap) Descriptors() (val.TupleDesc, val.TupleDesc) {
|
||||
func (mut *MutableMap) Descriptors() (val.TupleDesc, val.TupleDesc) {
|
||||
return mut.keyDesc, mut.valDesc
|
||||
}
|
||||
|
||||
func debugFormat(ctx context.Context, m MutableMap) (string, error) {
|
||||
kd, vd := m.keyDesc, m.valDesc
|
||||
|
||||
editIter := m.tuples.Edits.IterAtStart()
|
||||
tupleIter, err := m.tuples.StaticMap.IterAll(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString("Mutable Map {\n")
|
||||
|
||||
c := strconv.Itoa(m.tuples.Edits.Count())
|
||||
sb.WriteString("\tedits (count: " + c + ") {\n")
|
||||
for {
|
||||
k, v := editIter.Current()
|
||||
if k == nil {
|
||||
break
|
||||
}
|
||||
sb.WriteString("\t\t")
|
||||
sb.WriteString(kd.Format(k))
|
||||
sb.WriteString(": ")
|
||||
sb.WriteString(vd.Format(v))
|
||||
sb.WriteString(",\n")
|
||||
editIter.Advance()
|
||||
}
|
||||
sb.WriteString("\t},\n")
|
||||
|
||||
ci, err := m.tuples.StaticMap.Count()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
c = strconv.Itoa(ci)
|
||||
sb.WriteString("\tTree (count: " + c + ") {\n")
|
||||
for {
|
||||
k, v, err := tupleIter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
sb.WriteString("\t\t")
|
||||
sb.WriteString(kd.Format(k))
|
||||
sb.WriteString(": ")
|
||||
sb.WriteString(vd.Format(v))
|
||||
sb.WriteString(",\n")
|
||||
}
|
||||
sb.WriteString("\t}\n}\n")
|
||||
return sb.String(), nil
|
||||
}
|
||||
|
||||
type tupleIter struct {
|
||||
tuples []val.Tuple
|
||||
}
|
||||
@@ -232,3 +220,56 @@ func (m mutationIter) NextMutation(ctx context.Context) (key, value tree.Item) {
|
||||
func (m mutationIter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func debugFormat(ctx context.Context, m *MutableMap) (string, error) {
|
||||
kd, vd := m.keyDesc, m.valDesc
|
||||
|
||||
editIter := m.tuples.Edits.IterAtStart()
|
||||
iter, err := m.tuples.StaticMap.IterAll(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString("Mutable Map {\n")
|
||||
|
||||
c := strconv.Itoa(m.tuples.Edits.Count())
|
||||
sb.WriteString("\tedits (count: " + c + ") {\n")
|
||||
for {
|
||||
k, v := editIter.Current()
|
||||
if k == nil {
|
||||
break
|
||||
}
|
||||
sb.WriteString("\t\t")
|
||||
sb.WriteString(kd.Format(k))
|
||||
sb.WriteString(": ")
|
||||
sb.WriteString(vd.Format(v))
|
||||
sb.WriteString(",\n")
|
||||
editIter.Advance()
|
||||
}
|
||||
sb.WriteString("\t},\n")
|
||||
|
||||
ci, err := m.tuples.StaticMap.Count()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
c = strconv.Itoa(ci)
|
||||
sb.WriteString("\tTree (count: " + c + ") {\n")
|
||||
for {
|
||||
k, v, err := iter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
sb.WriteString("\t\t")
|
||||
sb.WriteString(kd.Format(k))
|
||||
sb.WriteString(": ")
|
||||
sb.WriteString(vd.Format(v))
|
||||
sb.WriteString(",\n")
|
||||
}
|
||||
sb.WriteString("\t}\n}\n")
|
||||
return sb.String(), nil
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ func (it mutableMapIter[K, V, O]) compareKeys(memKey, proKey K) int {
|
||||
|
||||
func memIterFromRange(list *skip.List, rng Range) *memRangeIter {
|
||||
// use the lower bound of |rng| to construct a skip.ListIter
|
||||
iter := list.GetIterFromSearchFn(skipSearchFromRange(rng))
|
||||
iter := list.GetIterFromSeekFn(skipSearchFromRange(rng))
|
||||
|
||||
// enforce range start
|
||||
var key val.Tuple
|
||||
|
||||
@@ -37,7 +37,7 @@ type testMap interface {
|
||||
}
|
||||
|
||||
var _ testMap = Map{}
|
||||
var _ testMap = MutableMap{}
|
||||
var _ testMap = &MutableMap{}
|
||||
|
||||
func countOrderedMap(t *testing.T, om testMap) (cnt int) {
|
||||
iter, err := om.IterAll(context.Background())
|
||||
@@ -57,7 +57,7 @@ func keyDescFromMap(om testMap) val.TupleDesc {
|
||||
switch m := om.(type) {
|
||||
case Map:
|
||||
return m.keyDesc
|
||||
case MutableMap:
|
||||
case *MutableMap:
|
||||
return m.keyDesc
|
||||
default:
|
||||
panic("unknown ordered map")
|
||||
|
||||
+22
-7
@@ -23,6 +23,7 @@ const (
|
||||
maxHeight = 9
|
||||
maxCount = math.MaxUint32 - 1
|
||||
sentinelId = nodeId(0)
|
||||
initSize = 8
|
||||
)
|
||||
|
||||
// A KeyOrder determines the ordering of two keys |l| and |r|.
|
||||
@@ -71,14 +72,12 @@ type skipNode struct {
|
||||
|
||||
// NewSkipList returns a new skip.List.
|
||||
func NewSkipList(order KeyOrder) *List {
|
||||
nodes := make([]skipNode, 0, 8)
|
||||
nodes := make([]skipNode, 0, initSize)
|
||||
|
||||
// initialize sentinel node
|
||||
nodes = append(nodes, skipNode{
|
||||
id: sentinelId,
|
||||
key: nil, val: nil,
|
||||
id: sentinelId,
|
||||
height: maxHeight,
|
||||
next: tower{},
|
||||
prev: sentinelId,
|
||||
})
|
||||
|
||||
@@ -95,6 +94,10 @@ func (l *List) Checkpoint() {
|
||||
l.checkpoint = l.nextNodeId()
|
||||
}
|
||||
|
||||
func (l *List) HasCheckpoint() bool {
|
||||
return l.checkpoint > nodeId(1)
|
||||
}
|
||||
|
||||
// Revert reverts to the last recorded checkpoint.
|
||||
func (l *List) Revert() {
|
||||
cp := l.checkpoint
|
||||
@@ -166,6 +169,18 @@ func (l *List) Put(key, val []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
func (l *List) Copy() *List {
|
||||
copies := make([]skipNode, len(l.nodes))
|
||||
copy(copies, l.nodes)
|
||||
return &List{
|
||||
nodes: copies,
|
||||
count: l.count,
|
||||
checkpoint: l.checkpoint,
|
||||
keyOrder: l.keyOrder,
|
||||
seed: l.seed,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *List) pathToKey(key []byte) (path tower) {
|
||||
next := l.headPointer()
|
||||
prev := sentinelId
|
||||
@@ -278,13 +293,13 @@ func (it *ListIter) Retreat() {
|
||||
// GetIterAt creates an iterator starting at the first item
|
||||
// of the list whose key is greater than or equal to |key|.
|
||||
func (l *List) GetIterAt(key []byte) (it *ListIter) {
|
||||
return l.GetIterFromSearchFn(func(nodeKey []byte) bool {
|
||||
return l.GetIterFromSeekFn(func(nodeKey []byte) bool {
|
||||
return l.compareKeys(key, nodeKey) > 0
|
||||
})
|
||||
}
|
||||
|
||||
// GetIterFromSearchFn creates an iterator using a SeekFn.
|
||||
func (l *List) GetIterFromSearchFn(fn SeekFn) (it *ListIter) {
|
||||
// GetIterFromSeekFn creates an iterator using a SeekFn.
|
||||
func (l *List) GetIterFromSeekFn(fn SeekFn) (it *ListIter) {
|
||||
it = &ListIter{
|
||||
curr: l.seekWithFn(fn),
|
||||
list: l,
|
||||
|
||||
Reference in New Issue
Block a user