Prevent creating a table if it would clash with a nonlocal_tables rule.

This commit is contained in:
Nick Tobey
2025-10-22 16:20:25 -07:00
parent ecd150ab5e
commit 9ca908f441
2 changed files with 64 additions and 47 deletions
+60 -39
View File
@@ -61,6 +61,7 @@ var ErrReservedTableName = errors.NewKind("Invalid table name %s. Table names be
var ErrReservedDiffTableName = errors.NewKind("Invalid table name %s. Table names beginning with `__DATABASE__` are reserved for internal use")
var ErrSystemTableAlter = errors.NewKind("Cannot alter table %s: system tables cannot be dropped or altered")
var ErrInvalidNonlocalTableOptions = errors.NewKind("Invalid nonlocal table options %s: only valid value is 'immediate'.")
var ErrNonlocalTableName = errors.NewKind("Cannot create table name %s because it matches a name present in dolt_nonlocal_tables.")
type readNonlocalTablesFlag bool
@@ -943,32 +944,32 @@ func (db Database) getTableInsensitiveWithRoot(ctx *sql.Context, head *doltdb.Co
return resolveOverriddenNonexistentTable(ctx, tblName, db)
}
// getNonlocalTable checks whether the table name maps onto a table in another root via the dolt_nonlocal_tables system table
func (db Database) getNonlocalTable(ctx *sql.Context, root doltdb.RootValue, lwrName string) (sql.Table, bool, error) {
// getNonlocalTableEntry returns the first entry in the dolt_nonlocal_tables system table that matches the provided table name.
func (db Database) getNonlocalTableEntry(ctx *sql.Context, root doltdb.RootValue, lwrName string) (doltdb.NonlocalTableEntry, bool, error) {
_, nonlocalsTable, nonlocalsTableExists, err := db.resolveUserTable(ctx, root, doltdb.GetNonlocalTablesTableName())
if err != nil {
return nil, false, err
return doltdb.NonlocalTableEntry{}, false, err
}
if !nonlocalsTableExists {
return nil, false, nil
return doltdb.NonlocalTableEntry{}, false, nil
}
index, err := nonlocalsTable.GetRowData(ctx)
if err != nil {
return nil, false, err
return doltdb.NonlocalTableEntry{}, false, err
}
nonlocalTablesSchema, err := nonlocalsTable.GetSchema(ctx)
if err != nil {
return nil, false, err
return doltdb.NonlocalTableEntry{}, false, err
}
m := durable.MapFromIndex(index)
keyDesc, valueDesc := nonlocalTablesSchema.GetMapDescriptors(m.NodeStore())
nonlocalTablesMap, err := m.IterAll(ctx)
if err != nil {
return nil, false, err
return doltdb.NonlocalTableEntry{}, false, err
}
var nonlocalTableEntry doltdb.NonlocalTableEntry
// check if there's an entry for this table. If so, resolve that reference.
@@ -978,52 +979,60 @@ func (db Database) getNonlocalTable(ctx *sql.Context, root doltdb.RootValue, lwr
break
}
if err != nil {
return nil, false, err
return doltdb.NonlocalTableEntry{}, false, err
}
nonlocalsEntryTableName, err := doltdb.GetNonlocalTablesNameColumn(ctx, keyDesc, keyTuple)
if err != nil {
return nil, false, err
return doltdb.NonlocalTableEntry{}, false, err
}
nonlocalsEntryTableName = strings.ToLower(nonlocalsEntryTableName)
matchesPattern, err := doltdb.MatchTablePattern(nonlocalsEntryTableName, lwrName)
if err != nil {
return nil, false, err
return doltdb.NonlocalTableEntry{}, false, err
}
if !matchesPattern {
continue
}
nonlocalTableEntry = doltdb.GetNonlocalTablesRef(ctx, valueDesc, valueTuple)
if nonlocalTableEntry.NewTableName == "" {
nonlocalTableEntry.NewTableName = lwrName
}
if nonlocalTableEntry.Ref == "" {
nonlocalTableEntry.Ref = db.revision
}
if nonlocalTableEntry.Options != "immediate" {
return nil, false, ErrInvalidNonlocalTableOptions.New(nonlocalTableEntry.Options)
}
// If the ref is a branch, we get the working set, not the head.
_, exists, err := isBranch(ctx, db, nonlocalTableEntry.Ref)
if exists {
referencedBranch, err := RevisionDbForBranch(ctx, db, nonlocalTableEntry.Ref, db.requestedName)
if err != nil {
return nil, false, err
if matchesPattern {
nonlocalTableEntry = doltdb.GetNonlocalTablesRef(ctx, valueDesc, valueTuple)
if nonlocalTableEntry.NewTableName == "" {
nonlocalTableEntry.NewTableName = lwrName
}
return referencedBranch.(Database).getTableInsensitive(ctx, nonlocalTableEntry.NewTableName, dontReadNonlocalTables)
} else {
// If we couldn't resolve it as a database revision, treat it as a noms ref.
// This lets us resolve branch heads like 'heads/$branchName' or remotes refs like '$remote/$branchName'
return db.getTableInsensitiveAsOf(ctx, nonlocalTableEntry.NewTableName, nonlocalTableEntry.Ref, false)
if nonlocalTableEntry.Ref == "" {
nonlocalTableEntry.Ref = db.revision
}
return nonlocalTableEntry, true, nil
}
}
return doltdb.NonlocalTableEntry{}, false, nil
}
return nil, false, nil
// getNonlocalTable checks whether the table name maps onto a table in another root via the dolt_nonlocal_tables system table
func (db Database) getNonlocalTable(ctx *sql.Context, root doltdb.RootValue, lwrName string) (sql.Table, bool, error) {
nonlocalTableEntry, found, err := db.getNonlocalTableEntry(ctx, root, lwrName)
if err != nil || !found {
return nil, found, err
}
if nonlocalTableEntry.Options != "immediate" {
return nil, false, ErrInvalidNonlocalTableOptions.New(nonlocalTableEntry.Options)
}
// If the ref is a branch, we get the working set, not the head.
_, exists, err := isBranch(ctx, db, nonlocalTableEntry.Ref)
if exists {
referencedBranch, err := RevisionDbForBranch(ctx, db, nonlocalTableEntry.Ref, db.requestedName)
if err != nil {
return nil, false, err
}
return referencedBranch.(Database).getTableInsensitive(ctx, nonlocalTableEntry.NewTableName, dontReadNonlocalTables)
} else {
// If we couldn't resolve it as a database revision, treat it as a noms ref.
// This lets us resolve branch heads like 'heads/$branchName' or remotes refs like '$remote/$branchName'
return db.getTableInsensitiveAsOf(ctx, nonlocalTableEntry.NewTableName, nonlocalTableEntry.Ref, false)
}
}
// getRootForNonlocalRef resolves a reference found in a dolt_nonlocal_tables entry.
@@ -1807,6 +1816,18 @@ func (db Database) CreateTable(ctx *sql.Context, tableName string, sch sql.Prima
return ErrInvalidTableName.New(tableName)
}
root, err := db.GetRoot(ctx)
if err != nil {
return err
}
_, hasNonlocalTable, err := db.getNonlocalTableEntry(ctx, root, tableName)
if err != nil {
return err
}
if hasNonlocalTable {
return ErrNonlocalTableName.New(tableName)
}
return db.createSqlTable(ctx, tableName, db.schemaName, sch, collation, comment)
}
+4 -8
View File
@@ -357,20 +357,16 @@ SQL
! [[ "$output" =~ "table_alias_missing" ]] || false
}
@test "nonlocal: creating a nonlocal table creates it on the appropriate branch" {
skip
@test "nonlocal: creating a table matching a nonlocal table rule results in an error" {
dolt checkout -b other
dolt sql <<SQL
INSERT INTO dolt_nonlocal_tables(table_name, target_ref, options) VALUES
("nonlocal_table", "main", "immediate");
CREATE TABLE nonlocal_table (pk char(8) PRIMARY KEY);
INSERT INTO nonlocal_table VALUES ("amzmapqt");
SQL
run dolt ls main
[ "$status" -eq 0 ]
[[ "$output" =~ "nonlocal_table" ]] || false
run dolt sql -q "CREATE TABLE nonlocal_table (pk char(8) PRIMARY KEY);"
[ "$status" -eq 1 ]
[[ "$output" =~ "Cannot create table name nonlocal_table because it matches a name present in dolt_nonlocal_tables." ]] || false
}
@test "nonlocal: adding an existing table to nonlocal tables errors" {