go/doltcore/{doltdb, sqle}: first pass at dolt_schema_conflicts system table

This commit is contained in:
Andy Arthur
2023-04-18 14:55:29 -07:00
parent 256395b3bc
commit abe3fdbcdc
9 changed files with 309 additions and 2 deletions

View File

@@ -353,7 +353,24 @@ func (rcv *MergeState) FromCommitSpecStr() []byte {
return nil
}
const MergeStateNumFields = 3
func (rcv *MergeState) SchemaConflictTables(j int) []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.ByteVector(a + flatbuffers.UOffsetT(j*4))
}
return nil
}
func (rcv *MergeState) SchemaConflictTablesLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
const MergeStateNumFields = 4
func MergeStateStart(builder *flatbuffers.Builder) {
builder.StartObject(MergeStateNumFields)
@@ -373,6 +390,12 @@ func MergeStateStartFromCommitAddrVector(builder *flatbuffers.Builder, numElems
func MergeStateAddFromCommitSpecStr(builder *flatbuffers.Builder, fromCommitSpecStr flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(fromCommitSpecStr), 0)
}
func MergeStateAddSchemaConflictTables(builder *flatbuffers.Builder, schemaConflictTables flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(schemaConflictTables), 0)
}
func MergeStateStartSchemaConflictTablesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(4, numElems, 4)
}
func MergeStateEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

View File

