[no-release-notes] implement dolt_diff system table and table function for new storage format (#3401)

* first pass at prolly diff iter

* implement dolt_diff system table and table function for new storage format

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* fix schema test and copyright

* also fix commit_diff_table

* pr edits

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* fix copyright

Co-authored-by: Andy Arthur <andy@dolthub.com>
Co-authored-by: druvv <druvv@users.noreply.github.com>
This commit is contained in:
Dhruv Sringari
2022-05-13 15:46:51 -07:00
committed by GitHub
parent d84b9a6cd2
commit db297cff8f
8 changed files with 717 additions and 240 deletions

View File

@@ -18,7 +18,6 @@ import (
"fmt"
"io"
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
@@ -42,8 +41,9 @@ type DiffTableFunction struct {
database sql.Database
sqlSch sql.Schema
joiner *rowconv.Joiner
toSch schema.Schema
fromSch schema.Schema
toSch schema.Schema
diffTableSch schema.Schema
}
// NewInstance implements the TableFunction interface
@@ -104,7 +104,7 @@ func (dtf *DiffTableFunction) WithExpressions(expression ...sql.Expression) (sql
return nil, err
}
dtf.sqlSch, err = dtf.generateSchema(tableName, fromCommitVal, toCommitVal)
err = dtf.generateSchema(tableName, fromCommitVal, toCommitVal)
if err != nil {
return nil, err
}
@@ -127,10 +127,6 @@ func (dtf *DiffTableFunction) RowIter(ctx *sql.Context, _ sql.Row) (sql.RowIter,
return nil, err
}
if dtf.joiner == nil {
panic("schema and joiner haven't been initialized")
}
sqledb, ok := dtf.database.(Database)
if !ok {
panic("unable to get dolt database")
@@ -258,9 +254,9 @@ func (dtf *DiffTableFunction) evaluateArguments() (string, interface{}, interfac
return tableName, fromCommitVal, toCommitVal, nil
}
func (dtf *DiffTableFunction) generateSchema(tableName string, fromCommitVal, toCommitVal interface{}) (sql.Schema, error) {
func (dtf *DiffTableFunction) generateSchema(tableName string, fromCommitVal, toCommitVal interface{}) error {
if !dtf.Resolved() {
return nil, nil
return nil
}
sqledb, ok := dtf.database.(Database)
@@ -270,81 +266,62 @@ func (dtf *DiffTableFunction) generateSchema(tableName string, fromCommitVal, to
fromRoot, err := sqledb.rootAsOf(dtf.ctx, fromCommitVal)
if err != nil {
return nil, err
return err
}
fromTable, _, ok, err := fromRoot.GetTableInsensitive(dtf.ctx, tableName)
if err != nil {
return nil, err
return err
}
if !ok {
return nil, sql.ErrTableNotFound.New(tableName)
return sql.ErrTableNotFound.New(tableName)
}
toRoot, err := sqledb.rootAsOf(dtf.ctx, toCommitVal)
if err != nil {
return nil, err
return err
}
toTable, _, ok, err := toRoot.GetTableInsensitive(dtf.ctx, tableName)
if err != nil {
return nil, err
return err
}
if !ok {
return nil, sql.ErrTableNotFound.New(tableName)
return sql.ErrTableNotFound.New(tableName)
}
fromSchema, err := fromTable.GetSchema(dtf.ctx)
if err != nil {
return nil, err
return err
}
toSchema, err := toTable.GetSchema(dtf.ctx)
if err != nil {
return nil, err
return err
}
fromSchema = schema.MustSchemaFromCols(
fromSchema.GetAllCols().Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false)))
dtf.fromSch = fromSchema
toSchema = schema.MustSchemaFromCols(
toSchema.GetAllCols().Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false)))
dtf.toSch = toSchema
joiner, err := rowconv.NewJoiner(
[]rowconv.NamedSchema{{Name: diff.To, Sch: toSchema}, {Name: diff.From, Sch: fromSchema}},
map[string]rowconv.ColNamingFunc{
diff.To: diff.ToColNamer,
diff.From: diff.FromColNamer,
})
diffTableSch, j, err := dtables.GetDiffTableSchemaAndJoiner(toTable.Format(), fromSchema, toSchema)
if err != nil {
return nil, err
return err
}
sch := joiner.GetSchema()
sch = schema.MustSchemaFromCols(
sch.GetAllCols().Append(
schema.NewColumn("diff_type", schema.DiffTypeTag, types.StringKind, false)))
dtf.joiner = j
// TODO: sql.Columns include a Source that indicates the table it came from, but we don't have a real table
// when the column comes from a table function, so we omit the table name when we create these columns.
// This allows column projections to work correctly with table functions, but we will need to add a
// unique id (e.g. hash generated from method arguments) when we add support for aliasing and joining
// table functions in order for the analyzer to determine which table function result a column comes from.
sqlSchema, err := sqlutil.FromDoltSchema("", sch)
sqlSchema, err := sqlutil.FromDoltSchema("", diffTableSch)
if err != nil {
return nil, err
return err
}
dtf.joiner = joiner
dtf.sqlSch = sqlSchema.Schema
return sqlSchema.Schema, nil
return nil
}
// Schema implements the sql.Node interface

View File

@@ -24,7 +24,6 @@ import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
@@ -66,36 +65,16 @@ func NewCommitDiffTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, ro
return nil, err
}
sch = schema.MustSchemaFromCols(sch.GetAllCols().Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false)))
if sch.GetAllCols().Size() <= 1 {
return nil, sql.ErrTableNotFound.New(diffTblName)
}
j, err := rowconv.NewJoiner(
[]rowconv.NamedSchema{{Name: diff.To, Sch: sch}, {Name: diff.From, Sch: sch}},
map[string]rowconv.ColNamingFunc{
diff.To: diff.ToColNamer,
diff.From: diff.FromColNamer,
})
diffTableSchema, j, err := GetDiffTableSchemaAndJoiner(ddb.Format(), sch, sch)
if err != nil {
return nil, err
}
sqlSch, err := sqlutil.FromDoltSchema(diffTblName, j.GetSchema())
sqlSch, err := sqlutil.FromDoltSchema(diffTblName, diffTableSchema)
if err != nil {
return nil, err
}
sqlSch.Schema = append(sqlSch.Schema, &sql.Column{
Name: diffTypeColName,
Type: sql.Text,
Nullable: false,
Source: diffTblName,
})
return &CommitDiffTable{
name: tblName,
ddb: ddb,

View File

@@ -0,0 +1,372 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dtables
import (
"context"
"errors"
"io"
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
type diffRowItr struct {
ad diff.RowDiffer
diffSrc *diff.RowDiffSource
joiner *rowconv.Joiner
sch schema.Schema
fromCommitInfo commitInfo
toCommitInfo commitInfo
}
var _ sql.RowIter = &diffRowItr{}
type commitInfo struct {
name types.String
date *types.Timestamp
nameTag uint64
dateTag uint64
}
func newNomsDiffIter(ctx *sql.Context, ddb *doltdb.DoltDB, joiner *rowconv.Joiner, dp DiffPartition) (*diffRowItr, error) {
fromData, fromSch, err := tableData(ctx, dp.from, ddb)
if err != nil {
return nil, err
}
toData, toSch, err := tableData(ctx, dp.to, ddb)
if err != nil {
return nil, err
}
fromConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), *dp.fromSch, fromSch)
if err != nil {
return nil, err
}
toConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), *dp.toSch, toSch)
if err != nil {
return nil, err
}
sch := joiner.GetSchema()
toCol, _ := sch.GetAllCols().GetByName(toCommit)
fromCol, _ := sch.GetAllCols().GetByName(fromCommit)
toDateCol, _ := sch.GetAllCols().GetByName(toCommitDate)
fromDateCol, _ := sch.GetAllCols().GetByName(fromCommitDate)
fromCmInfo := commitInfo{types.String(dp.fromName), dp.fromDate, fromCol.Tag, fromDateCol.Tag}
toCmInfo := commitInfo{types.String(dp.toName), dp.toDate, toCol.Tag, toDateCol.Tag}
rd := diff.NewRowDiffer(ctx, fromSch, toSch, 1024)
// TODO (dhruv) don't cast to noms map
rd.Start(ctx, durable.NomsMapFromIndex(fromData), durable.NomsMapFromIndex(toData))
warnFn := func(code int, message string, args ...string) {
ctx.Warn(code, message, args)
}
src := diff.NewRowDiffSource(rd, joiner, warnFn)
src.AddInputRowConversion(fromConv, toConv)
return &diffRowItr{
ad: rd,
diffSrc: src,
joiner: joiner,
sch: joiner.GetSchema(),
fromCommitInfo: fromCmInfo,
toCommitInfo: toCmInfo,
}, nil
}
// Next returns the next row
func (itr *diffRowItr) Next(*sql.Context) (sql.Row, error) {
r, _, err := itr.diffSrc.NextDiff()
if err != nil {
return nil, err
}
toAndFromRows, err := itr.joiner.Split(r)
if err != nil {
return nil, err
}
_, hasTo := toAndFromRows[diff.To]
_, hasFrom := toAndFromRows[diff.From]
r, err = r.SetColVal(itr.toCommitInfo.nameTag, types.String(itr.toCommitInfo.name), itr.sch)
if err != nil {
return nil, err
}
r, err = r.SetColVal(itr.fromCommitInfo.nameTag, types.String(itr.fromCommitInfo.name), itr.sch)
if err != nil {
return nil, err
}
if itr.toCommitInfo.date != nil {
r, err = r.SetColVal(itr.toCommitInfo.dateTag, *itr.toCommitInfo.date, itr.sch)
if err != nil {
return nil, err
}
}
if itr.fromCommitInfo.date != nil {
r, err = r.SetColVal(itr.fromCommitInfo.dateTag, *itr.fromCommitInfo.date, itr.sch)
if err != nil {
return nil, err
}
}
sqlRow, err := sqlutil.DoltRowToSqlRow(r, itr.sch)
if err != nil {
return nil, err
}
if hasTo && hasFrom {
sqlRow = append(sqlRow, diffTypeModified)
} else if hasTo && !hasFrom {
sqlRow = append(sqlRow, diffTypeAdded)
} else {
sqlRow = append(sqlRow, diffTypeRemoved)
}
return sqlRow, nil
}
// Close closes the iterator
func (itr *diffRowItr) Close(*sql.Context) (err error) {
defer itr.ad.Close()
defer func() {
closeErr := itr.diffSrc.Close()
if err == nil {
err = closeErr
}
}()
return nil
}
type commitInfo2 struct {
name string
ts *time.Time
}
type prollyDiffIter struct {
from, to prolly.Map
fromSch, toSch schema.Schema
targetFromSch, targetToSch schema.Schema
fromConverter, toConverter ProllyRowConverter
fromCm commitInfo2
toCm commitInfo2
rows chan sql.Row
errChan chan error
cancel context.CancelFunc
}
var _ sql.RowIter = prollyDiffIter{}
// newProllyDiffIter produces dolt_diff system table and dolt_diff table
// function rows. The rows first have the "to" columns on the left and the
// "from" columns on the right. After the "to" and "from" columns, a commit
// name, and commit date is also present. The final column is the diff_type
// column.
//
// An example: to_pk, to_col1, to_commit, to_commit_date, from_pk, from_col1, from_commit, from_commit_date, diff_type
//
// |targetFromSchema| and |targetToSchema| defines what the schema should be for
// the row data on the "from" or "to" side. In the above example, both schemas are
// identical with two columns "pk" and "col1". The dolt diff table function for
// example can provide two different schemas.
//
// The |from| and |to| tables in the DiffPartition may have different schemas
// than |targetFromSchema| or |targetToSchema|. We convert the rows from the
// schema of |from| to |targetFromSchema| and the schema of |to| to
// |targetToSchema|. See the tablediff_prolly package.
func newProllyDiffIter(ctx *sql.Context, dp DiffPartition, ddb *doltdb.DoltDB, targetFromSchema, targetToSchema schema.Schema) (prollyDiffIter, error) {
if schema.IsKeyless(targetToSchema) {
return prollyDiffIter{}, errors.New("diffs with keyless schema have not been implemented yet")
}
fromCm := commitInfo2{
name: dp.fromName,
ts: (*time.Time)(dp.fromDate),
}
toCm := commitInfo2{
name: dp.toName,
ts: (*time.Time)(dp.toDate),
}
// dp.from may be nil
f, fSch, err := tableData(ctx, dp.from, ddb)
if err != nil {
return prollyDiffIter{}, nil
}
from := durable.ProllyMapFromIndex(f)
t, tSch, err := tableData(ctx, dp.to, ddb)
if err != nil {
return prollyDiffIter{}, nil
}
to := durable.ProllyMapFromIndex(t)
fromConverter, err := NewProllyRowConverter(fSch, targetFromSchema)
if err != nil {
return prollyDiffIter{}, err
}
toConverter, err := NewProllyRowConverter(tSch, targetToSchema)
if err != nil {
return prollyDiffIter{}, err
}
child, cancel := context.WithCancel(ctx)
iter := prollyDiffIter{
from: from,
to: to,
fromSch: fSch,
toSch: tSch,
targetFromSch: targetFromSchema,
targetToSch: targetToSchema,
fromConverter: fromConverter,
toConverter: toConverter,
fromCm: fromCm,
toCm: toCm,
rows: make(chan sql.Row, 64),
errChan: make(chan error),
cancel: cancel,
}
go func() {
iter.queueRows(child)
}()
return iter, nil
}
func (itr prollyDiffIter) Next(ctx *sql.Context) (sql.Row, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-itr.errChan:
return nil, err
case r, ok := <-itr.rows:
if !ok {
return nil, io.EOF
}
return r, nil
}
}
func (itr prollyDiffIter) Close(ctx *sql.Context) error {
itr.cancel()
return nil
}
func (itr prollyDiffIter) queueRows(ctx context.Context) {
err := prolly.DiffMaps(ctx, itr.from, itr.to, func(ctx context.Context, d tree.Diff) error {
r, err := itr.makeDiffRow(d)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case itr.rows <- r:
return nil
}
})
if err != nil && err != io.EOF {
select {
case <-ctx.Done():
case itr.errChan <- err:
}
return
}
// we need to drain itr.rows before returning io.EOF
close(itr.rows)
}
// todo(andy): copy string fields
func (itr prollyDiffIter) makeDiffRow(d tree.Diff) (r sql.Row, err error) {
n := itr.targetFromSch.GetAllCols().Size()
m := itr.targetToSch.GetAllCols().Size()
// 2 commit names, 2 commit dates, 1 diff_type
r = make(sql.Row, n+m+5)
// todo (dhruv): implement warnings for row column value coercions.
if d.Type != tree.RemovedDiff {
err = itr.toConverter.PutConverted(val.Tuple(d.Key), val.Tuple(d.To), r[0:n])
if err != nil {
return nil, err
}
}
o := n
r[o] = itr.toCm.name
r[o+1] = itr.toCm.ts
if d.Type != tree.AddedDiff {
err = itr.fromConverter.PutConverted(val.Tuple(d.Key), val.Tuple(d.From), r[n+2:n+2+m])
if err != nil {
return nil, err
}
}
o = n + 2 + m
r[o] = itr.fromCm.name
r[o+1] = itr.fromCm.ts
r[o+2] = diffTypeString(d)
return r, nil
}
func diffTypeString(d tree.Diff) (s string) {
switch d.Type {
case tree.AddedDiff:
s = diffTypeAdded
case tree.ModifiedDiff:
s = diffTypeModified
case tree.RemovedDiff:
s = diffTypeRemoved
}
return
}

