keyless secondary index (#1886)

* Initial keyless secondary index

* Cleanup code

* Add bats test

remove comments

* Remove confusing diff

* Remote feature gate

* Adds index insert, delete on row, should work with update. Not concurrency safe, might need to use ActionExecutor

* Fix the cardinality issues, card stored in index value tuple, indexKey returned multiple times in lookup range iterator

* Add index unit tests

* Refactor cardCounter, add describe bats

* Remove the extraneous iea map

* Fix inRange bug

* Remove types.Tuple{} usage

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* Delete commented line

* small tuple changes

* Fix noerror mistake

* Keyless indexEd undo in it's own closure

* Fix broken foreign key reference cascades

* QOL edits re: PR feedback

* Better comment

* Delete extraneous comment

* Delete extraneous comment

* Fix noms row ReduceToIndexKeys comment

* Add basic keyless FK cascade checks

* Daylon's PR comments

* Add keyless fk bats starter

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* Skip extra bats

* Replace PK reference with keyless index, skip 2 extra testsg

* Fix bats test names

* bats test name change typo

Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
Maximilian Hoffman
2021-07-26 14:52:29 -07:00
committed by GitHub
parent 02aacf5f8d
commit 909fdf4117
19 changed files with 3179 additions and 153 deletions
+49
View File
@@ -264,3 +264,52 @@ func (r keylessRow) TaggedValues() (TaggedValues, error) {
func (r keylessRow) Format() *types.NomsBinFormat {
return r.val.Format()
}
// ReduceToIndexKeys creates a full key, a partial key, and a cardinality value from the given row
// (first tuple being the full key). Please refer to the note in the index editor for more information
// regarding partial keys.
func (r keylessRow) ReduceToIndexKeys(idx schema.Index) (types.Tuple, types.Tuple, types.Tuple, error) {
vals := make([]types.Value, 0, len(idx.AllTags())*2)
for _, tag := range idx.AllTags() {
val, ok := r.GetColVal(tag)
if !ok {
val = types.NullValue
}
vals = append(vals, types.Uint(tag), val)
}
hashTag, err := r.key.Get(0)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
hashVal, err := r.key.Get(1)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
cardTag, err := r.val.Get(0)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
cardVal, err := r.val.Get(1)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
keyValue, err := types.NewTuple(r.Format(), cardTag, cardVal)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
vals = append(vals, hashTag, hashVal)
fullKey, err := types.NewTuple(r.Format(), vals...)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
partialKey, err := types.NewTuple(r.Format(), vals[:idx.Count()*2]...)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
return fullKey, partialKey, keyValue, nil
}
+31
View File
@@ -44,6 +44,14 @@ func pkRowFromNoms(sch schema.Schema, nomsKey, nomsVal types.Tuple) (Row, error)
allCols := sch.GetAllCols()
err = IterPkTuple(keySl, func(tag uint64, val types.Value) (stop bool, err error) {
// The IsKeyless check in FromNoms misses keyless index schemas, even though
// the output tuple is a keyless index that contains a KeylessRowIdTag.
// NomsRangeReader breaks without this.
// A longer term fix could separate record vs index parsing, each of
// which is different for keyless vs keyed tables.
if tag == schema.KeylessRowIdTag {
return false, nil
}
col, ok := allCols.GetByTag(tag)
if !ok {
@@ -248,6 +256,29 @@ func (nr nomsRow) NomsMapValue(sch schema.Schema) types.Valuable {
return nr.value.NomsTupleForNonPKCols(nr.nbf, sch.GetNonPKCols())
}
// ReduceToIndexKeys creates a full key, partial key, and value tuple from the given row (first tuple being the full key). Please
// refer to the note in the index editor for more information regarding partial keys. NomsRows map always
// keys to an empty value tuple.
func (nr nomsRow) ReduceToIndexKeys(idx schema.Index) (types.Tuple, types.Tuple, types.Tuple, error) {
vals := make([]types.Value, 0, len(idx.AllTags())*2)
for _, tag := range idx.AllTags() {
val, ok := nr.GetColVal(tag)
if !ok {
val = types.NullValue
}
vals = append(vals, types.Uint(tag), val)
}
fullKey, err := types.NewTuple(nr.Format(), vals...)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
partialKey, err := types.NewTuple(nr.Format(), vals[:idx.Count()*2]...)
if err != nil {
return types.Tuple{}, types.Tuple{}, types.Tuple{}, err
}
return fullKey, partialKey, types.EmptyTuple(nr.Format()), nil
}
func IterPkTuple(tvs types.TupleValueSlice, cb func(tag uint64, val types.Value) (stop bool, err error)) error {
if len(tvs)%2 != 0 {
return fmt.Errorf("expected len(TupleValueSlice) to be even, got %d", len(tvs))
+3 -22
View File
@@ -56,6 +56,9 @@ type Row interface {
// TaggedValues returns the row as TaggedValues.
TaggedValues() (TaggedValues, error)
// ReduceToIndexKeys returns full and partial index keys
ReduceToIndexKeys(idx schema.Index) (full types.Tuple, partial types.Tuple, value types.Tuple, err error)
}
func New(nbf *types.NomsBinFormat, sch schema.Schema, colVals TaggedValues) (Row, error) {
@@ -114,28 +117,6 @@ func GetFieldByNameWithDefault(colName string, defVal types.Value, r Row, sch sc
}
}
// ReduceToIndexKeys creates a full key and a partial key from the given row (first tuple being the full key). Please
// refer to the note in the index editor for more information regarding partial keys.
func ReduceToIndexKeys(idx schema.Index, r Row) (types.Tuple, types.Tuple, error) {
vals := make([]types.Value, 0, len(idx.AllTags())*2)
for _, tag := range idx.AllTags() {
val, ok := r.GetColVal(tag)
if !ok {
val = types.NullValue
}
vals = append(vals, types.Uint(tag), val)
}
fullKey, err := types.NewTuple(r.Format(), vals...)
if err != nil {
return types.Tuple{}, types.Tuple{}, err
}
partialKey, err := types.NewTuple(r.Format(), vals[:idx.Count()*2]...)
if err != nil {
return types.Tuple{}, types.Tuple{}, err
}
return fullKey, partialKey, nil
}
// ReduceToIndexKeysFromTagMap creates a full key and a partial key from the given map of tags (first tuple being the
// full key). Please refer to the note in the index editor for more information regarding partial keys.
func ReduceToIndexKeysFromTagMap(nbf *types.NomsBinFormat, idx schema.Index, tagToVal map[uint64]types.Value) (types.Tuple, types.Tuple, error) {
+2 -1
View File
@@ -162,7 +162,8 @@ func (conv *KVToSqlRowConverter) processTuple(cols []interface{}, valsToFill int
}
tag64 := primReader.ReadUint()
if tag64 > maxTag {
if tag64 > maxTag && tag64 != schema.KeylessRowCardinalityTag && tag64 != schema.KeylessRowIdTag {
break
}
+24 -19
View File
@@ -38,12 +38,12 @@ var resultBufferPool = &sync.Pool{
}
type indexLookupRowIterAdapter struct {
idx DoltIndex
keyIter nomsKeyIter
pkTags map[uint64]int
conv *KVToSqlRowConverter
ctx *sql.Context
cancelF func()
idx DoltIndex
keyIter nomsKeyIter
lookupTags map[uint64]int
conv *KVToSqlRowConverter
ctx *sql.Context
cancelF func()
read uint64
count uint64
@@ -53,9 +53,14 @@ type indexLookupRowIterAdapter struct {
// NewIndexLookupRowIterAdapter returns a new indexLookupRowIterAdapter.
func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter nomsKeyIter) *indexLookupRowIterAdapter {
pkTags := make(map[uint64]int)
lookupTags := make(map[uint64]int)
for i, tag := range idx.Schema().GetPKCols().Tags {
pkTags[tag] = i
lookupTags[tag] = i
}
// handle keyless case, where no columns are pk's and rowIdTag is the only lookup tag
if len(lookupTags) == 0 {
lookupTags[schema.KeylessRowIdTag] = 0
}
cols := idx.Schema().GetAllCols().GetColumns()
@@ -66,13 +71,13 @@ func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter nomsK
queueCtx, cancelF := context.WithCancel(ctx)
iter := &indexLookupRowIterAdapter{
idx: idx,
keyIter: keyIter,
conv: conv,
pkTags: pkTags,
ctx: ctx,
cancelF: cancelF,
resultBuf: resBuf,
idx: idx,
keyIter: keyIter,
conv: conv,
lookupTags: lookupTags,
ctx: ctx,
cancelF: cancelF,
resultBuf: resBuf,
}
go iter.queueRows(queueCtx, epoch)
@@ -157,7 +162,7 @@ func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat,
return types.Tuple{}, err
}
resVals := make([]types.Value, len(i.pkTags)*2)
resVals := make([]types.Value, len(i.lookupTags)*2)
for {
_, tag, err := tplItr.NextUint64()
@@ -169,9 +174,9 @@ func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat,
return types.Tuple{}, err
}
idx, inPK := i.pkTags[tag]
idx, inKey := i.lookupTags[tag]
if inPK {
if inKey {
_, valVal, err := tplItr.Next()
if err != nil {
@@ -195,8 +200,8 @@ func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat,
// processKey is called within queueRows and processes each key, sending the resulting row to the row channel.
func (i *indexLookupRowIterAdapter) processKey(indexKey types.Tuple) (sql.Row, error) {
tableData := i.idx.TableData()
pkTupleVal, err := i.indexKeyToTableKey(tableData.Format(), indexKey)
pkTupleVal, err := i.indexKeyToTableKey(tableData.Format(), indexKey)
if err != nil {
return nil, err
}
@@ -104,20 +104,16 @@ func (te *sqlTableEditor) Insert(ctx *sql.Context, sqlRow sql.Row) error {
return te.tableEditor.InsertKeyVal(ctx, k, v, tagToVal, te.duplicateKeyErrFunc)
}
dRow, err := sqlutil.SqlRowToDoltRow(ctx, te.vrw, sqlRow, te.sch)
if err != nil {
return err
}
return te.tableEditor.InsertRow(ctx, dRow, te.duplicateKeyErrFunc)
}
func (te *sqlTableEditor) Delete(ctx *sql.Context, sqlRow sql.Row) error {
if !schema.IsKeyless(te.sch) {
k, tagToVal, err := sqlutil.DoltKeyAndMappingFromSqlRow(ctx, te.vrw, sqlRow, te.sch)
if err != nil {
return err
}
@@ -128,7 +124,6 @@ func (te *sqlTableEditor) Delete(ctx *sql.Context, sqlRow sql.Row) error {
if err != nil {
return err
}
return te.tableEditor.DeleteRow(ctx, dRow)
}
}
@@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"reflect"
"sort"
"strings"
"testing"
@@ -730,7 +731,172 @@ func assertTableEditorRows(t *testing.T, root *doltdb.RootValue, expected []sql.
sqlRows = append(sqlRows, sqlRow)
return nil
})
if !reflect.DeepEqual(expectedIndexRows, sqlRows) {
sort.Slice(sqlRows, func(leftIndex, rightIndex int) bool {
a := sqlRows[leftIndex]
b := sqlRows[rightIndex]
for i := range a {
aVal, aNotNil := a[i].(int64)
bVal, bNotNil := b[i].(int64)
if !aNotNil {
aVal = math.MaxInt64
}
if !bNotNil {
bVal = math.MaxInt64
}
if aVal < bVal {
return true
}
}
return false
})
}
assert.Equal(t, expectedIndexRows, sqlRows)
}
}
}
func setupEditorKeylessFkTest(t *testing.T) (*env.DoltEnv, *doltdb.RootValue) {
dEnv := dtestutils.CreateTestEnv()
root, err := dEnv.WorkingRoot(context.Background())
if err != nil {
panic(err)
}
initialRoot, err := ExecuteSql(t, dEnv, root, `
CREATE TABLE one (
pk BIGINT,
v1 BIGINT,
v2 BIGINT,
INDEX secondary (v1)
);
CREATE TABLE two (
pk BIGINT,
v1 BIGINT,
v2 BIGINT,
INDEX secondary (v1, v2)
);
CREATE TABLE three (
pk BIGINT,
v1 BIGINT,
v2 BIGINT,
INDEX v1 (v1),
INDEX v2 (v2)
);
CREATE TABLE parent (
id BIGINT,
v1 BIGINT,
v2 BIGINT,
INDEX v1 (v1),
INDEX v2 (v2)
);
CREATE TABLE child (
id BIGINT,
v1 BIGINT,
v2 BIGINT
);
`)
require.NoError(t, err)
return dEnv, initialRoot
}
func TestTableEditorKeylessFKCascade(t *testing.T) {
tests := []struct {
name string
sqlStatement string
expectedOne []sql.Row
expectedTwo []sql.Row
expectedThree []sql.Row
}{
{
"cascade updates",
`INSERT INTO one VALUES (1, 1, 4), (2, 2, 5), (3, 3, 6), (4, 4, 5);
INSERT INTO two VALUES (2, 1, 1), (3, 2, 2), (4, 3, 3), (5, 4, 4);
INSERT INTO three VALUES (3, 1, 1), (4, 2, 2), (5, 3, 3), (6, 4, 4);
UPDATE one SET v1 = v1 + v2;
UPDATE two SET v2 = v1 - 2;`,
[]sql.Row{{1, 5, 4}, {4, 9, 5}, {2, 7, 5}, {3, 9, 6}},
[]sql.Row{{3, 7, 5}, {2, 5, 3}, {5, 9, 7}, {4, 9, 7}},
[]sql.Row{{5, 9, 7}, {6, 9, 7}, {4, 7, 5}, {3, 5, 3}},
},
{
"cascade updates and deletes",
`INSERT INTO one VALUES (1, 1, 4), (2, 2, 5), (3, 3, 6), (4, 4, 5);
INSERT INTO two VALUES (2, 1, 1), (3, 2, 2), (4, 3, 3), (5, 4, 4);
INSERT INTO three VALUES (3, 1, 1), (4, 2, 2), (5, 3, 3), (6, 4, 4);
UPDATE one SET v1 = v1 + v2;
DELETE FROM one WHERE pk = 3;
UPDATE two SET v2 = v1 - 2;`,
[]sql.Row{{1, 5, 4}, {4, 9, 5}, {2, 7, 5}},
[]sql.Row{{3, 7, 5}, {2, 5, 3}},
[]sql.Row{{4, 7, 5}, {3, 5, 3}},
},
{
"cascade insertions",
`INSERT INTO three VALUES (1, NULL, NULL), (2, NULL, 2), (3, 3, NULL);
INSERT INTO two VALUES (1, NULL, 1);`,
[]sql.Row{},
[]sql.Row{{1, nil, 1}},
[]sql.Row{{3, 3, nil}, {2, nil, 2}, {1, nil, nil}},
},
{
"cascade updates and deletes after table and column renames",
`INSERT INTO one VALUES (1, 1, 4), (2, 2, 5), (3, 3, 6), (4, 4, 5);
INSERT INTO two VALUES (2, 1, 1), (3, 2, 2), (4, 3, 3), (5, 4, 4);
INSERT INTO three VALUES (3, 1, 1), (4, 2, 2), (5, 3, 3), (6, 4, 4);
RENAME TABLE one TO new;
ALTER TABLE new RENAME COLUMN v1 TO vnew;
UPDATE new SET vnew = vnew + v2;
DELETE FROM new WHERE pk = 3;
UPDATE two SET v2 = v1 - 2;
RENAME TABLE new TO one;`,
[]sql.Row{{1, 5, 4}, {4, 9, 5}, {2, 7, 5}},
[]sql.Row{{3, 7, 5}, {2, 5, 3}},
[]sql.Row{{4, 7, 5}, {3, 5, 3}},
},
{
"cascade inserts and deletes",
`INSERT INTO one VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3);
INSERT INTO two VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3);
DELETE FROM one;`,
[]sql.Row{},
[]sql.Row{},
[]sql.Row{},
},
{
"cascade inserts and deletes (ep. 2)",
`INSERT INTO one VALUES (1, NULL, 1);
INSERT INTO two VALUES (1, NULL, 1), (2, NULL, 2);
INSERT INTO three VALUES (1, NULL, 1), (2, NULL, 2);
DELETE FROM one;
DELETE FROM two WHERE pk = 2`,
[]sql.Row{},
[]sql.Row{{1, nil, 1}},
[]sql.Row{{2, nil, 2}, {1, nil, 1}},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
dEnv, initialRoot := setupEditorKeylessFkTest(t)
testRoot, err := ExecuteSql(t, dEnv, initialRoot, `
ALTER TABLE two ADD FOREIGN KEY (v1) REFERENCES one(v1) ON DELETE CASCADE ON UPDATE CASCADE;
ALTER TABLE three ADD FOREIGN KEY (v1, v2) REFERENCES two(v1, v2) ON DELETE CASCADE ON UPDATE CASCADE;
`)
require.NoError(t, err)
root := testRoot
for _, sqlStatement := range strings.Split(test.sqlStatement, ";") {
var err error
root, err = executeModify(t, context.Background(), dEnv, root, sqlStatement)
require.NoError(t, err)
}
assertTableEditorRows(t, root, test.expectedOne, "one")
assertTableEditorRows(t, root, test.expectedTwo, "two")
assertTableEditorRows(t, root, test.expectedThree, "three")
})
}
}
-3
View File
@@ -1012,9 +1012,6 @@ func (t *AlterableDoltTable) CreateIndex(
columns []sql.IndexColumn,
comment string,
) error {
if schema.IsKeyless(t.sch) {
return fmt.Errorf("indexes on keyless tables are not supported")
}
table, err := t.doltTable(ctx)
if err != nil {
@@ -16,6 +16,7 @@ package editor
import (
"context"
"fmt"
"io"
"sync"
@@ -105,8 +106,9 @@ type indexEditAccumulator struct {
// hashedTuple is a tuple accompanied by a hash. The representing value of the hash is dependent on the function
// it is obtained from.
type hashedTuple struct {
types.Tuple
hash.Hash
key types.Tuple
value types.Tuple
hash hash.Hash
}
// createInitialIndexEditAcc creates the initial indexEditAccumulator. All future ieas should use the method
@@ -189,16 +191,21 @@ func (iea *indexEditAccumulator) HasPartial(
defer mapIter.Close(ctx)
var r row.Row
for r, err = mapIter.ReadRow(ctx); err == nil; r, err = mapIter.ReadRow(ctx) {
tplVal, err := r.NomsMapKey(idxSch).Value(ctx)
tplKeyVal, err := r.NomsMapKey(idxSch).Value(ctx)
if err != nil {
return nil, err
}
tpl := tplVal.(types.Tuple)
tplHash, err := tpl.Hash(tpl.Format())
key := tplKeyVal.(types.Tuple)
tplValVal, err := r.NomsMapValue(idxSch).Value(ctx)
if err != nil {
return nil, err
}
matches = append(matches, hashedTuple{tpl, tplHash})
val := tplValVal.(types.Tuple)
keyHash, err := key.Hash(key.Format())
if err != nil {
return nil, err
}
matches = append(matches, hashedTuple{key, val, keyHash})
}
if err != io.EOF {
return nil, err
@@ -207,13 +214,13 @@ func (iea *indexEditAccumulator) HasPartial(
for i := len(matches) - 1; i >= 0; i-- {
// If we've removed a key that's present here, remove it from the slice
if _, ok := iea.removedKeys[matches[i].Hash]; ok {
if _, ok := iea.removedKeys[matches[i].hash]; ok {
matches[i] = matches[len(matches)-1]
matches = matches[:len(matches)-1]
}
}
for addedHash, addedTpl := range iea.addedPartialKeys[partialKeyHash] {
matches = append(matches, hashedTuple{addedTpl, addedHash})
matches = append(matches, hashedTuple{addedTpl, types.EmptyTuple(addedTpl.Format()), addedHash})
}
return matches, nil
}
@@ -236,7 +243,7 @@ func NewIndexEditor(ctx context.Context, index schema.Index, indexData types.Map
// InsertRow adds the given row to the index. If the row already exists and the index is unique, then an error is returned.
// Otherwise, it is a no-op.
func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tuple) error {
func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tuple, value types.Tuple) error {
defer ie.autoFlush()
ie.flushMutex.RLock()
defer ie.flushMutex.RUnlock()
@@ -257,18 +264,18 @@ func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tupl
if matches, err := ie.iea.HasPartial(ctx, ie.idxSch, partialKeyHash, partialKey); err != nil {
return err
} else if len(matches) > 0 {
tableTuple, err := ie.idx.ToTableTuple(ctx, matches[0].Tuple, ie.nbf)
tableTuple, err := ie.idx.ToTableTuple(ctx, matches[0].key, ie.nbf)
if err != nil {
return err
}
// For a UNIQUE key violation, there should only be 1 at max. We still do an "over 0" check for safety though.
return &uniqueKeyErr{tableTuple, matches[0].Tuple, ie.idx.Name()}
return &uniqueKeyErr{tableTuple, matches[0].key, ie.idx.Name()}
}
} else {
if rowExists, err := ie.iea.Has(ctx, keyHash, key); err != nil {
return err
} else if rowExists {
ie.stack.Push(true, types.EmptyTuple(key.Format()), types.EmptyTuple(key.Format()))
} else if rowExists && value.Empty() {
ie.stack.Push(true, types.EmptyTuple(key.Format()), types.EmptyTuple(key.Format()), types.EmptyTuple(value.Format()))
return nil
}
}
@@ -276,7 +283,7 @@ func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tupl
if _, ok := ie.iea.removedKeys[keyHash]; ok {
delete(ie.iea.removedKeys, keyHash)
} else {
ie.iea.addedKeys[keyHash] = hashedTuple{key, partialKeyHash}
ie.iea.addedKeys[keyHash] = hashedTuple{key, value, partialKeyHash}
if matchingMap, ok := ie.iea.addedPartialKeys[partialKeyHash]; ok {
matchingMap[keyHash] = key
} else {
@@ -285,12 +292,12 @@ func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tupl
}
ie.iea.opCount++
ie.stack.Push(true, key, partialKey)
ie.stack.Push(true, key, partialKey, value)
return nil
}
// DeleteRow removes the given row from the index.
func (ie *IndexEditor) DeleteRow(ctx context.Context, key, partialKey types.Tuple) error {
func (ie *IndexEditor) DeleteRow(ctx context.Context, key, partialKey, value types.Tuple) error {
defer ie.autoFlush()
ie.flushMutex.RLock()
defer ie.flushMutex.RUnlock()
@@ -311,11 +318,11 @@ func (ie *IndexEditor) DeleteRow(ctx context.Context, key, partialKey types.Tupl
delete(ie.iea.addedKeys, keyHash)
delete(ie.iea.addedPartialKeys[partialKeyHash], keyHash)
} else {
ie.iea.removedKeys[keyHash] = hashedTuple{key, partialKeyHash}
ie.iea.removedKeys[keyHash] = hashedTuple{key, value, partialKeyHash}
}
ie.iea.opCount++
ie.stack.Push(false, key, partialKey)
ie.stack.Push(false, key, partialKey, value)
return nil
}
@@ -356,14 +363,14 @@ func (ie *IndexEditor) Undo(ctx context.Context) {
return
}
if indexOp.isInsert {
err := ie.DeleteRow(ctx, indexOp.fullKey, indexOp.partialKey)
err := ie.DeleteRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value)
if err != nil {
panic(fmt.Sprintf("index '%s' is in an invalid and unrecoverable state: "+
"attempted to undo previous insertion but encountered the following error: %v",
ie.idx.Name(), err))
}
} else {
err := ie.InsertRow(ctx, indexOp.fullKey, indexOp.partialKey)
err := ie.InsertRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value)
if err != nil {
panic(fmt.Sprintf("index '%s' is in an invalid and unrecoverable state: "+
"attempted to undo previous deletion but encountered the following error: %v",
@@ -414,7 +421,7 @@ func (ie *IndexEditor) StatementFinished(ctx context.Context, errored bool) erro
for keyHash, hTpl := range ie.iea.removedKeys {
if _, ok := targetIea.addedKeys[keyHash]; ok {
delete(targetIea.addedKeys, keyHash)
delete(targetIea.addedPartialKeys[hTpl.Hash], keyHash)
delete(targetIea.addedPartialKeys[hTpl.hash], keyHash)
} else {
targetIea.removedKeys[keyHash] = hTpl
}
@@ -424,10 +431,10 @@ func (ie *IndexEditor) StatementFinished(ctx context.Context, errored bool) erro
delete(targetIea.removedKeys, keyHash)
} else {
targetIea.addedKeys[keyHash] = hTpl
if matchingMap, ok := targetIea.addedPartialKeys[hTpl.Hash]; ok {
matchingMap[keyHash] = hTpl.Tuple
if matchingMap, ok := targetIea.addedPartialKeys[hTpl.hash]; ok {
matchingMap[keyHash] = hTpl.key
} else {
targetIea.addedPartialKeys[hTpl.Hash] = map[hash.Hash]types.Tuple{keyHash: hTpl.Tuple}
targetIea.addedPartialKeys[hTpl.hash] = map[hash.Hash]types.Tuple{keyHash: hTpl.key}
}
}
}
@@ -517,10 +524,10 @@ func processIndexEditAccumulatorChain(ctx context.Context, futureIea *indexEditA
ed := types.CreateEditAccForMapEdits(iea.nbf)
defer ed.Close()
for _, hTpl := range iea.removedKeys {
ed.AddEdit(hTpl.Tuple, nil)
ed.AddEdit(hTpl.key, nil)
}
for _, hTpl := range iea.addedKeys {
ed.AddEdit(hTpl.Tuple, types.EmptyTuple(hTpl.Tuple.Format()))
ed.AddEdit(hTpl.key, hTpl.value)
}
// If we encounter an error and return, then we need to remove this iea from the chain and update the next's rowData
@@ -646,14 +653,17 @@ func rebuildIndexRowData(ctx context.Context, vrw types.ValueReadWriter, sch sch
if err != nil {
return err
}
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, keyVal, err := dRow.ReduceToIndexKeys(index)
if err != nil {
return err
}
err = indexEditor.InsertRow(ctx, fullKey, partialKey)
err = indexEditor.InsertRow(ctx, fullKey, partialKey, keyVal)
if err != nil {
return err
}
return nil
})
if err != nil {
@@ -82,9 +82,9 @@ func TestIndexEditorConcurrency(t *testing.T) {
1: types.Int(val),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, value, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey, value))
wg.Done()
}(j)
}
@@ -103,12 +103,12 @@ func TestIndexEditorConcurrency(t *testing.T) {
1: types.Int(val + 1),
})
require.NoError(t, err)
oldFullKey, oldPartialKey, err := row.ReduceToIndexKeys(index, dOldRow)
oldFullKey, oldPartialKey, _, err := dOldRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.DeleteRow(context.Background(), oldFullKey, oldPartialKey))
newFullKey, newPartialKey, err := row.ReduceToIndexKeys(index, dNewRow)
require.NoError(t, indexEditor.DeleteRow(context.Background(), oldFullKey, oldPartialKey, types.EmptyTuple(format)))
newFullKey, newPartialKey, newValue, err := dNewRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), newFullKey, newPartialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), newFullKey, newPartialKey, newValue))
wg.Done()
}(j)
}
@@ -122,9 +122,9 @@ func TestIndexEditorConcurrency(t *testing.T) {
1: types.Int(val),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, _, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.DeleteRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.DeleteRow(context.Background(), fullKey, partialKey, types.EmptyTuple(format)))
wg.Done()
}(j)
}
@@ -173,9 +173,9 @@ func TestIndexEditorConcurrencyPostInsert(t *testing.T) {
1: types.Int(i),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, value, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey, value))
}
indexData, err := indexEditor.Map(context.Background())
require.NoError(t, err)
@@ -197,12 +197,12 @@ func TestIndexEditorConcurrencyPostInsert(t *testing.T) {
1: types.Int(val + 1),
})
require.NoError(t, err)
oldFullKey, oldPartialKey, err := row.ReduceToIndexKeys(index, dOldRow)
oldFullKey, oldPartialKey, _, err := dOldRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.DeleteRow(context.Background(), oldFullKey, oldPartialKey))
newFullKey, newPartialKey, err := row.ReduceToIndexKeys(index, dNewRow)
require.NoError(t, indexEditor.DeleteRow(context.Background(), oldFullKey, oldPartialKey, types.EmptyTuple(format)))
newFullKey, newPartialKey, value, err := dNewRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), newFullKey, newPartialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), newFullKey, newPartialKey, value))
wg.Done()
}(j)
}
@@ -215,9 +215,9 @@ func TestIndexEditorConcurrencyPostInsert(t *testing.T) {
1: types.Int(val),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, _, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.DeleteRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.DeleteRow(context.Background(), fullKey, partialKey, types.EmptyTuple(format)))
wg.Done()
}(j)
}
@@ -265,9 +265,9 @@ func TestIndexEditorUniqueMultipleNil(t *testing.T) {
1: types.Int(i),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, value, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey, value))
}
newIndexData, err := indexEditor.Map(context.Background())
require.NoError(t, err)
@@ -312,9 +312,9 @@ func TestIndexEditorWriteAfterFlush(t *testing.T) {
1: types.Int(i),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, value, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey, value))
}
_, err = indexEditor.Map(context.Background())
@@ -326,9 +326,9 @@ func TestIndexEditorWriteAfterFlush(t *testing.T) {
1: types.Int(i),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, _, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.DeleteRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.DeleteRow(context.Background(), fullKey, partialKey, types.EmptyTuple(format)))
}
newIndexData, err := indexEditor.Map(context.Background())
@@ -375,25 +375,25 @@ func TestIndexEditorUniqueErrorDoesntPersist(t *testing.T) {
1: types.Int(1),
})
require.NoError(t, err)
fullKey, partialKey, err := row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, value, err := dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey, value))
dRow, err = row.New(format, indexSch, row.TaggedValues{
0: types.Int(2),
1: types.Int(1),
})
require.NoError(t, err)
fullKey, partialKey, err = row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, value, err = dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.Error(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey))
require.Error(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey, value))
dRow, err = row.New(format, indexSch, row.TaggedValues{
0: types.Int(2),
1: types.Int(2),
})
require.NoError(t, err)
fullKey, partialKey, err = row.ReduceToIndexKeys(index, dRow)
fullKey, partialKey, value, err = dRow.ReduceToIndexKeys(index)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey))
require.NoError(t, indexEditor.InsertRow(context.Background(), fullKey, partialKey, value))
}
func TestIndexRebuildingWithZeroIndexes(t *testing.T) {
@@ -34,13 +34,15 @@ type indexOperation struct {
isInsert bool
fullKey types.Tuple
partialKey types.Tuple
value types.Tuple
}
// Push adds the given keys to the top of the stack.
func (ios *indexOperationStack) Push(isInsert bool, fullKey, partialKey types.Tuple) {
func (ios *indexOperationStack) Push(isInsert bool, fullKey, partialKey, value types.Tuple) {
ios.entries[ios.currentIndex].isInsert = isInsert
ios.entries[ios.currentIndex].fullKey = fullKey
ios.entries[ios.currentIndex].partialKey = partialKey
ios.entries[ios.currentIndex].value = value
ios.currentIndex = (ios.currentIndex + 1) % uint64(len(ios.entries))
ios.numOfItems++
if ios.numOfItems > uint64(len(ios.entries)) {
@@ -26,30 +26,32 @@ func TestIndexOperationStack(t *testing.T) {
ios := &indexOperationStack{}
require.True(t, len(ios.entries) >= 2) // Entries should always at least have a length of 2
ios.Push(true, iosTuple(t, 100, 100), iosTuple(t, 100))
ios.Push(true, iosTuple(t, 100, 100), iosTuple(t, 100), iosTuple(t, 0))
entry, ok := ios.Pop()
require.True(t, ok)
iosTupleComp(t, entry.fullKey, 100, 100)
iosTupleComp(t, entry.partialKey, 100)
iosTupleComp(t, entry.value, 0)
require.True(t, entry.isInsert)
_, ok = ios.Pop()
require.False(t, ok)
for i := 0; i < len(ios.entries); i++ {
ios.Push(false, iosTuple(t, i, i), iosTuple(t, i))
ios.Push(false, iosTuple(t, i, i), iosTuple(t, i), iosTuple(t, i*2))
}
for i := len(ios.entries) - 1; i >= 0; i-- {
entry, ok = ios.Pop()
require.True(t, ok)
iosTupleComp(t, entry.fullKey, i, i)
iosTupleComp(t, entry.partialKey, i)
iosTupleComp(t, entry.partialKey, i*2)
require.False(t, entry.isInsert)
}
_, ok = ios.Pop()
require.False(t, ok)
for i := 0; i < (len(ios.entries)*2)+1; i++ {
ios.Push(true, iosTuple(t, i, i), iosTuple(t, i))
ios.Push(true, iosTuple(t, i, i), iosTuple(t, i), iosTuple(t, i*2))
}
for i := len(ios.entries) - 1; i >= 0; i-- {
entry, ok = ios.Pop()
@@ -57,6 +59,7 @@ func TestIndexOperationStack(t *testing.T) {
val := ((len(ios.entries) * 2) + 1) - i
iosTupleComp(t, entry.fullKey, val, val)
iosTupleComp(t, entry.partialKey, val)
iosTupleComp(t, entry.value, val*2)
require.True(t, entry.isInsert)
}
_, ok = ios.Pop()
@@ -37,6 +37,7 @@ type keylessTableEditor struct {
name string
acc keylessEditAcc
indexEds []*IndexEditor
cvEditor *types.MapEditor
eg *errgroup.Group
@@ -123,14 +124,22 @@ func newKeylessTableEditor(ctx context.Context, tbl *doltdb.Table, sch schema.Sc
eg, _ := errgroup.WithContext(ctx)
te := &keylessTableEditor{
tbl: tbl,
sch: sch,
name: name,
acc: acc,
eg: eg,
mu: &sync.Mutex{},
tbl: tbl,
sch: sch,
name: name,
acc: acc,
indexEds: make([]*IndexEditor, sch.Indexes().Count()),
eg: eg,
mu: &sync.Mutex{},
}
for i, index := range sch.Indexes().AllIndexes() {
indexData, err := tbl.GetIndexRowData(ctx, index.Name())
if err != nil {
return nil, err
}
te.indexEds[i] = NewIndexEditor(ctx, index, indexData, sch)
}
return te, nil
}
@@ -331,14 +340,14 @@ func (kte *keylessTableEditor) flush(ctx context.Context) error {
}
kte.eg.Go(func() (err error) {
kte.tbl, err = applyEdits(ctx, tbl, acc)
kte.tbl, err = applyEdits(ctx, tbl, acc, kte.indexEds, nil)
return err
})
return nil
}
func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc) (*doltdb.Table, error) {
func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc, indexEds []*IndexEditor, errFunc PKDuplicateErrFunc) (_ *doltdb.Table, retErr error) {
rowData, err := tbl.GetRowData(ctx)
if err != nil {
return nil, err
@@ -359,12 +368,12 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc) (*do
ed := rowData.Edit()
iter := table.NewMapPointReader(rowData, keys...)
var ok bool
for {
k, v, err := iter.NextTuple(ctx)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
@@ -374,22 +383,79 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc) (*do
return nil, err
}
var ok bool
oldv := v
if v.Empty() {
// row does not yet exist
v, ok, err = initializeCardinality(delta.val, delta.delta)
} else {
v, ok, err = modifyCardinalityWithDelta(v, delta.delta)
}
if err != nil {
return nil, err
}
func(k, v types.Tuple) (*doltdb.Table, error) {
indexOpsToUndo := make([]int, len(indexEds))
defer func() {
if retErr != nil {
for i, opsToUndo := range indexOpsToUndo {
for undone := 0; undone < opsToUndo; undone++ {
indexEds[i].Undo(ctx)
}
}
}
}()
for i, indexEd := range indexEds {
var r row.Row
if v.Empty() {
r, _, err = row.KeylessRowsFromTuples(k, oldv)
} else {
r, _, err = row.KeylessRowsFromTuples(k, v)
}
if err != nil {
return nil, err
}
fullKey, partialKey, value, err := r.ReduceToIndexKeys(indexEd.Index())
if err != nil {
return nil, err
}
if delta.delta < 1 {
err = indexEd.DeleteRow(ctx, fullKey, partialKey, value)
if err != nil {
return nil, err
}
} else {
err = indexEd.InsertRow(ctx, fullKey, partialKey, value)
if err != nil {
return nil, err
}
}
indexOpsToUndo[i]++
}
return nil, nil
}(k, v)
if ok {
ed.Set(k, v)
} else {
ed.Remove(k)
}
}
for i := 0; i < len(indexEds); i++ {
indexMap, idxErr := indexEds[i].Map(ctx)
if idxErr != nil {
return nil, err
}
tbl, idxErr = tbl.SetIndexRowData(ctx, indexEds[i].Index().Name(), indexMap)
if idxErr != nil {
return nil, err
}
}
rowData, err = ed.Map(ctx)
@@ -0,0 +1,658 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package editor
import (
"context"
"errors"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/encoding"
"github.com/dolthub/dolt/go/store/types"
)
func TestKeylessTableEditorConcurrency(t *testing.T) {
format := types.Format_Default
db, err := dbfactory.MemFactory{}.CreateDB(context.Background(), format, nil, nil)
require.NoError(t, err)
colColl := schema.NewColCollection(
schema.NewColumn("v0", 0, types.IntKind, false),
schema.NewColumn("v1", 1, types.IntKind, false),
schema.NewColumn("v2", 2, types.IntKind, false))
tableSch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
tableSchVal, err := encoding.MarshalSchemaAsNomsValue(context.Background(), db, tableSch)
require.NoError(t, err)
emptyMap, err := types.NewMap(context.Background(), db)
require.NoError(t, err)
table, err := doltdb.NewTable(context.Background(), db, tableSchVal, emptyMap, emptyMap, nil)
require.NoError(t, err)
for i := 0; i < tableEditorConcurrencyIterations; i++ {
tableEditor, err := newKeylessTableEditor(context.Background(), table, tableSch, tableName)
require.NoError(t, err)
wg := &sync.WaitGroup{}
for j := 0; j < tableEditorConcurrencyFinalCount*2; j++ {
wg.Add(1)
go func(val int) {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(val),
1: types.Int(val),
2: types.Int(val),
})
require.NoError(t, err)
require.NoError(t, tableEditor.InsertRow(context.Background(), dRow, nil))
wg.Done()
}(j)
}
wg.Wait()
for j := 0; j < tableEditorConcurrencyFinalCount; j++ {
wg.Add(1)
go func(val int) {
dOldRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(val),
1: types.Int(val),
2: types.Int(val),
})
require.NoError(t, err)
dNewRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(val),
1: types.Int(val + 1),
2: types.Int(val + 1),
})
require.NoError(t, err)
require.NoError(t, tableEditor.UpdateRow(context.Background(), dOldRow, dNewRow, nil))
wg.Done()
}(j)
}
// We let the Updates and Deletes execute at the same time
for j := tableEditorConcurrencyFinalCount; j < tableEditorConcurrencyFinalCount*2; j++ {
wg.Add(1)
go func(val int) {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(val),
1: types.Int(val),
2: types.Int(val),
})
require.NoError(t, err)
require.NoError(t, tableEditor.DeleteRow(context.Background(), dRow))
wg.Done()
}(j)
}
wg.Wait()
newTable, err := tableEditor.Table(context.Background())
require.NoError(t, err)
newTableData, err := newTable.GetRowData(context.Background())
require.NoError(t, err)
require.Equal(t, newTableData.Len(), uint64(100))
seen := make([]bool, 100)
if assert.Equal(t, uint64(tableEditorConcurrencyFinalCount), newTableData.Len()) {
_ = newTableData.IterAll(context.Background(), func(key, value types.Value) error {
dReadRow, err := row.FromNoms(tableSch, key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[0].(types.Int)
assert.Equal(t, true, ok)
seen[int(idx)] = true
val1, ok := dReadVals[1].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val1)-1)
val2, ok := dReadVals[2].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val2)-1)
return nil
})
for _, v := range seen {
assert.True(t, v)
}
}
}
}
func TestKeylessTableEditorConcurrencyPostInsert(t *testing.T) {
format := types.Format_Default
db, err := dbfactory.MemFactory{}.CreateDB(context.Background(), format, nil, nil)
require.NoError(t, err)
colColl := schema.NewColCollection(
schema.NewColumn("v0", 0, types.IntKind, false),
schema.NewColumn("v1", 1, types.IntKind, false),
schema.NewColumn("v2", 2, types.IntKind, false))
tableSch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
tableSchVal, err := encoding.MarshalSchemaAsNomsValue(context.Background(), db, tableSch)
require.NoError(t, err)
emptyMap, err := types.NewMap(context.Background(), db)
require.NoError(t, err)
table, err := doltdb.NewTable(context.Background(), db, tableSchVal, emptyMap, emptyMap, nil)
require.NoError(t, err)
tableEditor, err := newKeylessTableEditor(context.Background(), table, tableSch, tableName)
require.NoError(t, err)
for i := 0; i < tableEditorConcurrencyFinalCount*2; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
require.NoError(t, tableEditor.InsertRow(context.Background(), dRow, nil))
}
table, err = tableEditor.Table(context.Background())
require.NoError(t, err)
for i := 0; i < tableEditorConcurrencyIterations; i++ {
tableEditor, err := newKeylessTableEditor(context.Background(), table, tableSch, tableName)
require.NoError(t, err)
wg := &sync.WaitGroup{}
for j := 0; j < tableEditorConcurrencyFinalCount; j++ {
wg.Add(1)
go func(val int) {
dOldRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(val),
1: types.Int(val),
2: types.Int(val),
})
require.NoError(t, err)
dNewRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(val),
1: types.Int(val + 1),
2: types.Int(val + 1),
})
require.NoError(t, err)
require.NoError(t, tableEditor.UpdateRow(context.Background(), dOldRow, dNewRow, nil))
wg.Done()
}(j)
}
for j := tableEditorConcurrencyFinalCount; j < tableEditorConcurrencyFinalCount*2; j++ {
wg.Add(1)
go func(val int) {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(val),
1: types.Int(val),
2: types.Int(val),
})
require.NoError(t, err)
require.NoError(t, tableEditor.DeleteRow(context.Background(), dRow))
wg.Done()
}(j)
}
wg.Wait()
newTable, err := tableEditor.Table(context.Background())
require.NoError(t, err)
newTableData, err := newTable.GetRowData(context.Background())
require.NoError(t, err)
require.Equal(t, newTableData.Len(), uint64(100))
seen := make([]bool, 100)
if assert.Equal(t, uint64(tableEditorConcurrencyFinalCount), newTableData.Len()) {
_ = newTableData.IterAll(context.Background(), func(key, value types.Value) error {
dReadRow, err := row.FromNoms(tableSch, key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[0].(types.Int)
assert.Equal(t, true, ok)
seen[int(idx)] = true
val1, ok := dReadVals[1].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val1)-1)
val2, ok := dReadVals[2].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val2)-1)
return nil
})
for _, v := range seen {
assert.True(t, v)
}
}
}
}
func TestKeylessTableEditorWriteAfterFlush(t *testing.T) {
format := types.Format_Default
db, err := dbfactory.MemFactory{}.CreateDB(context.Background(), format, nil, nil)
require.NoError(t, err)
colColl := schema.NewColCollection(
schema.NewColumn("v0", 0, types.IntKind, false),
schema.NewColumn("v1", 1, types.IntKind, false),
schema.NewColumn("v2", 2, types.IntKind, false))
tableSch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
tableSchVal, err := encoding.MarshalSchemaAsNomsValue(context.Background(), db, tableSch)
require.NoError(t, err)
emptyMap, err := types.NewMap(context.Background(), db)
require.NoError(t, err)
table, err := doltdb.NewTable(context.Background(), db, tableSchVal, emptyMap, emptyMap, nil)
require.NoError(t, err)
tableEditor, err := newKeylessTableEditor(context.Background(), table, tableSch, tableName)
require.NoError(t, err)
for i := 0; i < 20; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
require.NoError(t, tableEditor.InsertRow(context.Background(), dRow, nil))
}
_, err = tableEditor.Table(context.Background())
require.NoError(t, err)
for i := 10; i < 20; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
require.NoError(t, tableEditor.DeleteRow(context.Background(), dRow))
}
newTable, err := tableEditor.Table(context.Background())
require.NoError(t, err)
newTableData, err := newTable.GetRowData(context.Background())
require.NoError(t, err)
seen := make([]bool, 10)
if assert.Equal(t, uint64(10), newTableData.Len()) {
_ = newTableData.IterAll(context.Background(), func(key, value types.Value) error {
dReadRow, err := row.FromNoms(tableSch, key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[0].(types.Int)
assert.Equal(t, true, ok)
seen[int(idx)] = true
val1, ok := dReadVals[1].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val1))
val2, ok := dReadVals[2].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val2))
return nil
})
for _, v := range seen {
assert.True(t, v)
}
}
sameTable, err := tableEditor.Table(context.Background())
require.NoError(t, err)
sameTableData, err := sameTable.GetRowData(context.Background())
require.NoError(t, err)
assert.True(t, sameTableData.Equals(newTableData))
}
func TestKeylessTableEditorDuplicateKeyHandling(t *testing.T) {
format := types.Format_Default
db, err := dbfactory.MemFactory{}.CreateDB(context.Background(), format, nil, nil)
require.NoError(t, err)
colColl := schema.NewColCollection(
schema.NewColumn("v0", 0, types.IntKind, false),
schema.NewColumn("v1", 1, types.IntKind, false),
schema.NewColumn("v2", 2, types.IntKind, false))
tableSch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
tableSchVal, err := encoding.MarshalSchemaAsNomsValue(context.Background(), db, tableSch)
require.NoError(t, err)
emptyMap, err := types.NewMap(context.Background(), db)
require.NoError(t, err)
table, err := doltdb.NewTable(context.Background(), db, tableSchVal, emptyMap, emptyMap, nil)
require.NoError(t, err)
tableEditor, err := newKeylessTableEditor(context.Background(), table, tableSch, tableName)
require.NoError(t, err)
for i := 0; i < 3; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
require.NoError(t, tableEditor.InsertRow(context.Background(), dRow, nil))
}
_, err = tableEditor.Table(context.Background())
require.NoError(t, err)
for i := 0; i < 3; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
err = tableEditor.InsertRow(context.Background(), dRow, nil)
require.False(t, errors.Is(err, ErrDuplicateKey))
}
_, err = tableEditor.Table(context.Background())
require.NoError(t, err)
for i := 3; i < 10; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
require.NoError(t, tableEditor.InsertRow(context.Background(), dRow, nil))
}
newTable, err := tableEditor.Table(context.Background())
require.NoError(t, err)
newTableData, err := newTable.GetRowData(context.Background())
require.NoError(t, err)
seen := make([]bool, 10)
if assert.Equal(t, uint64(10), newTableData.Len()) {
_ = newTableData.IterAll(context.Background(), func(key, value types.Value) error {
dReadRow, err := row.FromNoms(tableSch, key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[0].(types.Int)
assert.Equal(t, true, ok)
seen[int(idx)] = true
val1, ok := dReadVals[1].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val1))
val2, ok := dReadVals[2].(types.Int)
assert.Equal(t, true, ok)
assert.Equal(t, int(idx), int(val2))
return nil
})
for _, v := range seen {
assert.True(t, v)
}
}
}
func TestKeylessTableEditorMultipleIndexErrorHandling(t *testing.T) {
ctx := context.Background()
format := types.Format_Default
db, err := dbfactory.MemFactory{}.CreateDB(ctx, format, nil, nil)
require.NoError(t, err)
colColl := schema.NewColCollection(
schema.NewColumn("v0", 0, types.IntKind, false),
schema.NewColumn("v1", 1, types.IntKind, false),
schema.NewColumn("v2", 2, types.IntKind, false))
tableSch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
idxv1, err := tableSch.Indexes().AddIndexByColNames("idx_v1", []string{"v1"}, schema.IndexProperties{
IsUnique: false,
})
require.NoError(t, err)
idxv2, err := tableSch.Indexes().AddIndexByColNames("idx_v2", []string{"v2"}, schema.IndexProperties{
IsUnique: false,
})
require.NoError(t, err)
tableSchVal, err := encoding.MarshalSchemaAsNomsValue(ctx, db, tableSch)
require.NoError(t, err)
emptyMap, err := types.NewMap(ctx, db)
require.NoError(t, err)
table, err := doltdb.NewTable(ctx, db, tableSchVal, emptyMap, emptyMap, nil)
require.NoError(t, err)
table, err = RebuildAllIndexes(ctx, table)
require.NoError(t, err)
tableEditor, err := newKeylessTableEditor(ctx, table, tableSch, tableName)
require.NoError(t, err)
for i := 0; i < 3; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
require.NoError(t, tableEditor.InsertRow(ctx, dRow, nil))
}
_, err = tableEditor.Table(ctx)
require.NoError(t, err)
for i := 0; i < 3; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i + 10),
1: types.Int(i),
2: types.Int(i + 10),
})
require.NoError(t, err)
err = tableEditor.InsertRow(ctx, dRow, nil)
require.False(t, errors.Is(err, ErrDuplicateKey))
dRow, err = row.New(format, tableSch, row.TaggedValues{
0: types.Int(i + 10),
1: types.Int(i + 10),
2: types.Int(i),
})
require.NoError(t, err)
err = tableEditor.InsertRow(ctx, dRow, nil)
require.False(t, errors.Is(err, ErrDuplicateKey))
}
table, err = tableEditor.Table(ctx)
require.NoError(t, err)
tableData, err := table.GetRowData(ctx)
require.NoError(t, err)
seen := make([]bool, 13)
if assert.Equal(t, uint64(9), tableData.Len()) {
_ = tableData.IterAll(ctx, func(key, value types.Value) error {
dReadRow, err := row.FromNoms(tableSch, key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[0].(types.Int)
assert.Equal(t, true, ok)
seen[int(idx)] = true
return nil
})
for i := 0; i < 3; i++ {
assert.True(t, seen[i])
}
for i := 3; i < 10; i++ {
assert.False(t, seen[i])
}
for i := 10; i < 13; i++ {
assert.True(t, seen[i])
}
}
idxv1Data, err := table.GetIndexRowData(ctx, "idx_v1")
require.NoError(t, err)
seen = make([]bool, 13)
if assert.Equal(t, uint64(9), idxv1Data.Len()) {
_ = idxv1Data.IterAll(ctx, func(key, value types.Value) error {
dReadRow, err := row.FromNoms(idxv1.Schema(), key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[1].(types.Int)
assert.True(t, ok)
seen[int(idx)] = true
_, ok = dReadVals[schema.KeylessRowIdTag].(types.UUID)
assert.True(t, ok)
return nil
})
for i := 0; i < 3; i++ {
assert.True(t, seen[i])
}
for i := 3; i < 10; i++ {
assert.False(t, seen[i])
}
for i := 10; i < 13; i++ {
assert.True(t, seen[i])
}
}
idxv2Data, err := table.GetIndexRowData(ctx, "idx_v2")
require.NoError(t, err)
seen = make([]bool, 13)
if assert.Equal(t, uint64(9), idxv2Data.Len()) {
_ = idxv2Data.IterAll(ctx, func(key, value types.Value) error {
dReadRow, err := row.FromNoms(idxv2.Schema(), key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[2].(types.Int)
assert.True(t, ok)
seen[int(idx)] = true
_, ok = dReadVals[schema.KeylessRowIdTag].(types.UUID)
assert.True(t, ok)
return nil
})
for i := 0; i < 3; i++ {
assert.True(t, seen[i])
}
for i := 3; i < 10; i++ {
assert.False(t, seen[i])
}
for i := 10; i < 13; i++ {
assert.True(t, seen[i])
}
}
}
func TestKeylessTableEditorIndexCardinality(t *testing.T) {
ctx := context.Background()
format := types.Format_Default
db, err := dbfactory.MemFactory{}.CreateDB(ctx, format, nil, nil)
require.NoError(t, err)
colColl := schema.NewColCollection(
schema.NewColumn("v0", 0, types.IntKind, false),
schema.NewColumn("v1", 1, types.IntKind, false),
schema.NewColumn("v2", 2, types.IntKind, false))
tableSch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
idxv1, err := tableSch.Indexes().AddIndexByColNames("idx_v1", []string{"v1"}, schema.IndexProperties{
IsUnique: false,
})
require.NoError(t, err)
//idxv2, err := tableSch.Indexes().AddIndexByColNames("idx_v2", []string{"v2"}, schema.IndexProperties{
// IsUnique: false,
//})
//require.NoError(t, err)
tableSchVal, err := encoding.MarshalSchemaAsNomsValue(ctx, db, tableSch)
require.NoError(t, err)
emptyMap, err := types.NewMap(ctx, db)
require.NoError(t, err)
table, err := doltdb.NewTable(ctx, db, tableSchVal, emptyMap, emptyMap, nil)
require.NoError(t, err)
table, err = RebuildAllIndexes(ctx, table)
require.NoError(t, err)
tableEditor, err := newKeylessTableEditor(ctx, table, tableSch, tableName)
require.NoError(t, err)
for i := 0; i < 3; i++ {
dRow, err := row.New(format, tableSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
2: types.Int(i),
})
require.NoError(t, err)
require.NoError(t, tableEditor.InsertRow(ctx, dRow, nil))
for j := 0; j < i; j++ {
require.NoError(t, tableEditor.InsertRow(ctx, dRow, nil))
}
}
table, err = tableEditor.Table(ctx)
require.NoError(t, err)
idxv1Data, err := table.GetIndexRowData(ctx, "idx_v1")
require.NoError(t, err)
seen := make([]bool, 3)
if assert.Equal(t, uint64(3), idxv1Data.Len()) {
_ = idxv1Data.IterAll(ctx, func(key, value types.Value) error {
dReadRow, err := row.FromNoms(idxv1.Schema(), key.(types.Tuple), value.(types.Tuple))
require.NoError(t, err)
dReadVals, err := dReadRow.TaggedValues()
require.NoError(t, err)
idx, ok := dReadVals[1].(types.Int)
require.True(t, ok)
seen[int(idx)] = true
_, ok = dReadVals[schema.KeylessRowIdTag].(types.UUID)
require.True(t, ok)
cardTuple := value.(types.Tuple)
cardVal, err := cardTuple.Get(1)
require.NoError(t, err)
card, ok := cardVal.(types.Uint)
require.True(t, ok)
assert.Equal(t, int(card), int(idx)+1)
return nil
})
}
}
@@ -293,6 +293,16 @@ func GetIndexedRowKVPs(ctx context.Context, te TableEditor, key types.Tuple, ind
return nil, err
}
lookupTags := make(map[uint64]int)
for i, tag := range te.Schema().GetPKCols().Tags {
lookupTags[tag] = i
}
// handle keyless case, where no columns are pk's and rowIdTag is the only lookup tag
if len(lookupTags) == 0 {
lookupTags[schema.KeylessRowIdTag] = 0
}
var rowKVPS [][2]types.Tuple
for {
k, err := indexIter.ReadKey(ctx)
@@ -303,37 +313,68 @@ func GetIndexedRowKVPs(ctx context.Context, te TableEditor, key types.Tuple, ind
return nil, err
}
indexRowTaggedValues, err := row.ParseTaggedValues(k)
pkTupleVal, err := indexKeyToTableKey(tbl.Format(), k, lookupTags)
if err != nil {
return nil, err
}
pkTuple := indexRowTaggedValues.NomsTupleForPKCols(te.Format(), te.Schema().GetPKCols())
pkTupleVal, err := pkTuple.Value(ctx)
fieldsVal, ok, err := rowData.MaybeGetTuple(ctx, pkTupleVal)
if err != nil {
return nil, err
}
fieldsVal, _, err := rowData.MaybeGet(ctx, pkTupleVal)
if err != nil {
return nil, err
if !ok {
return nil, nil
}
if fieldsVal == nil {
keyStr, err := formatKey(ctx, key)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("index key `%s` does not have a corresponding entry in table", keyStr)
}
rowKVPS = append(rowKVPS, [2]types.Tuple{pkTupleVal.(types.Tuple), fieldsVal.(types.Tuple)})
rowKVPS = append(rowKVPS, [2]types.Tuple{pkTupleVal, fieldsVal})
}
return rowKVPS, nil
}
func indexKeyToTableKey(nbf *types.NomsBinFormat, indexKey types.Tuple, lookupTags map[uint64]int) (types.Tuple, error) {
tplItr, err := indexKey.Iterator()
if err != nil {
return types.Tuple{}, err
}
resVals := make([]types.Value, len(lookupTags)*2)
for {
_, tag, err := tplItr.NextUint64()
if err != nil {
if err == io.EOF {
break
}
return types.Tuple{}, err
}
idx, inKey := lookupTags[tag]
if inKey {
_, valVal, err := tplItr.Next()
if err != nil {
return types.Tuple{}, err
}
resVals[idx*2] = types.Uint(tag)
resVals[idx*2+1] = valVal
} else {
err := tplItr.Skip()
if err != nil {
return types.Tuple{}, err
}
}
}
return types.NewTuple(nbf, resVals...)
}
// GetIndexedRows returns all matching rows for the given key on the index. The key is assumed to be in the format
// expected of the index, similar to searching on the index map itself.
func GetIndexedRows(ctx context.Context, te TableEditor, key types.Tuple, indexName string, idxSch schema.Schema) ([]row.Row, error) {
@@ -421,7 +462,7 @@ func (te *pkTableEditor) InsertKeyVal(ctx context.Context, key, val types.Tuple,
if err != nil {
return err
}
err = indexEd.InsertRow(ctx, fullKey, partialKey)
err = indexEd.InsertRow(ctx, fullKey, partialKey, types.EmptyTuple(te.nbf))
if uke, ok := err.(*uniqueKeyErr); ok {
tableTupleHash, err := uke.TableTuple.Hash(uke.TableTuple.Format())
if err != nil {
@@ -533,7 +574,7 @@ func (te *pkTableEditor) DeleteByKey(ctx context.Context, key types.Tuple, tagTo
if err != nil {
return err
}
err = indexEd.DeleteRow(ctx, fullKey, partialKey)
err = indexEd.DeleteRow(ctx, fullKey, partialKey, types.EmptyTuple(te.nbf))
if err != nil {
return err
}
@@ -609,20 +650,20 @@ func (te *pkTableEditor) UpdateRow(ctx context.Context, dOldRow row.Row, dNewRow
}()
for i, indexEd := range te.indexEds {
oldFullKey, oldPartialKey, err := row.ReduceToIndexKeys(indexEd.Index(), dOldRow)
oldFullKey, oldPartialKey, oldVal, err := dOldRow.ReduceToIndexKeys(indexEd.Index())
if err != nil {
return err
}
err = indexEd.DeleteRow(ctx, oldFullKey, oldPartialKey)
err = indexEd.DeleteRow(ctx, oldFullKey, oldPartialKey, oldVal)
if err != nil {
return err
}
indexOpsToUndo[i]++
newFullKey, newPartialKey, err := row.ReduceToIndexKeys(indexEd.Index(), dNewRow)
newFullKey, newPartialKey, newVal, err := dNewRow.ReduceToIndexKeys(indexEd.Index())
if err != nil {
return err
}
err = indexEd.InsertRow(ctx, newFullKey, newPartialKey)
err = indexEd.InsertRow(ctx, newFullKey, newPartialKey, newVal)
if uke, ok := err.(*uniqueKeyErr); ok {
tableTupleHash, err := uke.TableTuple.Hash(uke.TableTuple.Format())
if err != nil {
@@ -186,6 +186,11 @@ func (ste *sessionedTableEditor) handleReferencingRowsOnDelete(ctx context.Conte
if err != nil {
return err
}
//
//value, err := dRow.NomsMapValue(ste.tableEditor.Schema()).Value(ctx)
//if err != nil {
// return err
//}
return ste.onDeleteHandleRowsReferencingValues(ctx, key.(types.Tuple), dRowTaggedVals)
}
@@ -16,6 +16,7 @@ package noms
import (
"context"
"errors"
"io"
"github.com/dolthub/go-mysql-server/sql"
@@ -83,12 +84,13 @@ func NewRangeStartingAfter(key types.Tuple, inRangeCheck InRangeCheck) *ReadRang
// NomsRangeReader reads values in one or more ranges from a map
type NomsRangeReader struct {
sch schema.Schema
m types.Map
ranges []*ReadRange
idx int
itr types.MapIterator
currCheck InRangeCheck
sch schema.Schema
m types.Map
ranges []*ReadRange
idx int
itr types.MapIterator
currCheck InRangeCheck
cardCounter *CardinalityCounter
}
// NewNomsRangeReader creates a NomsRangeReader
@@ -100,6 +102,7 @@ func NewNomsRangeReader(sch schema.Schema, m types.Map, ranges []*ReadRange) *No
0,
nil,
nil,
NewCardinalityCounter(),
}
}
@@ -132,6 +135,15 @@ func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Tuple, types.Tupl
var k types.Tuple
var v types.Tuple
for nrr.itr != nil || nrr.idx < len(nrr.ranges) {
if !nrr.cardCounter.empty() {
if nrr.cardCounter.done() {
nrr.cardCounter.reset()
} else {
return nrr.cardCounter.next()
}
}
if nrr.itr == nil {
r := nrr.ranges[nrr.idx]
nrr.idx++
@@ -141,7 +153,6 @@ func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Tuple, types.Tupl
} else {
nrr.itr, err = nrr.m.IteratorFrom(ctx, r.Start)
}
if err != nil {
return types.Tuple{}, types.Tuple{}, err
}
@@ -170,6 +181,12 @@ func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Tuple, types.Tupl
}
if inRange {
if !v.Empty() {
nrr.cardCounter.updateWithKV(k, v)
if !nrr.cardCounter.empty() && !nrr.cardCounter.done() {
return nrr.cardCounter.next()
}
}
return k, v, nil
}
}
@@ -227,3 +244,82 @@ func SqlRowFromTuples(sch schema.Schema, key, val types.Tuple) (sql.Row, error)
return sql.NewRow(colVals...), nil
}
type CardinalityCounter struct {
key *types.Tuple
value *types.Tuple
card int
idx int
}
func NewCardinalityCounter() *CardinalityCounter {
return &CardinalityCounter{
nil,
nil,
-1,
-1,
}
}
func (cc *CardinalityCounter) updateWithKV(k, v types.Tuple) error {
if !v.Empty() {
cardTagVal, err := v.Get(0)
if err != nil {
return err
}
cardTag, ok := cardTagVal.(types.Uint)
if !ok {
return errors.New("index cardinality invalid tag type")
}
if uint64(cardTag) != schema.KeylessRowCardinalityTag {
return errors.New("index cardinality tag invalid")
}
cardVal, err := v.Get(1)
if err != nil {
return err
}
card, ok := cardVal.(types.Uint)
if !ok {
return errors.New("index cardinality value invalid type")
}
if int(card) > 1 {
cc.card = int(card)
cc.idx = 0
cc.key = &k
cc.value = &v
return nil
} else {
cc.card = -1
cc.idx = -1
cc.key = nil
cc.value = nil
}
}
return nil
}
func (cc *CardinalityCounter) empty() bool {
return cc.key == nil || cc.value == nil
}
func (cc *CardinalityCounter) done() bool {
return cc.card < 1 || cc.idx >= cc.card
}
func (cc *CardinalityCounter) next() (types.Tuple, types.Tuple, error) {
if cc.key == nil || cc.value == nil {
return types.Tuple{}, types.Tuple{}, errors.New("cannot increment empty cardinality counter")
}
cc.idx++
return *cc.key, *cc.value, nil
}
func (cc *CardinalityCounter) reset() {
cc.card = -1
cc.idx = -1
cc.key = nil
cc.value = nil
}
File diff suppressed because it is too large Load Diff
+120 -4
View File
@@ -31,15 +31,15 @@ teardown() {
@test "keyless: feature indexes and foreign keys" {
run dolt sql -q "ALTER TABLE keyless ADD INDEX (c1);"
[ $status -ne 0 ]
[ $status -eq 0 ]
[[ ! "$output" =~ "panic" ]] || false
run dolt sql -q "CREATE TABLE bad (a int, b int, INDEX (b));"
[ $status -ne 0 ]
run dolt sql -q "CREATE TABLE fine (a int, b int, INDEX (b));"
[ $status -eq 0 ]
[[ ! "$output" =~ "panic" ]] || false
run dolt sql -q "CREATE TABLE worse (a int, b int, FOREIGN KEY (b) REFERENCES keyless(c1));"
[ $status -ne 0 ]
[ $status -eq 0 ]
[[ ! "$output" =~ "panic" ]] || false
}
@@ -820,3 +820,119 @@ SQL
[[ "${lines[3]}" = "1,1" ]] || false
[ "${#lines[@]}" -eq 4 ]
}
@test "keyless: create secondary index" {
dolt sql -q "create index idx on keyless (c1)"
run dolt sql -q "show index from keyless" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "Table,Non_unique,Key_name,Seq_in_index,Column_name,Collation,Cardinality,Sub_part,Packed,Null,Index_type,Comment,Index_comment,Visible,Expression" ]] || false
[[ "${lines[1]}" = "keyless,1,idx,1,c1,,0,,,YES,BTREE,\"\",\"\",YES," ]] || false
run dolt sql -q "select * from keyless where c1 > 0 order by c0" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0,c1" ]] || false
[[ "${lines[1]}" = "1,1" ]] || false
[[ "${lines[2]}" = "1,1" ]] || false
[[ "${lines[3]}" = "2,2" ]] || false
[ "${#lines[@]}" -eq 4 ]
run dolt sql -q "select c0 from keyless where c1 = 1" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "1" ]] || false
[[ "${lines[2]}" = "1" ]] || false
[ "${#lines[@]}" -eq 3 ]
run dolt sql -q "describe select c0 from keyless where c1 = 1" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "plan" ]] || false
[[ "${lines[1]}" =~ "Project(keyless.c0)" ]] || false
[[ "${lines[2]}" =~ "Filter(keyless.c1 = 1)" ]] || false
[[ "${lines[3]}" =~ "Projected table access on [c0 c1]" ]] || false
[[ "${lines[4]}" =~ "IndexedTableAccess(keyless on [keyless.c1])" ]] || false
}
@test "keyless: secondary index insert" {
dolt sql -q "create index idx on keyless (c1)"
dolt sql -q "insert into keyless values (3,3), (4,4)"
run dolt sql -q "select c0 from keyless where c1 = 4" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "4" ]] || false
[ "${#lines[@]}" -eq 2 ]
}
@test "keyless: secondary index duplicate insert" {
dolt sql -q "create index idx on keyless (c1)"
dolt sql -q "insert into keyless values (3,3), (4,4), (4,4)"
run dolt sql -q "select c0 from keyless where c1 = 4" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "4" ]] || false
[[ "${lines[2]}" = "4" ]] || false
[ "${#lines[@]}" -eq 3 ]
}
@test "keyless: secondary index update" {
dolt sql -q "create index idx on keyless (c1)"
dolt sql -q "update keyless set c0 = c0 + 1"
run dolt sql -q "select * from keyless order by c0" -r csv
[ $status -eq 0 ]
[[ "${lines[1]}" = "1,0" ]] || false
[[ "${lines[2]}" = "2,1" ]] || false
[[ "${lines[3]}" = "2,1" ]] || false
[[ "${lines[4]}" = "3,2" ]] || false
[ "${#lines[@]}" -eq 5 ]
run dolt sql -q "select c0 from keyless where c1 = 1" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "2" ]] || false
[[ "${lines[1]}" = "2" ]] || false
[ "${#lines[@]}" -eq 3 ]
}
@test "keyless: secondary index delete single" {
dolt sql -q "create index idx on keyless (c1)"
dolt sql -q "insert into keyless values (3,3), (4,4)"
dolt sql -q "delete from keyless where c0 = 4"
run dolt sql -q "select c0 from keyless where c1 > 2" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "3" ]] || false
[ "${#lines[@]}" -eq 2 ]
run dolt sql -q "select c0 from keyless where c0 > 2" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "3" ]] || false
[ "${#lines[@]}" -eq 2 ]
}
@test "keyless: secondary index delete duplicate" {
dolt sql -q "create index idx on keyless (c1)"
dolt sql -q "insert into keyless values (3,3), (4,4), (4,4)"
dolt sql -q "delete from keyless where c0 = 4"
run dolt sql -q "select c0 from keyless where c1 > 2" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "3" ]] || false
[ "${#lines[@]}" -eq 2 ]
run dolt sql -q "select c0 from keyless where c0 > 2" -r csv
[ $status -eq 0 ]
[[ "${lines[0]}" = "c0" ]] || false
[[ "${lines[1]}" = "3" ]] || false
[ "${#lines[@]}" -eq 2 ]
}