implementing projection pushdowns

This commit is contained in:
Andy Arthur
2022-07-05 15:21:59 -07:00
parent 8299c204ff
commit 928ba2756d
5 changed files with 169 additions and 70 deletions

View File

@@ -84,29 +84,92 @@ func TestSingleScript(t *testing.T) {
var scripts = []queries.ScriptTest{
{
Name: "new table",
Name: "Group Concat Queries",
SetUpScript: []string{
"create table t1 (a int primary key, b int)",
"insert into t1 values (1,2)",
"CREATE TABLE x (pk int)",
"INSERT INTO x VALUES (1),(2),(3),(4),(NULL)",
"create table t (o_id int, attribute longtext, value longtext)",
"INSERT INTO t VALUES (2, 'color', 'red'), (2, 'fabric', 'silk')",
"INSERT INTO t VALUES (3, 'color', 'green'), (3, 'shape', 'square')",
"create table nulls(pk int)",
"INSERT INTO nulls VALUES (NULL)",
},
Assertions: []queries.ScriptTestAssertion{
{
Query: "select to_a, to_b, from_commit, to_commit, diff_type from dolt_diff('t1', 'HEAD', 'WORKING')",
Expected: []sql.Row{{1, 2, "HEAD", "WORKING", "added"}},
Query: `SELECT group_concat(pk ORDER BY pk) FROM x;`,
Expected: []sql.Row{{"1,2,3,4"}},
},
{
Query: "select to_a, from_b, from_commit, to_commit, diff_type from dolt_diff('t1', 'HEAD', 'WORKING')",
ExpectedErr: sql.ErrColumnNotFound,
Query: `SELECT group_concat(DISTINCT pk ORDER BY pk) FROM x;`,
Expected: []sql.Row{{"1,2,3,4"}},
},
{
Query: "select from_a, from_b, from_commit, to_commit, diff_type from dolt_diff('t1', 'WORKING', 'HEAD')",
Expected: []sql.Row{{1, 2, "WORKING", "HEAD", "removed"}},
Query: `SELECT group_concat(DISTINCT pk ORDER BY pk SEPARATOR '-') FROM x;`,
Expected: []sql.Row{{"1-2-3-4"}},
},
{
Query: "SELECT group_concat(`attribute` ORDER BY `attribute`) FROM t group by o_id order by o_id asc",
Expected: []sql.Row{{"color,fabric"}, {"color,shape"}},
},
{
Query: "SELECT group_concat(DISTINCT `attribute` ORDER BY value DESC SEPARATOR ';') FROM t group by o_id order by o_id asc",
Expected: []sql.Row{{"fabric;color"}, {"shape;color"}},
},
{
Query: "SELECT group_concat(DISTINCT `attribute` ORDER BY `attribute`) FROM t",
Expected: []sql.Row{{"color,fabric,shape"}},
},
{
Query: "SELECT group_concat(`attribute` ORDER BY `attribute`) FROM t",
Expected: []sql.Row{{"color,color,fabric,shape"}},
},
{
Query: `SELECT group_concat((SELECT 2)) FROM x;`,
Expected: []sql.Row{{"2,2,2,2,2"}},
},
{
Query: `SELECT group_concat(DISTINCT (SELECT 2)) FROM x;`,
Expected: []sql.Row{{"2"}},
},
{
Query: "SELECT group_concat(DISTINCT `attribute` ORDER BY `attribute` ASC) FROM t",
Expected: []sql.Row{{"color,fabric,shape"}},
},
{
Query: "SELECT group_concat(DISTINCT `attribute` ORDER BY `attribute` DESC) FROM t",
Expected: []sql.Row{{"shape,fabric,color"}},
},
{
Query: `SELECT group_concat(pk) FROM nulls`,
Expected: []sql.Row{{nil}},
},
{
Query: `SELECT group_concat((SELECT * FROM t LIMIT 1)) from t`,
ExpectedErr: sql.ErrInvalidOperandColumns,
},
{
Query: `SELECT group_concat((SELECT * FROM x)) from t`,
ExpectedErr: sql.ErrExpectedSingleRow,
},
{
Query: "SELECT group_concat(`attribute`) FROM t where o_id=2 order by attribute",
Expected: []sql.Row{{"color,fabric"}},
},
{
Query: "SELECT group_concat(DISTINCT `attribute` ORDER BY value DESC SEPARATOR ';') FROM t group by o_id order by o_id asc",
Expected: []sql.Row{{"fabric;color"}, {"shape;color"}},
},
{
Query: "SELECT group_concat(o_id) FROM t WHERE `attribute`='color' order by o_id",
Expected: []sql.Row{{"2,3"}},
},
},
},
}
harness := newDoltHarness(t)
harness := newDoltHarness(t).WithParallelism(1)
harness.Setup(setup.MydbData)
for _, test := range scripts {
enginetest.TestScript(t, harness, test)

View File

@@ -60,18 +60,20 @@ func RowIterForIndexLookup(ctx *sql.Context, t DoltTableable, ilu sql.IndexLooku
}
}
func RowIterForProllyRange(ctx *sql.Context, idx DoltIndex, r prolly.Range, pkSch sql.PrimaryKeySchema, columns []string, durableState *durableIndexState) (sql.RowIter2, error) {
func RowIterForProllyRange(ctx *sql.Context, idx DoltIndex, r prolly.Range, pkSch sql.PrimaryKeySchema, projections []string, durableState *durableIndexState) (sql.RowIter2, error) {
if projections == nil {
projections = idx.Schema().GetAllCols().GetColumnNames()
}
if sql.IsKeyless(pkSch.Schema) {
// in order to resolve row cardinality, keyless indexes must always perform
// an indirect lookup through the clustered index.
return newProllyKeylessIndexIter(ctx, idx, r, pkSch, durableState.Primary, durableState.Secondary)
return newProllyKeylessIndexIter(ctx, idx, r, pkSch, projections, durableState.Primary, durableState.Secondary)
}
covers := idx.coversColumns(durableState, columns)
covers := idx.coversColumns(durableState, projections)
if covers {
return newProllyCoveringIndexIter(ctx, idx, r, pkSch, durableState.Secondary)
return newProllyCoveringIndexIter(ctx, idx, r, pkSch, projections, durableState.Secondary)
}
return newProllyIndexIter(ctx, idx, r, pkSch, durableState.Primary, durableState.Secondary)
return newProllyIndexIter(ctx, idx, r, pkSch, projections, durableState.Primary, durableState.Secondary)
}
func RowIterForNomsRanges(ctx *sql.Context, idx DoltIndex, ranges []*noms.ReadRange, columns []string, durableState *durableIndexState) (sql.RowIter, error) {

View File

@@ -52,7 +52,14 @@ var _ sql.RowIter = prollyIndexIter{}
var _ sql.RowIter2 = prollyIndexIter{}
// NewProllyIndexIter returns a new prollyIndexIter.
func newProllyIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Range, pkSch sql.PrimaryKeySchema, dprimary, dsecondary durable.Index) (prollyIndexIter, error) {
func newProllyIndexIter(
ctx *sql.Context,
idx DoltIndex,
rng prolly.Range,
pkSch sql.PrimaryKeySchema,
projections []string,
dprimary, dsecondary durable.Index,
) (prollyIndexIter, error) {
secondary := durable.ProllyMapFromIndex(dsecondary)
indexIter, err := secondary.IterRange(ctx, rng)
if err != nil {
@@ -64,7 +71,7 @@ func newProllyIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Range, pkSch
pkBld := val.NewTupleBuilder(kd)
pkMap := ordinalMappingFromIndex(idx)
sch := idx.Schema()
km, vm := projectionMappings(sch, sch.GetAllCols().GetColumnNames())
km, vm := projectionMappings(sch, projections)
eg, c := errgroup.WithContext(ctx)
@@ -216,7 +223,14 @@ type prollyCoveringIndexIter struct {
var _ sql.RowIter = prollyCoveringIndexIter{}
var _ sql.RowIter2 = prollyCoveringIndexIter{}
func newProllyCoveringIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Range, pkSch sql.PrimaryKeySchema, indexdata durable.Index) (prollyCoveringIndexIter, error) {
func newProllyCoveringIndexIter(
ctx *sql.Context,
idx DoltIndex,
rng prolly.Range,
pkSch sql.PrimaryKeySchema,
projections []string,
indexdata durable.Index,
) (prollyCoveringIndexIter, error) {
secondary := durable.ProllyMapFromIndex(indexdata)
indexIter, err := secondary.IterRange(ctx, rng)
if err != nil {
@@ -226,12 +240,12 @@ func newProllyCoveringIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Rang
var keyMap, valMap val.OrdinalMapping
if idx.IsPrimaryKey() {
keyMap, valMap = primaryIndexMapping(idx, pkSch)
keyMap, valMap = primaryIndexMapping(idx, pkSch, projections)
} else {
keyMap = coveringIndexMapping(idx)
keyMap = coveringIndexMapping(idx, projections)
}
iter := prollyCoveringIndexIter{
return prollyCoveringIndexIter{
idx: idx,
indexIter: indexIter,
keyDesc: keyDesc,
@@ -240,9 +254,7 @@ func newProllyCoveringIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Rang
valMap: valMap,
sqlSch: pkSch.Schema,
ns: secondary.NodeStore(),
}
return iter, nil
}, nil
}
// Next returns the next row from the iterator.
@@ -328,14 +340,14 @@ func (p prollyCoveringIndexIter) Close(*sql.Context) error {
return nil
}
func coveringIndexMapping(idx DoltIndex) (keyMap val.OrdinalMapping) {
func coveringIndexMapping(idx DoltIndex, projections []string) (keyMap val.OrdinalMapping) {
allCols := idx.Schema().GetAllCols()
idxCols := idx.IndexSchema().GetAllCols()
keyMap = make(val.OrdinalMapping, allCols.Size())
for i, col := range allCols.GetColumns() {
j, ok := idxCols.TagToIdx[col.Tag]
if ok {
if ok && contains(projections, col.Name) {
keyMap[i] = j
} else {
keyMap[i] = -1
@@ -344,17 +356,23 @@ func coveringIndexMapping(idx DoltIndex) (keyMap val.OrdinalMapping) {
return
}
func primaryIndexMapping(idx DoltIndex, pkSch sql.PrimaryKeySchema) (keyMap, valMap val.OrdinalMapping) {
func primaryIndexMapping(idx DoltIndex, pkSch sql.PrimaryKeySchema, projections []string) (km, vm val.OrdinalMapping) {
sch := idx.Schema()
keyMap = make(val.OrdinalMapping, len(pkSch.Schema))
km = make(val.OrdinalMapping, len(pkSch.Schema))
for i, col := range pkSch.Schema {
// if |col.Name| not found, IndexOf returns -1
keyMap[i] = sch.GetPKCols().IndexOf(col.Name)
km[i] = -1
if contains(projections, col.Name) {
// if |col.Name| not found, IndexOf returns -1
km[i] = sch.GetPKCols().IndexOf(col.Name)
}
}
valMap = make(val.OrdinalMapping, len(pkSch.Schema))
vm = make(val.OrdinalMapping, len(pkSch.Schema))
for i, col := range pkSch.Schema {
// if |col.Name| not found, IndexOf returns -1
valMap[i] = sch.GetNonPKCols().IndexOf(col.Name)
vm[i] = -1
if contains(projections, col.Name) {
// if |col.Name| not found, IndexOf returns -1
vm[i] = sch.GetNonPKCols().IndexOf(col.Name)
}
}
return
}
@@ -382,7 +400,14 @@ type prollyKeylessIndexIter struct {
var _ sql.RowIter = prollyKeylessIndexIter{}
var _ sql.RowIter2 = prollyKeylessIndexIter{}
func newProllyKeylessIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Range, pkSch sql.PrimaryKeySchema, rows, dsecondary durable.Index) (prollyKeylessIndexIter, error) {
func newProllyKeylessIndexIter(
ctx *sql.Context,
idx DoltIndex,
rng prolly.Range,
pkSch sql.PrimaryKeySchema,
projections []string,
rows, dsecondary durable.Index,
) (prollyKeylessIndexIter, error) {
secondary := durable.ProllyMapFromIndex(dsecondary)
indexIter, err := secondary.IterRange(ctx, rng)
if err != nil {
@@ -394,7 +419,7 @@ func newProllyKeylessIndexIter(ctx *sql.Context, idx DoltIndex, rng prolly.Range
indexMap := ordinalMappingFromIndex(idx)
keyBld := val.NewTupleBuilder(keyDesc)
sch := idx.Schema()
_, vm := projectionMappings(sch, sch.GetAllCols().GetColumnNames())
_, vm := projectionMappings(sch, projections)
eg, c := errgroup.WithContext(ctx)

View File

@@ -64,16 +64,12 @@ type prollyRowIter struct {
var _ sql.RowIter = prollyRowIter{}
var _ sql.RowIter2 = prollyRowIter{}
func NewProllyRowIter(sch schema.Schema, schSch sql.Schema, rows prolly.Map, iter prolly.MapIter, projections []string) (sql.RowIter, error) {
func NewProllyRowIter(sch schema.Schema, sqlSch sql.Schema, rows prolly.Map, iter prolly.MapIter, projections []string) (sql.RowIter, error) {
if len(projections) == 0 {
projections = sch.GetAllCols().GetColumnNames()
}
// todo(andy): NomsRangeReader seemingly ignores projections
//if projections == nil {
// projections = sch.GetAllCols().GetColumnNames()
//}
projections = sch.GetAllCols().GetColumnNames()
keyProj, valProj := projectionMappings(sch, projections)
kd, vd := rows.Descriptors()
if schema.IsKeyless(sch) {
@@ -81,54 +77,51 @@ func NewProllyRowIter(sch schema.Schema, schSch sql.Schema, rows prolly.Map, ite
iter: iter,
valDesc: vd,
valProj: valProj,
rowLen: len(projections),
rowLen: len(sqlSch),
ns: rows.NodeStore(),
}, nil
}
return prollyRowIter{
iter: iter,
sqlSch: schSch,
sqlSch: sqlSch,
keyDesc: kd,
valDesc: vd,
keyProj: keyProj,
valProj: valProj,
rowLen: len(projections),
rowLen: sch.GetAllCols().Size(),
ns: rows.NodeStore(),
}, nil
}
func projectionMappings(sch schema.Schema, projs []string) (keyMap, valMap val.OrdinalMapping) {
func projectionMappings(sch schema.Schema, projections []string) (keyMap, valMap val.OrdinalMapping) {
keyMap = make(val.OrdinalMapping, sch.GetPKCols().Size())
for idx := range keyMap {
keyMap[idx] = -1
idxCol := sch.GetPKCols().GetAtIndex(idx)
for j, proj := range projs {
if strings.ToLower(idxCol.Name) == strings.ToLower(proj) {
keyMap[idx] = j
break
}
}
}
valMap = make(val.OrdinalMapping, sch.GetNonPKCols().Size())
for idx := range valMap {
valMap[idx] = -1
idxCol := sch.GetNonPKCols().GetAtIndex(idx)
for j, proj := range projs {
if strings.ToLower(idxCol.Name) == strings.ToLower(proj) {
valMap[idx] = j
break
}
}
for i := range keyMap {
keyMap[i] = -1
}
for i := range valMap {
valMap[i] = -1
}
all := sch.GetAllCols()
for i := range keyMap {
col := sch.GetPKCols().GetAtIndex(i)
if contains(projections, col.Name) {
keyMap[i] = all.IndexOf(col.Name)
}
}
for i := range valMap {
col := sch.GetNonPKCols().GetAtIndex(i)
if contains(projections, col.Name) {
valMap[i] = all.IndexOf(col.Name)
}
}
if schema.IsKeyless(sch) {
skip := val.OrdinalMapping{-1}
keyMap = append(skip, keyMap...) // hashId
valMap = append(skip, valMap...) // cardinality
}
return
}
@@ -253,3 +246,12 @@ func (it *prollyKeylessIter) nextTuple(ctx *sql.Context) error {
func (it *prollyKeylessIter) Close(ctx *sql.Context) error {
return nil
}
func contains(slice []string, str string) (ok bool) {
for _, x := range slice {
if strings.ToLower(x) == strings.ToLower(str) {
ok = true
}
}
return
}

View File

@@ -60,6 +60,13 @@ func BenchmarkOltpPointSelect(b *testing.B) {
})
}
func BenchmarkProjectionAggregation(b *testing.B) {
benchmarkSysbenchQuery(b, func(int) string {
q := "SELECT c, count(id) FROM sbtest1 WHERE k > %d GROUP BY c ORDER BY c"
return fmt.Sprintf(q, rand.Intn(tableSize))
})
}
func BenchmarkSelectRandomPoints(b *testing.B) {
benchmarkSysbenchQuery(b, func(int) string {
var sb strings.Builder
@@ -78,7 +85,7 @@ func BenchmarkSelectRandomPoints(b *testing.B) {
func benchmarkSysbenchQuery(b *testing.B, getQuery func(int) string) {
ctx, eng := setupBenchmark(b, dEnv)
for i := 0; i < b.N; i++ {
_, iter, err := commands.ProcessQuery(ctx, getQuery(i), eng)
_, iter, err := eng.Query(ctx, getQuery(i))
require.NoError(b, err)
for {
if _, err = iter.Next(ctx); err != nil {