View File

@@ -56,11 +56,19 @@ type DiffTable struct {
workingRoot *doltdb.RootValue
head *doltdb.Commit
targetSch schema.Schema
joiner *rowconv.Joiner
// from and to need to be mapped to this schema
targetSch schema.Schema
// the schema for the diff table itself. Once from and to are converted to
// targetSch, the commit names and dates are inserted.
diffTableSch schema.Schema
sqlSch sql.PrimaryKeySchema
partitionFilters []sql.Expression
rowFilters []sql.Expression
// noms only
joiner *rowconv.Joiner
}
var PrimaryKeyChangeWarning = "cannot render full diff between commits %s and %s due to primary key set change"
@@ -82,48 +90,27 @@ func NewDiffTable(ctx *sql.Context, tblName string, ddb *doltdb.DoltDB, root *do
return nil, err
}
colCollection := sch.GetAllCols()
colCollection = colCollection.Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false))
sch = schema.MustSchemaFromCols(colCollection)
if sch.GetAllCols().Size() <= 1 {
return nil, sql.ErrTableNotFound.New(diffTblName)
}
j, err := rowconv.NewJoiner(
[]rowconv.NamedSchema{{Name: diff.To, Sch: sch}, {Name: diff.From, Sch: sch}},
map[string]rowconv.ColNamingFunc{
diff.To: diff.ToColNamer,
diff.From: diff.FromColNamer,
})
diffTableSchema, j, err := GetDiffTableSchemaAndJoiner(ddb.Format(), sch, sch)
if err != nil {
return nil, err
}
sqlSch, err := sqlutil.FromDoltSchema(diffTblName, j.GetSchema())
sqlSch, err := sqlutil.FromDoltSchema(diffTblName, diffTableSchema)
if err != nil {
return nil, err
}
sqlSch.Schema = append(sqlSch.Schema, &sql.Column{
Name: diffTypeColName,
Type: sql.Text,
Nullable: false,
Source: diffTblName,
})
return &DiffTable{
name: tblName,
ddb: ddb,
workingRoot: root,
head: head,
targetSch: sch,
joiner: j,
diffTableSch: diffTableSchema,
sqlSch: sqlSch,
partitionFilters: nil,
rowFilters: nil,
joiner: j,
}, nil
}
@@ -247,97 +234,6 @@ func tableData(ctx *sql.Context, tbl *doltdb.Table, ddb *doltdb.DoltDB) (durable
return data, sch, nil
}
var _ sql.RowIter = (*diffRowItr)(nil)
type diffRowItr struct {
ad diff.RowDiffer
diffSrc *diff.RowDiffSource
joiner *rowconv.Joiner
sch schema.Schema
fromCommitInfo commitInfo
toCommitInfo commitInfo
}
type commitInfo struct {
name types.String
date *types.Timestamp
nameTag uint64
dateTag uint64
}
// Next returns the next row
func (itr *diffRowItr) Next(*sql.Context) (sql.Row, error) {
r, _, err := itr.diffSrc.NextDiff()
if err != nil {
return nil, err
}
toAndFromRows, err := itr.joiner.Split(r)
if err != nil {
return nil, err
}
_, hasTo := toAndFromRows[diff.To]
_, hasFrom := toAndFromRows[diff.From]
r, err = r.SetColVal(itr.toCommitInfo.nameTag, types.String(itr.toCommitInfo.name), itr.sch)
if err != nil {
return nil, err
}
r, err = r.SetColVal(itr.fromCommitInfo.nameTag, types.String(itr.fromCommitInfo.name), itr.sch)
if err != nil {
return nil, err
}
if itr.toCommitInfo.date != nil {
r, err = r.SetColVal(itr.toCommitInfo.dateTag, *itr.toCommitInfo.date, itr.sch)
if err != nil {
return nil, err
}
}
if itr.fromCommitInfo.date != nil {
r, err = r.SetColVal(itr.fromCommitInfo.dateTag, *itr.fromCommitInfo.date, itr.sch)
if err != nil {
return nil, err
}
}
sqlRow, err := sqlutil.DoltRowToSqlRow(r, itr.sch)
if err != nil {
return nil, err
}
if hasTo && hasFrom {
sqlRow = append(sqlRow, diffTypeModified)
} else if hasTo && !hasFrom {
sqlRow = append(sqlRow, diffTypeAdded)
} else {
sqlRow = append(sqlRow, diffTypeRemoved)
}
return sqlRow, nil
}
// Close closes the iterator
func (itr *diffRowItr) Close(*sql.Context) (err error) {
defer itr.ad.Close()
defer func() {
closeErr := itr.diffSrc.Close()
if err == nil {
err = closeErr
}
}()
return nil
}
type TblInfoAtCommit struct {
name string
date *types.Timestamp
@@ -361,8 +257,9 @@ type DiffPartition struct {
fromName string
toDate *types.Timestamp
fromDate *types.Timestamp
toSch *schema.Schema
fromSch *schema.Schema
// fromSch and toSch are usually identical. It is the schema of the table at head.
toSch *schema.Schema
fromSch *schema.Schema
}
func NewDiffPartition(to, from *doltdb.Table, toName, fromName string, toDate, fromDate *types.Timestamp, toSch, fromSch *schema.Schema) *DiffPartition {
@@ -383,58 +280,11 @@ func (dp DiffPartition) Key() []byte {
}
func (dp DiffPartition) GetRowIter(ctx *sql.Context, ddb *doltdb.DoltDB, joiner *rowconv.Joiner) (sql.RowIter, error) {
fromData, fromSch, err := tableData(ctx, dp.from, ddb)
if err != nil {
return nil, err
if types.IsFormat_DOLT_1(ddb.Format()) {
return newProllyDiffIter(ctx, dp, ddb, *dp.fromSch, *dp.toSch)
} else {
return newNomsDiffIter(ctx, ddb, joiner, dp)
}
toData, toSch, err := tableData(ctx, dp.to, ddb)
if err != nil {
return nil, err
}
fromConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), *dp.fromSch, fromSch)
if err != nil {
return nil, err
}
toConv, err := dp.rowConvForSchema(ctx, ddb.ValueReadWriter(), *dp.toSch, toSch)
if err != nil {
return nil, err
}
sch := joiner.GetSchema()
toCol, _ := sch.GetAllCols().GetByName(toCommit)
fromCol, _ := sch.GetAllCols().GetByName(fromCommit)
toDateCol, _ := sch.GetAllCols().GetByName(toCommitDate)
fromDateCol, _ := sch.GetAllCols().GetByName(fromCommitDate)
fromCmInfo := commitInfo{types.String(dp.fromName), dp.fromDate, fromCol.Tag, fromDateCol.Tag}
toCmInfo := commitInfo{types.String(dp.toName), dp.toDate, toCol.Tag, toDateCol.Tag}
rd := diff.NewRowDiffer(ctx, fromSch, toSch, 1024)
// TODO (dhruv) don't cast to noms map
rd.Start(ctx, durable.NomsMapFromIndex(fromData), durable.NomsMapFromIndex(toData))
warnFn := func(code int, message string, args ...string) {
ctx.Warn(code, message, args)
}
src := diff.NewRowDiffSource(rd, joiner, warnFn)
src.AddInputRowConversion(fromConv, toConv)
return &diffRowItr{
ad: rd,
diffSrc: src,
joiner: joiner,
sch: joiner.GetSchema(),
fromCommitInfo: fromCmInfo,
toCommitInfo: toCmInfo,
}, nil
}
// isDiffablePartition checks if the commit pair for this partition is "diffable".
@@ -551,7 +401,16 @@ func (dps *DiffPartitions) processCommit(ctx *sql.Context, cmHash hash.Hash, cm
var nextPartition *DiffPartition
if tblHash != toInfoForCommit.tblHash {
partition := DiffPartition{toInfoForCommit.tbl, tbl, toInfoForCommit.name, cmHashStr, toInfoForCommit.date, &ts, &dps.toSch, &dps.fromSch}
partition := DiffPartition{
to: toInfoForCommit.tbl,
from: tbl,
toName: toInfoForCommit.name,
fromName: cmHashStr,
toDate: toInfoForCommit.date,
fromDate: &ts,
fromSch: &dps.fromSch,
toSch: &dps.toSch,
}
selected, err := dps.selectFunc(ctx, partition)
if err != nil {
@@ -636,3 +495,96 @@ func (dp DiffPartition) rowConvForSchema(ctx context.Context, vrw types.ValueRea
return rowconv.NewRowConverter(ctx, vrw, fm)
}
// GetDiffTableSchemaAndJoiner returns the schema for the diff table given a
// target schema for a row |sch|. In the old storage format, it also returns the
// associated joiner.
func GetDiffTableSchemaAndJoiner(format *types.NomsBinFormat, fromTargetSch, toTargetSch schema.Schema) (diffTableSchema schema.Schema, j *rowconv.Joiner, err error) {
if format == types.Format_DOLT_1 {
diffTableSchema, err = CalculateDiffSchema(fromTargetSch, toTargetSch)
if err != nil {
return nil, nil, err
}
} else {
colCollection := toTargetSch.GetAllCols()
colCollection = colCollection.Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false))
toTargetSch = schema.MustSchemaFromCols(colCollection)
colCollection = fromTargetSch.GetAllCols()
colCollection = colCollection.Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false))
fromTargetSch = schema.MustSchemaFromCols(colCollection)
j, err = rowconv.NewJoiner(
[]rowconv.NamedSchema{{Name: diff.To, Sch: toTargetSch}, {Name: diff.From, Sch: fromTargetSch}},
map[string]rowconv.ColNamingFunc{
diff.To: diff.ToColNamer,
diff.From: diff.FromColNamer,
})
if err != nil {
return nil, nil, err
}
diffTableSchema = j.GetSchema()
colCollection = diffTableSchema.GetAllCols()
colCollection = colCollection.Append(
schema.NewColumn(diffTypeColName, schema.DiffTypeTag, types.StringKind, false),
)
diffTableSchema = schema.MustSchemaFromCols(colCollection)
}
return
}
// CalculateDiffSchema returns the schema for the dolt_diff table based on the
// schemas from the from and to tables.
func CalculateDiffSchema(fromSch schema.Schema, toSch schema.Schema) (schema.Schema, error) {
colCollection := fromSch.GetAllCols()
colCollection = colCollection.Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false))
fromSch = schema.MustSchemaFromCols(colCollection)
colCollection = toSch.GetAllCols()
colCollection = colCollection.Append(
schema.NewColumn("commit", schema.DiffCommitTag, types.StringKind, false),
schema.NewColumn("commit_date", schema.DiffCommitDateTag, types.TimestampKind, false))
toSch = schema.MustSchemaFromCols(colCollection)
cols := make([]schema.Column, toSch.GetAllCols().Size()+fromSch.GetAllCols().Size()+1)
i := 0
err := toSch.GetAllCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
toCol, err := schema.NewColumnWithTypeInfo("to_"+col.Name, uint64(i), col.TypeInfo, false, col.Default, false, col.Comment)
if err != nil {
return true, err
}
cols[i] = toCol
i++
return false, nil
})
if err != nil {
return nil, err
}
j := toSch.GetAllCols().Size()
err = fromSch.GetAllCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
fromCol, err := schema.NewColumnWithTypeInfo("from_"+col.Name, uint64(i), col.TypeInfo, false, col.Default, false, col.Comment)
if err != nil {
return true, err
}
cols[j] = fromCol
j++
return false, nil
})
if err != nil {
return nil, err
}
cols[len(cols)-1] = schema.NewColumn("diff_type", schema.DiffTypeTag, types.StringKind, false)
return schema.UnkeyedSchemaFromCols(schema.NewColCollection(cols...)), nil
}

