First pass at FK equality checking to handle resolved/unresolved FKs.

This commit is contained in:
Jason Fulghum
2023-05-15 15:26:21 -07:00
parent d80f046f98
commit 3899ddf05a
3 changed files with 127 additions and 34 deletions

View File

@@ -398,7 +398,29 @@ func (td TableDelta) CurName() string {
}
func (td TableDelta) HasFKChanges() bool {
return !fkSlicesAreEqual(td.FromFks, td.ToFks)
if len(td.FromFks) != len(td.ToFks) {
return true
}
sort.Slice(td.FromFks, func(i, j int) bool {
return td.FromFks[i].Name < td.FromFks[j].Name
})
sort.Slice(td.ToFks, func(i, j int) bool {
return td.ToFks[i].Name < td.ToFks[j].Name
})
fromSchemaMap := td.FromFksParentSch
fromSchemaMap[td.FromName] = td.FromSch
toSchemaMap := td.ToFksParentSch
toSchemaMap[td.ToName] = td.ToSch
for i := range td.FromFks {
if !td.FromFks[i].Equals(td.ToFks[i], fromSchemaMap, toSchemaMap) {
return true
}
}
return false
}
// GetSchemas returns the table's schema at the fromRoot and toRoot, or schema.Empty if the table did not exist.
@@ -538,26 +560,6 @@ func (td TableDelta) GetRowData(ctx context.Context) (from, to durable.Index, er
return from, to, nil
}
func fkSlicesAreEqual(from, to []doltdb.ForeignKey) bool {
if len(from) != len(to) {
return false
}
sort.Slice(from, func(i, j int) bool {
return from[i].Name < from[j].Name
})
sort.Slice(to, func(i, j int) bool {
return to[i].Name < to[j].Name
})
for i := range from {
if !from[i].DeepEquals(to[i]) {
return false
}
}
return true
}
// SqlSchemaDiff returns a slice of DDL statements that will transform the schema in the from delta to the schema in
// the to delta.
func SqlSchemaDiff(ctx context.Context, td TableDelta, toSchemas map[string]schema.Schema) ([]string, error) {

View File

@@ -124,7 +124,86 @@ func (fk ForeignKey) EqualDefs(other ForeignKey) bool {
fk.OnDelete == other.OnDelete
}
// DeepEquals compares all attributes of a foreign key to another, including name and table names.
// Equals compares this ForeignKey to |other| and returns true if they are equal. Foreign keys can either be in
// a "resolved" state, where the referenced columns in the parent and child tables are identified by column tags,
// or in an "unresolved" state where the reference columns in the parent and child are still identified by strings.
// If one foreign key is resolved and one is unresolved, the logic for comparing them requires resolving the string
// column names to column tags, which is why |fkSchemasByName| and |otherSchemasByName| are passed in. Each of these
// is a map of table schemas for |fk| and |other|, where the child table and every parent table referenced in the
// foreign key is present in the map.
func (fk ForeignKey) Equals(other ForeignKey, fkSchemasByName, otherSchemasByName map[string]schema.Schema) bool {
// If both FKs are resolved or unresolved, we can just deeply compare them
if fk.IsResolved() == other.IsResolved() {
return fk.DeepEquals(other)
}
// Otherwise, one FK is resolved and one is not, so we need to work a little harder
// to calculate equality since their referenced columns are represented differently.
// First check the attributes that don't change when an FK is resolved or unresolved.
if fk.Name != other.Name &&
fk.TableName != other.TableName &&
fk.ReferencedTableName != other.ReferencedTableName &&
fk.TableIndex != other.TableIndex &&
fk.ReferencedTableIndex != other.ReferencedTableIndex &&
fk.OnUpdate == other.OnUpdate &&
fk.OnDelete == other.OnDelete {
return false
}
// Sort out which FK is resolved and which is not
var resolvedFK, unresolvedFK ForeignKey
var resolvedSchemasByName map[string]schema.Schema
if fk.IsResolved() {
resolvedFK, unresolvedFK, resolvedSchemasByName = fk, other, fkSchemasByName
} else {
resolvedFK, unresolvedFK, resolvedSchemasByName = other, fk, otherSchemasByName
}
// Check the columns on the child table
if len(resolvedFK.TableColumns) != len(unresolvedFK.UnresolvedFKDetails.TableColumns) {
return false
}
for i, tag := range resolvedFK.TableColumns {
unresolvedColName := unresolvedFK.UnresolvedFKDetails.TableColumns[i]
resolvedSch, ok := resolvedSchemasByName[resolvedFK.TableName]
if !ok {
return false
}
resolvedCol, ok := resolvedSch.GetAllCols().GetByTag(tag)
if !ok {
return false
}
if resolvedCol.Name != unresolvedColName {
return false
}
}
// Check the columns on the parent table
if len(resolvedFK.ReferencedTableColumns) != len(unresolvedFK.UnresolvedFKDetails.ReferencedTableColumns) {
return false
}
for i, tag := range resolvedFK.ReferencedTableColumns {
unresolvedColName := unresolvedFK.UnresolvedFKDetails.ReferencedTableColumns[i]
resolvedSch, ok := resolvedSchemasByName[unresolvedFK.ReferencedTableName]
if !ok {
return false
}
resolvedCol, ok := resolvedSch.GetAllCols().GetByTag(tag)
if !ok {
return false
}
if resolvedCol.Name != unresolvedColName {
return false
}
}
return true
}
// DeepEquals compares all attributes of a foreign key to another, including name and
// table names. Note that if one foreign key is resolved and the other is NOT resolved,
// then this function will not calculate equality correctly. When comparing a resolved
// FK with an unresolved FK, the ForeignKey.Equals() function should be used instead.
func (fk ForeignKey) DeepEquals(other ForeignKey) bool {
if !fk.EqualDefs(other) {
return false

View File

@@ -771,23 +771,35 @@ SQL
[[ "$output" =~ 'resolved foreign key' ]] || false
}
@test "diff: existing foreign key is resolved" {
@test "diff: resolved FKs don't show up in diff results" {
dolt sql <<SQL
set foreign_key_checks=0;
create table parent (i int primary key);
create table child (j int primary key, constraint fk foreign key (j) references parent (i));
SET @@foreign_key_checks=0;
CREATE TABLE dept_emp (
emp_no int NOT NULL,
dept_no char(4) COLLATE utf8mb4_0900_ai_ci NOT NULL,
PRIMARY KEY (emp_no,dept_no),
KEY dept_no (dept_no),
CONSTRAINT dept_emp_ibfk_1 FOREIGN KEY (emp_no) REFERENCES employees (emp_no) ON DELETE CASCADE
);
CREATE TABLE employees (
emp_no int NOT NULL,
nickname varchar(100),
PRIMARY KEY (emp_no)
);
insert into employees values (100, "bob");
insert into dept_emp values (100, 1);
SQL
run dolt diff
[ "$status" -eq 0 ]
[[ ! "$output" =~ 'resolved foreign key' ]] || false
dolt commit -Am "Importing data, with unresolved FKs"
dolt add -A
dolt commit -m "init commit"
dolt sql -q "delete from parent where i = 0"
# update a row to trigger FKs to be resolved
dolt sql -q "UPDATE employees SET nickname = 'bobby' WHERE emp_no = 100;"
dolt commit -am "Updating data, and resolving FKs"
run dolt diff
# check that the diff output doesn't mention FKs getting resolved or the dept_emp table
run dolt diff HEAD~ HEAD
[ "$status" -eq 0 ]
[[ "$output" =~ 'resolved foreign key' ]] || false
! [[ "$output" =~ "dept_emp" ]] || false
! [[ "$output" =~ "resolved foreign key" ]] || false
}
@test "diff: with index and foreign key changes" {