Merge pull request #5319 from dolthub/james/fk-stuff-again

don't create new index if matching on primary key
This commit is contained in:
James Cor
2023-02-09 13:38:44 -08:00
committed by GitHub
2 changed files with 114 additions and 215 deletions
+102 -215
View File
@@ -1921,6 +1921,103 @@ func (t *AlterableDoltTable) RenameIndex(ctx *sql.Context, fromIndexName string,
return t.updateFromRoot(ctx, newRoot)
}
// createForeignKey creates a doltdb.ForeignKey from a sql.ForeignKeyConstraint
func (t *AlterableDoltTable) createForeignKey(
ctx *sql.Context,
root *doltdb.RootValue,
tbl *doltdb.Table,
sqlFk sql.ForeignKeyConstraint,
onUpdateRefAction, onDeleteRefAction doltdb.ForeignKeyReferentialAction) (doltdb.ForeignKey, error) {
if !sqlFk.IsResolved {
return doltdb.ForeignKey{
Name: sqlFk.Name,
TableName: sqlFk.Table,
TableIndex: "",
TableColumns: nil,
ReferencedTableName: sqlFk.ParentTable,
ReferencedTableIndex: "",
ReferencedTableColumns: nil,
OnUpdate: onUpdateRefAction,
OnDelete: onDeleteRefAction,
UnresolvedFKDetails: doltdb.UnresolvedFKDetails{
TableColumns: sqlFk.Columns,
ReferencedTableColumns: sqlFk.ParentColumns,
},
}, nil
}
colTags := make([]uint64, len(sqlFk.Columns))
for i, col := range sqlFk.Columns {
tableCol, ok := t.sch.GetAllCols().GetByNameCaseInsensitive(col)
if !ok {
return doltdb.ForeignKey{}, fmt.Errorf("table `%s` does not have column `%s`", sqlFk.Table, col)
}
colTags[i] = tableCol.Tag
}
var refTbl *doltdb.Table
var refSch schema.Schema
if sqlFk.IsSelfReferential() {
refTbl = tbl
refSch = t.sch
} else {
var ok bool
var err error
refTbl, _, ok, err = root.GetTableInsensitive(ctx, sqlFk.ParentTable)
if err != nil {
return doltdb.ForeignKey{}, err
}
if !ok {
return doltdb.ForeignKey{}, fmt.Errorf("referenced table `%s` does not exist", sqlFk.ParentTable)
}
refSch, err = refTbl.GetSchema(ctx)
if err != nil {
return doltdb.ForeignKey{}, err
}
}
refColTags := make([]uint64, len(sqlFk.ParentColumns))
for i, name := range sqlFk.ParentColumns {
refCol, ok := refSch.GetAllCols().GetByNameCaseInsensitive(name)
if !ok {
return doltdb.ForeignKey{}, fmt.Errorf("table `%s` does not have column `%s`", sqlFk.ParentTable, name)
}
refColTags[i] = refCol.Tag
}
var tableIndexName, refTableIndexName string
tableIndex, ok, err := findIndexWithPrefix(t.sch, sqlFk.Columns)
if err != nil {
return doltdb.ForeignKey{}, err
}
// Use secondary index if found; otherwise it will use empty string, indicating primary key
if ok {
tableIndexName = tableIndex.Name()
}
refTableIndex, ok, err := findIndexWithPrefix(refSch, sqlFk.ParentColumns)
if err != nil {
return doltdb.ForeignKey{}, err
}
// Use secondary index if found; otherwise it will use empty string, indicating primary key
if ok {
refTableIndexName = refTableIndex.Name()
}
return doltdb.ForeignKey{
Name: sqlFk.Name,
TableName: sqlFk.Table,
TableIndex: tableIndexName,
TableColumns: colTags,
ReferencedTableName: sqlFk.ParentTable,
ReferencedTableIndex: refTableIndexName,
ReferencedTableColumns: refColTags,
OnUpdate: onUpdateRefAction,
OnDelete: onDeleteRefAction,
UnresolvedFKDetails: doltdb.UnresolvedFKDetails{
TableColumns: sqlFk.Columns,
ReferencedTableColumns: sqlFk.ParentColumns,
},
}, nil
}
// AddForeignKey implements sql.ForeignKeyTable
func (t *AlterableDoltTable) AddForeignKey(ctx *sql.Context, sqlFk sql.ForeignKeyConstraint) error {
if err := branch_control.CheckAccess(ctx, branch_control.Permissions_Write); err != nil {
@@ -1954,95 +2051,9 @@ func (t *AlterableDoltTable) AddForeignKey(ctx *sql.Context, sqlFk sql.ForeignKe
return err
}
var doltFk doltdb.ForeignKey
if sqlFk.IsResolved {
colTags := make([]uint64, len(sqlFk.Columns))
for i, col := range sqlFk.Columns {
tableCol, ok := t.sch.GetAllCols().GetByNameCaseInsensitive(col)
if !ok {
return fmt.Errorf("table `%s` does not have column `%s`", sqlFk.Table, col)
}
colTags[i] = tableCol.Tag
}
var refTbl *doltdb.Table
var ok bool
var refSch schema.Schema
if sqlFk.IsSelfReferential() {
refTbl = tbl
refSch = t.sch
} else {
refTbl, _, ok, err = root.GetTableInsensitive(ctx, sqlFk.ParentTable)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("referenced table `%s` does not exist", sqlFk.ParentTable)
}
refSch, err = refTbl.GetSchema(ctx)
if err != nil {
return err
}
}
refColTags := make([]uint64, len(sqlFk.ParentColumns))
for i, name := range sqlFk.ParentColumns {
refCol, ok := refSch.GetAllCols().GetByNameCaseInsensitive(name)
if !ok {
return fmt.Errorf("table `%s` does not have column `%s`", sqlFk.ParentTable, name)
}
refColTags[i] = refCol.Tag
}
var tableIndexName, refTableIndexName string
tableIndex, ok, err := findIndexWithPrefix(t.sch, sqlFk.Columns)
if err != nil {
return err
}
// Use secondary index if found; otherwise it will use empty string, indicating primary key
if ok {
tableIndexName = tableIndex.Name()
}
refTableIndex, ok, err := findIndexWithPrefix(refSch, sqlFk.ParentColumns)
if err != nil {
return err
}
// Use secondary index if found; otherwise it will use empty string, indicating primary key
if ok {
refTableIndexName = refTableIndex.Name()
}
doltFk = doltdb.ForeignKey{
Name: sqlFk.Name,
TableName: sqlFk.Table,
TableIndex: tableIndexName,
TableColumns: colTags,
ReferencedTableName: sqlFk.ParentTable,
ReferencedTableIndex: refTableIndexName,
ReferencedTableColumns: refColTags,
OnUpdate: onUpdateRefAction,
OnDelete: onDeleteRefAction,
UnresolvedFKDetails: doltdb.UnresolvedFKDetails{
TableColumns: sqlFk.Columns,
ReferencedTableColumns: sqlFk.ParentColumns,
},
}
} else {
doltFk = doltdb.ForeignKey{
Name: sqlFk.Name,
TableName: sqlFk.Table,
TableIndex: "",
TableColumns: nil,
ReferencedTableName: sqlFk.ParentTable,
ReferencedTableIndex: "",
ReferencedTableColumns: nil,
OnUpdate: onUpdateRefAction,
OnDelete: onDeleteRefAction,
UnresolvedFKDetails: doltdb.UnresolvedFKDetails{
TableColumns: sqlFk.Columns,
ReferencedTableColumns: sqlFk.ParentColumns,
},
}
doltFk, err := t.createForeignKey(ctx, root, tbl, sqlFk, onUpdateRefAction, onDeleteRefAction)
if err != nil {
return err
}
fkc, err := root.GetForeignKeyCollection(ctx)
@@ -2116,12 +2127,7 @@ func (t *AlterableDoltTable) UpdateForeignKey(ctx *sql.Context, fkName string, s
doltFk.UnresolvedFKDetails.TableColumns = sqlFk.Columns
doltFk.UnresolvedFKDetails.ReferencedTableColumns = sqlFk.ParentColumns
if doltFk.IsResolved() && !sqlFk.IsResolved { // Need to unresolve the foreign key
doltFk.TableIndex = ""
doltFk.TableColumns = nil
doltFk.ReferencedTableIndex = ""
doltFk.ReferencedTableColumns = nil
} else if !doltFk.IsResolved() && sqlFk.IsResolved { // Need to assign tags and indexes since it's resolved
if !doltFk.IsResolved() || !sqlFk.IsResolved {
tbl, _, ok, err := root.GetTableInsensitive(ctx, t.tableName)
if err != nil {
return err
@@ -2129,129 +2135,10 @@ func (t *AlterableDoltTable) UpdateForeignKey(ctx *sql.Context, fkName string, s
if !ok {
return sql.ErrTableNotFound.New(t.tableName)
}
colTags := make([]uint64, len(sqlFk.Columns))
for i, col := range sqlFk.Columns {
tableCol, ok := t.sch.GetAllCols().GetByNameCaseInsensitive(col)
if !ok {
return fmt.Errorf("table `%s` does not have column `%s`", sqlFk.Table, col)
}
colTags[i] = tableCol.Tag
}
var refTbl *doltdb.Table
var refSch schema.Schema
if sqlFk.IsSelfReferential() {
refTbl = tbl
refSch = t.sch
} else {
refTbl, _, ok, err = root.GetTableInsensitive(ctx, sqlFk.ParentTable)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("referenced table `%s` does not exist", sqlFk.ParentTable)
}
refSch, err = refTbl.GetSchema(ctx)
if err != nil {
return err
}
}
refColTags := make([]uint64, len(sqlFk.ParentColumns))
for i, name := range sqlFk.ParentColumns {
refCol, ok := refSch.GetAllCols().GetByNameCaseInsensitive(name)
if !ok {
return fmt.Errorf("table `%s` does not have column `%s`", sqlFk.ParentTable, name)
}
refColTags[i] = refCol.Tag
}
tableIndex, ok, err := findIndexWithPrefix(t.sch, sqlFk.Columns)
doltFk, err = t.createForeignKey(ctx, root, tbl, sqlFk, doltFk.OnUpdate, doltFk.OnDelete)
if err != nil {
return err
}
if !ok {
// The engine matched on a primary key, and Dolt does not yet support using the primary key within the
// schema.Index interface (which is used internally to represent indexes across the codebase). In the
// meantime, we must generate a duplicate key over the primary key.
//TODO: use the primary key as-is
idxReturn, err := creation.CreateIndex(
ctx,
tbl,
"",
sqlFk.Columns,
nil,
false,
false,
"",
editor.Options{
ForeignKeyChecksDisabled: true,
Deaf: t.opts.Deaf,
Tempdir: t.opts.Tempdir,
})
if err != nil {
return err
}
tableIndex = idxReturn.NewIndex
tbl = idxReturn.NewTable
root, err = root.PutTable(ctx, t.tableName, idxReturn.NewTable)
if sqlFk.IsSelfReferential() {
refTbl = idxReturn.NewTable
}
}
refTableIndex, ok, err := findIndexWithPrefix(refSch, sqlFk.ParentColumns)
if err != nil {
return err
}
if !ok {
// The engine matched on a primary key, and Dolt does not yet support using the primary key within the
// schema.Index interface (which is used internally to represent indexes across the codebase). In the
// meantime, we must generate a duplicate key over the primary key.
//TODO: use the primary key as-is
var refPkTags []uint64
for _, i := range refSch.GetPkOrdinals() {
refPkTags = append(refPkTags, refSch.GetAllCols().GetByIndex(i).Tag)
}
var colNames []string
for _, t := range refColTags {
c, _ := refSch.GetAllCols().GetByTag(t)
colNames = append(colNames, c.Name)
}
// Our duplicate index is only unique if it's the entire primary key (which is by definition unique)
unique := len(refPkTags) == len(refColTags)
idxReturn, err := creation.CreateIndex(
ctx,
refTbl,
"",
colNames,
nil,
unique,
false,
"",
editor.Options{
ForeignKeyChecksDisabled: true,
Deaf: t.opts.Deaf,
Tempdir: t.opts.Tempdir,
})
if err != nil {
return err
}
refTbl = idxReturn.NewTable
refTableIndex = idxReturn.NewIndex
root, err = root.PutTable(ctx, sqlFk.ParentTable, idxReturn.NewTable)
if err != nil {
return err
}
}
doltFk.TableIndex = tableIndex.Name()
doltFk.TableColumns = colTags
doltFk.ReferencedTableIndex = refTableIndex.Name()
doltFk.ReferencedTableColumns = refColTags
}
err = fkc.AddKeys(doltFk)
@@ -2861,3 +2861,15 @@ SQL
[[ $output =~ "Automatic merge failed; 1 table(s) are unmerged." ]]
}
@test "constraint-violations: altering FKs over PKs does not create bad index" {
dolt sql <<SQL
set foreign_key_checks=0;
create table child (j int primary key, foreign key (j) references parent (i));
create table parent (i int primary key);
set foreign_key_checks=1;
delete from parent where i = 0;
SQL
run dolt index ls
[[ "$output" =~ "No indexes in the working set" ]] || false
}