@@ -277,6 +277,9 @@ const (
// TableOfTablesWithViolationsName is the constraint violations system table name
TableOfTablesWithViolationsName = "dolt_constraint_violations"
// SchemaConflictsTableName is the schema conflicts system table name
SchemaConflictsTableName = "dolt_schema_conflicts"
// BranchesTableName is the branches system table name
BranchesTableName = "dolt_branches"

View File

@@ -19,6 +19,8 @@ import (
"fmt"
"time"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
@@ -32,6 +34,16 @@ type MergeState struct {
// the spec string that was used to specify |commit|
commitSpecStr string
preMergeWorking *RootValue
sourceCommit hash.Hash
schemaConflictTables []string
}
type SchemaConflict struct {
ToSch, FromSch schema.Schema
ToFks, FromFks []ForeignKey
ToParentSchemas map[string]schema.Schema
FromParentSchemas map[string]schema.Schema
}
// TodoWorkingSetMeta returns an incomplete WorkingSetMeta, suitable for methods that don't have the means to construct
@@ -63,6 +75,73 @@ func (m MergeState) PreMergeWorkingRoot() *RootValue {
return m.preMergeWorking
}
type SchemaConflictFn func(table string, conflict SchemaConflict) error
func (m MergeState) HasSchemaConflicts() bool {
return len(m.schemaConflictTables) > 0
}
func (m MergeState) IterSchemaConflicts(ctx context.Context, ddb *DoltDB, cb SchemaConflictFn) error {
var to, from *RootValue
to = m.preMergeWorking
rcm, err := ddb.ReadCommit(ctx, m.sourceCommit)
if err != nil {
return err
}
if from, err = rcm.GetRootValue(ctx); err != nil {
return err
}
toFKs, err := to.GetForeignKeyCollection(ctx)
if err != nil {
return err
}
toSchemas, err := to.GetAllSchemas(ctx)
if err != nil {
return err
}
fromFKs, err := from.GetForeignKeyCollection(ctx)
if err != nil {
return err
}
fromSchemas, err := from.GetAllSchemas(ctx)
if err != nil {
return err
}
for _, name := range m.schemaConflictTables {
var toTbl, fromTbl *Table
if toTbl, _, err = to.GetTable(ctx, name); err != nil {
return err
}
// todo: rename resolution
if fromTbl, _, err = from.GetTable(ctx, name); err != nil {
return err
}
var sc SchemaConflict
if sc.ToSch, err = toTbl.GetSchema(ctx); err != nil {
return err
}
if sc.FromSch, err = fromTbl.GetSchema(ctx); err != nil {
return err
}
sc.ToFks, _ = toFKs.KeysForTable(name)
sc.ToParentSchemas = toSchemas
sc.FromFks, _ = fromFKs.KeysForTable(name)
sc.FromParentSchemas = fromSchemas
if err = cb(name, sc); err != nil {
return err
}
}
return nil
}
type WorkingSet struct {
Name string
meta *datas.WorkingSetMeta

View File

@@ -369,6 +369,8 @@ func (db Database) getTableInsensitive(ctx *sql.Context, head *doltdb.Commit, ds
dt, found = dtables.NewTableOfTablesInConflict(ctx, db.name, db.ddb), true
case doltdb.TableOfTablesWithViolationsName:
dt, found = dtables.NewTableOfTablesConstraintViolations(ctx, root), true
case doltdb.SchemaConflictsTableName:
dt, found = dtables.NewSchemaConflictsTable(ctx, db.name, db.ddb), true
case doltdb.BranchesTableName:
dt, found = dtables.NewBranchesTable(ctx, db.ddb), true
case doltdb.RemoteBranchesTableName:

View File

@@ -0,0 +1,175 @@
// Copyright 2020 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dtables
import (
"errors"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
)
var _ sql.Table = (*SchemaConflictsTable)(nil)
// SchemaConflictsTable is a sql.Table implementation that implements a system table which shows the current conflicts
type SchemaConflictsTable struct {
dbName string
ddb *doltdb.DoltDB
}
// NewSchemaConflictsTable creates a SchemaConflictsTable
func NewSchemaConflictsTable(_ *sql.Context, dbName string, ddb *doltdb.DoltDB) sql.Table {
return &SchemaConflictsTable{dbName: dbName, ddb: ddb}
}
// Name is a sql.Table interface function which returns the name of the table which is defined by the constant
// SchemaConflictsTableName
func (dt *SchemaConflictsTable) Name() string {
return doltdb.SchemaConflictsTableName
}
// String is a sql.Table interface function which returns the name of the table which is defined by the constant
// SchemaConflictsTableName
func (dt *SchemaConflictsTable) String() string {
return doltdb.SchemaConflictsTableName
}
// Schema is a sql.Table interface function that gets the sql.Schema of the log system table.
func (dt *SchemaConflictsTable) Schema() sql.Schema {
return []*sql.Column{
{Name: "table", Type: types.Text, Source: doltdb.SchemaConflictsTableName, PrimaryKey: true},
{Name: "from_schema", Type: types.Text, Source: doltdb.SchemaConflictsTableName, PrimaryKey: false},
{Name: "to_schema", Type: types.Text, Source: doltdb.SchemaConflictsTableName, PrimaryKey: false},
{Name: "description", Type: types.Text, Source: doltdb.SchemaConflictsTableName, PrimaryKey: false},
}
}
// Collation implements the sql.Table interface.
func (dt *SchemaConflictsTable) Collation() sql.CollationID {
return sql.Collation_Default
}
// Partitions is a sql.Table interface function that returns a partition of the data. Conflict data is partitioned by table.
func (dt *SchemaConflictsTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error) {
sess := dsess.DSessFromSess(ctx.Session)
ws, err := sess.WorkingSet(ctx, dt.dbName)
if err != nil {
return nil, err
}
dbd, _ := sess.GetDbData(ctx, dt.dbName)
if ws.MergeState() == nil || !ws.MergeState().HasSchemaConflicts() {
return sql.PartitionsToPartitionIter(), nil
}
return sql.PartitionsToPartitionIter(schemaConflictsPartition{
state: ws.MergeState(),
ddb: dbd.Ddb,
}), nil
}
// PartitionRows is a sql.Table interface function that gets a row iterator for a partition
func (dt *SchemaConflictsTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
p, ok := part.(schemaConflictsPartition)
if !ok {
return nil, errors.New("unexpected partition for schema conflicts table")
}
var conflicts []schemaConflict
err := p.state.IterSchemaConflicts(ctx, p.ddb, func(table string, cnf doltdb.SchemaConflict) error {
c, err := newSchemaConflict(table, cnf)
if err != nil {
return err
}
conflicts = append(conflicts, c)
return nil
})
if err != nil {
return nil, err
}
return &schemaConflictsIter{
conflicts: conflicts,
}, nil
}
type schemaConflictsPartition struct {
state *doltdb.MergeState
ddb *doltdb.DoltDB
}
func (p schemaConflictsPartition) Key() []byte {
return []byte(doltdb.SchemaConflictsTableName)
}
type schemaConflict struct {
table string
toSch, fromSch string
description string
}
func newSchemaConflict(table string, c doltdb.SchemaConflict) (schemaConflict, error) {
to, err := getCreateTableStatement(table, c.ToSch, c.ToFks, c.ToParentSchemas)
if err != nil {
return schemaConflict{}, err
}
from, err := getCreateTableStatement(table, c.FromSch, c.FromFks, c.FromParentSchemas)
if err != nil {
return schemaConflict{}, err
}
return schemaConflict{
table: table,
toSch: to,
fromSch: from,
// todo(andy): description
}, nil
}
func getCreateTableStatement(table string, sch schema.Schema, fks []doltdb.ForeignKey, parents map[string]schema.Schema) (string, error) {
pkSch, err := sqlutil.FromDoltSchema(table, sch)
if err != nil {
return "", err
}
return diff.GenerateCreateTableStatement(table, sch, pkSch, fks, parents)
}
type schemaConflictsIter struct {
conflicts []schemaConflict
toSchemas map[string]schema.Schema
fromSchemas map[string]schema.Schema
}
func (it *schemaConflictsIter) Next(ctx *sql.Context) (sql.Row, error) {
if len(it.conflicts) == 0 {
return nil, io.EOF
}
c := it.conflicts[0] // pop next conflict
it.conflicts = it.conflicts[1:]
return sql.NewRow(c.table, c.toSch, c.fromSch, c.description), nil
}
func (it *schemaConflictsIter) Close(ctx *sql.Context) error {
it.conflicts = nil
return nil
}

View File

@@ -1324,6 +1324,17 @@ func TestDoltMergeArtifacts(t *testing.T) {
}
}
func TestDoltSchemaConflicts(t *testing.T) {
if !types.IsFormat_DOLT(types.Format_Default) {
t.Skip()
}
for _, script := range SchemaConflictScripts {
h := newDoltHarness(t)
enginetest.TestScript(t, h, script)
h.Close()
}
}
// these tests are temporary while there is a difference between the old format
// and new format merge behaviors.
func TestOldFormatMergeConflictsAndCVs(t *testing.T) {

View File

@@ -2880,6 +2880,18 @@ var MergeArtifactsScripts = []queries.ScriptTest{
},
}
var SchemaConflictScripts = []queries.ScriptTest{
{
Name: "Schema Conflict smoke test",
Assertions: []queries.ScriptTestAssertion{
{
Query: "select * from dolt_schema_conflicts",
Expected: []sql.Row{},
},
},
},
}
// OldFormatMergeConflictsAndCVsScripts tests old format merge behavior
// where violations are appended and merges are aborted if there are existing
// violations and/or conflicts.

View File

@@ -36,6 +36,8 @@ table MergeState {
// The spec that was used to identify the commit that we are merging. Optional
// for backwards compatibility.
from_commit_spec_str:string;
schema_conflict_tables:[string];
}
// KEEP THIS IN SYNC WITH fileidentifiers.go

View File

@@ -324,7 +324,7 @@ func LoadCommitAddr(ctx context.Context, vr types.ValueReader, addr hash.Hash) (
return nil, err
}
if v == nil {
return nil, errors.New("target commit not found")
return nil, errors.New("target commit (" + addr.String() + ") not found")
}
return commitFromValue(vr.Format(), v)
}