Started defining new schema for dolt_schemas

This commit is contained in:
Zach Musgrave
2023-02-14 18:44:21 -08:00
parent 8101359df1
commit 938ea2d4df
3 changed files with 111 additions and 43 deletions

View File

@@ -224,17 +224,19 @@ const (
// SchemasTableName is the name of the dolt schema fragment table
SchemasTableName = "dolt_schemas"
// SchemasTablesIdCol is an incrementing integer that represents the insertion index.
// Deprecated: This column is no longer used and will be removed in a future release.
SchemasTablesIdCol = "id"
// Currently: `view` or `trigger`.
// SchemasTablesTypeCol is the name of the column that stores the type of a schema fragment in the dolt_schemas table
SchemasTablesTypeCol = "type"
// The name of the database entity.
// SchemasTablesNameCol The name of the column that stores the name of a schema fragment in the dolt_schemas table
SchemasTablesNameCol = "name"
// The schema fragment associated with the database entity.
// For example, the SELECT statement for a CREATE VIEW.
// SchemasTablesFragmentCol The name of the column that stores the SQL fragment of a schema element in the
// dolt_schemas table
SchemasTablesFragmentCol = "fragment"
// The extra information for schema; currently contains creation time for triggers and views
// SchemasTablesExtraCol The name of the column that stores extra information about a schema element in the
// dolt_schemas table
SchemasTablesExtraCol = "extra"
// The name of the index that is on the table.
//
SchemasTablesIndexName = "fragment_name"
)

View File

