Check constraints on commit

This commit is contained in:
Daylon Wilkins
2021-02-08 23:59:07 -08:00
committed by Daylon Wilkins
parent c22de45cee
commit 7dcc54b630
7 changed files with 206 additions and 93 deletions

View File

@@ -164,7 +164,6 @@ SQL
[ "$status" -eq "1" ]
[[ "$output" =~ "not valid type" ]] || false
skip "TEXT passed, BLOB not yet supported"
dolt sql <<SQL
CREATE TABLE parent2 (
id INT PRIMARY KEY,
@@ -939,6 +938,22 @@ SQL
dolt commit -m "passes now"
}
@test "foreign-keys: Add data to two tables and commit only one" {
dolt sql <<SQL
ALTER TABLE child ADD CONSTRAINT fk_v1 FOREIGN KEY (v1) REFERENCES parent(v1);
SQL
dolt add -A
dolt commit -m "added tables"
dolt sql <<SQL
INSERT INTO parent VALUES (0,0,0),(1,1,1);
INSERT INTO child VALUES (0,0,0),(1,1,1);
SQL
dolt add child
run dolt commit -m "should fail"
[ "$status" -eq "1" ]
[[ "$output" =~ "foreign key violation" ]] || false
}
@test "foreign-keys: Merge valid onto parent" {
dolt sql <<SQL
ALTER TABLE child ADD CONSTRAINT fk_name FOREIGN KEY (v1) REFERENCES parent(v1) ON DELETE CASCADE ON UPDATE CASCADE;
@@ -962,7 +977,7 @@ SET FOREIGN_KEY_CHECKS=0;
UPDATE parent SET v1 = v1 - 1;
SQL
dolt add -A
dolt commit -m "updated parent"
dolt commit --force -m "updated parent"
dolt checkout master
dolt merge other
@@ -1006,7 +1021,7 @@ SET FOREIGN_KEY_CHECKS=0;
UPDATE parent SET v1 = v1 - 1;
SQL
dolt add -A
dolt commit -m "updated parent"
dolt commit --force -m "updated parent"
dolt checkout master
run dolt merge other
[ "$status" -eq "1" ]
@@ -1037,7 +1052,7 @@ SET FOREIGN_KEY_CHECKS=0;
UPDATE child SET v1 = v1 + 1;
SQL
dolt add -A
dolt commit -m "updated child"
dolt commit --force -m "updated child"
dolt checkout master
dolt merge other
@@ -1081,7 +1096,7 @@ SET FOREIGN_KEY_CHECKS=0;
UPDATE child SET v1 = v1 - 1;
SQL
dolt add -A
dolt commit -m "updated child"
dolt commit --force -m "updated child"
dolt checkout master
run dolt merge other
[ "$status" -eq "1" ]
@@ -1113,7 +1128,7 @@ UPDATE parent SET v1 = v1 - 1;
UPDATE child SET v1 = v1 + 1;
SQL
dolt add -A
dolt commit -m "updated both"
dolt commit --force -m "updated both"
dolt checkout master
dolt merge other
@@ -1159,10 +1174,42 @@ UPDATE parent SET v1 = v1 - 1;
UPDATE child SET v1 = v1 + 1;
SQL
dolt add -A
dolt commit -m "updated both"
dolt commit --force -m "updated both"
dolt checkout master
run dolt merge other
[ "$status" -eq "1" ]
[[ "$output" =~ "violation" ]] || false
[[ "$output" =~ "4" ]] || false
}
@test "foreign-keys: Resolve catches violations" {
dolt sql <<SQL
ALTER TABLE child ADD CONSTRAINT fk_v1 FOREIGN KEY (v1) REFERENCES parent(v1);
INSERT INTO parent VALUES (0,0,0);
INSERT INTO child VALUES (0,0,0);
SQL
dolt add -A
dolt commit -m "added tables"
dolt branch other
dolt sql <<SQL
INSERT INTO parent VALUES (1,1,1);
INSERT INTO child VALUES (1,1,1);
SQL
dolt add -A
dolt commit -m "added 1s"
dolt checkout other
dolt sql <<SQL
INSERT INTO parent VALUES (1,2,2);
INSERT INTO child VALUES (1,2,2);
SQL
dolt add -A
dolt commit -m "added 2s"
dolt checkout master
dolt merge other
run dolt conflicts resolve --theirs parent
[ "$status" -eq "1" ]
[[ "$output" =~ "violation" ]] || false
run dolt conflicts resolve --theirs child
[ "$status" -eq "1" ]
[[ "$output" =~ "violation" ]] || false
}