View File

@@ -0,0 +1,168 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dtables
import (
"fmt"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/val"
)
// ProllyRowConverter can be used to convert key, value val.Tuple's from |inSchema|
// to |outSchema|. Columns are matched based on names and primary key
// membership. The output of the conversion process is a sql.Row.
type ProllyRowConverter struct {
inSchema schema.Schema
outSchema schema.Schema
keyProj, valProj val.OrdinalMapping
keyDesc val.TupleDesc
valDesc val.TupleDesc
pkTargetTypes []sql.Type
nonPkTargetTypes []sql.Type
}
func NewProllyRowConverter(inSch, outSch schema.Schema) (ProllyRowConverter, error) {
keyProj, valProj, err := MapSchemaBasedOnName(inSch, outSch)
if err != nil {
return ProllyRowConverter{}, err
}
pkTargetTypes := make([]sql.Type, inSch.GetPKCols().Size())
nonPkTargetTypes := make([]sql.Type, inSch.GetNonPKCols().Size())
// Populate pkTargetTypes and nonPkTargetTypes with non-nil sql.Type if we need to do a type conversion
for i, j := range keyProj {
if j == -1 {
continue
}
inColType := inSch.GetPKCols().GetByIndex(i).TypeInfo.ToSqlType()
outColType := outSch.GetAllCols().GetByIndex(j).TypeInfo.ToSqlType()
if !inColType.Equals(outColType) {
pkTargetTypes[i] = outColType
}
}
for i, j := range valProj {
if j == -1 {
continue
}
inColType := inSch.GetNonPKCols().GetByIndex(i).TypeInfo.ToSqlType()
outColType := outSch.GetAllCols().GetByIndex(j).TypeInfo.ToSqlType()
if !inColType.Equals(outColType) {
nonPkTargetTypes[i] = outColType
}
}
kd, vd := prolly.MapDescriptorsFromScheam(inSch)
return ProllyRowConverter{
inSchema: inSch,
outSchema: outSch,
keyProj: keyProj,
valProj: valProj,
keyDesc: kd,
valDesc: vd,
pkTargetTypes: pkTargetTypes,
nonPkTargetTypes: nonPkTargetTypes,
}, nil
}
// PutConverted converts the |key| and |value| val.Tuple from |inSchema| to |outSchema|
// and places the converted row in |dstRow|.
func (c ProllyRowConverter) PutConverted(key, value val.Tuple, dstRow []interface{}) error {
for i, j := range c.keyProj {
if j == -1 {
continue
}
f, err := index.GetField(c.keyDesc, i, key)
if err != nil {
return err
}
if t := c.pkTargetTypes[i]; t != nil {
dstRow[j], err = t.Convert(f)
if err != nil {
return err
}
} else {
dstRow[j] = f
}
}
for i, j := range c.valProj {
if j == -1 {
continue
}
f, err := index.GetField(c.valDesc, i, value)
if err != nil {
return err
}
if t := c.nonPkTargetTypes[i]; t != nil {
dstRow[j], err = t.Convert(f)
if err != nil {
return err
}
} else {
dstRow[j] = f
}
}
return nil
}
// MapSchemaBasedOnName can be used to map column values from one schema to
// another schema. A column in |inSch| is mapped to |outSch| if they share the
// same name and primary key membership status. It returns ordinal mappings that
// can be use to map key, value val.Tuple's of schema |inSch| to a sql.Row of
// |outSch|. The first ordinal map is for keys, and the second is for values. If
// a column of |inSch| is missing in |outSch| then that column's index in the
// ordinal map holds -1.
func MapSchemaBasedOnName(inSch, outSch schema.Schema) (val.OrdinalMapping, val.OrdinalMapping, error) {
keyMapping := make(val.OrdinalMapping, inSch.GetPKCols().Size())
valMapping := make(val.OrdinalMapping, inSch.GetNonPKCols().Size())
err := inSch.GetPKCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
i := inSch.GetPKCols().TagToIdx[tag]
if col, ok := outSch.GetPKCols().GetByName(col.Name); ok {
j := outSch.GetAllCols().TagToIdx[col.Tag]
keyMapping[i] = j
} else {
return true, fmt.Errorf("could not map primary key column %s", col.Name)
}
return false, nil
})
if err != nil {
return nil, nil, err
}
err = inSch.GetNonPKCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
i := inSch.GetNonPKCols().TagToIdx[col.Tag]
if col, ok := outSch.GetNonPKCols().GetByName(col.Name); ok {
j := outSch.GetAllCols().TagToIdx[col.Tag]
valMapping[i] = j
} else {
valMapping[i] = -1
}
return false, nil
})
if err != nil {
return nil, nil, err
}
return keyMapping, valMapping, nil
}

