use commitItr instead of commitwalk for querying dolt_diff table (#4243)

This commit is contained in:
jennifersp
2022-09-07 10:02:01 -07:00
committed by GitHub
parent ac04ed7ed1
commit 67ddf5e5a6
2 changed files with 186 additions and 27 deletions
@@ -22,22 +22,30 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions/commitwalk"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/dolthub/go-mysql-server/sql/transform"
)
var workingSetPartitionKey = []byte("workingset")
var commitHistoryPartitionKey = []byte("commithistory")
var commitHashCol = "commit_hash"
var filterColumnNameSet = set.NewStrSet([]string{commitHashCol})
var _ sql.FilteredTable = (*UnscopedDiffTable)(nil)
// UnscopedDiffTable is a sql.Table implementation of a system table that shows which tables have
// changed in each commit, across all branches.
type UnscopedDiffTable struct {
ddb *doltdb.DoltDB
head *doltdb.Commit
ddb *doltdb.DoltDB
head *doltdb.Commit
partitionFilters []sql.Expression
cmItr doltdb.CommitItr
}
// tableChange is an internal data structure used to hold the results of processing
@@ -50,7 +58,38 @@ type tableChange struct {
// NewUnscopedDiffTable creates an UnscopedDiffTable
func NewUnscopedDiffTable(_ *sql.Context, ddb *doltdb.DoltDB, head *doltdb.Commit) sql.Table {
return &UnscopedDiffTable{ddb: ddb, head: head}
cmItr := doltdb.CommitItrForRoots(ddb, head)
return &UnscopedDiffTable{ddb: ddb, head: head, cmItr: cmItr}
}
// Filters returns the list of filters that are applied to this table.
func (dt *UnscopedDiffTable) Filters() []sql.Expression {
return dt.partitionFilters
}
// HandledFilters returns the list of filters that will be handled by the table itself
func (dt *UnscopedDiffTable) HandledFilters(filters []sql.Expression) []sql.Expression {
dt.partitionFilters = FilterFilters(filters, ColumnPredicate(filterColumnNameSet))
return dt.partitionFilters
}
// WithFilters returns a new sql.Table instance with the filters applied
func (dt *UnscopedDiffTable) WithFilters(ctx *sql.Context, filters []sql.Expression) sql.Table {
if dt.partitionFilters == nil {
dt.partitionFilters = FilterFilters(filters, ColumnPredicate(filterColumnNameSet))
}
if len(dt.partitionFilters) > 0 {
commitCheck, err := commitFilterForDiffTableFilterExprs(dt.partitionFilters)
if err != nil {
return nil
}
ndt := *dt
ndt.cmItr = doltdb.NewFilteringCommitItr(dt.cmItr, commitCheck)
return &ndt
}
return dt
}
// Name is a sql.Table interface function which returns the name of the table which is defined by the constant
@@ -190,6 +229,7 @@ type doltDiffCommitHistoryRowItr struct {
ctx *sql.Context
ddb *doltdb.DoltDB
child doltdb.CommitItr
commits []*doltdb.Commit
meta *datas.CommitMeta
hash hash.Hash
tableChanges []tableChange
@@ -198,18 +238,18 @@ type doltDiffCommitHistoryRowItr struct {
// newCommitHistoryRowItr creates a doltDiffCommitHistoryRowItr from the current environment.
func (dt *UnscopedDiffTable) newCommitHistoryRowItr(ctx *sql.Context) (*doltDiffCommitHistoryRowItr, error) {
hash, err := dt.head.HashOf()
if err != nil {
return nil, err
}
child, err := commitwalk.GetTopologicalOrderIterator(ctx, dt.ddb, hash)
return &doltDiffCommitHistoryRowItr{
dchItr := &doltDiffCommitHistoryRowItr{
ctx: ctx,
ddb: dt.ddb,
child: child,
tableChangesIdx: -1,
}, nil
}
cms, hasCommitHashEquality := getCommitsFromCommitHashEquality(ctx, dt.ddb, dt.partitionFilters)
if hasCommitHashEquality {
dchItr.commits = cms
} else {
dchItr.child = dt.cmItr
}
return dchItr, nil
}
// incrementIndexes increments the table changes index, and if it's the end of the table changes array, moves
@@ -228,9 +268,25 @@ func (itr *doltDiffCommitHistoryRowItr) Next(ctx *sql.Context) (sql.Row, error)
defer itr.incrementIndexes()
for itr.tableChanges == nil {
err := itr.loadTableChanges(ctx)
if err != nil {
return nil, err
if itr.commits != nil {
for _, commit := range itr.commits {
err := itr.loadTableChanges(ctx, commit)
if err != nil {
return nil, err
}
}
itr.commits = nil
} else if itr.child != nil {
_, commit, err := itr.child.Next(ctx)
if err != nil {
return nil, err
}
err = itr.loadTableChanges(ctx, commit)
if err != nil {
return nil, err
}
} else {
return nil, io.EOF
}
}
@@ -250,14 +306,9 @@ func (itr *doltDiffCommitHistoryRowItr) Next(ctx *sql.Context) (sql.Row, error)
), nil
}
// loadTableChanges loads the next commit's table changes and metadata
// loadTableChanges loads the current commit's table changes and metadata
// into the iterator.
func (itr *doltDiffCommitHistoryRowItr) loadTableChanges(ctx context.Context) error {
hash, commit, err := itr.child.Next(ctx)
if err != nil {
return err
}
func (itr *doltDiffCommitHistoryRowItr) loadTableChanges(ctx context.Context, commit *doltdb.Commit) error {
tableChanges, err := itr.calculateTableChanges(ctx, commit)
if err != nil {
return err
@@ -274,7 +325,13 @@ func (itr *doltDiffCommitHistoryRowItr) loadTableChanges(ctx context.Context) er
return err
}
itr.meta = meta
itr.hash = hash
cmHash, err := commit.HashOf()
if err != nil {
return err
}
itr.hash = cmHash
return nil
}
@@ -399,3 +456,103 @@ func isTableDataEmpty(ctx *sql.Context, table *doltdb.Table) (bool, error) {
return rowData.Empty()
}
// commitFilterForDiffTableFilterExprs returns CommitFilter used for CommitItr.
func commitFilterForDiffTableFilterExprs(filters []sql.Expression) (doltdb.CommitFilter, error) {
filters = transformFilters(filters...)
return func(ctx context.Context, h hash.Hash, cm *doltdb.Commit) (filterOut bool, err error) {
sc := sql.NewContext(ctx)
meta, err := cm.GetCommitMeta(ctx)
if err != nil {
return false, err
}
for _, filter := range filters {
res, err := filter.Eval(sc, sql.Row{h.String(), meta.Name, meta.Time()})
if err != nil {
return false, err
}
b, ok := res.(bool)
if ok && !b {
return true, nil
}
}
return false, err
}, nil
}
// transformFilters return filter expressions with index specified for rows used in CommitFilter.
func transformFilters(filters ...sql.Expression) []sql.Expression {
for i := range filters {
filters[i], _, _ = transform.Expr(filters[i], func(e sql.Expression) (sql.Expression, transform.TreeIdentity, error) {
gf, ok := e.(*expression.GetField)
if !ok {
return e, transform.SameTree, nil
}
switch gf.Name() {
case commitHashCol:
return gf.WithIndex(0), transform.NewTree, nil
default:
return gf, transform.SameTree, nil
}
})
}
return filters
}
func getCommitsFromCommitHashEquality(ctx *sql.Context, ddb *doltdb.DoltDB, filters []sql.Expression) ([]*doltdb.Commit, bool) {
var commits []*doltdb.Commit
var isCommitHashEquality bool
for i := range filters {
switch f := filters[i].(type) {
case *expression.Equals:
v, err := f.Right().Eval(ctx, nil)
if err == nil {
isCommitHashEquality = true
cm := getCommitFromHash(ctx, ddb, v.(string))
if cm != nil {
commits = append(commits, cm)
}
}
case *expression.InTuple:
switch r := f.Right().(type) {
case expression.Tuple:
right, err := r.Eval(ctx, nil)
if err == nil && right != nil {
isCommitHashEquality = true
if len(r) == 1 {
cm := getCommitFromHash(ctx, ddb, right.(string))
if cm != nil {
commits = append(commits, cm)
}
} else {
for _, el := range right.([]interface{}) {
cm := getCommitFromHash(ctx, ddb, el.(string))
if cm != nil {
commits = append(commits, cm)
}
}
}
}
}
}
}
return commits, isCommitHashEquality
}
func getCommitFromHash(ctx *sql.Context, ddb *doltdb.DoltDB, val string) *doltdb.Commit {
cmSpec, err := doltdb.NewCommitSpec(val)
if err != nil {
return nil
}
headRef, err := dsess.DSessFromSess(ctx.Session).CWBHeadRef(ctx, ctx.GetCurrentDatabase())
if err != nil {
return nil
}
cm, err := ddb.Resolve(ctx, cmSpec, headRef)
if err != nil {
return nil
}
return cm
}
@@ -327,7 +327,8 @@ var DoltRevisionDbScripts = []queries.ScriptTest{
},
},
{
Name: "database revision specs: branch-qualified revision spec",
SkipPrepared: true,
Name: "database revision specs: branch-qualified revision spec",
SetUpScript: []string{
"create table t01 (pk int primary key, c1 int)",
"call dolt_add('.')",
@@ -4767,7 +4768,8 @@ var LargeJsonObjectScriptTests = []queries.ScriptTest{
var UnscopedDiffSystemTableScriptTests = []queries.ScriptTest{
{
Name: "working set changes",
SkipPrepared: true,
Name: "working set changes",
SetUpScript: []string{
"create table regularTable (a int primary key, b int, c int);",
"create table droppedTable (a int primary key, b int, c int);",
@@ -4830,7 +4832,7 @@ var UnscopedDiffSystemTableScriptTests = []queries.ScriptTest{
Expected: []sql.Row{{6}},
},
{
Query: "select table_name, schema_change, data_change from DOLT_DIFF where commit_hash in (@Commit1)",
Query: "select table_name, schema_change, data_change from DOLT_DIFF where commit_hash = @Commit1",
Expected: []sql.Row{{"x", true, true}, {"y", true, false}},
},
{