View File

@@ -22,7 +22,6 @@ import (
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)
@@ -114,7 +113,7 @@ func (cmd VerifyConstraintsCmd) Exec(ctx context.Context, commandStr string, arg
return HandleVErrAndExitCode(errhand.BuildDError("Unable to get index data for %s.", fk.ReferencedTableIndex).AddCause(err).Build(), nil)
}
err = table.ForeignKeyIsSatisfied(ctx, fk, childIdxRowData, parentIdxRowData, childIdx, parentIdx)
err = fk.ValidateData(ctx, childIdxRowData, parentIdxRowData, childIdx, parentIdx)
if err != nil {
accumulatedConstraintErrors = append(accumulatedConstraintErrors, err.Error())
}

View File

@@ -19,9 +19,14 @@ import (
"context"
"encoding/binary"
"fmt"
"io"
"sort"
"strings"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/hash"
@@ -285,11 +290,12 @@ func (fkc *ForeignKeyCollection) Iter(cb func(fk ForeignKey) (stop bool, err err
// all foreign keys in which this table is the referenced table. If the table contains a self-referential foreign key,
// it will be present in both declaresFk and referencedByFk. Each array is sorted by name ascending.
func (fkc *ForeignKeyCollection) KeysForTable(tableName string) (declaredFk, referencedByFk []ForeignKey) {
lowercaseTblName := strings.ToLower(tableName)
for _, foreignKey := range fkc.foreignKeys {
if foreignKey.TableName == tableName {
if strings.ToLower(foreignKey.TableName) == lowercaseTblName {
declaredFk = append(declaredFk, foreignKey)
}
if foreignKey.ReferencedTableName == tableName {
if strings.ToLower(foreignKey.ReferencedTableName) == lowercaseTblName {
referencedByFk = append(referencedByFk, foreignKey)
}
}
@@ -423,3 +429,75 @@ func (fkc *ForeignKeyCollection) copy() *ForeignKeyCollection {
}
return &ForeignKeyCollection{copiedForeignKeys}
}
// ValidateData ensures that the foreign key is valid by comparing the index data from the given table
// against the index data from the referenced table.
func (fk ForeignKey) ValidateData(ctx context.Context, childIdx, parentIdx types.Map, childDef, parentDef schema.Index) error {
if fk.ReferencedTableIndex != parentDef.Name() {
return fmt.Errorf("cannot validate data as wrong referenced index was given: expected `%s` but received `%s`",
fk.ReferencedTableIndex, parentDef.Name())
}
tagMap := make(map[uint64]uint64, len(fk.TableColumns))
for i, childTag := range fk.TableColumns {
tagMap[childTag] = fk.ReferencedTableColumns[i]
}
// FieldMappings ignore columns not in the tagMap
fm, err := rowconv.NewFieldMapping(childDef.Schema(), parentDef.Schema(), tagMap)
if err != nil {
return err
}
vrw := types.NewMemoryValueStore() // We are checking fks rather than persisting any values, so an internal VRW can be used
rc, err := rowconv.NewRowConverter(ctx, vrw, fm)
if err != nil {
return err
}
rdr, err := noms.NewNomsMapReader(ctx, childIdx, childDef.Schema())
if err != nil {
return err
}
for {
childIdxRow, err := rdr.ReadRow(ctx)
if err == io.EOF {
break
}
if err != nil {
return err
}
parentIdxRow, err := rc.Convert(childIdxRow)
if err != nil {
return err
}
if row.IsEmpty(parentIdxRow) {
continue
}
partial, err := row.ReduceToIndexPartialKey(parentDef, parentIdxRow)
if err != nil {
return err
}
indexIter := noms.NewNomsRangeReader(parentDef.Schema(), parentIdx,
[]*noms.ReadRange{{Start: partial, Inclusive: true, Reverse: false, Check: func(tuple types.Tuple) (bool, error) {
return tuple.StartsWith(partial), nil
}}},
)
switch _, err = indexIter.ReadRow(ctx); err {
case nil:
continue // parent table contains child key
case io.EOF:
indexKeyStr, _ := types.EncodedValue(ctx, partial)
return fmt.Errorf("foreign key violation on `%s`.`%s`: `%s`", fk.Name, fk.TableName, indexKeyStr)
default:
return err
}
}
return nil
}

View File

@@ -1065,10 +1065,10 @@ func (root *RootValue) PutForeignKeyCollection(ctx context.Context, fkc *Foreign
return &RootValue{root.vrw, rootValSt, fkc.copy()}, nil
}
// ValidateForeignKeys ensures that all foreign keys' tables are present, removing any foreign keys where the declared
// table is missing, and returning an error if a key is in an invalid state or a referenced table is missing.
//TODO: validate that rows that were not modified still adhere to the foreign key constraints
func (root *RootValue) ValidateForeignKeys(ctx context.Context) (*RootValue, error) {
// ValidateForeignKeysOnSchemas ensures that all foreign keys' tables are present, removing any foreign keys where the declared
// table is missing, and returning an error if a key is in an invalid state or a referenced table is missing. Does not
// check any tables' row data.
func (root *RootValue) ValidateForeignKeysOnSchemas(ctx context.Context) (*RootValue, error) {
fkCollection, err := root.GetForeignKeyCollection(ctx)
if err != nil {
return nil, err

View File

@@ -17,6 +17,7 @@ package actions
import (
"context"
"errors"
"fmt"
"sort"
"time"
@@ -130,7 +131,7 @@ func CommitStaged(ctx context.Context, dbData env.DbData, props CommitStagedProp
}
if props.CheckForeignKeys {
srt, err = srt.ValidateForeignKeys(ctx)
srt, err = ValidateForeignKeysOnCommit(ctx, srt, stagedTblNames)
if err != nil {
return "", err
}
@@ -179,6 +180,70 @@ func CommitStaged(ctx context.Context, dbData env.DbData, props CommitStagedProp
return h.String(), err
}
func ValidateForeignKeysOnCommit(ctx context.Context, srt *doltdb.RootValue, stagedTblNames []string) (*doltdb.RootValue, error) {
// Validate schemas
srt, err := srt.ValidateForeignKeysOnSchemas(ctx)
if err != nil {
return nil, err
}
// Validate data
//TODO: make this more efficient, perhaps by leveraging diffs?
fkColl, err := srt.GetForeignKeyCollection(ctx)
if err != nil {
return nil, err
}
fksToCheck := make(map[string]doltdb.ForeignKey)
for _, tblName := range stagedTblNames {
declaredFk, referencedByFk := fkColl.KeysForTable(tblName)
for _, fk := range declaredFk {
fksToCheck[fk.Name] = fk
}
for _, fk := range referencedByFk {
fksToCheck[fk.Name] = fk
}
}
for _, fk := range fksToCheck {
childTbl, _, ok, err := srt.GetTableInsensitive(ctx, fk.TableName)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("foreign key '%s' references missing table '%s'", fk.Name, fk.TableName)
}
childSch, err := childTbl.GetSchema(ctx)
if err != nil {
return nil, err
}
childIdx := childSch.Indexes().GetByName(fk.TableIndex)
childIdxRowData, err := childTbl.GetIndexRowData(ctx, fk.TableIndex)
if err != nil {
return nil, err
}
parentTbl, _, ok, err := srt.GetTableInsensitive(ctx, fk.ReferencedTableName)
if err != nil {
return nil, err
}
if !ok {
return nil, fmt.Errorf("foreign key '%s' references missing table '%s'", fk.Name, fk.ReferencedTableName)
}
parentTblSch, err := parentTbl.GetSchema(ctx)
if err != nil {
return nil, err
}
parentIdx := parentTblSch.Indexes().GetByName(fk.ReferencedTableIndex)
parentIdxRowData, err := parentTbl.GetIndexRowData(ctx, fk.ReferencedTableIndex)
if err != nil {
return nil, err
}
err = fk.ValidateData(ctx, childIdxRowData, parentIdxRowData, childIdx, parentIdx)
if err != nil {
return nil, err
}
}
return srt, nil
}
// TimeSortedCommits returns a reverse-chronological (latest-first) list of the most recent `n` ancestors of `commit`.
// Passing a negative value for `n` will result in all ancestors being returned.
func TimeSortedCommits(ctx context.Context, ddb *doltdb.DoltDB, commit *doltdb.Commit, n int) ([]*doltdb.Commit, error) {

View File

@@ -34,7 +34,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema/encoding"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/types"
@@ -1036,7 +1035,7 @@ func (t *AlterableDoltTable) CreateForeignKey(
if err != nil {
return err
}
err = table.ForeignKeyIsSatisfied(ctx, foreignKey, tableIndexData, refTableIndexData, tableIndex, refTableIndex)
err = foreignKey.ValidateData(ctx, tableIndexData, refTableIndexData, tableIndex, refTableIndex)
if err != nil {
return err
}

View File

@@ -17,14 +17,11 @@ package table
import (
"context"
"errors"
"fmt"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/types"
)
@@ -115,75 +112,3 @@ func ReadAllRows(ctx context.Context, rd TableReader, contOnBadRow bool) ([]row.
return nil, badRowCount, err
}
// ForeignKeyIsSatisfied ensures that the foreign key is valid by comparing the index data from the given table
// against the index data from the referenced table.
func ForeignKeyIsSatisfied(ctx context.Context, fk doltdb.ForeignKey, childIdx, parentIdx types.Map, childDef, parentDef schema.Index) error {
if fk.ReferencedTableIndex != parentDef.Name() {
return fmt.Errorf("cannot validate data as wrong referenced index was given: expected `%s` but received `%s`",
fk.ReferencedTableIndex, parentDef.Name())
}
tagMap := make(map[uint64]uint64, len(fk.TableColumns))
for i, childTag := range fk.TableColumns {
tagMap[childTag] = fk.ReferencedTableColumns[i]
}
// FieldMappings ignore columns not in the tagMap
fm, err := rowconv.NewFieldMapping(childDef.Schema(), parentDef.Schema(), tagMap)
if err != nil {
return err
}
vrw := types.NewMemoryValueStore() // We are checking fks rather than persisting any values, so an internal VRW can be used
rc, err := rowconv.NewRowConverter(ctx, vrw, fm)
if err != nil {
return err
}
rdr, err := noms.NewNomsMapReader(ctx, childIdx, childDef.Schema())
if err != nil {
return err
}
for {
childIdxRow, err := rdr.ReadRow(ctx)
if err == io.EOF {
break
}
if err != nil {
return err
}
parentIdxRow, err := rc.Convert(childIdxRow)
if err != nil {
return err
}
if row.IsEmpty(parentIdxRow) {
continue
}
partial, err := row.ReduceToIndexPartialKey(parentDef, parentIdxRow)
if err != nil {
return err
}
indexIter := noms.NewNomsRangeReader(parentDef.Schema(), parentIdx,
[]*noms.ReadRange{{Start: partial, Inclusive: true, Reverse: false, Check: func(tuple types.Tuple) (bool, error) {
return tuple.StartsWith(partial), nil
}}},
)
switch _, err = indexIter.ReadRow(ctx); err {
case nil:
continue // parent table contains child key
case io.EOF:
indexKeyStr, _ := types.EncodedValue(ctx, partial)
return fmt.Errorf("foreign key violation on `%s`.`%s`: `%s`", fk.Name, fk.TableName, indexKeyStr)
default:
return err
}
}
return nil
}