View File

@@ -743,7 +743,6 @@ func TestHistorySystemTable(t *testing.T) {
}
func TestUnscopedDiffSystemTable(t *testing.T) {
skipNewFormat(t)
harness := newDoltHarness(t)
for _, test := range UnscopedDiffSystemTableScriptTests {
databases := harness.NewDatabases("mydb")

View File

@@ -1414,6 +1414,21 @@ var DiffSystemTableScriptTests = []enginetest.ScriptTest{
},
},
},
{
Name: "table with commit column should maintain its data in diff",
SetUpScript: []string{
"CREATE TABLE t (pk int PRIMARY KEY, commit text);",
"CALL dolt_commit('-am', 'creating table t');",
"INSERT INTO t VALUES (1, 'hi');",
"CALL dolt_commit('-am', 'insert data');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_commit, from_pk, from_commit, diff_type from dolt_diff_t;",
Expected: []sql.Row{{1, "hi", nil, nil, "added"}},
},
},
},
}
var DiffTableFunctionScriptTests = []enginetest.ScriptTest{
@@ -1774,6 +1789,21 @@ var DiffTableFunctionScriptTests = []enginetest.ScriptTest{
},
},
},
{
Name: "table with commit column should maintain its data in diff",
SetUpScript: []string{
"CREATE TABLE t (pk int PRIMARY KEY, commit text);",
"set @Commit1 = dolt_commit('-am', 'creating table t');",
"INSERT INTO t VALUES (1, 'hi');",
"set @Commit2 = dolt_commit('-am', 'insert data');",
},
Assertions: []enginetest.ScriptTestAssertion{
{
Query: "SELECT to_pk, to_commit, from_pk, from_commit, diff_type from dolt_diff('t', @Commit1, @Commit2);",
Expected: []sql.Row{{1, "hi", nil, nil, "added"}},
},
},
},
}
var UnscopedDiffSystemTableScriptTests = []enginetest.ScriptTest{

View File

@@ -795,7 +795,7 @@ var sqlDiffSchema = sql.Schema{
&sql.Column{Name: "from_first_name", Type: typeinfo.StringDefaultType.ToSqlType()},
&sql.Column{Name: "from_last_name", Type: typeinfo.StringDefaultType.ToSqlType()},
&sql.Column{Name: "from_addr", Type: typeinfo.StringDefaultType.ToSqlType()},
&sql.Column{Name: "diff_type", Type: sql.Text},
&sql.Column{Name: "diff_type", Type: typeinfo.StringDefaultType.ToSqlType()},
}
var SelectDiffTests = []SelectTest{