@@ -52,6 +52,10 @@ var (
StringDefaultType = &varStringType{gmstypes.MustCreateStringWithDefaults(sqltypes.VarChar, 16383)}
)
func CreateVarStringTypeFromSqlType(stringType sql.StringType) TypeInfo {
return &varStringType{stringType}
}
func CreateVarStringTypeFromParams(params map[string]string) (TypeInfo, error) {
var length int64
var collation sql.CollationID

View File

@@ -21,6 +21,7 @@ import (
"github.com/dolthub/go-mysql-server/sql"
gmstypes "github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/vitess/go/vt/proto/query"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
@@ -33,7 +34,6 @@ import (
)
var errDoltSchemasTableFormat = fmt.Errorf("`%s` schema in unexpected format", doltdb.SchemasTableName)
var noSchemaIndexDefined = fmt.Errorf("could not find index `%s` on system table `%s`", doltdb.SchemasTablesIndexName, doltdb.SchemasTableName)
const (
viewFragment = "view"
@@ -44,6 +44,46 @@ type Extra struct {
CreatedAt int64
}
func mustNewColWithTypeInfo(name string, tag uint64, typeInfo typeinfo.TypeInfo, partOfPK bool, defaultVal string, autoIncrement bool, comment string, constraints ...schema.ColConstraint) schema.Column {
col, err := schema.NewColumnWithTypeInfo(name, tag, typeInfo,partOfPK, defaultVal, autoIncrement, comment, constraints...)
if err != nil {
panic(err)
}
return col
}
func mustCreateStringType(baseType query.Type, length int64, collation sql.CollationID) sql.StringType {
ti, err := gmstypes.CreateString(baseType, length, collation)
if err != nil {
panic(err)
}
return ti
}
// old dolt_schemas columns, only used for migration
var schemasTableColsWithId = schema.NewColCollection(
mustNewColWithTypeInfo(doltdb.SchemasTablesTypeCol, schema.DoltSchemasTypeTag, typeinfo.StringDefaultType, false, "", false, ""),
mustNewColWithTypeInfo(doltdb.SchemasTablesNameCol, schema.DoltSchemasNameTag, typeinfo.StringDefaultType, false, "", false, ""),
mustNewColWithTypeInfo(doltdb.SchemasTablesFragmentCol, schema.DoltSchemasFragmentTag, typeinfo.StringDefaultType, false, "", false, ""),
mustNewColWithTypeInfo(doltdb.SchemasTablesIdCol, schema.DoltSchemasIdTag, typeinfo.Int64Type, true, "", false, "", schema.NotNullConstraint{}),
mustNewColWithTypeInfo(doltdb.SchemasTablesExtraCol, schema.DoltSchemasExtraTag, typeinfo.JSONType, false, "", false, ""),
)
var schemaTableWithIdColSchema = schema.MustSchemaFromCols(schemasTableColsWithId)
// dolt_schemas columns
var schemasTableCols = schema.NewColCollection(
mustNewColWithTypeInfo(doltdb.SchemasTablesTypeCol, schema.DoltSchemasTypeTag, typeinfo.CreateVarStringTypeFromSqlType(mustCreateStringType(query.Type_VARCHAR, 64, sql.Collation_utf8mb4_0900_ai_ci)), true, "", false, ""),
mustNewColWithTypeInfo(doltdb.SchemasTablesNameCol, schema.DoltSchemasNameTag, typeinfo.CreateVarStringTypeFromSqlType(mustCreateStringType(query.Type_VARCHAR, 64, sql.Collation_utf8mb4_0900_ai_ci)), true, "", false, ""),
mustNewColWithTypeInfo(doltdb.SchemasTablesFragmentCol, schema.DoltSchemasFragmentTag, typeinfo.CreateVarStringTypeFromSqlType(gmstypes.LongText), false, "", false, ""),
mustNewColWithTypeInfo(doltdb.SchemasTablesExtraCol, schema.DoltSchemasExtraTag, typeinfo.JSONType, false, "", false, ""),
)
var schemaTableSchema = schema.MustSchemaFromCols(schemasTableCols)
var schemaTableKd = schemaTableSchema.GetKeyDescriptor()
var schemaTabldVd = schemaTableSchema.GetValueDescriptor()
// The fixed dolt schema for the `dolt_schemas` table.
func SchemasTableSchema() schema.Schema {
typeCol, err := schema.NewColumnWithTypeInfo(doltdb.SchemasTablesTypeCol, schema.DoltSchemasTypeTag, typeinfo.StringDefaultType, false, "", false, "")
@@ -79,8 +119,8 @@ func GetOrCreateDoltSchemasTable(ctx *sql.Context, db Database) (retTbl *Writabl
var rowsToAdd []sql.Row
if found {
schemasTable := tbl.(*WritableDoltTable)
// Old schemas table does not contain the `id` or `extra` column.
if !tbl.Schema().Contains(doltdb.SchemasTablesIdCol, doltdb.SchemasTableName) || !tbl.Schema().Contains(doltdb.SchemasTablesExtraCol, doltdb.SchemasTableName) {
// Old schemas table contains the `id` column.
if tbl.Schema().Contains(doltdb.SchemasTablesIdCol, doltdb.SchemasTableName) {
root, err := db.GetRoot(ctx)
if err != nil {
return nil, err
@@ -165,40 +205,61 @@ func migrateOldSchemasTableToNew(
[]sql.Row,
error,
) {
// Copy all of the old data over and add an index column and an extra column
var rowsToAdd []sql.Row
table, err := schemasTable.DoltTable.DoltTable(ctx)
if err != nil {
return nil, nil, err
}
rowData, err := table.GetNomsRowData(ctx)
rowData, err := table.GetRowData(ctx)
if err != nil {
return nil, nil, err
}
id := int64(1)
err = rowData.IterAll(ctx, func(key, val types.Value) error {
dRow, err := row.FromNoms(schemasTable.sch, key.(types.Tuple), val.(types.Tuple))
// Copy all of the old row data over minus the id column
var rowsToAdd []sql.Row
if types.IsFormat_DOLT(rowData.Format()) {
pm := durable.ProllyMapFromIndex(rowData)
iter, err := pm.IterAll(ctx)
if err != nil {
return err
return nil, nil, err
}
sqlRow, err := sqlutil.DoltRowToSqlRow(dRow, schemasTable.sch)
for {
_, v, err := iter.Next(ctx)
if err != nil {
return nil, nil, err
}
}
} else {
nomsMap := durable.NomsMapFromIndex(rowData)
id := int64(1)
err = nomsMap.IterAll(ctx, func(key, val types.Value) error {
dRow, err := row.FromNoms(schemasTable.sch, key.(types.Tuple), val.(types.Tuple))
if err != nil {
return err
}
sqlRow, err := sqlutil.DoltRowToSqlRow(dRow, schemasTable.sch)
if err != nil {
return err
}
// append the new id to row, if missing
if !schemasTable.sqlSchema().Contains(doltdb.SchemasTablesIdCol, doltdb.SchemasTableName) {
sqlRow = append(sqlRow, id)
}
// append the extra cols to row
sqlRow = append(sqlRow, nil)
rowsToAdd = append(rowsToAdd, sqlRow)
id++
return nil
})
if err != nil {
return err
return nil, nil, err
}
// append the new id to row, if missing
if !schemasTable.sqlSchema().Contains(doltdb.SchemasTablesIdCol, doltdb.SchemasTableName) {
sqlRow = append(sqlRow, id)
}
// append the extra cols to row
sqlRow = append(sqlRow, nil)
rowsToAdd = append(rowsToAdd, sqlRow)
id++
return nil
})
if err != nil {
return nil, nil, err
}
err = db.DropTable(ctx, doltdb.SchemasTableName)
if err != nil {
return nil, nil, err
@@ -256,20 +317,21 @@ func nextSchemasTableIndex(ctx *sql.Context, root *doltdb.RootValue) (int64, err
// fragFromSchemasTable returns the row with the given schema fragment if it exists.
func fragFromSchemasTable(ctx *sql.Context, tbl *WritableDoltTable, fragType string, name string) (sql.Row, bool, error) {
indexes, err := tbl.GetIndexes(ctx)
if err != nil {
return nil, false, err
}
// indexes, err := tbl.GetIndexes(ctx)
// if err != nil {
// return nil, false, err
// }
var fragNameIndex sql.Index
for _, index := range indexes {
if index.ID() == doltdb.SchemasTablesIndexName {
fragNameIndex = index
break
}
}
if fragNameIndex == nil {
return nil, false, noSchemaIndexDefined
}
// TODO: replace with primary key lookup
// for _, index := range indexes {
// if index.ID() == doltdb.SchemasTablesIndexName {
// fragNameIndex = index
// break
// }
// }
// if fragNameIndex == nil {
// return nil, false, noSchemaIndexDefined
// }
exprs := fragNameIndex.Expressions()
lookup, err := sql.NewIndexBuilder(fragNameIndex).Equals(ctx, exprs[0], fragType).Equals(ctx, exprs[1], name).Build(ctx)