Add interfaces that allow for multiple implementations of prolly.Map, prolly.MutableMap, and tree.Map

This commit is contained in:
Nick Tobey
2024-11-12 13:30:17 -08:00
parent 884cb4dfd0
commit 5026f4f68e
17 changed files with 297 additions and 104 deletions
@@ -46,7 +46,7 @@ func RefFromArtifactIndex(ctx context.Context, vrw types.ValueReadWriter, idx Ar
panic("TODO")
case types.Format_DOLT:
b := shim.ValueFromArtifactMap(idx.(prollyArtifactIndex).index)
b := shim.ValueFromMap(idx.(prollyArtifactIndex).index)
return refFromNomsValue(ctx, vrw, b)
default:
+24 -3
View File
@@ -87,7 +87,7 @@ func RefFromIndex(ctx context.Context, vrw types.ValueReadWriter, idx Index) (ty
return refFromNomsValue(ctx, vrw, idx.(nomsIndex).index)
case types.Format_DOLT:
b := shim.ValueFromMap(idx.(prollyIndex).index)
b := shim.ValueFromMap(MapFromIndex(idx))
return refFromNomsValue(ctx, vrw, b)
default:
@@ -112,11 +112,11 @@ func indexFromAddr(ctx context.Context, vrw types.ValueReadWriter, ns tree.NodeS
return IndexFromNomsMap(v.(types.Map), vrw, ns), nil
case types.Format_DOLT:
pm, err := shim.MapFromValue(v, sch, ns, isKeylessTable)
m, err := shim.MapInterfaceFromValue(v, sch, ns, isKeylessTable)
if err != nil {
return nil, err
}
return IndexFromProllyMap(pm), nil
return IndexFromMapInterface(m), nil
default:
return nil, errNbfUnknown
@@ -238,11 +238,32 @@ func ProllyMapFromIndex(i Index) prolly.Map {
return i.(prollyIndex).index
}
// MapFromIndex unwraps the Index and returns the underlying prolly.Map or prolly.ProximityMap.
func MapFromIndex(i Index) prolly.MapInterfaceWithMutable {
switch indexType := i.(type) {
case prollyIndex:
return indexType.index
}
return i.(prollyIndex).index
}
// IndexFromProllyMap wraps a prolly.Map and returns it as an Index.
func IndexFromProllyMap(m prolly.Map) Index {
return prollyIndex{index: m}
}
// IndexFromMapInterface wraps a prolly.MapInterface and returns it as an Index.
func IndexFromMapInterface(m prolly.MapInterface) Index {
switch m := m.(type) {
case prolly.Map:
return IndexFromProllyMap(m)
case prolly.ProximityMap:
return IndexFromProximityMap(m)
default:
panic("unknown map type")
}
}
var _ Index = prollyIndex{}
// HashOf implements Index.
@@ -850,11 +850,11 @@ func (t doltDevTable) GetTableRows(ctx context.Context) (Index, error) {
if err != nil {
return nil, err
}
m, err := shim.MapFromValue(types.SerialMessage(rowbytes), sch, t.ns, false)
m, err := shim.MapInterfaceFromValue(types.SerialMessage(rowbytes), sch, t.ns, false)
if err != nil {
return nil, err
}
return IndexFromProllyMap(m), nil
return IndexFromMapInterface(m), nil
}
func (t doltDevTable) GetTableRowsWithDescriptors(ctx context.Context, kd, vd val.TupleDesc) (Index, error) {
@@ -863,7 +863,7 @@ func (t doltDevTable) GetTableRowsWithDescriptors(ctx context.Context, kd, vd va
if err != nil {
return nil, err
}
return IndexFromProllyMap(m), nil
return IndexFromMapInterface(m), nil
}
func (t doltDevTable) SetTableRows(ctx context.Context, rows Index) (Table, error) {
@@ -67,8 +67,8 @@ var validationStages = []validator{
// validateChunkReferences checks for dangling chunks.
func validateChunkReferences(ctx context.Context, db sqle.Database) error {
validateIndex := func(ctx context.Context, idx durable.Index) error {
pm := durable.ProllyMapFromIndex(idx)
return pm.WalkNodes(ctx, func(ctx context.Context, nd tree.Node) error {
m := durable.MapFromIndex(idx)
return m.WalkNodes(ctx, func(ctx context.Context, nd tree.Node) error {
if nd.Size() <= 0 {
return fmt.Errorf("encountered nil tree.Node")
}
@@ -113,7 +113,7 @@ func validateSecondaryIndexes(ctx context.Context, db sqle.Database) error {
if err != nil {
return false, err
}
primary := durable.ProllyMapFromIndex(rows)
primary := durable.MapFromIndex(rows)
for _, def := range sch.Indexes().AllIndexes() {
set, err := t.GetIndexSet(ctx)
@@ -124,7 +124,7 @@ func validateSecondaryIndexes(ctx context.Context, db sqle.Database) error {
if err != nil {
return true, err
}
secondary := durable.ProllyMapFromIndex(idx)
secondary := durable.MapFromIndex(idx)
err = validateIndexConsistency(ctx, sch, def, primary, secondary)
if err != nil {
@@ -140,7 +140,7 @@ func validateIndexConsistency(
ctx context.Context,
sch schema.Schema,
def schema.Index,
primary, secondary prolly.Map,
primary, secondary prolly.MapInterface,
) error {
if schema.IsKeyless(sch) {
return validateKeylessIndex(ctx, sch, def, primary, secondary)
@@ -151,7 +151,7 @@ func validateIndexConsistency(
// printIndexContents prints the contents of |prollyMap| to stdout. Intended for use debugging
// index consistency issues.
func printIndexContents(ctx context.Context, prollyMap prolly.Map) {
func printIndexContents(ctx context.Context, prollyMap prolly.MapInterface) {
fmt.Printf("Secondary index contents:\n")
kd := prollyMap.KeyDesc()
iterAll, _ := prollyMap.IterAll(ctx)
@@ -164,7 +164,7 @@ func printIndexContents(ctx context.Context, prollyMap prolly.Map) {
}
}
func validateKeylessIndex(ctx context.Context, sch schema.Schema, def schema.Index, primary, secondary prolly.Map) error {
func validateKeylessIndex(ctx context.Context, sch schema.Schema, def schema.Index, primary, secondary prolly.MapInterface) error {
// Full-Text indexes do not make use of their internal map, so we may safely skip this check
if def.IsFullText() {
return nil
@@ -239,7 +239,7 @@ func validateKeylessIndex(ctx context.Context, sch schema.Schema, def schema.Ind
}
}
func validatePkIndex(ctx context.Context, sch schema.Schema, def schema.Index, primary, secondary prolly.Map) error {
func validatePkIndex(ctx context.Context, sch schema.Schema, def schema.Index, primary, secondary prolly.MapInterface) error {
// Full-Text indexes do not make use of their internal map, so we may safely skip this check
if def.IsFullText() {
return nil
@@ -1095,7 +1095,7 @@ var sharePool = pool.NewBuffPool()
func maybeGetKeyBuilder(idx durable.Index) *val.TupleBuilder {
if types.IsFormat_DOLT(idx.Format()) {
kd, _ := durable.ProllyMapFromIndex(idx).Descriptors()
kd, _ := durable.MapFromIndex(idx).Descriptors()
return val.NewTupleBuilder(kd)
}
return nil
@@ -68,7 +68,7 @@ func getPrimaryKeylessProllyWriter(ctx context.Context, t *doltdb.Table, schStat
type indexWriter interface {
Name() string
Map(ctx context.Context) (prolly.Map, error)
Map(ctx context.Context) (prolly.MapInterface, error)
ValidateKeyViolations(ctx context.Context, sqlRow sql.Row) error
Insert(ctx context.Context, sqlRow sql.Row) error
Delete(ctx context.Context, sqlRow sql.Row) error
@@ -101,7 +101,7 @@ func (m prollyIndexWriter) Name() string {
return ""
}
func (m prollyIndexWriter) Map(ctx context.Context) (prolly.Map, error) {
func (m prollyIndexWriter) Map(ctx context.Context) (prolly.MapInterface, error) {
return m.mut.Map(ctx)
}
@@ -247,7 +247,7 @@ func (m prollyIndexWriter) uniqueKeyError(ctx context.Context, keyStr string, ke
type prollySecondaryIndexWriter struct {
name string
mut *prolly.MutableMap
mut prolly.MutableMapInterface
unique bool
prefixLengths []uint16
@@ -273,8 +273,8 @@ func (m prollySecondaryIndexWriter) Name() string {
return m.name
}
func (m prollySecondaryIndexWriter) Map(ctx context.Context) (prolly.Map, error) {
return m.mut.Map(ctx)
func (m prollySecondaryIndexWriter) Map(ctx context.Context) (prolly.MapInterface, error) {
return m.mut.MapInterface(ctx)
}
func (m prollySecondaryIndexWriter) ValidateKeyViolations(ctx context.Context, sqlRow sql.Row) error {
@@ -41,7 +41,7 @@ func (k prollyKeylessWriter) Name() string {
return k.name
}
func (k prollyKeylessWriter) Map(ctx context.Context) (prolly.Map, error) {
func (k prollyKeylessWriter) Map(ctx context.Context) (prolly.MapInterface, error) {
return k.mut.Map(ctx)
}
@@ -202,7 +202,7 @@ func (writer prollyKeylessSecondaryWriter) Name() string {
}
// Map implements the interface indexWriter.
func (writer prollyKeylessSecondaryWriter) Map(ctx context.Context) (prolly.Map, error) {
func (writer prollyKeylessSecondaryWriter) Map(ctx context.Context) (prolly.MapInterface, error) {
return writer.mut.Map(ctx)
}
@@ -78,14 +78,14 @@ func getSecondaryProllyIndexWriters(ctx context.Context, t *doltdb.Table, schSta
if err != nil {
return nil, err
}
idxMap := durable.ProllyMapFromIndex(idxRows)
idxMap := durable.MapFromIndex(idxRows)
keyDesc, _ := idxMap.Descriptors()
// mapping from secondary index key to primary key
writers[defName] = prollySecondaryIndexWriter{
name: defName,
mut: idxMap.Mutate(),
mut: idxMap.MutateInterface(),
unique: def.IsUnique,
prefixLengths: def.PrefixLengths,
idxCols: def.Count,
@@ -337,7 +337,7 @@ func (w *prollyTableWriter) table(ctx context.Context) (t *doltdb.Table, err err
return nil, err
}
t, err = w.tbl.UpdateRows(ctx, durable.IndexFromProllyMap(pm))
t, err = w.tbl.UpdateRows(ctx, durable.IndexFromMapInterface(pm))
if err != nil {
return nil, err
}
@@ -353,7 +353,7 @@ func (w *prollyTableWriter) table(ctx context.Context) (t *doltdb.Table, err err
if err != nil {
return nil, err
}
idx := durable.IndexFromProllyMap(sm)
idx := durable.IndexFromMapInterface(sm)
s, err = s.PutIndex(ctx, wrSecondary.Name(), idx)
if err != nil {
+1 -1
View File
@@ -134,7 +134,7 @@ func (c AddressMap) Editor() AddressMapEditor {
}
type AddressMapEditor struct {
addresses tree.MutableMap[stringSlice, address, lexicographic]
addresses tree.MutableMap[stringSlice, address, lexicographic, tree.StaticMap[stringSlice, address, lexicographic]]
}
func (wr AddressMapEditor) Add(ctx context.Context, name string, addr hash.Hash) error {
+24 -5
View File
@@ -57,6 +57,20 @@ type ArtifactMap struct {
valDesc val.TupleDesc
}
func (m ArtifactMap) Has(ctx context.Context, key val.Tuple) (ok bool, err error) {
return m.tuples.Has(ctx, key)
}
func (m ArtifactMap) ValDesc() val.TupleDesc {
return m.valDesc
}
func (m ArtifactMap) KeyDesc() val.TupleDesc {
return m.keyDesc
}
var _ MapInterface = (*ArtifactMap)(nil)
// NewArtifactMap creates an artifact map based on |srcKeyDesc| which is the key descriptor for
// the corresponding row map.
func NewArtifactMap(node tree.Node, ns tree.NodeStore, srcKeyDesc val.TupleDesc) ArtifactMap {
@@ -169,8 +183,13 @@ func (m ArtifactMap) Editor() *ArtifactsEditor {
}
}
// IterAll returns an iterator for all artifacts.
func (m ArtifactMap) IterAll(ctx context.Context) (ArtifactIter, error) {
// IterAll returns an MapIter for all artifacts.
func (m ArtifactMap) IterAll(ctx context.Context) (MapIter, error) {
return m.tuples.IterAll(ctx)
}
// IterAllArtifacts returns an iterator for all artifacts.
func (m ArtifactMap) IterAllArtifacts(ctx context.Context) (ArtifactIter, error) {
numPks := m.srcKeyDesc.Count()
tb := val.NewTupleBuilder(m.srcKeyDesc)
itr, err := m.tuples.IterAll(ctx)
@@ -279,7 +298,7 @@ func (m ArtifactMap) CountOfTypes(ctx context.Context, artTypes ...ArtifactType)
}
func (m ArtifactMap) iterAllOfType(ctx context.Context, artType ArtifactType) (artifactTypeIter, error) {
itr, err := m.IterAll(ctx)
itr, err := m.IterAllArtifacts(ctx)
if err != nil {
return artifactTypeIter{}, err
}
@@ -287,7 +306,7 @@ func (m ArtifactMap) iterAllOfType(ctx context.Context, artType ArtifactType) (a
}
func (m ArtifactMap) iterAllOfTypes(ctx context.Context, artTypes ...ArtifactType) (multiArtifactTypeItr, error) {
itr, err := m.IterAll(ctx)
itr, err := m.IterAllArtifacts(ctx)
if err != nil {
return multiArtifactTypeItr{}, err
}
@@ -429,7 +448,7 @@ func (wr *ArtifactsEditor) Flush(ctx context.Context) (ArtifactMap, error) {
}
return ArtifactMap{
tuples: m.tuples,
tuples: m,
srcKeyDesc: wr.srcKeyDesc,
keyDesc: wr.mut.keyDesc,
valDesc: wr.mut.valDesc,
+1 -1
View File
@@ -148,7 +148,7 @@ func (c CommitClosure) AsHashSet(ctx context.Context) (hash.HashSet, error) {
}
type CommitClosureEditor struct {
closure tree.MutableMap[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]
closure tree.MutableMap[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering, tree.StaticMap[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]]
}
type CommitClosureKey []byte
+54
View File
@@ -0,0 +1,54 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prolly
import (
"context"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/val"
)
// MapInterface is a common interface for prolly-tree based maps.
type MapInterface interface {
Node() tree.Node
NodeStore() tree.NodeStore
Count() (int, error)
HashOf() hash.Hash
WalkNodes(ctx context.Context, cb tree.NodeCb) error
Descriptors() (val.TupleDesc, val.TupleDesc)
IterAll(ctx context.Context) (MapIter, error)
Pool() pool.BuffPool
Has(ctx context.Context, key val.Tuple) (ok bool, err error)
ValDesc() val.TupleDesc
KeyDesc() val.TupleDesc
}
type MapInterfaceWithMutable interface {
MapInterface
MutateInterface() MutableMapInterface
}
type MutableMapInterface interface {
NodeStore() tree.NodeStore
Put(ctx context.Context, key, value val.Tuple) error
Delete(ctx context.Context, key val.Tuple) error
Checkpoint(ctx context.Context) error
Revert(ctx context.Context)
HasEdits() bool
IterRange(ctx context.Context, rng Range) (MapIter, error)
MapInterface(ctx context.Context) (MapInterface, error)
}
+34 -9
View File
@@ -15,27 +15,29 @@
package shim
import (
"fmt"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
"github.com/dolthub/go-mysql-server/sql/expression/function/vector"
)
func NodeFromValue(v types.Value) (tree.Node, error) {
return tree.NodeFromBytes(v.(types.SerialMessage))
}
func ValueFromMap(m prolly.Map) types.Value {
return tree.ValueFromNode(m.Node())
}
func ValueFromArtifactMap(m prolly.ArtifactMap) types.Value {
func ValueFromMap(m prolly.MapInterface) types.Value {
return tree.ValueFromNode(m.Node())
}
func MapFromValue(v types.Value, sch schema.Schema, ns tree.NodeStore, isKeylessSecondary bool) (prolly.Map, error) {
root, err := NodeFromValue(v)
root, fileId, err := NodeFromValue(v)
if fileId == serial.VectorIndexNodeFileID {
return prolly.Map{}, fmt.Errorf("can't make a prolly.Map from a vector index node")
}
if err != nil {
return prolly.Map{}, err
}
@@ -47,10 +49,33 @@ func MapFromValue(v types.Value, sch schema.Schema, ns tree.NodeStore, isKeyless
return prolly.NewMap(root, ns, kd, vd), nil
}
func MapFromValueWithDescriptors(v types.Value, kd, vd val.TupleDesc, ns tree.NodeStore) (prolly.Map, error) {
root, err := NodeFromValue(v)
func MapInterfaceFromValue(v types.Value, sch schema.Schema, ns tree.NodeStore, isKeylessSecondary bool) (prolly.MapInterface, error) {
root, fileId, err := NodeFromValue(v)
if err != nil {
return nil, err
}
kd := sch.GetKeyDescriptor()
if isKeylessSecondary {
kd = prolly.AddHashToSchema(kd)
}
vd := sch.GetValueDescriptor()
switch fileId {
case serial.VectorIndexNodeFileID:
return prolly.NewProximityMap(nil, ns, root, kd, vd, vector.DistanceL2Squared{}), nil
default:
return prolly.NewMap(root, ns, kd, vd), nil
}
}
func MapFromValueWithDescriptors(v types.Value, kd, vd val.TupleDesc, ns tree.NodeStore) (prolly.MapInterface, error) {
root, fileId, err := NodeFromValue(v)
if err != nil {
return prolly.Map{}, err
}
return prolly.NewMap(root, ns, kd, vd), nil
switch fileId {
case serial.VectorIndexNodeFileID:
return prolly.NewProximityMap(nil, ns, root, kd, vd, vector.DistanceL2Squared{}), nil
default:
return prolly.NewMap(root, ns, kd, vd), nil
}
}
+20 -2
View File
@@ -37,6 +37,16 @@ type StaticMap[K, V ~[]byte, O Ordering[K]] struct {
Order O
}
type MapInterface[K, V ~[]byte, O Ordering[K]] interface {
Get(ctx context.Context, query K, cb KeyValueFn[K, V]) (err error)
GetPrefix(ctx context.Context, query K, prefixOrder O, cb KeyValueFn[K, V]) (err error)
Has(ctx context.Context, query K) (ok bool, err error)
HasPrefix(ctx context.Context, query K, prefixOrder O) (ok bool, err error)
GetRoot() Node
GetNodeStore() NodeStore
IterKeyRange(ctx context.Context, start, stop K) (*OrderedTreeIter[K, V], error)
}
// DiffOrderedTrees invokes `cb` for each difference between `from` and `to. If `considerAllRowsModified`
// is true, then a key that exists in both trees will be considered a modification even if the bytes are the same.
// This is used when `from` and `to` have different schemas.
@@ -193,6 +203,14 @@ func VisitMapLevelOrder[K, V ~[]byte, O Ordering[K]](
return err
}
func (t StaticMap[K, V, O]) GetRoot() Node {
return t.Root
}
func (t StaticMap[K, V, O]) GetNodeStore() NodeStore {
return t.NodeStore
}
func (t StaticMap[K, V, O]) Count() (int, error) {
return t.Root.TreeCount()
}
@@ -205,8 +223,8 @@ func (t StaticMap[K, V, O]) HashOf() hash.Hash {
return t.Root.HashOf()
}
func (t StaticMap[K, V, O]) Mutate() MutableMap[K, V, O] {
return MutableMap[K, V, O]{
func (t StaticMap[K, V, O]) Mutate() MutableMap[K, V, O, StaticMap[K, V, O]] {
return MutableMap[K, V, O, StaticMap[K, V, O]]{
Edits: skip.NewSkipList(func(left, right []byte) int {
return t.Order.Compare(left, right)
}),
+11 -12
View File
@@ -16,27 +16,26 @@ package tree
import (
"context"
"github.com/dolthub/dolt/go/store/skip"
)
// MutableMap is a mutable prolly Static with ordered elements.
type MutableMap[K, V ~[]byte, O Ordering[K]] struct {
type MutableMap[K, V ~[]byte, O Ordering[K], M MapInterface[K, V, O]] struct {
Edits *skip.List
Static StaticMap[K, V, O]
Static M
}
func (m MutableMap[K, V, O]) Put(_ context.Context, key K, value V) error {
func (m MutableMap[K, V, O, M]) Put(_ context.Context, key K, value V) error {
m.Edits.Put(key, value)
return nil
}
func (m MutableMap[K, V, O]) Delete(_ context.Context, key K) error {
func (m MutableMap[K, V, O, M]) Delete(_ context.Context, key K) error {
m.Edits.Put(key, nil)
return nil
}
func (m MutableMap[K, V, O]) Get(ctx context.Context, key K, cb KeyValueFn[K, V]) (err error) {
func (m MutableMap[K, V, O, M]) Get(ctx context.Context, key K, cb KeyValueFn[K, V]) (err error) {
value, ok := m.Edits.Get(key)
if ok {
if value == nil {
@@ -48,7 +47,7 @@ func (m MutableMap[K, V, O]) Get(ctx context.Context, key K, cb KeyValueFn[K, V]
return m.Static.Get(ctx, key, cb)
}
func (m MutableMap[K, V, O]) GetPrefix(ctx context.Context, key K, prefixOrder O, cb KeyValueFn[K, V]) (err error) {
func (m MutableMap[K, V, O, M]) GetPrefix(ctx context.Context, key K, prefixOrder O, cb KeyValueFn[K, V]) (err error) {
iter := m.Edits.GetIterFromSeekFn(func(k []byte) (advance bool) {
if k != nil { // seek until |k| >= |key|
advance = prefixOrder.Compare(k, key) < 0
@@ -65,7 +64,7 @@ func (m MutableMap[K, V, O]) GetPrefix(ctx context.Context, key K, prefixOrder O
return m.Static.GetPrefix(ctx, key, prefixOrder, cb)
}
func (m MutableMap[K, V, O]) Has(ctx context.Context, key K) (present bool, err error) {
func (m MutableMap[K, V, O, M]) Has(ctx context.Context, key K) (present bool, err error) {
value, ok := m.Edits.Get(key)
if ok {
present = value != nil
@@ -74,7 +73,7 @@ func (m MutableMap[K, V, O]) Has(ctx context.Context, key K) (present bool, err
return m.Static.Has(ctx, key)
}
func (m MutableMap[K, V, O]) HasPrefix(ctx context.Context, key K, prefixOrder O) (present bool, err error) {
func (m MutableMap[K, V, O, M]) HasPrefix(ctx context.Context, key K, prefixOrder O) (present bool, err error) {
iter := m.Edits.GetIterFromSeekFn(func(k []byte) (advance bool) {
if k != nil { // seek until |k| >= |key|
advance = prefixOrder.Compare(k, key) < 0
@@ -89,14 +88,14 @@ func (m MutableMap[K, V, O]) HasPrefix(ctx context.Context, key K, prefixOrder O
return m.Static.HasPrefix(ctx, key, prefixOrder)
}
func (m MutableMap[K, V, O]) Copy() MutableMap[K, V, O] {
return MutableMap[K, V, O]{
func (m MutableMap[K, V, O, M]) Copy() MutableMap[K, V, O, M] {
return MutableMap[K, V, O, M]{
Edits: m.Edits.Copy(),
Static: m.Static,
}
}
func (m MutableMap[K, V, O]) Mutations() MutationIter {
func (m MutableMap[K, V, O, M]) Mutations() MutationIter {
return orderedListIter[K, V]{iter: m.Edits.IterAtStart()}
}
+5
View File
@@ -213,6 +213,11 @@ func (m Map) Mutate() *MutableMap {
return newMutableMap(m)
}
// MutateInterface makes a MutableMap from a Map.
func (m Map) MutateInterface() MutableMapInterface {
return newMutableMap(m)
}
// Rewriter returns a mutator that intends to rewrite this map with the key and value descriptors provided.
func (m Map) Rewriter(kd, vd val.TupleDesc) *MutableMap {
return newMutableMapWithDescriptors(m, kd, vd)
+99 -47
View File
@@ -32,22 +32,42 @@ const (
// MutableMap is an ordered collection of val.Tuple backed by a Prolly Tree.
// Writes to the map are queued in a skip.List and periodically flushed when
// the maximum number of pending writes is exceeded.
type MutableMap struct {
type GenericMutableMap[MapType MapInterface, TreeMap tree.MapInterface[val.Tuple, val.Tuple, val.TupleDesc]] struct {
// tuples contains the primary Prolly Tree and skip.List for this map.
tuples tree.MutableMap[val.Tuple, val.Tuple, val.TupleDesc]
tuples tree.MutableMap[val.Tuple, val.Tuple, val.TupleDesc, TreeMap]
// stash, if not nil, contains a previous checkpoint of this map.
// stashes are created when a MutableMap has been check-pointed, but
// the number of in-memory pending writes exceeds, maxPending.
// In this case we stash a copy MutableMap containing the checkpoint,
// flush the pending writes and continue accumulating
stash *tree.MutableMap[val.Tuple, val.Tuple, val.TupleDesc]
stash *tree.MutableMap[val.Tuple, val.Tuple, val.TupleDesc, TreeMap]
// keyDesc and valDesc are tuples descriptors for the map.
keyDesc, valDesc val.TupleDesc
// buffer size
maxPending int
flusher MutableMapFlusher[MapType, TreeMap]
}
type MutableMap = GenericMutableMap[Map, tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc]]
// MapInterface materializes all pending and applied mutations in the GenericMutableMap, producing the resulting MapInterface.
func (mut *GenericMutableMap[M, T]) MapInterface(ctx context.Context) (MapInterface, error) {
return mut.Map(ctx)
}
// TreeMap materializes all pending and applied mutations in the GenericMutableMap, producing the resulting tree.MapInterface.
func (mut *GenericMutableMap[M, T]) TreeMap(ctx context.Context) (T, error) {
return mut.flusher.ApplyMutations(ctx, mut.NodeStore(), mut.tuples.Static.GetRoot(), mut.keyDesc, mut.tuples.Mutations())
}
// Map materializes all pending and applied mutations in the GenericMutableMap, producing the specific MapInterface implementation
// that the struct has been specialized with.
func (mut *GenericMutableMap[M, T]) Map(ctx context.Context) (M, error) {
return mut.flusher.Map(ctx, mut)
}
// newMutableMap returns a new MutableMap.
@@ -57,6 +77,7 @@ func newMutableMap(m Map) *MutableMap {
keyDesc: m.keyDesc,
valDesc: m.valDesc,
maxPending: defaultMaxPending,
flusher: ProllyFlusher{},
}
}
@@ -68,49 +89,28 @@ func newMutableMapWithDescriptors(m Map, kd, vd val.TupleDesc) *MutableMap {
keyDesc: kd,
valDesc: vd,
maxPending: defaultMaxPending,
flusher: ProllyFlusher{},
}
}
// Map materializes all pending and applied mutations in the MutableMap.
func (mut *MutableMap) Map(ctx context.Context) (Map, error) {
s := message.NewProllyMapSerializer(mut.valDesc, mut.NodeStore().Pool())
return mut.flushWithSerializer(ctx, s)
}
func (mut *MutableMap) flushWithSerializer(ctx context.Context, s message.Serializer) (Map, error) {
sm := mut.tuples.Static
fn := tree.ApplyMutations[val.Tuple, val.TupleDesc, message.Serializer]
root, err := fn(ctx, sm.NodeStore, sm.Root, mut.keyDesc, s, mut.tuples.Mutations())
if err != nil {
return Map{}, err
}
return Map{
tuples: tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc]{
Root: root,
NodeStore: sm.NodeStore,
Order: sm.Order,
},
keyDesc: mut.keyDesc,
valDesc: mut.valDesc,
}, nil
func (mut *GenericMutableMap[M, T]) flushWithSerializer(ctx context.Context, s message.Serializer) (T, error) {
return mut.flusher.ApplyMutationsWithSerializer(ctx, mut.NodeStore(), mut.tuples.Static.GetRoot(), mut.keyDesc, s, mut.tuples.Mutations())
}
// WithMaxPending returns a MutableMap with a new pending buffer size.
func (mut *MutableMap) WithMaxPending(max int) *MutableMap {
func (mut *GenericMutableMap[M, T]) WithMaxPending(max int) *GenericMutableMap[M, T] {
ret := *mut
ret.maxPending = max
return &ret
}
// NodeStore returns the map's NodeStore
func (mut *MutableMap) NodeStore() tree.NodeStore {
return mut.tuples.Static.NodeStore
func (mut *GenericMutableMap[M, T]) NodeStore() tree.NodeStore {
return mut.tuples.Static.GetNodeStore()
}
// Put adds the Tuple pair |key|, |value| to the MutableMap.
func (mut *MutableMap) Put(ctx context.Context, key, value val.Tuple) error {
func (mut *GenericMutableMap[M, T]) Put(ctx context.Context, key, value val.Tuple) error {
if err := mut.tuples.Put(ctx, key, value); err != nil {
return err
}
@@ -121,32 +121,32 @@ func (mut *MutableMap) Put(ctx context.Context, key, value val.Tuple) error {
}
// Delete deletes the pair keyed by |key| from the MutableMap.
func (mut *MutableMap) Delete(ctx context.Context, key val.Tuple) error {
func (mut *GenericMutableMap[M, T]) Delete(ctx context.Context, key val.Tuple) error {
return mut.tuples.Delete(ctx, key)
}
// Get fetches the Tuple pair keyed by |key|, if it exists, and passes it to |cb|.
// If the |key| is not present in the MutableMap, a nil Tuple pair is passed to |cb|.
func (mut *MutableMap) Get(ctx context.Context, key val.Tuple, cb tree.KeyValueFn[val.Tuple, val.Tuple]) (err error) {
func (mut *GenericMutableMap[M, T]) Get(ctx context.Context, key val.Tuple, cb tree.KeyValueFn[val.Tuple, val.Tuple]) (err error) {
return mut.tuples.Get(ctx, key, cb)
}
func (mut *MutableMap) GetPrefix(ctx context.Context, key val.Tuple, prefixDesc val.TupleDesc, cb tree.KeyValueFn[val.Tuple, val.Tuple]) (err error) {
func (mut *GenericMutableMap[M, T]) GetPrefix(ctx context.Context, key val.Tuple, prefixDesc val.TupleDesc, cb tree.KeyValueFn[val.Tuple, val.Tuple]) (err error) {
return mut.tuples.GetPrefix(ctx, key, prefixDesc, cb)
}
// Has returns true if |key| is present in the MutableMap.
func (mut *MutableMap) Has(ctx context.Context, key val.Tuple) (ok bool, err error) {
func (mut *GenericMutableMap[M, T]) Has(ctx context.Context, key val.Tuple) (ok bool, err error) {
return mut.tuples.Has(ctx, key)
}
// HasPrefix returns true if a key with a matching prefix to |key| is present in the MutableMap.
func (mut *MutableMap) HasPrefix(ctx context.Context, key val.Tuple, prefixDesc val.TupleDesc) (ok bool, err error) {
func (mut *GenericMutableMap[M, T]) HasPrefix(ctx context.Context, key val.Tuple, prefixDesc val.TupleDesc) (ok bool, err error) {
return mut.tuples.HasPrefix(ctx, key, prefixDesc)
}
// Checkpoint records a checkpoint that can be reverted to.
func (mut *MutableMap) Checkpoint(context.Context) error {
func (mut *GenericMutableMap[M, T]) Checkpoint(context.Context) error {
// discard previous stash, if one exists
mut.stash = nil
mut.tuples.Edits.Checkpoint()
@@ -154,7 +154,7 @@ func (mut *MutableMap) Checkpoint(context.Context) error {
}
// Revert discards writes made since the last checkpoint.
func (mut *MutableMap) Revert(context.Context) {
func (mut *GenericMutableMap[M, T]) Revert(context.Context) {
// if we've accumulated a large number of writes
// since we check-pointed, our last checkpoint
// may be stashed in a separate tree.MutableMap
@@ -165,7 +165,7 @@ func (mut *MutableMap) Revert(context.Context) {
mut.tuples.Edits.Revert()
}
func (mut *MutableMap) flushPending(ctx context.Context) error {
func (mut *GenericMutableMap[M, T]) flushPending(ctx context.Context) error {
stash := mut.stash
// if our in-memory edit set contains a checkpoint, we
// must stash a copy of |mut.tuples| we can revert to.
@@ -174,18 +174,18 @@ func (mut *MutableMap) flushPending(ctx context.Context) error {
cp.Edits.Revert()
stash = &cp
}
sm, err := mut.Map(ctx)
sm, err := mut.flusher.ApplyMutations(ctx, mut.NodeStore(), mut.tuples.Static.GetRoot(), mut.keyDesc, mut.tuples.Mutations())
if err != nil {
return err
}
mut.tuples.Static = sm.tuples
mut.tuples.Static = sm
mut.tuples.Edits.Truncate() // reuse skip list
mut.stash = stash
return nil
}
// IterAll returns a mutableMapIter that iterates over the entire MutableMap.
func (mut *MutableMap) IterAll(ctx context.Context) (MapIter, error) {
func (mut *GenericMutableMap[M, T]) IterAll(ctx context.Context) (MapIter, error) {
rng := Range{Fields: nil, Desc: mut.keyDesc}
return mut.IterRange(ctx, rng)
}
@@ -193,13 +193,13 @@ func (mut *MutableMap) IterAll(ctx context.Context) (MapIter, error) {
// IterKeyRange iterates over a physical key range defined by |start| and
// |stop|. If |start| and/or |stop| is nil, the range will be open
// towards that end.
func (mut *MutableMap) IterKeyRange(ctx context.Context, start, stop val.Tuple) (MapIter, error) {
func (mut *GenericMutableMap[M, T]) IterKeyRange(ctx context.Context, start, stop val.Tuple) (MapIter, error) {
return mut.tuples.Static.IterKeyRange(ctx, start, stop)
}
// IterRange returns a MapIter that iterates over a Range.
func (mut *MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error) {
treeIter, err := treeIterFromRange(ctx, mut.tuples.Static.Root, mut.tuples.Static.NodeStore, rng)
func (mut *GenericMutableMap[M, T]) IterRange(ctx context.Context, rng Range) (MapIter, error) {
treeIter, err := treeIterFromRange(ctx, mut.tuples.Static.GetRoot(), mut.tuples.Static.GetNodeStore(), rng)
if err != nil {
return nil, err
}
@@ -216,12 +216,12 @@ func (mut *MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error
// HasEdits returns true when the MutableMap has performed at least one Put or Delete operation. This does not indicate
// whether the materialized map contains different values to the contained unedited map.
func (mut *MutableMap) HasEdits() bool {
func (mut *GenericMutableMap[M, T]) HasEdits() bool {
return mut.tuples.Edits.Count() > 0
}
// Descriptors returns the key and value val.TupleDesc.
func (mut *MutableMap) Descriptors() (val.TupleDesc, val.TupleDesc) {
func (mut *GenericMutableMap[M, T]) Descriptors() (val.TupleDesc, val.TupleDesc) {
return mut.keyDesc, mut.valDesc
}
@@ -308,3 +308,55 @@ func debugFormat(ctx context.Context, m *MutableMap) (string, error) {
sb.WriteString("\t}\n}\n")
return sb.String(), nil
}
type ProllyFlusher struct{}
func (f ProllyFlusher) Map(ctx context.Context, mut *GenericMutableMap[Map, tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc]]) (Map, error) {
treeMap, err := f.TreeMap(ctx, mut)
if err != nil {
return Map{}, err
}
return Map{
tuples: treeMap,
keyDesc: mut.keyDesc,
valDesc: mut.valDesc,
}, nil
}
// TreeMap materializes all pending and applied mutations in the MutableMap.
func (f ProllyFlusher) TreeMap(ctx context.Context, mut *MutableMap) (tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc], error) {
s := message.NewProllyMapSerializer(mut.valDesc, mut.NodeStore().Pool())
return mut.flushWithSerializer(ctx, s)
}
var _ MutableMapFlusher[Map, tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc]] = ProllyFlusher{}
func (f ProllyFlusher) ApplyMutations(
ctx context.Context,
ns tree.NodeStore,
root tree.Node,
order val.TupleDesc,
edits tree.MutationIter,
) (tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc], error) {
serializer := message.NewVectorIndexSerializer(ns.Pool())
return f.ApplyMutationsWithSerializer(ctx, ns, root, order, serializer, edits)
}
func (f ProllyFlusher) ApplyMutationsWithSerializer(
ctx context.Context,
ns tree.NodeStore,
root tree.Node,
order val.TupleDesc,
serializer message.Serializer,
edits tree.MutationIter,
) (tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc], error) {
newRoot, err := tree.ApplyMutations(ctx, ns, root, order, serializer, edits)
if err != nil {
return tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc]{}, err
}
return tree.StaticMap[val.Tuple, val.Tuple, val.TupleDesc]{
Root: newRoot,
NodeStore: ns,
Order: order,
}, nil
}