Prune table scans columns (#3903)

* starter

* fix projMappings

* add some row iter proj

* other noms row converters

* store_test.go

* GMS bump

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* iter changes

* more bug fixes

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* cleanup

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* GMS bump

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* merged GMS bump

* fix keyless row iter

Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
Maximilian Hoffman
2022-07-28 16:23:26 -07:00
committed by GitHub
parent c0e3d71648
commit 487ec00ce5
15 changed files with 280 additions and 194 deletions

View File

@@ -58,7 +58,7 @@ require (
)
require (
github.com/dolthub/go-mysql-server v0.12.1-0.20220728170427-82a31eff75d6
github.com/dolthub/go-mysql-server v0.12.1-0.20220728202257-cd27ab81b097
github.com/google/flatbuffers v2.0.6+incompatible
github.com/gosuri/uilive v0.0.4
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6

View File

@@ -173,8 +173,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.12.1-0.20220728170427-82a31eff75d6 h1:txQTQU3kexgjfEFM4nVoBuFWkoAtFML78CTPJA5jrnE=
github.com/dolthub/go-mysql-server v0.12.1-0.20220728170427-82a31eff75d6/go.mod h1:JgB3WpY0RMgyAda3YG5VHVncH2B8i1N9Mx9LOp41lIs=
github.com/dolthub/go-mysql-server v0.12.1-0.20220728202257-cd27ab81b097 h1:pywBH+t2pdsVB27E67N2rAAdhv/QtGl5VK9G2aqKLrg=
github.com/dolthub/go-mysql-server v0.12.1-0.20220728202257-cd27ab81b097/go.mod h1:JgB3WpY0RMgyAda3YG5VHVncH2B8i1N9Mx9LOp41lIs=
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371 h1:oyPHJlzumKta1vnOQqUnfdz+pk3EmnHS3Nd0cCT0I2g=
github.com/dolthub/ishell v0.0.0-20220112232610-14e753f0f371/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=

View File

@@ -625,20 +625,19 @@ var HistorySystemTableScriptTests = []queries.ScriptTest{
{
Query: "explain select pk, c from dolt_history_t1 where pk = 3",
Expected: []sql.Row{
{"Project(dolt_history_t1.pk, dolt_history_t1.c)"},
{" └─ Filter(dolt_history_t1.pk = 3)"},
{" └─ Projected table access on [pk c]"},
{" └─ Exchange"},
{" └─ IndexedTableAccess(dolt_history_t1 on [dolt_history_t1.pk] with ranges: [{[3, 3]}])"},
{"Projected table access on [pk c]"},
{" └─ Exchange"},
{" └─ Filter(dolt_history_t1.pk = 3)"},
{" └─ IndexedTableAccess(dolt_history_t1 on [dolt_history_t1.pk] with ranges: [{[3, 3]}])"},
},
},
{
Query: "explain select pk, c from dolt_history_t1 where pk = 3 and committer = 'someguy'",
Expected: []sql.Row{
{"Project(dolt_history_t1.pk, dolt_history_t1.c)"},
{" └─ Filter((dolt_history_t1.pk = 3) AND (dolt_history_t1.committer = 'someguy'))"},
{" └─ Projected table access on [pk c committer]"},
{" └─ Exchange"},
{" └─ Projected table access on [pk c committer]"},
{" └─ Exchange"},
{" └─ Filter((dolt_history_t1.pk = 3) AND (dolt_history_t1.committer = 'someguy'))"},
{" └─ IndexedTableAccess(dolt_history_t1 on [dolt_history_t1.pk] with ranges: [{[3, 3]}])"},
},
},
@@ -691,20 +690,19 @@ var HistorySystemTableScriptTests = []queries.ScriptTest{
{
Query: "explain select pk, c from dolt_history_t1 where c = 4",
Expected: []sql.Row{
{"Project(dolt_history_t1.pk, dolt_history_t1.c)"},
{" └─ Filter(dolt_history_t1.c = 4)"},
{" └─ Projected table access on [pk c]"},
{" └─ Exchange"},
{" └─ IndexedTableAccess(dolt_history_t1 on [dolt_history_t1.c] with ranges: [{[4, 4]}])"},
{"Projected table access on [pk c]"},
{" └─ Exchange"},
{" └─ Filter(dolt_history_t1.c = 4)"},
{" └─ IndexedTableAccess(dolt_history_t1 on [dolt_history_t1.c] with ranges: [{[4, 4]}])"},
},
},
{
Query: "explain select pk, c from dolt_history_t1 where c = 10 and committer = 'someguy'",
Expected: []sql.Row{
{"Project(dolt_history_t1.pk, dolt_history_t1.c)"},
{" └─ Filter((dolt_history_t1.c = 10) AND (dolt_history_t1.committer = 'someguy'))"},
{" └─ Projected table access on [pk c committer]"},
{" └─ Exchange"},
{" └─ Projected table access on [pk c committer]"},
{" └─ Exchange"},
{" └─ Filter((dolt_history_t1.c = 10) AND (dolt_history_t1.committer = 'someguy'))"},
{" └─ IndexedTableAccess(dolt_history_t1 on [dolt_history_t1.c] with ranges: [{[10, 10]}])"}},
},
},

View File

@@ -272,7 +272,40 @@ func (ht *HistoryTable) String() string {
// Schema returns the schema for the history table
func (ht *HistoryTable) Schema() sql.Schema {
return historyTableSchema(ht.Name(), ht.doltTable)
sch := historyTableSchema(ht.Name(), ht.doltTable)
if len(ht.projectedCols) == 0 {
return sch
}
projectedSch := make(sql.Schema, len(ht.projectedCols))
allCols := ht.doltTable.sch.GetAllCols()
for i, t := range ht.projectedCols {
if col, ok := allCols.TagToCol[t]; ok {
idx := sch.IndexOfColName(col.Name)
projectedSch[i] = sch[idx]
} else if t == schema.HistoryCommitterTag {
projectedSch[i] = &sql.Column{
Name: CommitterCol,
Source: ht.Name(),
Type: CommitterColType,
}
} else if t == schema.HistoryCommitHashTag {
projectedSch[i] = &sql.Column{
Name: CommitHashCol,
Source: ht.Name(),
Type: CommitHashColType,
}
} else if t == schema.HistoryCommitDateTag {
projectedSch[i] = &sql.Column{
Name: CommitDateCol,
Source: ht.Name(),
Type: sql.Datetime,
}
} else {
panic("column not found")
}
}
return projectedSch
}
// Partitions returns a PartitionIter which will be used in getting partitions each of which is used to create RowIter.
@@ -284,7 +317,7 @@ func (ht *HistoryTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error)
func (ht *HistoryTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
cp := part.(*commitPartition)
return newRowItrForTableAtCommit(ctx, ht.Name(), ht.doltTable, cp.h, cp.cm, ht.indexLookup)
return newRowItrForTableAtCommit(ctx, ht.Name(), ht.doltTable, cp.h, cp.cm, ht.indexLookup, ht.projectedCols)
}
// commitPartition is a single commit
@@ -327,8 +360,8 @@ type historyIter struct {
nonExistentTable bool
}
func newRowItrForTableAtCommit(ctx *sql.Context, tableName string, table *DoltTable, h hash.Hash, cm *doltdb.Commit, lookup sql.IndexLookup) (*historyIter, error) {
targetSchema := historyTableSchema(tableName, table)
func newRowItrForTableAtCommit(ctx *sql.Context, tableName string, table *DoltTable, h hash.Hash, cm *doltdb.Commit, lookup sql.IndexLookup, projections []uint64) (*historyIter, error) {
targetSchema := table.Schema().Copy()
root, err := cm.GetRootValue(ctx)
if err != nil {
@@ -380,7 +413,7 @@ func newRowItrForTableAtCommit(ctx *sql.Context, tableName string, table *DoltTa
return nil, err
}
converter := rowConverter(sqlTable.Schema(), targetSchema, h, meta)
converter := rowConverter(sqlTable.Schema(), targetSchema, h, meta, projections)
return &historyIter{
table: sqlTable,
@@ -426,9 +459,9 @@ func (i *historyIter) Close(ctx *sql.Context) error {
return nil
}
func rowConverter(srcSchema, targetSchema sql.Schema, h hash.Hash, meta *datas.CommitMeta) func(row sql.Row) sql.Row {
func rowConverter(srcSchema, targetSchema sql.Schema, h hash.Hash, meta *datas.CommitMeta, projections []uint64) func(row sql.Row) sql.Row {
srcToTarget := make(map[int]int)
for i, col := range targetSchema[:len(targetSchema)-3] {
for i, col := range targetSchema {
srcIdx := srcSchema.IndexOfColName(col.Name)
if srcIdx >= 0 {
// only add a conversion if the type is the same
@@ -440,17 +473,22 @@ func rowConverter(srcSchema, targetSchema sql.Schema, h hash.Hash, meta *datas.C
}
return func(row sql.Row) sql.Row {
r := make(sql.Row, len(targetSchema))
for i := range row {
if idx, ok := srcToTarget[i]; ok {
r[idx] = row[i]
r := make(sql.Row, len(projections))
for i, t := range projections {
switch t {
case schema.HistoryCommitterTag:
r[i] = meta.Name
case schema.HistoryCommitDateTag:
r[i] = meta.Time()
case schema.HistoryCommitHashTag:
r[i] = h.String()
default:
if j, ok := srcToTarget[i]; ok {
r[j] = row[i]
}
}
i++
}
r[len(targetSchema)-3] = h.String()
r[len(targetSchema)-2] = meta.Name
r[len(targetSchema)-1] = meta.Time()
return r
}
}

View File

@@ -49,7 +49,7 @@ type DoltIndex interface {
getDurableState(*sql.Context, DoltTableable) (*durableIndexState, error)
coversColumns(s *durableIndexState, columns []uint64) bool
sqlRowConverter(s *durableIndexState) *KVToSqlRowConverter
sqlRowConverter(*durableIndexState, []uint64) *KVToSqlRowConverter
lookupTags(s *durableIndexState) map[uint64]int
}
@@ -260,6 +260,7 @@ type durableIndexState struct {
coversAllCols uint32
cachedLookupTags atomic.Value
cachedSqlRowConverter atomic.Value
cachedProjections atomic.Value
}
func (s *durableIndexState) coversAllColumns(i *doltIndex) bool {
@@ -317,13 +318,29 @@ func (s *durableIndexState) lookupTags(i *doltIndex) map[uint64]int {
return cached.(map[uint64]int)
}
func (s *durableIndexState) sqlRowConverter(i *doltIndex) *KVToSqlRowConverter {
cached := s.cachedSqlRowConverter.Load()
if cached == nil {
cached = NewKVToSqlRowConverterForCols(i.Format(), i.Schema())
s.cachedSqlRowConverter.Store(cached)
func projectionsEqual(x, y []uint64) bool {
if len(x) != len(y) {
return false
}
return cached.(*KVToSqlRowConverter)
var i, j int
for i < len(x) && j < len(y) {
if x[i] != y[j] {
return false
}
i++
j++
}
return true
}
func (s *durableIndexState) sqlRowConverter(i *doltIndex, proj []uint64) *KVToSqlRowConverter {
cachedProjections := s.cachedProjections.Load()
cachedConverter := s.cachedSqlRowConverter.Load()
if cachedConverter == nil || !projectionsEqual(proj, cachedProjections.([]uint64)) {
cachedConverter = NewKVToSqlRowConverterForCols(i.Format(), i.Schema(), proj)
s.cachedSqlRowConverter.Store(cachedConverter)
s.cachedProjections.Store(proj)
}
return cachedConverter.(*KVToSqlRowConverter)
}
type cachedDurableIndexes struct {
@@ -575,8 +592,8 @@ RangeLoop:
}, nil
}
func (di *doltIndex) sqlRowConverter(s *durableIndexState) *KVToSqlRowConverter {
return s.sqlRowConverter(di)
func (di *doltIndex) sqlRowConverter(s *durableIndexState, columns []uint64) *KVToSqlRowConverter {
return s.sqlRowConverter(di, columns)
}
func (di *doltIndex) lookupTags(s *durableIndexState) map[uint64]int {

View File

@@ -82,15 +82,25 @@ func getValLocations(tagToSqlColIdx map[uint64]int, cols []schema.Column) (int,
}
// NewKVToSqlRowConverterForCols returns a KVToSqlConverter instance based on the list of columns passed in
func NewKVToSqlRowConverterForCols(nbf *types.NomsBinFormat, sch schema.Schema) *KVToSqlRowConverter {
cols := sch.GetAllCols().GetColumns()
func NewKVToSqlRowConverterForCols(nbf *types.NomsBinFormat, sch schema.Schema, columns []uint64) *KVToSqlRowConverter {
allCols := sch.GetAllCols().GetColumns()
tagToSqlColIdx := make(map[uint64]int)
for i, col := range cols {
tagToSqlColIdx[col.Tag] = i
var outCols []schema.Column
if len(columns) > 0 {
outCols = make([]schema.Column, len(columns))
for i, tag := range columns {
schIdx := sch.GetAllCols().TagToIdx[tag]
outCols[i] = allCols[schIdx]
tagToSqlColIdx[tag] = i
}
} else {
outCols = allCols
for i, col := range allCols {
tagToSqlColIdx[col.Tag] = i
}
}
return NewKVToSqlRowConverter(nbf, tagToSqlColIdx, cols, len(cols))
return NewKVToSqlRowConverter(nbf, tagToSqlColIdx, outCols, len(outCols))
}
// ConvertKVToSqlRow returns a sql.Row generated from the key and value provided.

View File

@@ -79,6 +79,9 @@ func RowIterForProllyRange(ctx *sql.Context, idx DoltIndex, r prolly.Range, pkSc
}
func RowIterForNomsRanges(ctx *sql.Context, idx DoltIndex, ranges []*noms.ReadRange, columns []uint64, durableState *durableIndexState) (sql.RowIter, error) {
if len(columns) == 0 {
columns = idx.Schema().GetAllCols().Tags
}
m := durable.NomsMapFromIndex(durableState.Secondary)
nrr := noms.NewNomsRangeReader(idx.IndexSchema(), m, ranges)
@@ -86,7 +89,7 @@ func RowIterForNomsRanges(ctx *sql.Context, idx DoltIndex, ranges []*noms.ReadRa
if covers || idx.ID() == "PRIMARY" {
return NewCoveringIndexRowIterAdapter(ctx, idx, nrr, columns), nil
} else {
return NewIndexLookupRowIterAdapter(ctx, idx, durableState, nrr)
return NewIndexLookupRowIterAdapter(ctx, idx, durableState, nrr, columns)
}
}

View File

@@ -54,7 +54,7 @@ type indexLookupRowIterAdapter struct {
}
// NewIndexLookupRowIterAdapter returns a new indexLookupRowIterAdapter.
func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, durableState *durableIndexState, keyIter nomsKeyIter) (*indexLookupRowIterAdapter, error) {
func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, durableState *durableIndexState, keyIter nomsKeyIter, columns []uint64) (*indexLookupRowIterAdapter, error) {
rows := durable.NomsMapFromIndex(durableState.Primary)
resBuf := resultBufferPool.Get().(*async.RingBuffer)
@@ -66,7 +66,7 @@ func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, durableState
idx: idx,
keyIter: keyIter,
tableRows: rows,
conv: idx.sqlRowConverter(durableState),
conv: idx.sqlRowConverter(durableState, columns),
lookupTags: idx.lookupTags(durableState),
cancelF: cancelF,
resultBuf: resBuf,
@@ -232,7 +232,16 @@ func NewCoveringIndexRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter *no
idxCols := idx.IndexSchema().GetPKCols()
tblPKCols := idx.Schema().GetPKCols()
sch := idx.Schema()
cols := sch.GetAllCols().GetColumns()
allCols := sch.GetAllCols().GetColumns()
cols := make([]schema.Column, 0)
for _, col := range allCols {
for _, tag := range resultCols {
if col.Tag == tag {
cols = append(cols, col)
}
}
}
tagToSqlColIdx := make(map[uint64]int)
isPrimaryKeyIdx := idx.ID() == "PRIMARY"

View File

@@ -46,7 +46,9 @@ type prollyIndexIter struct {
// keyMap and valMap transform tuples from
// primary row storage into sql.Row's
keyMap, valMap val.OrdinalMapping
sqlSch sql.Schema
//ordMap are output ordinals for |keyMap| and |valMap|
ordMap val.OrdinalMapping
sqlSch sql.Schema
}
var _ sql.RowIter = prollyIndexIter{}
@@ -71,7 +73,7 @@ func newProllyIndexIter(
kd, _ := primary.Descriptors()
pkBld := val.NewTupleBuilder(kd)
pkMap := ordinalMappingFromIndex(idx)
km, vm := projectionMappings(idx.Schema(), projections)
keyProj, valProj, ordProj := projectionMappings(idx.Schema(), projections)
eg, c := errgroup.WithContext(ctx)
@@ -83,8 +85,9 @@ func newProllyIndexIter(
pkMap: pkMap,
eg: eg,
rowChan: make(chan sql.Row, indexLookupBufSize),
keyMap: km,
valMap: vm,
keyMap: keyProj,
valMap: valProj,
ordMap: ordProj,
sqlSch: pkSch.Schema,
}
@@ -147,20 +150,17 @@ func (p prollyIndexIter) queueRows(ctx context.Context) error {
func (p prollyIndexIter) rowFromTuples(ctx context.Context, key, value val.Tuple, r sql.Row) (err error) {
keyDesc, valDesc := p.primary.Descriptors()
for keyIdx, rowIdx := range p.keyMap {
if rowIdx == -1 {
continue
}
r[rowIdx], err = GetField(ctx, keyDesc, keyIdx, key, p.primary.NodeStore())
for i, idx := range p.keyMap {
outputIdx := p.ordMap[i]
r[outputIdx], err = GetField(ctx, keyDesc, idx, key, p.primary.NodeStore())
if err != nil {
return err
}
}
for valIdx, rowIdx := range p.valMap {
if rowIdx == -1 {
continue
}
r[rowIdx], err = GetField(ctx, valDesc, valIdx, value, p.primary.NodeStore())
for i, idx := range p.valMap {
outputIdx := p.ordMap[len(p.keyMap)+i]
r[outputIdx], err = GetField(ctx, valDesc, idx, value, p.primary.NodeStore())
if err != nil {
return err
}
@@ -206,8 +206,8 @@ type prollyCoveringIndexIter struct {
// secondary index value tuples are assumed to be empty.
// |keyMap| and |valMap| are both of len ==
keyMap, valMap val.OrdinalMapping
sqlSch sql.Schema
keyMap, valMap, ordMap val.OrdinalMapping
sqlSch sql.Schema
}
var _ sql.RowIter = prollyCoveringIndexIter{}
@@ -228,11 +228,11 @@ func newProllyCoveringIndexIter(
}
keyDesc, valDesc := secondary.Descriptors()
var keyMap, valMap val.OrdinalMapping
var keyMap, valMap, ordMap val.OrdinalMapping
if idx.IsPrimaryKey() {
keyMap, valMap = primaryIndexMapping(idx, pkSch, projections)
keyMap, valMap, ordMap = primaryIndexMapping(idx, pkSch, projections)
} else {
keyMap = coveringIndexMapping(idx, projections)
keyMap, ordMap = coveringIndexMapping(idx, projections)
}
return prollyCoveringIndexIter{
@@ -242,6 +242,7 @@ func newProllyCoveringIndexIter(
valDesc: valDesc,
keyMap: keyMap,
valMap: valMap,
ordMap: ordMap,
sqlSch: pkSch.Schema,
ns: secondary.NodeStore(),
}, nil
@@ -254,7 +255,7 @@ func (p prollyCoveringIndexIter) Next(ctx *sql.Context) (sql.Row, error) {
return nil, err
}
r := make(sql.Row, len(p.keyMap))
r := make(sql.Row, len(p.keyMap)+len(p.valMap))
if err := p.writeRowFromTuples(ctx, k, v, r); err != nil {
return nil, err
}
@@ -272,23 +273,17 @@ func (p prollyCoveringIndexIter) Next2(ctx *sql.Context, f *sql.RowFrame) error
}
func (p prollyCoveringIndexIter) writeRowFromTuples(ctx context.Context, key, value val.Tuple, r sql.Row) (err error) {
for to := range p.keyMap {
from := p.keyMap.MapOrdinal(to)
if from == -1 {
continue
}
r[to], err = GetField(ctx, p.keyDesc, from, key, p.ns)
for i, idx := range p.keyMap {
outputIdx := p.ordMap[i]
r[outputIdx], err = GetField(ctx, p.keyDesc, idx, key, p.ns)
if err != nil {
return err
}
}
for to := range p.valMap {
from := p.valMap.MapOrdinal(to)
if from == -1 {
continue
}
r[to], err = GetField(ctx, p.valDesc, from, value, p.ns)
for i, idx := range p.valMap {
outputIdx := p.ordMap[len(p.keyMap)+i]
r[outputIdx], err = GetField(ctx, p.valDesc, idx, value, p.ns)
if err != nil {
return err
}
@@ -330,48 +325,45 @@ func (p prollyCoveringIndexIter) Close(*sql.Context) error {
return nil
}
func coveringIndexMapping(d DoltIndex, projections []uint64) (keyMap val.OrdinalMapping) {
all := d.Schema().GetAllCols()
func coveringIndexMapping(d DoltIndex, projections []uint64) (keyMap, ordMap val.OrdinalMapping) {
idx := d.IndexSchema().GetAllCols()
keyMap = make(val.OrdinalMapping, all.Size())
for i := range keyMap {
keyMap[i] = -1
}
allMap := make(val.OrdinalMapping, len(projections)*2)
var i int
for _, p := range projections {
i := all.TagToIdx[p]
if idx, ok := idx.TagToIdx[p]; ok {
keyMap[i] = idx
allMap[i] = idx
allMap[len(projections)+i] = i
i++
}
}
keyMap = allMap[:len(projections)]
ordMap = allMap[len(projections):]
return
}
func primaryIndexMapping(idx DoltIndex, sqlSch sql.PrimaryKeySchema, projections []uint64) (keyMap, valMap val.OrdinalMapping) {
all := idx.Schema().GetAllCols()
func primaryIndexMapping(idx DoltIndex, sqlSch sql.PrimaryKeySchema, projections []uint64) (keyProj, valProj, ordProj val.OrdinalMapping) {
pks := idx.Schema().GetPKCols()
nonPks := idx.Schema().GetNonPKCols()
keyMap = make(val.OrdinalMapping, all.Size())
for i := range keyMap {
keyMap[i] = -1
}
valMap = make(val.OrdinalMapping, all.Size())
for i := range valMap {
valMap[i] = -1
}
for _, p := range projections {
i := all.TagToIdx[p]
if iidx, ok := pks.TagToIdx[p]; ok {
keyMap[i] = iidx
allMap := make([]int, len(projections)*2)
i := 0
j := len(projections) - 1
for k, p := range projections {
if idx, ok := pks.TagToIdx[p]; ok {
allMap[i] = idx
allMap[len(projections)+i] = k
i++
}
j := all.TagToIdx[p]
if jidx, ok := nonPks.TagToIdx[p]; ok {
valMap[j] = jidx
if idx, ok := nonPks.TagToIdx[p]; ok {
allMap[j] = idx
allMap[len(projections)+j] = k
j--
}
}
keyProj = allMap[:i]
valProj = allMap[i:len(projections)]
ordProj = allMap[len(projections):]
return
}
@@ -400,6 +392,7 @@ type prollyKeylessIndexIter struct {
// valueMap transforms tuples from the
// clustered index into sql.Rows
valueMap val.OrdinalMapping
ordMap val.OrdinalMapping
valueDesc val.TupleDesc
sqlSch sql.Schema
}
@@ -426,7 +419,7 @@ func newProllyKeylessIndexIter(
indexMap := ordinalMappingFromIndex(idx)
keyBld := val.NewTupleBuilder(keyDesc)
sch := idx.Schema()
_, vm := projectionMappings(sch, projections)
_, vm, om := projectionMappings(sch, projections)
eg, c := errgroup.WithContext(ctx)
@@ -439,6 +432,7 @@ func newProllyKeylessIndexIter(
eg: eg,
rowChan: make(chan sql.Row, indexLookupBufSize),
valueMap: vm,
ordMap: om,
valueDesc: valDesc,
sqlSch: pkSch.Schema,
}
@@ -510,13 +504,11 @@ func (p prollyKeylessIndexIter) queueRows(ctx context.Context) error {
func (p prollyKeylessIndexIter) keylessRowsFromValueTuple(ctx context.Context, ns tree.NodeStore, value val.Tuple) (rows []sql.Row, err error) {
card := val.ReadKeylessCardinality(value)
rows = make([]sql.Row, card)
rows[0] = make(sql.Row, len(p.valueMap)-1) // omit cardinality field
rows[0] = make(sql.Row, len(p.valueMap))
for valIdx, rowIdx := range p.valueMap {
if rowIdx == -1 {
continue
}
rows[0][rowIdx], err = GetField(ctx, p.valueDesc, valIdx, value, ns)
for i, idx := range p.valueMap {
outputIdx := p.ordMap[i]
rows[0][outputIdx], err = GetField(ctx, p.valueDesc, idx, value, ns)
if err != nil {
return nil, err
}

View File

@@ -54,8 +54,11 @@ type prollyRowIter struct {
sqlSch sql.Schema
keyDesc val.TupleDesc
valDesc val.TupleDesc
keyProj []int
valProj []int
// orjProj is a concatenated list of output ordinals for |keyProj| and |valProj|
ordProj []int
rowLen int
}
@@ -67,7 +70,7 @@ func NewProllyRowIter(sch schema.Schema, sqlSch sql.Schema, rows prolly.Map, ite
projections = sch.GetAllCols().Tags
}
keyProj, valProj := projectionMappings(sch, projections)
keyProj, valProj, ordProj := projectionMappings(sch, projections)
kd, vd := rows.Descriptors()
if schema.IsKeyless(sch) {
@@ -75,7 +78,8 @@ func NewProllyRowIter(sch schema.Schema, sqlSch sql.Schema, rows prolly.Map, ite
iter: iter,
valDesc: vd,
valProj: valProj,
rowLen: len(sqlSch),
ordProj: ordProj,
rowLen: len(projections),
ns: rows.NodeStore(),
}, nil
}
@@ -87,37 +91,43 @@ func NewProllyRowIter(sch schema.Schema, sqlSch sql.Schema, rows prolly.Map, ite
valDesc: vd,
keyProj: keyProj,
valProj: valProj,
rowLen: sch.GetAllCols().Size(),
ordProj: ordProj,
rowLen: len(projections),
ns: rows.NodeStore(),
}, nil
}
func projectionMappings(sch schema.Schema, projections []uint64) (keyMap, valMap val.OrdinalMapping) {
keyMap = make(val.OrdinalMapping, sch.GetPKCols().Size())
for i := range keyMap {
keyMap[i] = -1
}
valMap = make(val.OrdinalMapping, sch.GetNonPKCols().Size())
for i := range valMap {
valMap[i] = -1
}
all := sch.GetAllCols()
// projectionMappings returns data structures that specify 1) which fields we read
// from key and value tuples, and 2) the position of those fields in the output row.
func projectionMappings(sch schema.Schema, projections []uint64) (keyMap, valMap, ordMap val.OrdinalMapping) {
pks := sch.GetPKCols()
nonPks := sch.GetNonPKCols()
for _, t := range projections {
if i, ok := pks.TagToIdx[t]; ok {
keyMap[i] = all.TagToIdx[t]
}
if j, ok := nonPks.TagToIdx[t]; ok {
valMap[j] = all.TagToIdx[t]
allMap := make([]int, 2*len(projections))
i := 0
j := len(projections) - 1
for k, t := range projections {
if idx, ok := pks.TagToIdx[t]; ok {
allMap[len(projections)+i] = k
allMap[i] = idx
i++
} else if idx, ok := nonPks.TagToIdx[t]; ok {
allMap[j] = idx
allMap[len(projections)+j] = k
j--
}
}
keyMap = allMap[:i]
valMap = allMap[i:len(projections)]
ordMap = allMap[len(projections):]
if schema.IsKeyless(sch) {
skip := val.OrdinalMapping{-1}
keyMap = append(skip, keyMap...) // hashId
valMap = append(skip, valMap...) // cardinality
// skip the cardinality value, increment every index
for i := range keyMap {
keyMap[i]++
}
for i := range valMap {
valMap[i]++
}
}
return
}
@@ -129,21 +139,16 @@ func (it prollyRowIter) Next(ctx *sql.Context) (sql.Row, error) {
}
row := make(sql.Row, it.rowLen)
for keyIdx, rowIdx := range it.keyProj {
if rowIdx == -1 {
continue
}
row[rowIdx], err = GetField(ctx, it.keyDesc, keyIdx, key, it.ns)
for i, idx := range it.keyProj {
outputIdx := it.ordProj[i]
row[outputIdx], err = GetField(ctx, it.keyDesc, idx, key, it.ns)
if err != nil {
return nil, err
}
}
for valIdx, rowIdx := range it.valProj {
if rowIdx == -1 {
continue
}
row[rowIdx], err = GetField(ctx, it.valDesc, valIdx, value, it.ns)
for i, idx := range it.valProj {
outputIdx := it.ordProj[len(it.keyProj)+i]
row[outputIdx], err = GetField(ctx, it.valDesc, idx, value, it.ns)
if err != nil {
return nil, err
}
@@ -197,6 +202,7 @@ type prollyKeylessIter struct {
valDesc val.TupleDesc
valProj []int
ordProj []int
rowLen int
curr sql.Row
@@ -228,11 +234,9 @@ func (it *prollyKeylessIter) nextTuple(ctx *sql.Context) error {
it.card = val.ReadKeylessCardinality(value)
it.curr = make(sql.Row, it.rowLen)
for valIdx, rowIdx := range it.valProj {
if rowIdx == -1 {
continue
}
it.curr[rowIdx], err = GetField(ctx, it.valDesc, valIdx, value, it.ns)
for i, idx := range it.valProj {
outputIdx := it.ordProj[i]
it.curr[outputIdx], err = GetField(ctx, it.valDesc, idx, value, it.ns)
if err != nil {
return err
}

View File

@@ -20271,8 +20271,10 @@ func TestExplain(t *testing.T) {
expectedExplain := "IndexedJoin(d.Symbol = t.Symbol)\n" +
" ├─ TableAlias(d)\n" +
" │ └─ Table(daily_summary)\n" +
" │ └─ Projected table access on [type symbol country tradingdate open high low close volume openint]\n" +
" │ └─ Table(daily_summary)\n" +
" └─ TableAlias(t)\n" +
" └─ IndexedTableAccess(symbols on [symbols.Symbol])"
" └─ Projected table access on [symbol name sector ipoyear]\n" +
" └─ IndexedTableAccess(symbols on [symbols.Symbol])"
assert.Equal(t, expectedExplain, strings.Join(rowStrings, "\n"))
}

View File

@@ -76,7 +76,7 @@ func newRowIterator(ctx context.Context, tbl *doltdb.Table, sqlSch sql.Schema, p
}
if types.IsFormat_DOLT_1(tbl.Format()) {
return ProllyRowIterFromPartition(ctx, tbl, sqlSch, projCols, partition)
return ProllyRowIterFromPartition(ctx, sch, sqlSch, projCols, partition)
}
if schema.IsKeyless(sch) {
@@ -146,20 +146,24 @@ func getTagToResColIdx(ctx context.Context, tbl *doltdb.Table, projectedCols []u
return nil, nil, err
}
cols := sch.GetAllCols().GetColumns()
allCols := sch.GetAllCols().GetColumns()
tagToSqlColIdx := make(map[uint64]int)
resultColSet := make(map[uint64]bool)
for i := range projectedCols {
resultColSet[projectedCols[i]] = true
if len(projectedCols) > 0 {
outCols := make([]schema.Column, len(projectedCols))
for i := range projectedCols {
t := projectedCols[i]
idx := sch.GetAllCols().TagToIdx[t]
tagToSqlColIdx[t] = i
outCols[i] = allCols[idx]
}
return outCols, tagToSqlColIdx, nil
}
for i, col := range cols {
if len(projectedCols) == 0 || resultColSet[col.Tag] {
tagToSqlColIdx[col.Tag] = i
}
for i, col := range allCols {
tagToSqlColIdx[col.Tag] = i
}
return cols, tagToSqlColIdx, nil
return allCols, tagToSqlColIdx, nil
}
// Next returns the next row in this row iterator, or an io.EOF error if there aren't any more.
@@ -174,16 +178,12 @@ func (itr *doltTableRowIter) Close(*sql.Context) error {
func ProllyRowIterFromPartition(
ctx context.Context,
tbl *doltdb.Table,
sch schema.Schema,
sqlSch sql.Schema,
projections []uint64,
partition doltTablePartition,
) (sql.RowIter, error) {
rows := durable.ProllyMapFromIndex(partition.rowData)
sch, err := tbl.GetSchema(ctx)
if err != nil {
return nil, err
}
if partition.end > uint64(rows.Count()) {
partition.end = uint64(rows.Count())
}

View File

@@ -116,7 +116,8 @@ type DoltTable struct {
sch schema.Schema
autoIncCol schema.Column
projectedCols []uint64
projectedCols []uint64
projectedSchema sql.Schema
opts editor.Options
@@ -180,17 +181,17 @@ func (t DoltTable) LockedToRoot(ctx *sql.Context, root *doltdb.RootValue) (*Dolt
return nil, err
}
return &DoltTable{
tableName: t.tableName,
db: t.db,
nbf: tbl.Format(),
sch: sch,
sqlSch: sqlSch,
autoIncCol: autoCol,
projectedCols: t.projectedCols,
opts: t.opts,
lockedToRoot: root,
}, nil
dt := &DoltTable{
tableName: t.tableName,
db: t.db,
nbf: tbl.Format(),
sch: sch,
sqlSch: sqlSch,
autoIncCol: autoCol,
opts: t.opts,
lockedToRoot: root,
}
return dt.WithProjections(t.Projections()).(*DoltTable), nil
}
// Internal interface for declaring the interfaces that read-only dolt tables are expected to implement
@@ -365,6 +366,9 @@ func (t *DoltTable) Format() *types.NomsBinFormat {
// Schema returns the schema for this table.
func (t *DoltTable) Schema() sql.Schema {
if len(t.projectedSchema) > 0 {
return t.projectedSchema
}
return t.sqlSchema().Schema
}
@@ -935,14 +939,23 @@ func (t *DoltTable) Projections() []string {
// WithProjections implements sql.ProjectedTable
func (t *DoltTable) WithProjections(colNames []string) sql.Table {
nt := *t
nt.projectedCols = make([]uint64, len(colNames))
nt.projectedCols = make([]uint64, 0, len(colNames))
nt.projectedSchema = make(sql.Schema, 0, len(colNames))
cols := t.sch.GetAllCols()
sch := t.Schema()
for i := range colNames {
col, ok := cols.LowerNameToCol[strings.ToLower(colNames[i])]
lowerName := strings.ToLower(colNames[i])
col, ok := cols.LowerNameToCol[lowerName]
if !ok {
panic("column does not exist")
// The history iter projects a new schema onto an
// older table. When a requested projection does not
// exist in the older schema, the table will ignore
// the field. The history table is responsible for
// filling the gaps with nil values.
continue
}
nt.projectedCols[i] = col.Tag
nt.projectedCols = append(nt.projectedCols, col.Tag)
nt.projectedSchema = append(nt.projectedSchema, sch[sch.IndexOfColName(lowerName)])
}
return &nt
}

View File

@@ -115,7 +115,7 @@ func (s *nomsWriteSession) GetTableWriter(ctx context.Context, table, db string,
return nil, err
}
conv := index.NewKVToSqlRowConverterForCols(t.Format(), sch)
conv := index.NewKVToSqlRowConverterForCols(t.Format(), sch, nil)
return &nomsTableWriter{
tableName: table,

View File

@@ -228,7 +228,7 @@ func BenchmarkMapItr(b *testing.B) {
sch, err := schema.SchemaFromCols(schema.NewColCollection(testDataCols...))
require.NoError(b, err)
dmItr := index.NewDoltMapIter(itr.NextTuple, closeFunc, index.NewKVToSqlRowConverterForCols(m.Format(), sch))
dmItr := index.NewDoltMapIter(itr.NextTuple, closeFunc, index.NewKVToSqlRowConverterForCols(m.Format(), sch, nil))
sqlCtx := sql.NewContext(ctx)
b.ResetTimer()