This commit is contained in:
James Cor
2022-03-07 13:08:08 -08:00
parent 9f0e490a46
commit f0d47bd82f
4 changed files with 40 additions and 27 deletions

View File

@@ -52,7 +52,7 @@ func AddPrimaryKeyToTable(ctx context.Context, table *doltdb.Table, tableName st
for ord, col := range sch.GetAllCols().GetColumns() {
if i, ok := pkColOrdering[col.Name]; ok {
pkOrdinals[i] = ord
// Can't assign IsPartOfPK true, before checking if IsNullable, since it'll always be false
// Only add NOT NULL constraint iff it doesn't exist
if col.IsNullable() {
col.Constraints = append(col.Constraints, schema.NotNullConstraint{})
}
@@ -202,4 +202,4 @@ func fmtPrimaryKeyError(sch schema.Schema, keylessRow row.Row) error {
func duplicatePkFunction(keyString, indexName string, k, v types.Tuple, isPk bool) error {
return sql.NewUniqueKeyErr(fmt.Sprintf("%s", keyString), true, sql.Row{})
}
}

View File

@@ -39,10 +39,10 @@ func DropPrimaryKeyFromTable(ctx context.Context, table *doltdb.Table, nbf *type
// Modify the schema to convert the primary key cols into non primary key cols
newCollection := schema.MapColCollection(sch.GetAllCols(), func(col schema.Column) schema.Column {
col.IsPartOfPK = false
// TODO: should be impossible for column to have been a primary key, but not have a NOT NULL constraint, do I still bother to check?
//if col.IsNullable() {
// col.Constraints = append(col.Constraints, schema.NotNullConstraint{})
//}
// Remove PK, does not remove NOT NULL constraint, so add it back if it's somehow gone
if col.IsNullable() {
col.Constraints = append(col.Constraints, schema.NotNullConstraint{})
}
return col
})

View File

@@ -58,10 +58,16 @@ type encodedColumn struct {
}
func encodeAllColConstraints(constraints []schema.ColConstraint) []encodedConstraint {
nomsConstraints := make([]encodedConstraint, len(constraints))
for i, c := range constraints {
nomsConstraints[i] = encodeColConstraint(c)
var nomsConstraints []encodedConstraint
seenNotNull := false
for _, c := range constraints {
if c.GetConstraintType() == schema.NotNullConstraintType {
if seenNotNull {
continue
}
seenNotNull = true
}
nomsConstraints = append(nomsConstraints, encodeColConstraint(c))
}
return nomsConstraints
@@ -72,11 +78,18 @@ func decodeAllColConstraint(encConstraints []encodedConstraint) []schema.ColCons
return nil
}
constraints := make([]schema.ColConstraint, len(encConstraints))
for i, nc := range encConstraints {
var constraints []schema.ColConstraint
seenNotNull := false
for _, nc := range encConstraints {
c := nc.decodeColConstraint()
constraints[i] = c
// Prevent duplicate NOT NULL constraints
if c.GetConstraintType() == schema.NotNullConstraintType {
if seenNotNull {
continue
}
seenNotNull = true
}
constraints = append(constraints, c)
}
return constraints
@@ -316,10 +329,12 @@ func (sd schemaData) addChecksIndexesAndPkOrderingToSchema(sch schema.Schema) er
func MarshalSchemaAsNomsValue(ctx context.Context, vrw types.ValueReadWriter, sch schema.Schema) (types.Value, error) {
// Anyone calling this is going to serialize this to disk, so it's our last line of defense against defective schemas.
// Business logic should catch errors before this point, but this is a failsafe.
err := schema.ValidateColumnConstraints(sch.GetAllCols())
if err != nil {
return nil, err
}
schema.RemoveDuplicateNotNullColumnConstraints(sch.GetAllCols())
err := schema.ValidateForInsert(sch.GetAllCols())
err = schema.ValidateForInsert(sch.GetAllCols())
if err != nil {
return nil, err
}

View File

@@ -39,6 +39,7 @@ type schemaImpl struct {
}
var ErrInvalidPkOrdinals = errors.New("incorrect number of primary key ordinals")
var ErrMultipleNotNullConstraints = errors.New("multiple not null constraints on same column")
// SchemaFromCols creates a Schema from a collection of columns
func SchemaFromCols(allCols *ColCollection) (Schema, error) {
@@ -90,23 +91,20 @@ func MustSchemaFromCols(typedColColl *ColCollection) Schema {
return sch
}
// RemoveDuplicateNotNullColumnConstraints removes any duplicate NOT NULL column constraints from schemas.
func RemoveDuplicateNotNullColumnConstraints(allCols *ColCollection) {
for i, col := range allCols.cols {
var newColConstraints []ColConstraint
// ValidateColumnConstraints removes any duplicate NOT NULL column constraints from schemas.
func ValidateColumnConstraints(allCols *ColCollection) error {
for _, col := range allCols.cols {
seenNotNull := false
for _, cc := range col.Constraints {
if cc.GetConstraintType() == NotNullConstraintType {
if !seenNotNull {
newColConstraints = append(newColConstraints, cc)
seenNotNull = true
if seenNotNull {
return ErrMultipleNotNullConstraints
}
} else {
newColConstraints = append(newColConstraints, cc)
seenNotNull = true
}
}
allCols.cols[i].Constraints = newColConstraints
}
return nil
}
// ValidateForInsert returns an error if the given schema cannot be written to the dolt database.