Implemented ExecuteSelect to return naked rows by running a pipeline with an in-memory sink. Added tests for all public functions in the sqlselect.go, and a couple private funcs.

This commit is contained in:
Zach Musgrave
2019-04-01 11:52:02 -07:00
parent c54d76b9cd
commit f498b37df8
4 changed files with 429 additions and 12 deletions

View File

@@ -160,6 +160,7 @@ github.com/skratchdot/open-golang v0.0.0-20190104022628-a2dfa6d0dab6 h1:cGT4dcuE
github.com/skratchdot/open-golang v0.0.0-20190104022628-a2dfa6d0dab6/go.mod h1:sUM3LWHvSMaG192sy56D9F7CNvL7jUJVXoqM1QKLnog=
github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=

View File

@@ -28,10 +28,10 @@ type selectStatement struct {
type filterFn = func(r row.Row) (matchesFilter bool)
type selectTransform struct {
p *pipeline.Pipeline
filter filterFn
limit int
count int
noMoreCallback func()
filter filterFn
limit int
count int
}
func (st *selectTransform) limitAndFilter(inRow row.Row, props pipeline.ReadableMap) ([]*pipeline.TransformedRowResult, string) {
@@ -41,12 +41,48 @@ func (st *selectTransform) limitAndFilter(inRow row.Row, props pipeline.Readable
return []*pipeline.TransformedRowResult{{inRow, nil}}, ""
}
} else {
st.p.NoMore()
st.noMoreCallback()
}
return nil, ""
}
// ExecuteSelect executes the given select query and returns the resultant rows accompanied by their output schema.
func ExecuteSelect(root *doltdb.RootValue, s *sqlparser.Select, query string) ([]row.Row, schema.Schema, error) {
p, schema, err := BuildSelectQueryPipeline(root, s, query)
if err != nil {
return nil, nil, err
}
var rows []row.Row // your boat
rowSink := pipeline.ProcFuncForSinkFunc(
func(r row.Row, props pipeline.ReadableMap) error {
rows = append(rows, r)
return nil
})
var executionErr error
errSink := func(failure *pipeline.TransformRowFailure) (quit bool) {
executionErr = errors.New(fmt.Sprintf("Query execution failed at stage %v for row %v: error was %v",
failure.TransformName, failure.Row, failure.Details))
return true
}
p.SetOutput(rowSink)
p.SetBadRowCallback(errSink)
p.Start()
err = p.Wait()
if err != nil {
return nil, nil, err
}
if executionErr != nil {
return nil, nil, executionErr
}
return rows, schema, nil
}
// BuildSelectQueryPipeline interprets the select statement given, builds a pipeline to execute it, and returns the pipeline
// for the caller to mutate and execute, as well as the schema of the result set. The pipeline will not have any output
// set; one must be assigned before execution.
@@ -65,9 +101,9 @@ func BuildSelectQueryPipeline(root *doltdb.RootValue, s *sqlparser.Select, query
case sqlparser.TableName:
tableName = e.Name.String()
case *sqlparser.Subquery:
quitErr("Subqueries are not supported: %v.", query)
return quitErr("Subqueries are not supported: %v.", query)
default:
quitErr("Unrecognized expression: %v", nodeToString(e))
return quitErr("Unrecognized expression: %v", nodeToString(e))
}
case *sqlparser.ParenTableExpr:
return quitErr("Parenthetical table expressions are not supported: %v,", query)
@@ -145,7 +181,7 @@ func processWhereClause(selectStmt *selectStatement, s *sqlparser.Select, query
}
if whereClause.Type != sqlparser.WhereStr {
errors.New(fmt.Sprintf("Having clause not supported: %v", query))
return errors.New(fmt.Sprintf("Having clause not supported: %v", query))
}
switch expr := whereClause.Expr.(type) {
@@ -167,7 +203,7 @@ func processWhereClause(selectStmt *selectStatement, s *sqlparser.Select, query
colName, ok = colExpr.(*sqlparser.ColName)
if !ok {
errors.New(fmt.Sprintf("Only column names and value literals are supported"))
return errors.New(fmt.Sprintf("Only column names and value literals are supported"))
}
colNameStr := colName.Name.String()
@@ -346,7 +382,7 @@ func nodeToString(node sqlparser.SQLNode) string {
// Returns an error with the message specified. Return type includes a nil Pipeline object to conform to the needs of
// BuildSelectQueryPipeline.
func quitErr(fmtMsg string, args ...interface{}) (*pipeline.Pipeline, schema.Schema, error) {
return nil, nil, errors.New(fmt.Sprintf(fmtMsg, args))
return nil, nil, errors.New(fmt.Sprintf(fmtMsg, args...))
}
// createPipeline constructs a pipeline to execute the statement and returns it. The constructed pipeline doesn't have
@@ -367,6 +403,8 @@ func createPipeline(root *doltdb.RootValue, statement *selectStatement) (*pipeli
rdProcFunc := pipeline.ProcFuncForReader(rd)
p := pipeline.NewPartialPipeline(rdProcFunc, transforms)
selTrans.noMoreCallback = func() {p.NoMore()}
p.RunAfter(func() { rd.Close() })
return p, outSchema, nil
@@ -377,7 +415,7 @@ func addColumnMapTransform(statement *selectStatement, tableSch schema.Schema, t
colColl := tableSch.GetAllCols()
if len(statement.colNames) > 0 {
cols := make([]schema.Column, 0, len(statement.colNames)+1)
cols := make([]schema.Column, 0, len(statement.colNames))
for _, name := range statement.colNames {

View File

@@ -0,0 +1,378 @@
package sql
import (
"github.com/attic-labs/noms/go/types"
"github.com/liquidata-inc/ld/dolt/go/cmd/dolt/dtestutils"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/env"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/rowconv"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/table"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/table/untyped"
"github.com/stretchr/testify/assert"
"testing"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/row"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/ld/dolt/go/libraries/doltcore/table/pipeline"
"github.com/xwb1989/sqlparser"
)
const (
idTag = 0
firstTag = 1
lastTag = 2
isMarriedTag = 3
ageTag = 4
emptyTag = 5
)
var testSch = createTestSchema()
var untypedSch = untyped.UntypeSchema(testSch)
var tableName = "people"
func createTestSchema() schema.Schema {
colColl, _ := schema.NewColCollection(
schema.NewColumn("id", idTag, types.IntKind, true, schema.NotNullConstraint{}),
schema.NewColumn("first", firstTag, types.StringKind, false, schema.NotNullConstraint{}),
schema.NewColumn("last", lastTag, types.StringKind, false, schema.NotNullConstraint{}),
schema.NewColumn("is_married", isMarriedTag, types.BoolKind, false),
schema.NewColumn("age", ageTag, types.UintKind, false),
schema.NewColumn("empty", emptyTag, types.IntKind, false),
)
return schema.SchemaFromCols(colColl)
}
func newRow(id int, first, last string, isMarried bool, age int) row.Row {
vals := row.TaggedValues{
idTag: types.Int(id),
firstTag: types.String(first),
lastTag: types.String(last),
isMarriedTag: types.Bool(isMarried),
ageTag: types.Uint(age),
}
return row.New(testSch, vals)
}
var homer = newRow(0, "Homer", "Simpson", true, 40)
var marge = newRow(1, "Marge", "Simpson", true, 38)
var bart = newRow(2, "Bart", "Simpson", false, 10)
var lisa = newRow(3, "Lisa", "Simpson", false, 8)
var moe = newRow(4, "Moe", "Szyslak", false, 48)
var barney = newRow(5, "Barney", "Gumble", false, 40)
// Tests that the basic SelectAndLimit
func Test_selectTransform_limitAndFilter(t *testing.T) {
var noMoreCallbackCalled = false
var noMoreCallback = func() {
noMoreCallbackCalled = true
}
type fields struct {
noMoreCallback func()
filter filterFn
limit int
count int
}
type args struct {
inRow row.Row
props pipeline.ReadableMap
}
tests := []struct {
name string
fields fields
args args
expectedRow []*pipeline.TransformedRowResult
expectedBadRowDetails string
noMoreCalled bool
}{
{
name: "true no limit",
fields: fields{
noMoreCallback: noMoreCallback,
filter: func(r row.Row) (matchesFilter bool) { return true },
limit: -1,
},
args: args{ homer, pipeline.NoProps },
expectedRow: transformedRowResults(homer),
expectedBadRowDetails: "",
},
{
name: "false no limit",
fields: fields{
noMoreCallback: noMoreCallback,
filter: func(r row.Row) (matchesFilter bool) { return false },
limit: -1,
},
args: args{ homer, pipeline.NoProps },
expectedRow: transformedRowResults(),
expectedBadRowDetails: "",
},
{
name: "true limit 1",
fields: fields{
noMoreCallback: noMoreCallback,
filter: func(r row.Row) (matchesFilter bool) { return true },
limit: 1,
count: 1,
},
args: args{ homer, pipeline.NoProps },
expectedRow: transformedRowResults(),
expectedBadRowDetails: "",
noMoreCalled: true,
},
{
name: "false limit 1",
fields: fields{
noMoreCallback: noMoreCallback,
filter: func(r row.Row) (matchesFilter bool) { return false },
limit: 1,
count: 1,
},
args: args{ homer, pipeline.NoProps },
expectedRow: transformedRowResults(),
expectedBadRowDetails: "",
noMoreCalled: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
noMoreCallbackCalled = false
st := &selectTransform{
noMoreCallback: tt.fields.noMoreCallback,
filter: tt.fields.filter,
limit: tt.fields.limit,
count: tt.fields.count,
}
row, badRowDetails := st.limitAndFilter(tt.args.inRow, tt.args.props)
assert.Equal(t, tt.expectedRow, row)
assert.Equal(t, tt.expectedBadRowDetails, badRowDetails)
assert.Equal(t, tt.noMoreCalled, noMoreCallbackCalled)
})
}
}
func transformedRowResults(rows... row.Row) []*pipeline.TransformedRowResult {
var r []*pipeline.TransformedRowResult
for _, v := range rows {
r = append(r, transformedRowWithoutProps(v))
}
return r
}
func transformedRowWithoutProps(r row.Row) *pipeline.TransformedRowResult {
return &pipeline.TransformedRowResult{r, nil}
}
func TestExecuteSelect(t *testing.T) {
tests := []struct {
name string
query string
columns []string
expectedRows []row.Row
expectedSchema schema.Schema
expectedErr bool
}{
{
name: "Test select * ",
query: "select * from people",
expectedRows: rs(homer, marge, bart, lisa, moe, barney),
expectedSchema: untypedSch,
},
{
name: "Test select *, where < int",
query: "select * from people where age < 40",
expectedRows: rs(marge, bart, lisa),
expectedSchema: untypedSch,
},
{
name: "Test select *, where <= int",
query: "select * from people where age <= 40",
expectedRows: rs(homer, marge, bart, lisa, barney),
expectedSchema: untypedSch,
},
{
name: "Test select *, where > int",
query: "select * from people where age > 40",
expectedRows: rs(moe),
expectedSchema: untypedSch,
},
{
name: "Test select *, where >= int",
query: "select * from people where age >= 40",
expectedRows: rs(homer, moe, barney),
expectedSchema: untypedSch,
},
{
name: "Test select subset of cols",
query: "select first,last from people where age >= 40",
columns: []string { "first", "last" },
expectedRows: rs(homer, moe, barney),
expectedSchema: subsetSchema(untypedSch, "first", "last"),
},
{
name: "Test select *, not equals",
query: "select * from people where age <> 40",
expectedRows: rs(marge, bart, lisa, moe),
expectedSchema: untypedSch,
},
{
name: "Test empty result set",
query: "select * from people where age > 80",
expectedRows: rs(),
expectedSchema: untypedSch,
},
{
name: "Test empty result set with columns",
query: "select id, age from people where age > 80",
expectedRows: rs(),
expectedSchema: subsetSchema(untypedSch, "id", "age"),
},
{
name: "Test unsupported comparison",
query: "select * from people where first in ('homer')",
expectedRows: nil, // not the same as empty result set
expectedErr: true,
},
}
for _, tt := range tests {
dEnv := dtestutils.CreateTestEnv()
createTestDatabase(dEnv, t)
root, _ := dEnv.WorkingRoot()
sqlStatement, _ := sqlparser.Parse(tt.query)
s := sqlStatement.(*sqlparser.Select)
t.Run(tt.name, func(t *testing.T) {
rows, sch, err := ExecuteSelect(root, s, tt.query)
untypedRows := untypeRows(t, tt.expectedRows, tt.columns, testSch)
assert.Equal(t, tt.expectedErr, err != nil)
assert.Equal(t, untypedRows, rows)
assert.Equal(t, tt.expectedSchema, sch)
})
}
}
func TestBuildSelectQueryPipeline(t *testing.T) {
tests := []struct {
name string
query string
expectedSchema schema.Schema
expectedNumRows int
}{
{
name: "Test select * ",
query: "select * from people",
expectedNumRows: len([]row.Row{homer, marge, bart, lisa, moe, barney}),
expectedSchema: untypedSch,
},
{
name: "Test select columns ",
query: "select age, id from people",
expectedNumRows: len([]row.Row{homer, marge, bart, lisa, moe, barney}),
expectedSchema: subsetSchema(untypedSch, "age", "id"),
},
}
for _, tt := range tests {
dEnv := dtestutils.CreateTestEnv()
createTestDatabase(dEnv, t)
root, _ := dEnv.WorkingRoot()
sqlStatement, _ := sqlparser.Parse(tt.query)
s := sqlStatement.(*sqlparser.Select)
t.Run(tt.name, func(t *testing.T) {
p, sch, _ := BuildSelectQueryPipeline(root, s, tt.query)
var outputRows int
p.SetOutput(pipeline.ProcFuncForSinkFunc(
func(r row.Row, props pipeline.ReadableMap) error {
outputRows++
return nil
}))
p.SetBadRowCallback(func(*pipeline.TransformRowFailure) (quit bool) {
return true
})
p.Start()
p.Wait()
assert.Equal(t, tt.expectedNumRows, outputRows)
assert.Equal(t, tt.expectedSchema, sch)
})
}
}
func rs(rows... row.Row) []row.Row {
return rows
}
// Returns a subset of the schema given
func subsetSchema(sch schema.Schema, colNames ...string) schema.Schema {
colColl := sch.GetAllCols()
if len(colNames) > 0 {
cols := make([]schema.Column, 0, len(colNames))
for _, name := range colNames {
if col, ok := colColl.GetByName(name); !ok {
panic("Unrecognized name")
} else {
cols = append(cols, col)
}
}
colColl, _ = schema.NewColCollection(cols...)
}
return schema.SchemaFromCols(colColl)
}
// Converts the rows given, having the schema given, to an untyped (string-typed) row. Only the column names specified
// will be included.
func untypeRows(t *testing.T, rows []row.Row, colNames []string, tableSch schema.Schema) []row.Row {
// Zero typing make the nil slice and the empty slice appear equal to most functions, but they are semantically
// distinct.
if rows == nil {
return nil
}
untyped := make([]row.Row, 0, len(rows))
for _, r := range rows {
untyped = append(untyped, untypeRow(t, r, colNames, tableSch))
}
return untyped
}
// Converts the row given, having the schema given, to an untyped (string-typed) row. Only the column names specified
// will be included.
func untypeRow(t *testing.T, r row.Row, colNames []string, tableSch schema.Schema) row.Row {
outSch := subsetSchema(tableSch, colNames...)
mapping, err := rowconv.TagMapping(tableSch, untyped.UntypeSchema(outSch))
assert.Nil(t, err, "failed to create untyped mapping")
rConv, _ := rowconv.NewRowConverter(mapping)
untyped, err := rConv.Convert(r)
assert.Nil(t, err, "failed to untyped row to untyped")
return untyped
}
func createTestDatabase(dEnv *env.DoltEnv, t *testing.T) {
imt := table.NewInMemTable(testSch)
for _, r := range []row.Row{homer, marge, bart, lisa, moe, barney} {
imt.AppendRow(r)
}
rd := table.NewInMemTableReader(imt)
wr := noms.NewNomsMapCreator(dEnv.DoltDB.ValueReadWriter(), testSch)
_, _, err := table.PipeRows(rd, wr, false)
rd.Close()
wr.Close()
assert.Nil(t, err, "Failed to seed initial data")
err = dEnv.PutTableToWorking(*wr.GetMap(), wr.GetSchema(), tableName)
assert.Nil(t, err,"Unable to put initial value of table in in mem noms db")
}

View File

@@ -54,4 +54,4 @@ type RowWithProps struct {
// NewRowWithProps creates a RowWith props from a row and a map of properties
func NewRowWithProps(r row.Row, props map[string]interface{}) RowWithProps {
return RowWithProps{r, ImmutableProperties{props}}
}
}