diff where and limit

This commit is contained in:
Brian Hendriks
2019-11-06 11:53:03 -08:00
committed by GitHub
parent 93bf491296
commit e188794ce7
9 changed files with 855 additions and 289 deletions

3
.gitignore vendored
View File

@@ -1,3 +1,6 @@
**/.idea/
.vscode
go.sum
go.mod

View File

@@ -30,6 +30,7 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/doltdb"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env/actions"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/rowconv"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/sql"
@@ -47,13 +48,17 @@ import (
)
const (
SchemaOnlyDiff = 1
DataOnlyDiff = 2
SchemaOnlyDiff = 1
DataOnlyDiff = 2
Summary = 4
SchemaAndDataDiff = SchemaOnlyDiff | DataOnlyDiff
DataFlag = "data"
SchemaFlag = "schema"
SummaryFlag = "summary"
whereParam = "where"
limitParam = "limit"
)
var diffShortDesc = "Show changes between commits, commit and working tree, etc"
@@ -67,11 +72,21 @@ dolt diff [--options] <commit> [<tables>...]
dolt diff [--options] <commit> <commit> [<tables>...]
This is to view the changes between two arbitrary <commit>.
The diffs displayed can be limited to show the first N by providing the parameter <b>--limit N</b> where N is the number of diffs to display.
In order to filter which diffs are displayed <b>--where key=value</b> can be used. The key in this case would be either to_COLUMN_NAME or from_COLUMN_NAME. where from_COLUMN_NAME=value would filter based on the original value and to_COLUMN_NAME would select based on its updated value.
`
var diffSynopsis = []string{
"[options] [<commit>] [--data|--schema] [<tables>...]",
"[options] <commit> <commit> [--data|--schema] [<tables>...]",
"[options] [options] [<commit>] [<tables>...]",
"[options] [options] <commit> <commit> [<tables>...]",
}
type diffArgs struct {
diffParts int
limit int
where string
}
func Diff(ctx context.Context, commandStr string, args []string, dEnv *env.DoltEnv) int {
@@ -79,6 +94,8 @@ func Diff(ctx context.Context, commandStr string, args []string, dEnv *env.DoltE
ap.SupportsFlag(DataFlag, "d", "Show only the data changes, do not show the schema changes (Both shown by default).")
ap.SupportsFlag(SchemaFlag, "s", "Show only the schema changes, do not show the data changes (Both shown by default).")
ap.SupportsFlag(SummaryFlag, "", "Show summary of data changes")
ap.SupportsString(whereParam, "", "column", "filters columns based on values in the diff. See dolt diff --help for details.")
ap.SupportsInt(limitParam, "", "record_count", "limits to the first N diffs.")
help, _ := cli.HelpAndUsagePrinters(commandStr, diffShortDesc, diffLongDesc, diffSynopsis, ap)
apr := cli.ParseArgs(ap, args, help)
@@ -89,12 +106,26 @@ func Diff(ctx context.Context, commandStr string, args []string, dEnv *env.DoltE
diffParts = SchemaOnlyDiff
}
r1, r2, tables, verr := getRoots(ctx, apr.Args(), dEnv)
summary := apr.Contains(SummaryFlag)
if summary {
if apr.Contains(SchemaFlag) || apr.Contains(DataFlag) {
cli.PrintErrln("Invalid Arguments: --summary cannot be combined with --schema or --data")
return 1
}
diffParts = Summary
}
r1, r2, tables, verr := getRoots(ctx, apr.Args(), dEnv)
// default value of 0 used to signal no limit.
limit, _ := apr.GetInt(limitParam)
if verr == nil {
verr = diffRoots(ctx, r1, r2, tables, diffParts, dEnv, summary)
whereClause := apr.GetValueOrDefault(whereParam, "")
verr = diffRoots(ctx, r1, r2, tables, dEnv, &diffArgs{diffParts, limit, whereClause})
}
if verr != nil {
@@ -197,7 +228,7 @@ func getRootForCommitSpecStr(ctx context.Context, csStr string, dEnv *env.DoltEn
return h.String(), r, nil
}
func diffRoots(ctx context.Context, r1, r2 *doltdb.RootValue, tblNames []string, diffParts int, dEnv *env.DoltEnv, summary bool) errhand.VerboseError {
func diffRoots(ctx context.Context, r1, r2 *doltdb.RootValue, tblNames []string, dEnv *env.DoltEnv, dArgs *diffArgs) errhand.VerboseError {
var err error
if len(tblNames) == 0 {
tblNames, err = actions.AllTables(ctx, r1, r2)
@@ -308,17 +339,17 @@ func diffRoots(ctx context.Context, r1, r2 *doltdb.RootValue, tblNames []string,
var verr errhand.VerboseError
if summary {
if dArgs.diffParts&Summary != 0 {
colLen := sch2.GetAllCols().Size()
verr = diffSummary(ctx, rowData1, rowData2, colLen)
}
if diffParts&SchemaOnlyDiff != 0 && sch1Hash != sch2Hash && !summary {
if dArgs.diffParts&SchemaOnlyDiff != 0 && sch1Hash != sch2Hash {
verr = diffSchemas(tblName, sch2, sch1)
}
if diffParts&DataOnlyDiff != 0 && !summary {
verr = diffRows(ctx, rowData1, rowData2, sch1, sch2)
if dArgs.diffParts&DataOnlyDiff != 0 {
verr = diffRows(ctx, rowData1, rowData2, sch1, sch2, dArgs)
}
if verr != nil {
@@ -419,77 +450,45 @@ func dumbDownSchema(in schema.Schema) (schema.Schema, error) {
return schema.SchemaFromCols(dumbColColl), nil
}
func diffRows(ctx context.Context, newRows, oldRows types.Map, newSch, oldSch schema.Schema) errhand.VerboseError {
dumbNewSch, err := dumbDownSchema(newSch)
func toNamer(name string) string {
return diff.To + "_" + name
}
if err != nil {
return errhand.BuildDError("").AddCause(err).Build()
}
func fromNamer(name string) string {
return diff.From + "_" + name
}
dumbOldSch, err := dumbDownSchema(oldSch)
func diffRows(ctx context.Context, newRows, oldRows types.Map, newSch, oldSch schema.Schema, dArgs *diffArgs) errhand.VerboseError {
joiner, err := rowconv.NewJoiner(
[]rowconv.NamedSchema{
{Name: diff.From, Sch: oldSch},
{Name: diff.To, Sch: newSch},
},
map[string]rowconv.ColNamingFunc{diff.To: toNamer, diff.From: fromNamer},
)
if err != nil {
return errhand.BuildDError("").AddCause(err).Build()
}
untypedUnionSch, err := untyped.UntypedSchemaUnion(dumbNewSch, dumbOldSch)
if err != nil {
return errhand.BuildDError("Failed to merge schemas").Build()
}
newToUnionConv := rowconv.IdentityConverter
if newSch != nil {
newToUnionMapping, err := rowconv.TagMapping(newSch, untypedUnionSch)
if err != nil {
return errhand.BuildDError("Error creating unioned mapping").AddCause(err).Build()
}
newToUnionConv, _ = rowconv.NewRowConverter(newToUnionMapping)
}
oldToUnionConv := rowconv.IdentityConverter
if oldSch != nil {
oldToUnionMapping, err := rowconv.TagMapping(oldSch, untypedUnionSch)
if err != nil {
return errhand.BuildDError("Error creating unioned mapping").AddCause(err).Build()
}
oldToUnionConv, _ = rowconv.NewRowConverter(oldToUnionMapping)
untypedUnionSch, ds, verr := createSplitter(newSch, oldSch, joiner)
if verr != nil {
return verr
}
ad := diff.NewAsyncDiffer(1024)
ad.Start(ctx, newRows, oldRows)
defer ad.Close()
src := diff.NewRowDiffSource(ad, oldToUnionConv, newToUnionConv, untypedUnionSch)
src := diff.NewRowDiffSource(ad, joiner)
defer src.Close()
oldColNames := make(map[uint64]string)
newColNames := make(map[uint64]string)
err = untypedUnionSch.GetAllCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
oldCol, oldOk := oldSch.GetAllCols().GetByTag(tag)
newCol, newOk := newSch.GetAllCols().GetByTag(tag)
oldColNames, verr := mapTagToColName(oldSch, untypedUnionSch)
if oldOk {
oldColNames[tag] = oldCol.Name
} else {
oldColNames[tag] = ""
}
if verr != nil {
return verr
}
if newOk {
newColNames[tag] = newCol.Name
} else {
newColNames[tag] = ""
}
newColNames, verr := mapTagToColName(newSch, untypedUnionSch)
return false, nil
})
if err != nil {
return errhand.BuildDError("error: failed to map columns to tags").Build()
if verr != nil {
return verr
}
schemasEqual := reflect.DeepEqual(oldColNames, newColNames)
@@ -506,27 +505,22 @@ func diffRows(ctx context.Context, newRows, oldRows types.Map, newSch, oldSch sc
defer sink.Close()
fwtTr := fwt.NewAutoSizingFWTTransformer(untypedUnionSch, fwt.HashFillWhenTooLong, 1000)
nullPrinter := nullprinter.NewNullPrinter(untypedUnionSch)
transforms := pipeline.NewTransformCollection(
pipeline.NewNamedTransform(nullprinter.NULL_PRINTING_STAGE, nullPrinter.ProcessRow),
pipeline.NamedTransform{Name: fwtStageName, Func: fwtTr.TransformToFWT},
)
var verr errhand.VerboseError
var badRowVErr errhand.VerboseError
badRowCallback := func(trf *pipeline.TransformRowFailure) (quit bool) {
verr = errhand.BuildDError("Failed transforming row").AddDetails(trf.TransformName).AddDetails(trf.Details).Build()
badRowVErr = errhand.BuildDError("Failed transforming row").AddDetails(trf.TransformName).AddDetails(trf.Details).Build()
return true
}
sinkProcFunc := pipeline.ProcFuncForSinkFunc(sink.ProcRowWithProps)
p := pipeline.NewAsyncPipeline(pipeline.ProcFuncForSourceFunc(src.NextDiff), sinkProcFunc, transforms, badRowCallback)
p, verr := buildPipeline(dArgs, joiner, ds, untypedUnionSch, src, sink, badRowCallback)
if verr != nil {
return verr
}
if schemasEqual {
schRow, err := untyped.NewRowFromTaggedStrings(newRows.Format(), untypedUnionSch, newColNames)
if err != nil {
return errhand.BuildDError("error: creating diff header").AddCause(err).Build()
}
p.InjectRow(fwtStageName, schRow)
@@ -534,14 +528,14 @@ func diffRows(ctx context.Context, newRows, oldRows types.Map, newSch, oldSch sc
newSchRow, err := untyped.NewRowFromTaggedStrings(newRows.Format(), untypedUnionSch, oldColNames)
if err != nil {
return errhand.BuildDError("error: creating diff header").AddCause(err).Build()
}
p.InjectRowWithProps(fwtStageName, newSchRow, map[string]interface{}{diff.DiffTypeProp: diff.DiffModifiedOld})
oldSchRow, err := untyped.NewRowFromTaggedStrings(newRows.Format(), untypedUnionSch, newColNames)
if err != nil {
return errhand.BuildDError("error: creating diff header").AddCause(err).Build()
}
p.InjectRowWithProps(fwtStageName, oldSchRow, map[string]interface{}{diff.DiffTypeProp: diff.DiffModifiedNew})
@@ -552,7 +546,117 @@ func diffRows(ctx context.Context, newRows, oldRows types.Map, newSch, oldSch sc
return errhand.BuildDError("Error diffing: %v", err.Error()).Build()
}
return verr
if badRowVErr != nil {
return badRowVErr
}
return nil
}
func buildPipeline(dArgs *diffArgs, joiner *rowconv.Joiner, ds *diff.DiffSplitter, untypedUnionSch schema.Schema, src *diff.RowDiffSource, sink *diff.ColorDiffSink, badRowCB pipeline.BadRowCallback) (*pipeline.Pipeline, errhand.VerboseError) {
var where FilterFn
var selTrans *SelectTransform
where, err := ParseWhere(joiner.GetSchema(), dArgs.where)
if err != nil {
return nil, errhand.BuildDError("error: failed to parse where cause").AddCause(err).SetPrintUsage().Build()
}
transforms := pipeline.NewTransformCollection()
if where != nil || dArgs.limit != 0 {
if where == nil {
where = func(r row.Row) bool {
return true
}
}
selTrans = NewSelTrans(where, dArgs.limit)
transforms.AppendTransforms(pipeline.NewNamedTransform("select", selTrans.LimitAndFilter))
}
fwtTr := fwt.NewAutoSizingFWTTransformer(untypedUnionSch, fwt.HashFillWhenTooLong, 1000)
nullPrinter := nullprinter.NewNullPrinter(untypedUnionSch)
transforms.AppendTransforms(pipeline.NewNamedTransform("split_diffs", ds.SplitDiffIntoOldAndNew),
pipeline.NewNamedTransform(nullprinter.NULL_PRINTING_STAGE, nullPrinter.ProcessRow),
pipeline.NamedTransform{Name: fwtStageName, Func: fwtTr.TransformToFWT},
)
sinkProcFunc := pipeline.ProcFuncForSinkFunc(sink.ProcRowWithProps)
p := pipeline.NewAsyncPipeline(pipeline.ProcFuncForSourceFunc(src.NextDiff), sinkProcFunc, transforms, badRowCB)
if selTrans != nil {
selTrans.Pipeline = p
}
return p, nil
}
func mapTagToColName(sch, untypedUnionSch schema.Schema) (map[uint64]string, errhand.VerboseError) {
tagToCol := make(map[uint64]string)
allCols := sch.GetAllCols()
err := untypedUnionSch.GetAllCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
col, ok := allCols.GetByTag(tag)
if ok {
tagToCol[tag] = col.Name
} else {
tagToCol[tag] = ""
}
return false, nil
})
if err != nil {
return nil, errhand.BuildDError("error: failed to map columns to tags").Build()
}
return tagToCol, nil
}
func createSplitter(newSch schema.Schema, oldSch schema.Schema, joiner *rowconv.Joiner) (schema.Schema, *diff.DiffSplitter, errhand.VerboseError) {
dumbNewSch, err := dumbDownSchema(newSch)
if err != nil {
return nil, nil, errhand.BuildDError("").AddCause(err).Build()
}
dumbOldSch, err := dumbDownSchema(oldSch)
if err != nil {
return nil, nil, errhand.BuildDError("").AddCause(err).Build()
}
untypedUnionSch, err := untyped.UntypedSchemaUnion(dumbNewSch, dumbOldSch)
if err != nil {
return nil, nil, errhand.BuildDError("Failed to merge schemas").Build()
}
newToUnionConv := rowconv.IdentityConverter
if newSch != nil {
newToUnionMapping, err := rowconv.TagMapping(newSch, untypedUnionSch)
if err != nil {
return nil, nil, errhand.BuildDError("Error creating unioned mapping").AddCause(err).Build()
}
newToUnionConv, _ = rowconv.NewRowConverter(newToUnionMapping)
}
oldToUnionConv := rowconv.IdentityConverter
if oldSch != nil {
oldToUnionMapping, err := rowconv.TagMapping(oldSch, untypedUnionSch)
if err != nil {
return nil, nil, errhand.BuildDError("Error creating unioned mapping").AddCause(err).Build()
}
oldToUnionConv, _ = rowconv.NewRowConverter(oldToUnionMapping)
}
ds := diff.NewDiffSplitter(joiner, oldToUnionConv, newToUnionConv)
return untypedUnionSch, ds, nil
}
var emptyHash = hash.Hash{}
@@ -560,12 +664,12 @@ var emptyHash = hash.Hash{}
func printTableDiffSummary(tblName string, tbl1, tbl2 *doltdb.Table) {
bold := color.New(color.Bold)
bold.Printf("diff --dolt a/%[1]s b/%[1]s\n", tblName)
_, _ = bold.Printf("diff --dolt a/%[1]s b/%[1]s\n", tblName)
if tbl1 == nil {
bold.Println("deleted table")
_, _ = bold.Println("deleted table")
} else if tbl2 == nil {
bold.Println("added table")
_, _ = bold.Println("added table")
} else {
h1, err := tbl1.HashOf()
@@ -573,7 +677,7 @@ func printTableDiffSummary(tblName string, tbl1, tbl2 *doltdb.Table) {
panic(err)
}
bold.Printf("--- a/%s @ %s\n", tblName, h1.String())
_, _ = bold.Printf("--- a/%s @ %s\n", tblName, h1.String())
h2, err := tbl2.HashOf()
@@ -581,7 +685,7 @@ func printTableDiffSummary(tblName string, tbl1, tbl2 *doltdb.Table) {
panic(err)
}
bold.Printf("+++ b/%s @ %s\n", tblName, h2.String())
_, _ = bold.Printf("+++ b/%s @ %s\n", tblName, h2.String())
}
}

View File

@@ -0,0 +1,96 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package commands
import (
"errors"
"strings"
"github.com/liquidata-inc/dolt/go/libraries/doltcore"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/pipeline"
"github.com/liquidata-inc/dolt/go/store/types"
)
type FilterFn = func(r row.Row) (matchesFilter bool)
func ParseWhere(sch schema.Schema, whereClause string) (FilterFn, error) {
if whereClause == "" {
return func(r row.Row) bool {
return true
}, nil
} else {
tokens := strings.Split(whereClause, "=")
if len(tokens) != 2 {
return nil, errors.New("'" + whereClause + "' is not in the format key=value")
}
key := tokens[0]
valStr := tokens[1]
col, ok := sch.GetAllCols().GetByName(key)
if !ok {
return nil, errors.New("where clause is invalid. '" + key + "' is not a known column.")
}
tag := col.Tag
convFunc, err := doltcore.GetConvFunc(types.StringKind, col.Kind)
if err != nil {
return nil, err
}
val, err := convFunc(types.String(valStr))
if err != nil {
return nil, errors.New("unable to convert '" + valStr + "' to " + col.KindString())
}
return func(r row.Row) bool {
rowVal, ok := r.GetColVal(tag)
if !ok {
return false
}
return val.Equals(rowVal)
}, nil
}
}
type SelectTransform struct {
Pipeline *pipeline.Pipeline
filter FilterFn
limit int
count int
}
func NewSelTrans(filter FilterFn, limit int) *SelectTransform {
return &SelectTransform{filter: filter, limit: limit}
}
func (st *SelectTransform) LimitAndFilter(inRow row.Row, props pipeline.ReadableMap) ([]*pipeline.TransformedRowResult, string) {
if st.limit <= 0 || st.count < st.limit {
if st.filter(inRow) {
st.count++
return []*pipeline.TransformedRowResult{{RowData: inRow, PropertyUpdates: nil}}, ""
}
} else if st.count == st.limit {
st.Pipeline.NoMore()
}
return nil, ""
}

View File

@@ -16,15 +16,12 @@ package tblcmds
import (
"context"
"errors"
"strings"
"github.com/fatih/color"
"github.com/liquidata-inc/dolt/go/cmd/dolt/cli"
"github.com/liquidata-inc/dolt/go/cmd/dolt/commands"
"github.com/liquidata-inc/dolt/go/cmd/dolt/errhand"
"github.com/liquidata-inc/dolt/go/libraries/doltcore"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/doltdb"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
@@ -60,72 +57,6 @@ var selSynopsis = []string{
"[--limit <record_count>] [--where <col1=val1>] [--hide-conflicts] [<commit>] <table> [<column>...]",
}
type filterFn = func(r row.Row) (matchesFilter bool)
func parseWhere(sch schema.Schema, whereClause string) (filterFn, error) {
if whereClause == "" {
return func(r row.Row) bool {
return true
}, nil
} else {
tokens := strings.Split(whereClause, "=")
if len(tokens) != 2 {
return nil, errors.New("'" + whereClause + "' is not in the format key=value")
}
key := tokens[0]
valStr := tokens[1]
col, ok := sch.GetAllCols().GetByName(key)
if !ok {
return nil, errors.New("where clause is invalid. '" + key + "' is not a known column.")
}
tag := col.Tag
convFunc, err := doltcore.GetConvFunc(types.StringKind, col.Kind)
if err != nil {
return nil, err
}
val, err := convFunc(types.String(valStr))
if err != nil {
return nil, errors.New("unable to convert '" + valStr + "' to " + col.KindString())
}
return func(r row.Row) bool {
rowVal, ok := r.GetColVal(tag)
if !ok {
return false
}
return val.Equals(rowVal)
}, nil
}
}
type selectTransform struct {
p *pipeline.Pipeline
filter filterFn
limit int
count int
}
func (st *selectTransform) LimitAndFilter(inRow row.Row, props pipeline.ReadableMap) ([]*pipeline.TransformedRowResult, string) {
if st.limit == -1 || st.count < st.limit {
if st.filter(inRow) {
st.count++
return []*pipeline.TransformedRowResult{{RowData: inRow, PropertyUpdates: nil}}, ""
}
} else if st.count == st.limit {
st.p.NoMore()
}
return nil, ""
}
type SelectArgs struct {
tblName string
colNames []string
@@ -227,13 +158,13 @@ func printTable(ctx context.Context, root *doltdb.RootValue, selArgs *SelectArgs
return errhand.BuildDError("error: failed to get schema").AddCause(err).Build()
}
whereFn, err := parseWhere(tblSch, selArgs.whereClause)
whereFn, err := commands.ParseWhere(tblSch, selArgs.whereClause)
if err != nil {
return errhand.BuildDError("error: failed to parse where cause").AddCause(err).SetPrintUsage().Build()
}
selTrans := &selectTransform{nil, whereFn, selArgs.limit, 0}
selTrans := commands.NewSelTrans(whereFn, selArgs.limit)
transforms := pipeline.NewTransformCollection(pipeline.NewNamedTransform("select", selTrans.LimitAndFilter))
sch, err := maybeAddCnfColTransform(ctx, transforms, tbl, tblSch)
@@ -253,7 +184,7 @@ func printTable(ctx context.Context, root *doltdb.RootValue, selArgs *SelectArgs
return errhand.BuildDError("error: failed to setup pipeline").AddCause(err).Build()
}
selTrans.p = p
selTrans.Pipeline = p
p.Start()
err = p.Wait()

View File

@@ -23,68 +23,34 @@ import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/rowconv"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/pipeline"
"github.com/liquidata-inc/dolt/go/libraries/utils/valutil"
"github.com/liquidata-inc/dolt/go/store/types"
)
const (
DiffTypeProp = "difftype"
CollChangesProp = "collchanges"
From = "from"
To = "to"
)
type DiffChType int
const (
DiffAdded DiffChType = iota
DiffRemoved
DiffModifiedOld
DiffModifiedNew
)
type DiffTyped interface {
DiffType() DiffChType
}
type DiffRow struct {
row.Row
diffType DiffChType
}
func (dr *DiffRow) DiffType() DiffChType {
return dr.diffType
}
type RowDiffSource struct {
oldConv *rowconv.RowConverter
newConv *rowconv.RowConverter
ad *AsyncDiffer
outSch schema.Schema
bufferedRows []pipeline.RowWithProps
ad *AsyncDiffer
joiner *rowconv.Joiner
}
func NewRowDiffSource(ad *AsyncDiffer, oldConv, newConv *rowconv.RowConverter, outSch schema.Schema) *RowDiffSource {
func NewRowDiffSource(ad *AsyncDiffer, joiner *rowconv.Joiner) *RowDiffSource {
return &RowDiffSource{
oldConv,
newConv,
ad,
outSch,
make([]pipeline.RowWithProps, 0, 1024),
joiner,
}
}
// GetSchema gets the schema of the rows that this reader will return
func (rdRd *RowDiffSource) GetSchema() schema.Schema {
return rdRd.outSch
return rdRd.joiner.GetSchema()
}
// NextDiff reads a row from a table. If there is a bad row the returned error will be non nil, and callin IsBadRow(err)
// will be return true. This is a potentially non-fatal error and callers can decide if they want to continue on a bad row, or fail.
func (rdRd *RowDiffSource) NextDiff() (row.Row, pipeline.ImmutableProperties, error) {
if len(rdRd.bufferedRows) != 0 {
rowWithProps := rdRd.nextFromBuffer()
return rowWithProps.Row, rowWithProps.Props, nil
}
if rdRd.ad.isDone {
return nil, pipeline.NoProps, io.EOF
}
@@ -103,95 +69,39 @@ func (rdRd *RowDiffSource) NextDiff() (row.Row, pipeline.ImmutableProperties, er
return nil, pipeline.NoProps, errors.New("timeout")
}
outCols := rdRd.outSch.GetAllCols()
for _, d := range diffs {
var mappedOld row.Row
var mappedNew row.Row
originalNewSch := rdRd.outSch
if !rdRd.newConv.IdentityConverter {
originalNewSch = rdRd.newConv.SrcSch
}
originalOldSch := rdRd.outSch
if !rdRd.oldConv.IdentityConverter {
originalOldSch = rdRd.oldConv.SrcSch
}
if d.OldValue != nil {
oldRow, err := row.FromNoms(originalOldSch, d.KeyValue.(types.Tuple), d.OldValue.(types.Tuple))
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
}
mappedOld, _ = rdRd.oldConv.Convert(oldRow)
}
if d.NewValue != nil {
newRow, err := row.FromNoms(originalNewSch, d.KeyValue.(types.Tuple), d.NewValue.(types.Tuple))
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
}
mappedNew, _ = rdRd.newConv.Convert(newRow)
}
var oldProps = map[string]interface{}{DiffTypeProp: DiffRemoved}
var newProps = map[string]interface{}{DiffTypeProp: DiffAdded}
if d.OldValue != nil && d.NewValue != nil {
oldColDiffs := make(map[string]DiffChType)
newColDiffs := make(map[string]DiffChType)
err := outCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
oldVal, _ := mappedOld.GetColVal(tag)
newVal, _ := mappedNew.GetColVal(tag)
_, inOld := originalOldSch.GetAllCols().GetByTag(tag)
_, inNew := originalNewSch.GetAllCols().GetByTag(tag)
if inOld && inNew {
if !valutil.NilSafeEqCheck(oldVal, newVal) {
newColDiffs[col.Name] = DiffModifiedNew
oldColDiffs[col.Name] = DiffModifiedOld
}
} else if inOld {
oldColDiffs[col.Name] = DiffRemoved
} else {
newColDiffs[col.Name] = DiffAdded
}
return false, nil
})
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
}
oldProps = map[string]interface{}{DiffTypeProp: DiffModifiedOld, CollChangesProp: oldColDiffs}
newProps = map[string]interface{}{DiffTypeProp: DiffModifiedNew, CollChangesProp: newColDiffs}
}
if d.OldValue != nil {
rwp := pipeline.NewRowWithProps(mappedOld, oldProps)
rdRd.bufferedRows = append(rdRd.bufferedRows, rwp)
}
if d.NewValue != nil {
rwp := pipeline.NewRowWithProps(mappedNew, newProps)
rdRd.bufferedRows = append(rdRd.bufferedRows, rwp)
}
if len(diffs) != 1 {
panic("only a single diff requested, multiple returned. bug in AsyncDiffer")
}
rwp := rdRd.nextFromBuffer()
return rwp.Row, rwp.Props, nil
}
d := diffs[0]
rows := make(map[string]row.Row)
if d.OldValue != nil {
oldRow, err := row.FromNoms(rdRd.joiner.SchemaForName(From), d.KeyValue.(types.Tuple), d.OldValue.(types.Tuple))
func (rdRd *RowDiffSource) nextFromBuffer() pipeline.RowWithProps {
r := rdRd.bufferedRows[0]
rdRd.bufferedRows = rdRd.bufferedRows[1:]
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
}
return r
rows[From] = oldRow
}
if d.NewValue != nil {
newRow, err := row.FromNoms(rdRd.joiner.SchemaForName(To), d.KeyValue.(types.Tuple), d.NewValue.(types.Tuple))
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
}
rows[To] = newRow
}
joinedRow, err := rdRd.joiner.Join(rows)
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
}
return joinedRow, pipeline.ImmutableProperties{}, nil
}
// Close should release resources being held

View File

@@ -0,0 +1,159 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package diff
import (
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/rowconv"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/pipeline"
"github.com/liquidata-inc/dolt/go/libraries/utils/valutil"
)
const (
// DiffTypeProp is the name of a property added to each split row which tells if its added, removed, the modified
// old value, or the new value after modification
DiffTypeProp = "difftype"
// CollChangesProp is the name of a property added to each modified row which is a map from collumn name to the
// type of change.
CollChangesProp = "collchanges"
)
// DiffChType is an enum that represents the type of change
type DiffChType int
const (
// DiffAdded is the DiffTypeProp value for a row that was newly added (In new, but not in old)
DiffAdded DiffChType = iota
// DiffRemoved is the DiffTypeProp value for a row that was newly deleted (In old, but not in new)
DiffRemoved
// DiffModifiedOld is the DiffTypeProp value for the row which represents the old value of the row before it was changed.
DiffModifiedOld
// DiffModifiedNew is the DiffTypeProp value for the row which represents the new value of the row after it was changed.
DiffModifiedNew
)
// DiffTyped is an interface for an object that has a DiffChType
type DiffTyped interface {
// DiffType gets the DiffChType of an object
DiffType() DiffChType
}
// DiffRow is a row.Row with a change type associated with it.
type DiffRow struct {
row.Row
diffType DiffChType
}
// DiffType gets the DiffChType for the row.
func (dr *DiffRow) DiffType() DiffChType {
return dr.diffType
}
// DiffSplitter is a struct that can take a diff which is represented by a row with a column for every field in the old
// version, and a column for every field in the new version and split it into two rows with properties which annotate
// what each row is. This is used to show diffs as 2 lines, instead of 1.
type DiffSplitter struct {
joiner *rowconv.Joiner
oldConv *rowconv.RowConverter
newConv *rowconv.RowConverter
}
// NewDiffSplitter creates a DiffSplitter
func NewDiffSplitter(joiner *rowconv.Joiner, oldConv, newConv *rowconv.RowConverter) *DiffSplitter {
return &DiffSplitter{joiner, oldConv, newConv}
}
func convertNamedRow(rows map[string]row.Row, name string, rc *rowconv.RowConverter) (row.Row, error) {
r, ok := rows[name]
if !ok || r == nil {
return nil, nil
}
return rc.Convert(r)
}
// SplitDiffIntoOldAndNew is a pipeline.TransformRowFunc which can be used in a pipeline to split single row diffs,
// into 2 row diffs.
func (ds *DiffSplitter) SplitDiffIntoOldAndNew(inRow row.Row, props pipeline.ReadableMap) (rowData []*pipeline.TransformedRowResult, badRowDetails string) {
rows, err := ds.joiner.Split(inRow)
mappedOld, err := convertNamedRow(rows, From, ds.oldConv)
if err != nil {
return nil, err.Error()
}
mappedNew, err := convertNamedRow(rows, To, ds.newConv)
if err != nil {
return nil, err.Error()
}
originalNewSch := ds.joiner.SchemaForName(From)
originalOldSch := ds.joiner.SchemaForName(To)
var oldProps = map[string]interface{}{DiffTypeProp: DiffRemoved}
var newProps = map[string]interface{}{DiffTypeProp: DiffAdded}
if mappedOld != nil && mappedNew != nil {
oldColDiffs := make(map[string]DiffChType)
newColDiffs := make(map[string]DiffChType)
outSch := ds.newConv.DestSch
outCols := outSch.GetAllCols()
err := outCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
oldVal, _ := mappedOld.GetColVal(tag)
newVal, _ := mappedNew.GetColVal(tag)
_, inOld := originalOldSch.GetAllCols().GetByTag(tag)
_, inNew := originalNewSch.GetAllCols().GetByTag(tag)
if inOld && inNew {
if !valutil.NilSafeEqCheck(oldVal, newVal) {
newColDiffs[col.Name] = DiffModifiedNew
oldColDiffs[col.Name] = DiffModifiedOld
}
} else if inOld {
oldColDiffs[col.Name] = DiffRemoved
} else {
newColDiffs[col.Name] = DiffAdded
}
return false, nil
})
if err != nil {
return nil, err.Error()
}
oldProps = map[string]interface{}{DiffTypeProp: DiffModifiedOld, CollChangesProp: oldColDiffs}
newProps = map[string]interface{}{DiffTypeProp: DiffModifiedNew, CollChangesProp: newColDiffs}
}
var results []*pipeline.TransformedRowResult
if mappedOld != nil {
results = append(results, &pipeline.TransformedRowResult{RowData: mappedOld, PropertyUpdates: oldProps})
}
if mappedNew != nil {
results = append(results, &pipeline.TransformedRowResult{RowData: mappedNew, PropertyUpdates: newProps})
}
return results, ""
}

View File

@@ -0,0 +1,177 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rowconv
import (
"errors"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/store/types"
)
// ColNamingFunc defines a function signature which takes the name of a column, and returns the name that should be used
// for the column in the joined dataset.
type ColNamingFunc func(colName string) string
type stringUint64Tuple struct {
str string
u64 uint64
}
// NamedSchema is an object that associates a schema with a string
type NamedSchema struct {
// Name the name given to the schema
Name string
// Sch is the schema
Sch schema.Schema
}
// Joiner is an object that can be used to join multiple rows together into a single row (See Join), and also to reverse
// this operation by taking a joined row and getting back a map of rows (See Split).
type Joiner struct {
srcSchemas map[string]schema.Schema
tagMaps map[string]map[uint64]uint64
revTagMap map[uint64]stringUint64Tuple
joined schema.Schema
}
// NewJoiner creates a joiner from a slice of NamedSchemas and a map of ColNamingFuncs. A new schema for joined rows will
// be created, and the columns for joined schemas will be named according to the ColNamingFunc associated with each schema
// name.
func NewJoiner(namedSchemas []NamedSchema, namers map[string]ColNamingFunc) (*Joiner, error) {
tags := make(map[string][]uint64)
revTagMap := make(map[uint64]stringUint64Tuple)
tagMaps := make(map[string]map[uint64]uint64, len(namedSchemas))
srcSchemas := make(map[string]schema.Schema)
for _, namedSch := range namedSchemas {
tagMaps[namedSch.Name] = make(map[uint64]uint64)
srcSchemas[namedSch.Name] = namedSch.Sch
}
var cols []schema.Column
var destTag uint64
for _, namedSch := range namedSchemas {
sch := namedSch.Sch
name := namedSch.Name
allCols := sch.GetAllCols()
namer := namers[name]
err := allCols.Iter(func(srcTag uint64, col schema.Column) (stop bool, err error) {
newColName := namer(col.Name)
cols = append(cols, schema.NewColumn(newColName, destTag, col.Kind, false))
tagMaps[name][srcTag] = destTag
revTagMap[destTag] = stringUint64Tuple{str: name, u64: srcTag}
tags[name] = append(tags[name], destTag)
destTag++
return false, nil
})
if err != nil {
return nil, err
}
}
colColl, err := schema.NewColCollection(cols...)
if err != nil {
return nil, err
}
joined := schema.UnkeyedSchemaFromCols(colColl)
return &Joiner{srcSchemas, tagMaps, revTagMap, joined}, nil
}
// Join takes a map from schema name to row which has that schema, and returns a single joined row containing all the
// data
func (j *Joiner) Join(namedRows map[string]row.Row) (row.Row, error) {
var nbf *types.NomsBinFormat
colVals := make(row.TaggedValues)
for name, r := range namedRows {
if r == nil {
continue
}
if nbf == nil {
nbf = r.Format()
} else if nbf.VersionString() != r.Format().VersionString() {
return nil, errors.New("not all rows have the same format")
}
_, err := r.IterCols(func(tag uint64, val types.Value) (stop bool, err error) {
destTag := j.tagMaps[name][tag]
colVals[destTag] = val
return false, nil
})
if err != nil {
return nil, err
}
}
return row.New(nbf, j.joined, colVals)
}
// Split takes a row which has the created joined schema, and splits it into a map of rows where the key of the map is
// the name of the schema for the associated row.
func (j *Joiner) Split(r row.Row) (map[string]row.Row, error) {
colVals := make(map[string]row.TaggedValues, len(j.srcSchemas))
for name := range j.srcSchemas {
colVals[name] = make(row.TaggedValues)
}
_, err := r.IterCols(func(tag uint64, val types.Value) (stop bool, err error) {
schemaNameAndTag := j.revTagMap[tag]
colVals[schemaNameAndTag.str][schemaNameAndTag.u64] = val
return false, nil
})
if err != nil {
return nil, err
}
rows := make(map[string]row.Row, len(colVals))
for name, sch := range j.srcSchemas {
var err error
currColVals := colVals[name]
if len(currColVals) == 0 {
continue
}
rows[name], err = row.New(r.Format(), sch, currColVals)
if err != nil {
return nil, err
}
}
return rows, nil
}
// GetSchema returns the schema which all joined rows will have, and any row passed into split should have.
func (j *Joiner) GetSchema() schema.Schema {
return j.joined
}
// SchemaForName retrieves the original schema which has the given name.
func (j *Joiner) SchemaForName(name string) schema.Schema {
return j.srcSchemas[name]
}

View File

@@ -0,0 +1,186 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rowconv
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/store/types"
)
const (
firstTag uint64 = iota
lastTag
ageTag
cityTag
)
var peopleCols, _ = schema.NewColCollection(
schema.NewColumn("last", lastTag, types.StringKind, true),
schema.NewColumn("first", firstTag, types.StringKind, true),
schema.NewColumn("age", ageTag, types.IntKind, false),
schema.NewColumn("city", cityTag, types.StringKind, false),
)
var peopleSch = schema.SchemaFromCols(peopleCols)
type toJoinAndExpectedResult struct {
toJoinVals map[string]row.TaggedValues
expected map[string]types.Value
}
func TestJoiner(t *testing.T) {
tests := []struct {
name string
namedSchemas []NamedSchema
namers map[string]ColNamingFunc
toJoin []toJoinAndExpectedResult
}{
{
name: "join diff versions of row",
namedSchemas: []NamedSchema{{"to", peopleSch}, {"from", peopleSch}},
namers: map[string]ColNamingFunc{"to": toNamer, "from": fromNamer},
toJoin: []toJoinAndExpectedResult{
{
toJoinVals: map[string]row.TaggedValues{
"from": {
lastTag: types.String("Richardson"),
firstTag: types.String("Richard"),
ageTag: types.Int(42),
cityTag: types.String("San Francisco"),
},
"to": {
lastTag: types.String("Richardson"),
firstTag: types.String("Richard"),
ageTag: types.Int(43),
cityTag: types.String("Los Angeles"),
},
},
expected: map[string]types.Value{
"from_last": types.String("Richardson"),
"from_first": types.String("Richard"),
"from_city": types.String("San Francisco"),
"from_age": types.Int(42),
"to_last": types.String("Richardson"),
"to_first": types.String("Richard"),
"to_city": types.String("Los Angeles"),
"to_age": types.Int(43),
},
},
{
toJoinVals: map[string]row.TaggedValues{
"from": {
lastTag: types.String("Richardson"),
firstTag: types.String("Richard"),
ageTag: types.Int(42),
cityTag: types.String("San Francisco"),
},
},
expected: map[string]types.Value{
"from_last": types.String("Richardson"),
"from_first": types.String("Richard"),
"from_city": types.String("San Francisco"),
"from_age": types.Int(42),
},
},
{
toJoinVals: map[string]row.TaggedValues{
"to": {
lastTag: types.String("Richardson"),
firstTag: types.String("Richard"),
ageTag: types.Int(43),
cityTag: types.String("Los Angeles"),
},
},
expected: map[string]types.Value{
"to_last": types.String("Richardson"),
"to_first": types.String("Richard"),
"to_city": types.String("Los Angeles"),
"to_age": types.Int(43),
},
},
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
j, err := NewJoiner(test.namedSchemas, test.namers)
assert.NoError(t, err)
for _, tj := range test.toJoin {
rows := map[string]row.Row{}
for _, namedSch := range test.namedSchemas {
r, err := row.New(types.Format_Default, namedSch.Sch, tj.toJoinVals[namedSch.Name])
assert.NoError(t, err)
rows[namedSch.Name] = r
}
joinedRow, err := j.Join(rows)
assert.NoError(t, err)
joinedSch := j.GetSchema()
_, err = joinedRow.IterCols(func(tag uint64, val types.Value) (stop bool, err error) {
col, ok := joinedSch.GetAllCols().GetByTag(tag)
assert.True(t, ok)
expectedVal := tj.expected[col.Name]
assert.Equal(t, val, expectedVal)
return false, nil
})
assert.NoError(t, err)
splitRows, err := j.Split(joinedRow)
assert.NoError(t, err)
assert.Equal(t, len(tj.toJoinVals), len(splitRows))
for _, namedSch := range test.namedSchemas {
name := namedSch.Name
sch := namedSch.Sch
actual := splitRows[name]
expectedVals := tj.toJoinVals[name]
if actual == nil && expectedVals == nil {
continue
}
assert.False(t, actual == nil || expectedVals == nil)
expected, err := row.New(types.Format_Default, sch, expectedVals)
assert.NoError(t, err)
assert.True(t, row.AreEqual(actual, expected, sch))
}
}
})
}
}
func fromNamer(name string) string {
return "from_" + name
}
func toNamer(name string) string {
return "to_" + name
}

View File

@@ -27,8 +27,8 @@ func NewTransformCollection(namedTransforms ...NamedTransform) *TransformCollect
// AppendTransform will mutate the internal slice of transforms by appending this new transform to the slice of
// Transforms
func (tc *TransformCollection) AppendTransforms(nt NamedTransform) {
tc.Transforms = append(tc.Transforms, nt)
func (tc *TransformCollection) AppendTransforms(nt ...NamedTransform) {
tc.Transforms = append(tc.Transforms, nt...)
}
// NumTransforms returns the number of NamedTransforms in the collection