mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-10 10:30:57 -06:00
Bh/delete optimizations (#1743)
This commit is contained in:
@@ -523,14 +523,18 @@ func applyPkChange(ctx context.Context, sch schema.Schema, tableEditor editor.Ta
|
||||
}
|
||||
stats.Modifications++
|
||||
case types.DiffChangeRemoved:
|
||||
oldRow, err := row.FromNoms(sch, change.Key.(types.Tuple), change.OldValue.(types.Tuple))
|
||||
key := change.Key.(types.Tuple)
|
||||
value := change.OldValue.(types.Tuple)
|
||||
tv, err := row.TaggedValuesFromTupleKeyAndValue(key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tableEditor.DeleteRow(ctx, oldRow)
|
||||
|
||||
err = tableEditor.DeleteByKey(ctx, key, tv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stats.Deletes++
|
||||
}
|
||||
|
||||
@@ -613,14 +617,18 @@ func applyKeylessChange(ctx context.Context, sch schema.Schema, tableEditor edit
|
||||
}
|
||||
stats.Modifications++
|
||||
case types.DiffChangeRemoved:
|
||||
oldRow, err := row.FromNoms(sch, ch.Key.(types.Tuple), ch.OldValue.(types.Tuple))
|
||||
key := change.Key.(types.Tuple)
|
||||
value := change.OldValue.(types.Tuple)
|
||||
tv, err := row.TaggedValuesFromTupleKeyAndValue(key, value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tableEditor.DeleteRow(ctx, oldRow)
|
||||
|
||||
err = tableEditor.DeleteByKey(ctx, key, tv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
stats.Deletes++
|
||||
}
|
||||
return nil
|
||||
|
||||
@@ -465,6 +465,6 @@ func TestMergeCommits(t *testing.T) {
|
||||
t.Error("index contents are incorrect")
|
||||
}
|
||||
} else {
|
||||
assert.Fail(t, "%v and %v do not equal", h, eh)
|
||||
assert.Fail(t, "%s and %s do not equal", h.String(), eh.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -257,3 +257,77 @@ func AreEqual(row1, row2 Row, sch schema.Schema) bool {
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func TaggedValsEqualForSch(tv, other TaggedValues, sch schema.Schema) bool {
|
||||
if tv == nil && other == nil {
|
||||
return true
|
||||
} else if tv == nil || other == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, tag := range sch.GetAllCols().Tags {
|
||||
val1, _ := tv[tag]
|
||||
val2, _ := other[tag]
|
||||
|
||||
if !valutil.NilSafeEqCheck(val1, val2) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func KeyAndTaggedValuesForRow(r Row, sch schema.Schema) (types.Tuple, TaggedValues, error) {
|
||||
switch typed := r.(type) {
|
||||
case nomsRow:
|
||||
pkCols := sch.GetPKCols()
|
||||
keyVals := make([]types.Value, 0, pkCols.Size()*2)
|
||||
tv := make(TaggedValues)
|
||||
err := pkCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
|
||||
val, ok := typed.key[tag]
|
||||
if !ok || types.IsNull(val) {
|
||||
return false, errors.New("invalid key contains null values")
|
||||
}
|
||||
|
||||
tv[tag] = val
|
||||
keyVals = append(keyVals, types.Uint(tag))
|
||||
keyVals = append(keyVals, val)
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return types.Tuple{}, nil, err
|
||||
}
|
||||
|
||||
nonPkCols := sch.GetNonPKCols()
|
||||
_, err = typed.value.Iter(func(tag uint64, val types.Value) (stop bool, err error) {
|
||||
if _, ok := nonPkCols.TagToIdx[tag]; ok {
|
||||
tv[tag] = val
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return types.Tuple{}, nil, err
|
||||
}
|
||||
|
||||
t, err := types.NewTuple(r.Format(), keyVals...)
|
||||
if err != nil {
|
||||
return types.Tuple{}, nil, err
|
||||
}
|
||||
|
||||
return t, tv, nil
|
||||
|
||||
case keylessRow:
|
||||
tv, err := typed.TaggedValues()
|
||||
if err != nil {
|
||||
return types.Tuple{}, nil, err
|
||||
}
|
||||
|
||||
return typed.key, tv, nil
|
||||
|
||||
default:
|
||||
panic("unknown row type")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,6 +62,24 @@ func (tvs TupleVals) Less(nbf *types.NomsBinFormat, other types.LesserValuable)
|
||||
return types.TupleKind < other.Kind(), nil
|
||||
}
|
||||
|
||||
func (tt TaggedValues) ToRow(ctx context.Context, nbf *types.NomsBinFormat, sch schema.Schema) (Row, error) {
|
||||
keyVals := tt.NomsTupleForNonPKCols(nbf, sch.GetPKCols())
|
||||
valVals := tt.NomsTupleForNonPKCols(nbf, sch.GetNonPKCols())
|
||||
key, err := keyVals.Value(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
val, err := valVals.Value(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return FromNoms(sch, key.(types.Tuple), val.(types.Tuple))
|
||||
}
|
||||
|
||||
func (tt TaggedValues) NomsTupleForPKCols(nbf *types.NomsBinFormat, pkCols *schema.ColCollection) TupleVals {
|
||||
return tt.nomsTupleForTags(nbf, pkCols.Tags, true)
|
||||
}
|
||||
|
||||
@@ -135,6 +135,64 @@ func DoltKeyValueAndMappingFromSqlRow(ctx context.Context, vrw types.ValueReadWr
|
||||
return keyTuple, valTuple, tagToVal, nil
|
||||
}
|
||||
|
||||
// DoltKeyValueAndMappingFromSqlRow converts a sql.Row to key tuple and keeps a mapping from tag to value that
|
||||
// can be used to speed up index key generation for foreign key checks.
|
||||
func DoltKeyAndMappingFromSqlRow(ctx context.Context, vrw types.ValueReadWriter, r sql.Row, doltSchema schema.Schema) (types.Tuple, map[uint64]types.Value, error) {
|
||||
allCols := doltSchema.GetAllCols()
|
||||
pkCols := doltSchema.GetPKCols()
|
||||
|
||||
numCols := allCols.Size()
|
||||
numPKCols := pkCols.Size()
|
||||
pkVals := make([]types.Value, numPKCols*2)
|
||||
tagToVal := make(map[uint64]types.Value, numCols)
|
||||
|
||||
if len(r) < numCols {
|
||||
numCols = len(r)
|
||||
}
|
||||
|
||||
// values for the pk tuple are in schema order
|
||||
pkIdx := 0
|
||||
for i := 0; i < numCols; i++ {
|
||||
schCol := allCols.GetAtIndex(i)
|
||||
val := r[i]
|
||||
if val == nil {
|
||||
if !schCol.IsNullable() {
|
||||
return types.Tuple{}, nil, fmt.Errorf("column <%v> received nil but is non-nullable", schCol.Name)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
tag := schCol.Tag
|
||||
nomsVal, err := schCol.TypeInfo.ConvertValueToNomsValue(ctx, vrw, val)
|
||||
|
||||
if err != nil {
|
||||
return types.Tuple{}, nil, err
|
||||
}
|
||||
|
||||
tagToVal[tag] = nomsVal
|
||||
|
||||
if schCol.IsPartOfPK {
|
||||
pkVals[pkIdx] = types.Uint(tag)
|
||||
pkVals[pkIdx+1] = nomsVal
|
||||
pkIdx += 2
|
||||
}
|
||||
}
|
||||
|
||||
// no nulls in keys
|
||||
if pkIdx != len(pkVals) {
|
||||
return types.Tuple{}, nil, errors.New("not all pk columns have a value")
|
||||
}
|
||||
|
||||
nbf := vrw.Format()
|
||||
keyTuple, err := types.NewTuple(nbf, pkVals...)
|
||||
|
||||
if err != nil {
|
||||
return types.Tuple{}, nil, err
|
||||
}
|
||||
|
||||
return keyTuple, tagToVal, nil
|
||||
}
|
||||
|
||||
func pkDoltRowFromSqlRow(ctx context.Context, vrw types.ValueReadWriter, r sql.Row, doltSchema schema.Schema) (row.Row, error) {
|
||||
taggedVals := make(row.TaggedValues)
|
||||
allCols := doltSchema.GetAllCols()
|
||||
|
||||
@@ -112,12 +112,22 @@ func (te *sqlTableEditor) Insert(ctx *sql.Context, sqlRow sql.Row) error {
|
||||
}
|
||||
|
||||
func (te *sqlTableEditor) Delete(ctx *sql.Context, sqlRow sql.Row) error {
|
||||
dRow, err := sqlutil.SqlRowToDoltRow(ctx, te.vrw, sqlRow, te.sch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !schema.IsKeyless(te.sch) {
|
||||
k, tagToVal, err := sqlutil.DoltKeyAndMappingFromSqlRow(ctx, te.vrw, sqlRow, te.sch)
|
||||
|
||||
return te.tableEditor.DeleteRow(ctx, dRow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return te.tableEditor.DeleteByKey(ctx, k, tagToVal)
|
||||
} else {
|
||||
dRow, err := sqlutil.SqlRowToDoltRow(ctx, te.vrw, sqlRow, te.sch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return te.tableEditor.DeleteRow(ctx, dRow)
|
||||
}
|
||||
}
|
||||
|
||||
func (te *sqlTableEditor) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row) error {
|
||||
|
||||
@@ -492,7 +492,7 @@ func (ie *IndexEditor) flush() {
|
||||
func (ie *IndexEditor) autoFlush() {
|
||||
ie.flushMutex.RLock()
|
||||
ie.writeMutex.Lock()
|
||||
runFlush := ie.iea.opCount >= tableEditorMaxOps
|
||||
runFlush := uint64(ie.iea.opCount) >= tableEditorMaxOps
|
||||
ie.writeMutex.Unlock()
|
||||
ie.flushMutex.RUnlock()
|
||||
|
||||
|
||||
@@ -138,6 +138,37 @@ func (kte *keylessTableEditor) InsertKeyVal(ctx context.Context, key, val types.
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (kte *keylessTableEditor) DeleteByKey(ctx context.Context, key types.Tuple, tagToVal map[uint64]types.Value) (err error) {
|
||||
kte.mu.Lock()
|
||||
defer kte.mu.Unlock()
|
||||
|
||||
defer func() { err = kte.autoFlush(ctx) }()
|
||||
|
||||
nonPkCols := kte.sch.GetNonPKCols()
|
||||
tplVals := make([]types.Value, 0, 2*nonPkCols.Size())
|
||||
err = nonPkCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
|
||||
var val types.Value = types.NullValue
|
||||
if rowVal, ok := tagToVal[tag]; ok {
|
||||
val = rowVal
|
||||
}
|
||||
|
||||
tplVals = append(tplVals, types.Uint(tag))
|
||||
tplVals = append(tplVals, val)
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
val, err := types.NewTuple(kte.tbl.Format(), tplVals...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return kte.acc.decrement(key, val)
|
||||
}
|
||||
|
||||
// InsertRow implements TableEditor.
|
||||
func (kte *keylessTableEditor) InsertRow(ctx context.Context, r row.Row, _ PKDuplicateErrFunc) (err error) {
|
||||
kte.mu.Lock()
|
||||
|
||||
@@ -34,14 +34,14 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
tableEditorMaxOps int64 = 16384
|
||||
ErrDuplicateKey = errors.New("duplicate key error")
|
||||
tableEditorMaxOps uint64 = 256 * 1024
|
||||
ErrDuplicateKey = errors.New("duplicate key error")
|
||||
)
|
||||
|
||||
func init() {
|
||||
if maxOpsEnv := os.Getenv("DOLT_EDIT_TABLE_BUFFER_ROWS"); maxOpsEnv != "" {
|
||||
if v, err := strconv.ParseUint(maxOpsEnv, 10, 63); err == nil {
|
||||
tableEditorMaxOps = int64(v)
|
||||
tableEditorMaxOps = v
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -50,6 +50,7 @@ type PKDuplicateErrFunc func(keyString, indexName string, k, v types.Tuple, isPk
|
||||
|
||||
type TableEditor interface {
|
||||
InsertKeyVal(ctx context.Context, key, val types.Tuple, tagToVal map[uint64]types.Value, errFunc PKDuplicateErrFunc) error
|
||||
DeleteByKey(ctx context.Context, key types.Tuple, tagToVal map[uint64]types.Value) error
|
||||
|
||||
InsertRow(ctx context.Context, r row.Row, errFunc PKDuplicateErrFunc) error
|
||||
UpdateRow(ctx context.Context, old, new row.Row, errFunc PKDuplicateErrFunc) error
|
||||
@@ -268,9 +269,9 @@ func ContainsIndexedKey(ctx context.Context, te TableEditor, key types.Tuple, in
|
||||
}
|
||||
}
|
||||
|
||||
// GetIndexedRows returns all matching rows for the given key on the index. The key is assumed to be in the format
|
||||
// GetIndexedRowKVPs returns all matching row keys and values for the given key on the index. The key is assumed to be in the format
|
||||
// expected of the index, similar to searching on the index map itself.
|
||||
func GetIndexedRows(ctx context.Context, te TableEditor, key types.Tuple, indexName string, idxSch schema.Schema) ([]row.Row, error) {
|
||||
func GetIndexedRowKVPs(ctx context.Context, te TableEditor, key types.Tuple, indexName string, idxSch schema.Schema) ([][2]types.Tuple, error) {
|
||||
tbl, err := te.Table(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -292,17 +293,17 @@ func GetIndexedRows(ctx context.Context, te TableEditor, key types.Tuple, indexN
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var rows []row.Row
|
||||
var rowKVPS [][2]types.Tuple
|
||||
for {
|
||||
r, err := indexIter.ReadRow(ctx)
|
||||
k, err := indexIter.ReadKey(ctx)
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
indexRowTaggedValues, err := r.TaggedValues()
|
||||
indexRowTaggedValues, err := row.ParseTaggedValues(k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -317,6 +318,7 @@ func GetIndexedRows(ctx context.Context, te TableEditor, key types.Tuple, indexN
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if fieldsVal == nil {
|
||||
keyStr, err := formatKey(ctx, key)
|
||||
if err != nil {
|
||||
@@ -325,12 +327,27 @@ func GetIndexedRows(ctx context.Context, te TableEditor, key types.Tuple, indexN
|
||||
return nil, fmt.Errorf("index key `%s` does not have a corresponding entry in table", keyStr)
|
||||
}
|
||||
|
||||
tableRow, err := row.FromNoms(te.Schema(), pkTupleVal.(types.Tuple), fieldsVal.(types.Tuple))
|
||||
rowKVPS = append(rowKVPS, [2]types.Tuple{pkTupleVal.(types.Tuple), fieldsVal.(types.Tuple)})
|
||||
}
|
||||
|
||||
return rowKVPS, nil
|
||||
|
||||
}
|
||||
|
||||
// GetIndexedRows returns all matching rows for the given key on the index. The key is assumed to be in the format
|
||||
// expected of the index, similar to searching on the index map itself.
|
||||
func GetIndexedRows(ctx context.Context, te TableEditor, key types.Tuple, indexName string, idxSch schema.Schema) ([]row.Row, error) {
|
||||
rowKVPS, err := GetIndexedRowKVPs(ctx, te, key, indexName, idxSch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows := make([]row.Row, len(rowKVPS))
|
||||
for i, rowKVP := range rowKVPS {
|
||||
rows[i], err = row.FromNoms(te.Schema(), rowKVP[0], rowKVP[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rows = append(rows, tableRow)
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
@@ -490,22 +507,11 @@ func (te *pkTableEditor) InsertRow(ctx context.Context, dRow row.Row, errFunc PK
|
||||
return te.InsertKeyVal(ctx, key.(types.Tuple), val.(types.Tuple), tagToVal, errFunc)
|
||||
}
|
||||
|
||||
// DeleteRow removes the given row from the table.
|
||||
func (te *pkTableEditor) DeleteRow(ctx context.Context, dRow row.Row) (retErr error) {
|
||||
func (te *pkTableEditor) DeleteByKey(ctx context.Context, key types.Tuple, tagToVal map[uint64]types.Value) (retErr error) {
|
||||
defer te.autoFlush()
|
||||
te.flushMutex.RLock()
|
||||
defer te.flushMutex.RUnlock()
|
||||
|
||||
key, err := dRow.NomsMapKey(te.tSch).Value(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
keyHash, err := key.Hash(te.nbf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Regarding the lock's position here, refer to the comment in InsertKeyVal
|
||||
te.writeMutex.Lock()
|
||||
defer te.writeMutex.Unlock()
|
||||
@@ -523,7 +529,7 @@ func (te *pkTableEditor) DeleteRow(ctx context.Context, dRow row.Row) (retErr er
|
||||
}()
|
||||
|
||||
for i, indexEd := range te.indexEds {
|
||||
fullKey, partialKey, err := row.ReduceToIndexKeys(indexEd.Index(), dRow)
|
||||
fullKey, partialKey, err := row.ReduceToIndexKeysFromTagMap(te.nbf, indexEd.Index(), tagToVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -534,6 +540,11 @@ func (te *pkTableEditor) DeleteRow(ctx context.Context, dRow row.Row) (retErr er
|
||||
indexOpsToUndo[i]++
|
||||
}
|
||||
|
||||
keyHash, err := key.Hash(te.nbf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
delete(te.tea.addedKeys, keyHash)
|
||||
te.tea.removedKeys[keyHash] = key
|
||||
|
||||
@@ -541,6 +552,17 @@ func (te *pkTableEditor) DeleteRow(ctx context.Context, dRow row.Row) (retErr er
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteRow removes the given row from the table. This essentially acts as a convenience function for DeleteKey, while
|
||||
// ensuring proper thread safety.
|
||||
func (te *pkTableEditor) DeleteRow(ctx context.Context, dRow row.Row) (retErr error) {
|
||||
key, tv, err := row.KeyAndTaggedValuesForRow(dRow, te.tSch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return te.DeleteByKey(ctx, key, tv)
|
||||
}
|
||||
|
||||
// UpdateRow takes the current row and new rows, and updates it accordingly.
|
||||
func (te *pkTableEditor) UpdateRow(ctx context.Context, dOldRow row.Row, dNewRow row.Row, errFunc PKDuplicateErrFunc) (retErr error) {
|
||||
defer te.autoFlush()
|
||||
@@ -861,7 +883,7 @@ func (te *pkTableEditor) flush() {
|
||||
func (te *pkTableEditor) autoFlush() {
|
||||
te.flushMutex.RLock()
|
||||
te.writeMutex.Lock()
|
||||
runFlush := te.tea.opCount >= tableEditorMaxOps
|
||||
runFlush := uint64(te.tea.opCount) >= tableEditorMaxOps
|
||||
te.writeMutex.Unlock()
|
||||
te.flushMutex.RUnlock()
|
||||
|
||||
|
||||
@@ -52,6 +52,20 @@ func (ste *sessionedTableEditor) InsertKeyVal(ctx context.Context, key, val type
|
||||
return ti.InsertKeyVal(ctx, key, val, tagToVal, errFunc)
|
||||
}
|
||||
|
||||
func (ste *sessionedTableEditor) DeleteByKey(ctx context.Context, key types.Tuple, tagToVal map[uint64]types.Value) error {
|
||||
ste.tableEditSession.writeMutex.RLock()
|
||||
defer ste.tableEditSession.writeMutex.RUnlock()
|
||||
|
||||
if !ste.tableEditSession.Props.ForeignKeyChecksDisabled && len(ste.referencingTables) > 0 {
|
||||
err := ste.onDeleteHandleRowsReferencingValues(ctx, key, tagToVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return ste.tableEditor.DeleteByKey(ctx, key, tagToVal)
|
||||
}
|
||||
|
||||
// InsertRow adds the given row to the table. If the row already exists, use UpdateRow.
|
||||
func (ste *sessionedTableEditor) InsertRow(ctx context.Context, dRow row.Row, errFunc PKDuplicateErrFunc) error {
|
||||
ste.tableEditSession.writeMutex.RLock()
|
||||
@@ -163,25 +177,36 @@ func (ste *sessionedTableEditor) Close() error {
|
||||
|
||||
// handleReferencingRowsOnDelete handles updating referencing foreign keys on delete operations
|
||||
func (ste *sessionedTableEditor) handleReferencingRowsOnDelete(ctx context.Context, dRow row.Row) error {
|
||||
//TODO: all self referential logic assumes non-composite keys
|
||||
if ste.tableEditSession.Props.ForeignKeyChecksDisabled {
|
||||
return nil
|
||||
}
|
||||
dRowTaggedVals, err := dRow.TaggedValues()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
key, err := dRow.NomsMapKey(ste.tableEditor.Schema()).Value(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ste.onDeleteHandleRowsReferencingValues(ctx, key.(types.Tuple), dRowTaggedVals)
|
||||
}
|
||||
|
||||
func (ste *sessionedTableEditor) onDeleteHandleRowsReferencingValues(ctx context.Context, key types.Tuple, dRowTaggedVals row.TaggedValues) error {
|
||||
//TODO: all self referential logic assumes non-composite keys
|
||||
if ste.tableEditSession.Props.ForeignKeyChecksDisabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
if ste.indexSchemaCache == nil {
|
||||
ste.indexSchemaCache = make(map[string]schema.Schema)
|
||||
}
|
||||
|
||||
nbf := ste.Format()
|
||||
for _, foreignKey := range ste.referencingTables {
|
||||
referencingSte, ok := ste.tableEditSession.tables[foreignKey.TableName]
|
||||
if !ok {
|
||||
return fmt.Errorf("unable to get table editor as `%s` is missing", foreignKey.TableName)
|
||||
}
|
||||
indexKey, hasNulls, err := ste.reduceRowAndConvert(ste.tableEditor.Format(), foreignKey.ReferencedTableColumns, foreignKey.TableColumns, dRowTaggedVals)
|
||||
indexKey, hasNulls, err := ste.reduceRowAndConvert(nbf, foreignKey.ReferencedTableColumns, foreignKey.TableColumns, dRowTaggedVals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -196,43 +221,63 @@ func (ste *sessionedTableEditor) handleReferencingRowsOnDelete(ctx context.Conte
|
||||
idxSch = referencingSte.tableEditor.Schema().Indexes().GetByName(foreignKey.TableIndex).Schema()
|
||||
ste.indexSchemaCache[cacheKey] = idxSch
|
||||
}
|
||||
referencingRows, err := GetIndexedRows(ctx, referencingSte.tableEditor, indexKey, foreignKey.TableIndex, idxSch)
|
||||
|
||||
referencingRowKVPs, err := GetIndexedRowKVPs(ctx, referencingSte.tableEditor, indexKey, foreignKey.TableIndex, idxSch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(referencingRows) == 0 {
|
||||
if len(referencingRowKVPs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
var shouldSkip bool
|
||||
switch foreignKey.OnDelete {
|
||||
case doltdb.ForeignKeyReferenceOption_Cascade:
|
||||
for _, rowToDelete := range referencingRows {
|
||||
ctx, shouldSkip, err = ste.shouldSkipDeleteCascade(ctx, foreignKey, dRow, rowToDelete)
|
||||
for _, kvpToDelete := range referencingRowKVPs {
|
||||
ctx, shouldSkip, err = ste.shouldSkipDeleteCascade(ctx, foreignKey, key, kvpToDelete[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if shouldSkip {
|
||||
continue
|
||||
}
|
||||
err = referencingSte.DeleteRow(ctx, rowToDelete)
|
||||
taggedVals, err := row.TaggedValuesFromTupleKeyAndValue(kvpToDelete[0], kvpToDelete[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = referencingSte.DeleteByKey(ctx, kvpToDelete[0], taggedVals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case doltdb.ForeignKeyReferenceOption_SetNull:
|
||||
for _, unalteredNewRow := range referencingRows {
|
||||
if foreignKey.IsSelfReferential() && row.AreEqual(dRow, unalteredNewRow, ste.tableEditor.Schema()) {
|
||||
for _, unalteredNewKVP := range referencingRowKVPs {
|
||||
taggedVals, err := row.TaggedValuesFromTupleKeyAndValue(unalteredNewKVP[0], unalteredNewKVP[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sch := referencingSte.tableEditor.Schema()
|
||||
oldRow, err := row.FromNoms(sch, unalteredNewKVP[0], unalteredNewKVP[1])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if foreignKey.IsSelfReferential() && row.TaggedValsEqualForSch(taggedVals, dRowTaggedVals, sch) {
|
||||
continue
|
||||
}
|
||||
newRow := unalteredNewRow
|
||||
|
||||
for _, colTag := range foreignKey.TableColumns {
|
||||
newRow, err = newRow.SetColVal(colTag, types.NullValue, referencingSte.tableEditor.Schema())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
taggedVals[colTag] = types.NullValue
|
||||
}
|
||||
err = referencingSte.updateRow(ctx, unalteredNewRow, newRow, false, nil)
|
||||
|
||||
newRow, err := taggedVals.ToRow(ctx, nbf, sch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = referencingSte.updateRow(ctx, oldRow, newRow, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -261,12 +306,13 @@ func (ste *sessionedTableEditor) handleReferencingRowsOnUpdate(ctx context.Conte
|
||||
ste.indexSchemaCache = make(map[string]schema.Schema)
|
||||
}
|
||||
|
||||
nbf := ste.Format()
|
||||
for _, foreignKey := range ste.referencingTables {
|
||||
referencingSte, ok := ste.tableEditSession.tables[foreignKey.TableName]
|
||||
if !ok {
|
||||
return fmt.Errorf("unable to get table editor as `%s` is missing", foreignKey.TableName)
|
||||
}
|
||||
indexKey, hasNulls, err := ste.reduceRowAndConvert(ste.tableEditor.Format(), foreignKey.ReferencedTableColumns, foreignKey.TableColumns, dOldRowTaggedVals)
|
||||
indexKey, hasNulls, err := ste.reduceRowAndConvert(nbf, foreignKey.ReferencedTableColumns, foreignKey.TableColumns, dOldRowTaggedVals)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -367,7 +413,7 @@ func (ste *sessionedTableEditor) handleReferencingRowsOnUpdate(ctx context.Conte
|
||||
// shouldSkipDeleteCascade determines whether the next row should be deleted, based on if a loop has been detected in
|
||||
// cascading deletes. Stores the previous delete hashes in the context, and returns a new context if the old one did not
|
||||
// have any hashes. Only applies to self referential foreign keys.
|
||||
func (ste *sessionedTableEditor) shouldSkipDeleteCascade(ctx context.Context, foreignKey doltdb.ForeignKey, oldRow, newRow row.Row) (context.Context, bool, error) {
|
||||
func (ste *sessionedTableEditor) shouldSkipDeleteCascade(ctx context.Context, foreignKey doltdb.ForeignKey, oldKey, newKey types.Tuple) (context.Context, bool, error) {
|
||||
//TODO: all self referential logic assumes non-composite keys
|
||||
if !foreignKey.IsSelfReferential() {
|
||||
return ctx, false, nil
|
||||
@@ -383,13 +429,7 @@ func (ste *sessionedTableEditor) shouldSkipDeleteCascade(ctx context.Context, fo
|
||||
ctx = context.WithValue(ctx, contextValueName, deleteKeys)
|
||||
}
|
||||
|
||||
// We immediately store the old key in the map. We don't need to see if it was already there.
|
||||
// We can also catch deletions that loop on the same row this way.
|
||||
oldKey, err := oldRow.NomsMapKey(ste.tableEditor.Schema()).Value(ctx)
|
||||
if err != nil {
|
||||
return ctx, false, err
|
||||
}
|
||||
oldKeyHash, err := oldKey.Hash(oldRow.Format())
|
||||
oldKeyHash, err := oldKey.Hash(ste.Format())
|
||||
if err != nil {
|
||||
return ctx, false, err
|
||||
}
|
||||
@@ -397,11 +437,7 @@ func (ste *sessionedTableEditor) shouldSkipDeleteCascade(ctx context.Context, fo
|
||||
|
||||
// We don't need to store the new key. If it also causes a cascade then it will become an old key as the logic
|
||||
// progresses. We're only interested in whether the new key is already present in the map.
|
||||
newKey, err := newRow.NomsMapKey(ste.tableEditor.Schema()).Value(ctx)
|
||||
if err != nil {
|
||||
return ctx, false, err
|
||||
}
|
||||
newKeyHash, err := newKey.Hash(newRow.Format())
|
||||
newKeyHash, err := newKey.Hash(ste.Format())
|
||||
if err != nil {
|
||||
return ctx, false, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user