diff --git a/go/libraries/doltcore/merge/violations_fk.go b/go/libraries/doltcore/merge/violations_fk.go index 2c3002c7fe..d9ac08c04a 100644 --- a/go/libraries/doltcore/merge/violations_fk.go +++ b/go/libraries/doltcore/merge/violations_fk.go @@ -503,16 +503,23 @@ func newConstraintViolationsLoadedTable(ctx context.Context, tblName, idxName st return nil, false, err } - // using primary key as index + // Create Primary Key Index if idxName == "PRIMARY" { - idx := sch.PkIndex() + pkCols := sch.GetPKCols() + pkIdxColl := schema.NewIndexCollection(pkCols, pkCols) + pkIdxProps := schema.IndexProperties{ + IsUnique: true, + IsUserDefined: false, + Comment: "", + } + pkIdx := schema.NewIndex("PRIMARY", pkCols.SortedTags, pkCols.SortedTags, pkIdxColl, pkIdxProps) return &constraintViolationsLoadedTable{ TableName: trueTblName, Table: tbl, Schema: sch, RowData: rowData, - Index: idx, - IndexSchema: idx.Schema(), + Index: pkIdx, + IndexSchema: pkIdx.Schema(), IndexData: rowData, }, true, nil } diff --git a/go/libraries/doltcore/schema/index.go b/go/libraries/doltcore/schema/index.go index 2a46a68fa7..b1af7c4332 100644 --- a/go/libraries/doltcore/schema/index.go +++ b/go/libraries/doltcore/schema/index.go @@ -68,12 +68,12 @@ type indexImpl struct { comment string } -func NewIndex(name string, tags, allTags []uint64, indexColl *indexCollectionImpl, props IndexProperties) Index { +func NewIndex(name string, tags, allTags []uint64, indexColl IndexCollection, props IndexProperties) Index { return &indexImpl{ name: name, tags: tags, allTags: allTags, - indexColl: indexColl, + indexColl: indexColl.(*indexCollectionImpl), isUnique: props.IsUnique, isUserDefined: props.IsUserDefined, comment: props.Comment, diff --git a/go/libraries/doltcore/schema/schema.go b/go/libraries/doltcore/schema/schema.go index d779de5f4d..34ac18f720 100644 --- a/go/libraries/doltcore/schema/schema.go +++ b/go/libraries/doltcore/schema/schema.go @@ -59,9 +59,6 @@ type Schema interface { // Indexes returns a collection of all indexes on the table that this schema belongs to. Indexes() IndexCollection - // PkIndex returns the primary key indexes on this table. - PkIndex() Index - // Checks returns a collection of all check constraints on the table that this schema belongs to. Checks() CheckCollection diff --git a/go/libraries/doltcore/schema/schema_impl.go b/go/libraries/doltcore/schema/schema_impl.go index 6c815aa60e..9f6e5d80ad 100644 --- a/go/libraries/doltcore/schema/schema_impl.go +++ b/go/libraries/doltcore/schema/schema_impl.go @@ -289,17 +289,6 @@ func (si *schemaImpl) Indexes() IndexCollection { return si.indexCollection } -// PkIndex creates a primary key index from the schema -func (si *schemaImpl) PkIndex() Index { - pkIdxColl := NewIndexCollection(si.pkCols, si.pkCols) - pkProps := IndexProperties{ - IsUnique: true, - IsUserDefined: false, - Comment: "", - } - return NewIndex("PRIMARY", si.pkCols.Tags, si.pkCols.Tags, pkIdxColl.(*indexCollectionImpl), pkProps) -} - func (si *schemaImpl) Checks() CheckCollection { return si.checkCollection } diff --git a/go/libraries/doltcore/sqle/tables.go b/go/libraries/doltcore/sqle/tables.go index f0df354893..1e04d7d1f5 100644 --- a/go/libraries/doltcore/sqle/tables.go +++ b/go/libraries/doltcore/sqle/tables.go @@ -1897,6 +1897,19 @@ func (t *AlterableDoltTable) RenameIndex(ctx *sql.Context, fromIndexName string, return t.updateFromRoot(ctx, newRoot) } +func isPkPrefix(fkColNames []string, pkCols []schema.Column) bool { + // can't be a prefix if it's longer + if len(fkColNames) > len(pkCols) { + return false + } + for i, fkColName := range fkColNames { + if strings.ToLower(fkColName) != strings.ToLower(pkCols[i].Name) { + return false + } + } + return true +} + // AddForeignKey implements sql.ForeignKeyTable func (t *AlterableDoltTable) AddForeignKey(ctx *sql.Context, sqlFk sql.ForeignKeyConstraint) error { if err := branch_control.CheckAccess(ctx, branch_control.Permissions_Write); err != nil { @@ -1971,24 +1984,31 @@ func (t *AlterableDoltTable) AddForeignKey(ctx *sql.Context, sqlFk sql.ForeignKe refColTags[i] = refCol.Tag } + var tableIndexName, refTableIndexName string tableIndex, ok, err := findIndexWithPrefix(t.sch, sqlFk.Columns) if err != nil { return err } if !ok { - idxReturn, err := creation.CreateIndex(ctx, tbl, "", sqlFk.Columns, false, false, "", editor.Options{ - ForeignKeyChecksDisabled: true, - Deaf: t.opts.Deaf, - Tempdir: t.opts.Tempdir, - }) - if err != nil { - return err - } - tableIndex = idxReturn.NewIndex - tbl = idxReturn.NewTable - root, err = root.PutTable(ctx, t.tableName, idxReturn.NewTable) - if sqlFk.IsSelfReferential() { - refTbl = idxReturn.NewTable + // check if foreign key is prefix of primary key + if isPkPrefix(sqlFk.Columns, t.sch.GetPKCols().GetColumns()) { + tableIndexName = "PRIMARY" + } else { + idxReturn, err := creation.CreateIndex(ctx, tbl, "", sqlFk.Columns, false, false, "", editor.Options{ + ForeignKeyChecksDisabled: true, + Deaf: t.opts.Deaf, + Tempdir: t.opts.Tempdir, + }) + if err != nil { + return err + } + tableIndex = idxReturn.NewIndex + tbl = idxReturn.NewTable + root, err = root.PutTable(ctx, t.tableName, idxReturn.NewTable) + if sqlFk.IsSelfReferential() { + refTbl = idxReturn.NewTable + } + tableIndexName = tableIndex.Name() } } @@ -1997,42 +2017,47 @@ func (t *AlterableDoltTable) AddForeignKey(ctx *sql.Context, sqlFk sql.ForeignKe return err } if !ok { - var refPkTags []uint64 - for _, i := range refSch.GetPkOrdinals() { - refPkTags = append(refPkTags, refSch.GetAllCols().GetByIndex(i).Tag) - } + // check if foreign key is prefix of primary key + if isPkPrefix(sqlFk.ParentColumns, refSch.GetPKCols().GetColumns()) { + refTableIndexName = "PRIMARY" + } else { + var refPkTags []uint64 + for _, i := range refSch.GetPkOrdinals() { + refPkTags = append(refPkTags, refSch.GetAllCols().GetByIndex(i).Tag) + } - var colNames []string - for _, t := range refColTags { - c, _ := refSch.GetAllCols().GetByTag(t) - colNames = append(colNames, c.Name) - } + var colNames []string + for _, t := range refColTags { + c, _ := refSch.GetAllCols().GetByTag(t) + colNames = append(colNames, c.Name) + } - // Our duplicate index is only unique if it's the entire primary key (which is by definition unique) - unique := len(refPkTags) == len(refColTags) - idxReturn, err := creation.CreateIndex(ctx, refTbl, "", colNames, unique, false, "", editor.Options{ - ForeignKeyChecksDisabled: true, - Deaf: t.opts.Deaf, - Tempdir: t.opts.Tempdir, - }) - if err != nil { - return err - } - refTbl = idxReturn.NewTable - refTableIndex = idxReturn.NewIndex - root, err = root.PutTable(ctx, sqlFk.ParentTable, idxReturn.NewTable) - if err != nil { - return err + // Our duplicate index is only unique if it's the entire primary key (which is by definition unique) + unique := len(refPkTags) == len(refColTags) + idxReturn, err := creation.CreateIndex(ctx, refTbl, "", colNames, unique, false, "", editor.Options{ + ForeignKeyChecksDisabled: true, + Deaf: t.opts.Deaf, + Tempdir: t.opts.Tempdir, + }) + if err != nil { + return err + } + refTbl = idxReturn.NewTable + refTableIndex = idxReturn.NewIndex + root, err = root.PutTable(ctx, sqlFk.ParentTable, idxReturn.NewTable) + if err != nil { + return err + } + refTableIndexName = refTableIndex.Name() } } - doltFk = doltdb.ForeignKey{ Name: sqlFk.Name, TableName: sqlFk.Table, - TableIndex: tableIndex.Name(), + TableIndex: tableIndexName, TableColumns: colTags, ReferencedTableName: sqlFk.ParentTable, - ReferencedTableIndex: refTableIndex.Name(), + ReferencedTableIndex: refTableIndexName, ReferencedTableColumns: refColTags, OnUpdate: onUpdateRefAction, OnDelete: onDeleteRefAction, @@ -2624,7 +2649,6 @@ func findIndexWithPrefix(sch schema.Schema, prefixCols []string) (schema.Index, prefixCols = lowercaseSlice(prefixCols) indexes := sch.Indexes().AllIndexes() - indexes = append(indexes, sch.PkIndex()) colLen := len(prefixCols) var indexesWithLen []idxWithLen for _, idx := range indexes {