System Table to-commit/commit-hash lookups (#5137)

* System table commit indexes

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* tidy

* docstrings and bad hash tests

* exclude ld

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* bad import

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* skip LD system table indexing testt

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* skip LD dolt scripts

* shift validation burden to indexes

* [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh

* fix bats

* fix non-determinism

* fix copyright header

Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
Maximilian Hoffman
2023-01-18 17:58:35 -08:00
committed by GitHub
parent f22cfc6524
commit d566687435
28 changed files with 1167 additions and 118 deletions

View File

@@ -58,7 +58,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3
github.com/cespare/xxhash v1.1.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/go-mysql-server v0.14.1-0.20230117235648-9c1bf0565164
github.com/dolthub/go-mysql-server v0.14.1-0.20230118214519-2a3c2d41e80e
github.com/google/flatbuffers v2.0.6+incompatible
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6
github.com/mitchellh/go-ps v1.0.0

View File

@@ -161,8 +161,8 @@ github.com/dolthub/flatbuffers v1.13.0-dh.1 h1:OWJdaPep22N52O/0xsUevxJ6Qfw1M2txC
github.com/dolthub/flatbuffers v1.13.0-dh.1/go.mod h1:CorYGaDmXjHz1Z7i50PYXG1Ricn31GcA2wNOTFIQAKE=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.14.1-0.20230117235648-9c1bf0565164 h1:Z1CcNXXQ6NfQVaO8EMdFXBbEjlcihSoHolURObCj1wA=
github.com/dolthub/go-mysql-server v0.14.1-0.20230117235648-9c1bf0565164/go.mod h1:ykkkC0nmCN0Dd7bpm+AeM6w4jcxfV9vIfLQEmajj20I=
github.com/dolthub/go-mysql-server v0.14.1-0.20230118214519-2a3c2d41e80e h1:3mP+4wj/TxEnIGk3OjlpQZW9l03XxAYiXFVrcarNzaM=
github.com/dolthub/go-mysql-server v0.14.1-0.20230118214519-2a3c2d41e80e/go.mod h1:ykkkC0nmCN0Dd7bpm+AeM6w4jcxfV9vIfLQEmajj20I=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488 h1:0HHu0GWJH0N6a6keStrHhUAK5/o9LVfkh44pvsV4514=
github.com/dolthub/ishell v0.0.0-20221214210346-d7db0b066488/go.mod h1:ehexgi1mPxRTk0Mok/pADALuHbvATulTh6gzr7NzZto=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=

View File

@@ -18,6 +18,8 @@ import (
"context"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly/tree"
@@ -133,7 +135,7 @@ func (cmItr *commitItr) Next(ctx context.Context) (hash.Hash, *Commit, error) {
next := cmItr.unprocessed[numUnprocessed-1]
cmItr.unprocessed = cmItr.unprocessed[:numUnprocessed-1]
cmItr.curr, err = hashToCommit(ctx, cmItr.ddb.ValueReadWriter(), cmItr.ddb.ns, next)
cmItr.curr, err = HashToCommit(ctx, cmItr.ddb.ValueReadWriter(), cmItr.ddb.ns, next)
if err != nil {
return hash.Hash{}, nil, err
@@ -142,7 +144,7 @@ func (cmItr *commitItr) Next(ctx context.Context) (hash.Hash, *Commit, error) {
return next, cmItr.curr, nil
}
func hashToCommit(ctx context.Context, vrw types.ValueReadWriter, ns tree.NodeStore, h hash.Hash) (*Commit, error) {
func HashToCommit(ctx context.Context, vrw types.ValueReadWriter, ns tree.NodeStore, h hash.Hash) (*Commit, error) {
dc, err := datas.LoadCommitAddr(ctx, vrw, h)
if err != nil {
return nil, err
@@ -191,3 +193,110 @@ func (itr FilteringCommitItr) Next(ctx context.Context) (hash.Hash, *Commit, err
func (itr FilteringCommitItr) Reset(ctx context.Context) error {
return itr.itr.Reset(ctx)
}
func NewCommitSliceIter(cm []*Commit, h []hash.Hash) *CommitSliceIter {
return &CommitSliceIter{cm: cm, h: h}
}
type CommitSliceIter struct {
h []hash.Hash
cm []*Commit
i int
}
var _ CommitItr = (*CommitSliceIter)(nil)
func (i *CommitSliceIter) Next(ctx context.Context) (hash.Hash, *Commit, error) {
if i.i >= len(i.h) {
return hash.Hash{}, nil, io.EOF
}
i.i++
return i.h[i.i-1], i.cm[i.i-1], nil
}
func (i *CommitSliceIter) Reset(ctx context.Context) error {
i.i = 0
return nil
}
func NewOneCommitIter(cm *Commit, h hash.Hash, meta *datas.CommitMeta) *OneCommitIter {
return &OneCommitIter{cm: cm, h: h}
}
type OneCommitIter struct {
h hash.Hash
cm *Commit
m *datas.CommitMeta
done bool
}
var _ CommitItr = (*OneCommitIter)(nil)
func (i *OneCommitIter) Next(_ context.Context) (hash.Hash, *Commit, error) {
if i.done {
return hash.Hash{}, nil, io.EOF
}
i.done = true
return i.h, i.cm, nil
}
func (i *OneCommitIter) Reset(_ context.Context) error {
i.done = false
return nil
}
func NewCommitPart(h hash.Hash, cm *Commit, m *datas.CommitMeta) *CommitPart {
return &CommitPart{h: h, cm: cm, m: m}
}
type CommitPart struct {
h hash.Hash
m *datas.CommitMeta
cm *Commit
}
var _ sql.Partition = (*CommitPart)(nil)
func (c *CommitPart) Hash() hash.Hash {
return c.h
}
func (c *CommitPart) Commit() *Commit {
return c.cm
}
func (c *CommitPart) Meta() *datas.CommitMeta {
return c.m
}
func (c *CommitPart) Key() []byte {
return c.h[:]
}
func NewCommitSlicePartitionIter(h []hash.Hash, cm []*Commit, m []*datas.CommitMeta) *CommitSlicePartitionIter {
return &CommitSlicePartitionIter{h: h, cm: cm, m: m}
}
type CommitSlicePartitionIter struct {
h []hash.Hash
m []*datas.CommitMeta
cm []*Commit
i int
}
var _ sql.PartitionIter = (*CommitSlicePartitionIter)(nil)
func (i *CommitSlicePartitionIter) Next(ctx *sql.Context) (sql.Partition, error) {
if i.i >= len(i.cm) {
return nil, io.EOF
}
i.i++
return &CommitPart{h: i.h[i.i-1], m: i.m[i.i-1], cm: i.cm[i.i-1]}, nil
}
func (i *CommitSlicePartitionIter) Close(ctx *sql.Context) error {
return nil
}

View File

@@ -15,6 +15,8 @@
package dtables
import (
"fmt"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
@@ -66,8 +68,45 @@ func (dt *CommitAncestorsTable) Partitions(*sql.Context) (sql.PartitionIter, err
}
// PartitionRows is a sql.Table interface function that gets a row iterator for a partition.
func (dt *CommitAncestorsTable) PartitionRows(sqlCtx *sql.Context, _ sql.Partition) (sql.RowIter, error) {
return NewCommitAncestorsRowItr(sqlCtx, dt.ddb)
func (dt *CommitAncestorsTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.RowIter, error) {
switch p := p.(type) {
case *doltdb.CommitPart:
return &CommitAncestorsRowItr{
itr: doltdb.NewOneCommitIter(p.Commit(), p.Hash(), p.Meta()),
ddb: dt.ddb,
}, nil
default:
return NewCommitAncestorsRowItr(ctx, dt.ddb)
}
}
// GetIndexes implements sql.IndexAddressable
func (dt *CommitAncestorsTable) GetIndexes(ctx *sql.Context) ([]sql.Index, error) {
return index.DoltCommitIndexes(dt.Name(), dt.ddb, true)
}
// IndexedAccess implements sql.IndexAddressable
func (dt *CommitAncestorsTable) IndexedAccess(lookup sql.IndexLookup) sql.IndexedTable {
nt := *dt
return &nt
}
func (dt *CommitAncestorsTable) LookupPartitions(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
if lookup.Index.ID() == index.CommitHashIndexId {
hs, ok := index.LookupToPointSelectStr(lookup)
if !ok {
return nil, fmt.Errorf("failed to parse commit hash lookup: %s", sql.DebugString(lookup.Ranges))
}
hashes, commits, metas := index.HashesToCommits(ctx, dt.ddb, hs, nil, false)
if len(hashes) == 0 {
return sql.PartitionsToPartitionIter(), nil
}
return doltdb.NewCommitSlicePartitionIter(hashes, commits, metas), nil
}
return dt.Partitions(ctx)
}
// CommitAncestorsRowItr is a sql.RowItr which iterates over each

View File

@@ -15,11 +15,15 @@
package dtables
import (
"fmt"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
)
var _ sql.Table = (*CommitsTable)(nil)
@@ -69,8 +73,41 @@ func (dt *CommitsTable) Partitions(*sql.Context) (sql.PartitionIter, error) {
}
// PartitionRows is a sql.Table interface function that gets a row iterator for a partition.
func (dt *CommitsTable) PartitionRows(ctx *sql.Context, _ sql.Partition) (sql.RowIter, error) {
return NewCommitsRowItr(ctx, dt.ddb)
func (dt *CommitsTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.RowIter, error) {
switch p := p.(type) {
case *doltdb.CommitPart:
return sql.RowsToRowIter(formatCommitTableRow(p.Hash(), p.Meta())), nil
default:
return NewCommitsRowItr(ctx, dt.ddb)
}
}
// GetIndexes implements sql.IndexAddressable
func (dt *CommitsTable) GetIndexes(ctx *sql.Context) ([]sql.Index, error) {
return index.DoltCommitIndexes(dt.Name(), dt.ddb, true)
}
// IndexedAccess implements sql.IndexAddressable
func (dt *CommitsTable) IndexedAccess(_ sql.IndexLookup) sql.IndexedTable {
nt := *dt
return &nt
}
func (dt *CommitsTable) LookupPartitions(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
if lookup.Index.ID() == index.CommitHashIndexId {
hashStrs, ok := index.LookupToPointSelectStr(lookup)
if !ok {
return nil, fmt.Errorf("failed to parse commit lookup ranges: %s", sql.DebugString(lookup.Ranges))
}
hashes, commits, metas := index.HashesToCommits(ctx, dt.ddb, hashStrs, nil, false)
if len(hashes) == 0 {
return sql.PartitionsToPartitionIter(), nil
}
return doltdb.NewCommitSlicePartitionIter(hashes, commits, metas), nil
}
return dt.Partitions(ctx)
}
// CommitsRowItr is a sql.RowItr which iterates over each commit as if it's a row in the table.
@@ -101,10 +138,14 @@ func (itr CommitsRowItr) Next(ctx *sql.Context) (sql.Row, error) {
return nil, err
}
return sql.NewRow(h.String(), meta.Name, meta.Email, meta.Time(), meta.Description), nil
return formatCommitTableRow(h, meta), nil
}
// Close closes the iterator.
func (itr CommitsRowItr) Close(*sql.Context) error {
return nil
}
func formatCommitTableRow(h hash.Hash, meta *datas.CommitMeta) sql.Row {
return sql.NewRow(h.String(), meta.Name, meta.Email, meta.Time(), meta.Description)
}

View File

@@ -32,6 +32,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
@@ -194,11 +195,9 @@ func (dt *DiffTable) Filters() []sql.Expression {
// WithFilters returns a new sql.Table instance with the filters applied
func (dt *DiffTable) WithFilters(_ *sql.Context, filters []sql.Expression) sql.Table {
if dt.partitionFilters == nil {
dt.partitionFilters = FilterFilters(filters, ColumnPredicate(commitMetaColumns))
}
return dt
ret := *dt
ret.partitionFilters = FilterFilters(filters, ColumnPredicate(commitMetaColumns))
return &ret
}
func (dt *DiffTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
@@ -207,14 +206,135 @@ func (dt *DiffTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.Ro
}
func (dt *DiffTable) LookupPartitions(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
if lookup.Index.ID() == index.ToCommitIndexId {
hs, ok := index.LookupToPointSelectStr(lookup)
if !ok {
return nil, fmt.Errorf("failed to parse commit lookup ranges: %s", sql.DebugString(lookup.Ranges))
}
hashes, commits, metas := index.HashesToCommits(ctx, dt.ddb, hs, dt.head, false)
if len(hashes) == 0 {
return sql.PartitionsToPartitionIter(), nil
}
return dt.toCommitLookupPartitions(ctx, hashes, commits, metas)
}
return dt.Partitions(ctx)
}
// toCommitLookupPartitions creates a diff partition iterator for a specific
// commit.The structure of the iter requires we pre-populate the parents of
// to_commit for diffing.
func (dt *DiffTable) toCommitLookupPartitions(ctx *sql.Context, hashes []hash.Hash, commits []*doltdb.Commit, metas []*datas.CommitMeta) (sql.PartitionIter, error) {
t, exactName, ok, err := dt.workingRoot.GetTableInsensitive(ctx, dt.name)
if err != nil {
return nil, err
} else if !ok {
return nil, errors.New(fmt.Sprintf("table: %s does not exist", dt.name))
}
working, err := dt.head.HashOf()
if err != nil {
return nil, err
}
var parentHashes []hash.Hash
cmHashToTblInfo := make(map[hash.Hash]TblInfoAtCommit)
var pCommits []*doltdb.Commit
for i, hs := range hashes {
cm := commits[i]
meta := metas[i]
var ph []hash.Hash
var toCmInfo TblInfoAtCommit
if hs == working && cm == nil {
wrTblHash, _, err := dt.workingRoot.GetTableHash(ctx, exactName)
if err != nil {
return nil, err
}
toCmInfo = TblInfoAtCommit{"WORKING", nil, t, wrTblHash}
cmHashToTblInfo[hs] = toCmInfo
parentHashes = append(parentHashes, hs)
pCommits = append(pCommits, dt.head)
continue
}
r, err := cm.GetRootValue(ctx)
if err != nil {
return nil, err
}
ph, err = cm.ParentHashes(ctx)
if err != nil {
return nil, err
}
tbl, exactName, ok, err := r.GetTableInsensitive(ctx, dt.name)
if err != nil {
return nil, err
}
if !ok {
return sql.PartitionsToPartitionIter(), nil
}
tblHash, _, err := r.GetTableHash(ctx, exactName)
if err != nil {
return nil, err
}
ts := types.Timestamp(meta.Time())
toCmInfo = NewTblInfoAtCommit(hs.String(), &ts, tbl, tblHash)
for i, pj := range ph {
pc, err := cm.GetParent(ctx, i)
if err != nil {
return nil, err
}
cmHashToTblInfo[pj] = toCmInfo
pCommits = append(pCommits, pc)
}
parentHashes = append(parentHashes, ph...)
}
if len(parentHashes) == 0 {
return sql.PartitionsToPartitionIter(), nil
}
sf, err := SelectFuncForFilters(dt.ddb.Format(), dt.partitionFilters)
if err != nil {
return nil, err
}
cmItr := doltdb.NewCommitSliceIter(pCommits, parentHashes)
if err != nil {
return nil, err
}
return &DiffPartitions{
tblName: exactName,
cmItr: cmItr,
cmHashToTblInfo: cmHashToTblInfo,
selectFunc: sf,
toSch: dt.targetSch,
fromSch: dt.targetSch,
}, nil
}
// GetIndexes implements sql.IndexAddressable
func (dt *DiffTable) GetIndexes(ctx *sql.Context) ([]sql.Index, error) {
return index.DoltDiffIndexesFromTable(ctx, "", dt.name, dt.table)
}
func (dt *DiffTable) IndexedAccess(index sql.Index) sql.IndexedTable {
// IndexedAccess implements sql.IndexAddressable
func (dt *DiffTable) IndexedAccess(lookup sql.IndexLookup) sql.IndexedTable {
//if !types.IsFormat_DOLT(dt.ddb.Format()) {
// return nil
//}
//if lookup.Index.ID() == index.CommitHashIndexId {
// _, ok := index.LookupToPointSelectStr(lookup)
// if !ok {
// return nil
// }
//}
nt := *dt
return &nt
}

View File

@@ -15,14 +15,17 @@
package dtables
import (
"context"
"fmt"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/types"
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions/commitwalk"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions/commitwalk"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
)
var _ sql.Table = (*LogTable)(nil)
@@ -31,8 +34,10 @@ var _ sql.StatisticsTable = (*LogTable)(nil)
// LogTable is a sql.Table implementation that implements a system table which shows the dolt commit log
type LogTable struct {
ddb *doltdb.DoltDB
head *doltdb.Commit
ddb *doltdb.DoltDB
head *doltdb.Commit
headHash hash.Hash
headCommitClosure *prolly.CommitClosure
}
// NewLogTable creates a LogTable
@@ -90,8 +95,100 @@ func (dt *LogTable) Partitions(*sql.Context) (sql.PartitionIter, error) {
}
// PartitionRows is a sql.Table interface function that gets a row iterator for a partition
func (dt *LogTable) PartitionRows(ctx *sql.Context, _ sql.Partition) (sql.RowIter, error) {
return NewLogItr(ctx, dt.ddb, dt.head)
func (dt *LogTable) PartitionRows(ctx *sql.Context, p sql.Partition) (sql.RowIter, error) {
switch p := p.(type) {
case *doltdb.CommitPart:
return sql.RowsToRowIter(sql.NewRow(p.Hash().String(), p.Meta().Name, p.Meta().Email, p.Meta().Time(), p.Meta().Description)), nil
default:
return NewLogItr(ctx, dt.ddb, dt.head)
}
}
func (dt *LogTable) GetIndexes(ctx *sql.Context) ([]sql.Index, error) {
return index.DoltCommitIndexes(dt.Name(), dt.ddb, true)
}
// IndexedAccess implements sql.IndexAddressable
func (dt *LogTable) IndexedAccess(lookup sql.IndexLookup) sql.IndexedTable {
nt := *dt
return &nt
}
func (dt *LogTable) LookupPartitions(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
if lookup.Index.ID() == index.CommitHashIndexId {
return dt.commitHashPartitionIter(ctx, lookup)
}
return dt.Partitions(ctx)
}
func (dt *LogTable) commitHashPartitionIter(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
hashStrs, ok := index.LookupToPointSelectStr(lookup)
if !ok {
return nil, fmt.Errorf("failed to parse commit lookup ranges: %s", sql.DebugString(lookup.Ranges))
}
hashes, commits, metas := index.HashesToCommits(ctx, dt.ddb, hashStrs, nil, false)
if len(hashes) == 0 {
return sql.PartitionsToPartitionIter(), nil
}
var partitions []sql.Partition
for i, h := range hashes {
height, err := commits[i].Height()
if err != nil {
return nil, err
}
ok, err = dt.CommitIsInScope(ctx, height, h)
if err != nil {
return nil, err
}
if !ok {
continue
}
partitions = append(partitions, doltdb.NewCommitPart(h, commits[i], metas[i]))
}
return sql.PartitionsToPartitionIter(partitions...), nil
}
// CommitIsInScope returns true if a given commit hash is head or is
// visible from the current head's ancestry graph.
func (dt *LogTable) CommitIsInScope(ctx context.Context, height uint64, h hash.Hash) (bool, error) {
cc, err := dt.HeadCommitClosure(ctx)
if err != nil {
return false, err
}
headHash, err := dt.HeadHash()
if err != nil {
return false, err
}
if headHash == h {
return true, nil
}
return cc.ContainsKey(ctx, h, height)
}
func (dt *LogTable) HeadCommitClosure(ctx context.Context) (*prolly.CommitClosure, error) {
if dt.headCommitClosure == nil {
cc, err := dt.head.GetCommitClosure(ctx)
dt.headCommitClosure = &cc
if err != nil {
return nil, err
}
}
return dt.headCommitClosure, nil
}
func (dt *LogTable) HeadHash() (hash.Hash, error) {
if dt.headHash.IsEmpty() {
var err error
dt.headHash, err = dt.head.HashOf()
if err != nil {
return hash.Hash{}, err
}
}
return dt.headHash, nil
}
// LogItr is a sql.RowItr implementation which iterates over each commit as if it's a row in the table.

View File

@@ -29,6 +29,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/diff"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
@@ -47,7 +48,7 @@ type UnscopedDiffTable struct {
ddb *doltdb.DoltDB
head *doltdb.Commit
partitionFilters []sql.Expression
cmItr doltdb.CommitItr
commitCheck doltdb.CommitFilter
}
// tableChange is an internal data structure used to hold the results of processing
@@ -60,8 +61,7 @@ type tableChange struct {
// NewUnscopedDiffTable creates an UnscopedDiffTable
func NewUnscopedDiffTable(_ *sql.Context, ddb *doltdb.DoltDB, head *doltdb.Commit) sql.Table {
cmItr := doltdb.CommitItrForRoots(ddb, head)
return &UnscopedDiffTable{ddb: ddb, head: head, cmItr: cmItr}
return &UnscopedDiffTable{ddb: ddb, head: head}
}
// Filters returns the list of filters that are applied to this table.
@@ -76,19 +76,13 @@ func (dt *UnscopedDiffTable) HandledFilters(filters []sql.Expression) []sql.Expr
}
// WithFilters returns a new sql.Table instance with the filters applied
func (dt *UnscopedDiffTable) WithFilters(ctx *sql.Context, filters []sql.Expression) sql.Table {
func (dt *UnscopedDiffTable) WithFilters(_ *sql.Context, filters []sql.Expression) sql.Table {
dt.partitionFilters = FilterFilters(filters, ColumnPredicate(filterColumnNameSet))
if len(dt.partitionFilters) > 0 {
commitCheck, err := commitFilterForDiffTableFilterExprs(dt.partitionFilters)
if err != nil {
return nil
}
ndt := *dt
ndt.cmItr = doltdb.NewFilteringCommitItr(dt.cmItr, commitCheck)
return &ndt
commitCheck, err := commitFilterForDiffTableFilterExprs(dt.partitionFilters)
if err != nil {
return nil
}
dt.commitCheck = commitCheck
return dt
}
@@ -134,15 +128,68 @@ func (dt *UnscopedDiffTable) Partitions(ctx *sql.Context) (sql.PartitionIter, er
// PartitionRows is a sql.Table interface function that gets a row iterator for a partition.
func (dt *UnscopedDiffTable) PartitionRows(ctx *sql.Context, partition sql.Partition) (sql.RowIter, error) {
if bytes.Equal(partition.Key(), workingSetPartitionKey) {
return dt.newWorkingSetRowItr(ctx)
} else if bytes.Equal(partition.Key(), commitHistoryPartitionKey) {
return dt.newCommitHistoryRowItr(ctx)
} else {
return nil, fmt.Errorf("unexpected partition: %v", partition)
switch p := partition.(type) {
case *doltdb.CommitPart:
return dt.newCommitHistoryRowItrFromCommits(ctx, []*doltdb.Commit{p.Commit()})
default:
if bytes.Equal(partition.Key(), workingSetPartitionKey) {
return dt.newWorkingSetRowItr(ctx)
} else if bytes.Equal(partition.Key(), commitHistoryPartitionKey) {
cms, hasCommitHashEquality := getCommitsFromCommitHashEquality(ctx, dt.ddb, dt.partitionFilters)
if hasCommitHashEquality {
return dt.newCommitHistoryRowItrFromCommits(ctx, cms)
}
iter := doltdb.CommitItrForRoots(dt.ddb, dt.head)
if dt.commitCheck != nil {
iter = doltdb.NewFilteringCommitItr(iter, dt.commitCheck)
}
return dt.newCommitHistoryRowItrFromItr(ctx, iter)
} else {
return nil, fmt.Errorf("unexpected partition: %v", partition)
}
}
}
// GetIndexes implements sql.IndexAddressable
func (dt *UnscopedDiffTable) GetIndexes(ctx *sql.Context) ([]sql.Index, error) {
return index.DoltCommitIndexes(dt.Name(), dt.ddb, true)
}
// IndexedAccess implements sql.IndexAddressable
func (dt *UnscopedDiffTable) IndexedAccess(lookup sql.IndexLookup) sql.IndexedTable {
nt := *dt
return &nt
}
func (dt *UnscopedDiffTable) LookupPartitions(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
if lookup.Index.ID() == index.CommitHashIndexId {
hs, ok := index.LookupToPointSelectStr(lookup)
if !ok {
return nil, fmt.Errorf("failed to parse commit lookup ranges: %s", sql.DebugString(lookup.Ranges))
}
hashes, commits, metas := index.HashesToCommits(ctx, dt.ddb, hs, dt.head, false)
if len(hashes) == 0 {
return sql.PartitionsToPartitionIter(), nil
}
headHash, err := dt.head.HashOf()
if err != nil {
return nil, err
}
var partitions []sql.Partition
for i, h := range hashes {
if h == headHash && commits[i] == nil {
partitions = append(partitions, newDoltDiffPartition(workingSetPartitionKey))
} else {
partitions = append(partitions, doltdb.NewCommitPart(h, commits[i], metas[i]))
}
}
return sql.PartitionsToPartitionIter(partitions...), nil
}
return dt.Partitions(ctx)
}
func (dt *UnscopedDiffTable) newWorkingSetRowItr(ctx *sql.Context) (sql.RowIter, error) {
sess := dsess.DSessFromSess(ctx.Session)
roots, ok := sess.GetRoots(ctx, ctx.GetCurrentDatabase())
@@ -243,18 +290,24 @@ type doltDiffCommitHistoryRowItr struct {
tableChangesIdx int
}
// newCommitHistoryRowItr creates a doltDiffCommitHistoryRowItr from the current environment.
func (dt *UnscopedDiffTable) newCommitHistoryRowItr(ctx *sql.Context) (*doltDiffCommitHistoryRowItr, error) {
// newCommitHistoryRowItr creates a doltDiffCommitHistoryRowItr from a CommitItr.
func (dt *UnscopedDiffTable) newCommitHistoryRowItrFromItr(ctx *sql.Context, iter doltdb.CommitItr) (*doltDiffCommitHistoryRowItr, error) {
dchItr := &doltDiffCommitHistoryRowItr{
ctx: ctx,
ddb: dt.ddb,
tableChangesIdx: -1,
child: iter,
}
cms, hasCommitHashEquality := getCommitsFromCommitHashEquality(ctx, dt.ddb, dt.partitionFilters)
if hasCommitHashEquality {
dchItr.commits = cms
} else {
dchItr.child = dt.cmItr
return dchItr, nil
}
// newCommitHistoryRowItr creates a doltDiffCommitHistoryRowItr from a list of commits.
func (dt *UnscopedDiffTable) newCommitHistoryRowItrFromCommits(ctx *sql.Context, commits []*doltdb.Commit) (*doltDiffCommitHistoryRowItr, error) {
dchItr := &doltDiffCommitHistoryRowItr{
ctx: ctx,
ddb: dt.ddb,
tableChangesIdx: -1,
commits: commits,
}
return dchItr, nil
}

View File

@@ -20,6 +20,7 @@ import (
"os"
"testing"
gms "github.com/dolthub/go-mysql-server"
"github.com/dolthub/go-mysql-server/enginetest"
"github.com/dolthub/go-mysql-server/enginetest/queries"
"github.com/dolthub/go-mysql-server/enginetest/scriptgen/setup"
@@ -266,6 +267,10 @@ func TestIntegrationQueryPlans(t *testing.T) {
}
func TestDoltDiffQueryPlans(t *testing.T) {
if !types.IsFormat_DOLT(types.Format_Default) {
t.Skip("only new format support system table indexing")
}
harness := newDoltHarness(t).WithParallelism(2) // want Exchange nodes
harness.Setup(setup.SimpleSetup...)
e, err := harness.NewEngine(t)
@@ -1282,6 +1287,10 @@ func TestCommitDiffSystemTablePrepared(t *testing.T) {
}
func TestDiffSystemTable(t *testing.T) {
if !types.IsFormat_DOLT(types.Format_Default) {
t.Skip("only new format support system table indexing")
}
harness := newDoltHarness(t)
harness.Setup(setup.MydbData)
for _, test := range DiffSystemTableScriptTests {
@@ -1299,6 +1308,10 @@ func TestDiffSystemTable(t *testing.T) {
}
func TestDiffSystemTablePrepared(t *testing.T) {
if !types.IsFormat_DOLT(types.Format_Default) {
t.Skip("only new format support system table indexing")
}
harness := newDoltHarness(t)
harness.Setup(setup.MydbData)
for _, test := range DiffSystemTableScriptTests {
@@ -1315,6 +1328,76 @@ func TestDiffSystemTablePrepared(t *testing.T) {
}
}
func mustNewEngine(t *testing.T, h enginetest.Harness) *gms.Engine {
e, err := h.NewEngine(t)
if err != nil {
require.NoError(t, err)
}
return e
}
func TestSystemTableIndexes(t *testing.T) {
if !types.IsFormat_DOLT(types.Format_Default) {
t.Skip("only new format support system table indexing")
}
for _, stt := range SystemTableIndexTests {
harness := newDoltHarness(t).WithParallelism(2)
harness.SkipSetupCommit()
e := mustNewEngine(t, harness)
defer e.Close()
ctx := enginetest.NewContext(harness)
for _, q := range stt.setup {
enginetest.RunQuery(t, e, harness, q)
}
for _, tt := range stt.queries {
t.Run(fmt.Sprintf("%s: %s", stt.name, tt.query), func(t *testing.T) {
if tt.skip {
t.Skip()
}
ctx = ctx.WithQuery(tt.query)
if tt.exp != nil {
enginetest.TestQueryWithContext(t, ctx, e, harness, tt.query, tt.exp, nil, nil)
}
})
}
}
}
func TestSystemTableIndexesPrepared(t *testing.T) {
if !types.IsFormat_DOLT(types.Format_Default) {
t.Skip("only new format support system table indexing")
}
for _, stt := range SystemTableIndexTests {
harness := newDoltHarness(t).WithParallelism(2)
harness.SkipSetupCommit()
e := mustNewEngine(t, harness)
defer e.Close()
ctx := enginetest.NewContext(harness)
for _, q := range stt.setup {
enginetest.RunQuery(t, e, harness, q)
}
for _, tt := range stt.queries {
t.Run(fmt.Sprintf("%s: %s", stt.name, tt.query), func(t *testing.T) {
if tt.skip {
t.Skip()
}
ctx = ctx.WithQuery(tt.query)
if tt.exp != nil {
enginetest.TestPreparedQueryWithContext(t, ctx, e, harness, tt.query, tt.exp, nil)
}
})
}
}
}
func TestReadOnlyDatabases(t *testing.T) {
enginetest.TestReadOnlyDatabases(t, newDoltHarness(t))
}

View File

@@ -39,16 +39,17 @@ import (
)
type DoltHarness struct {
t *testing.T
provider dsess.DoltDatabaseProvider
multiRepoEnv *env.MultiRepoEnv
session *dsess.DoltSession
branchControl *branch_control.Controller
parallelism int
skippedQueries []string
setupData []setup.SetupScript
resetData []setup.SetupScript
engine *gms.Engine
t *testing.T
provider dsess.DoltDatabaseProvider
multiRepoEnv *env.MultiRepoEnv
session *dsess.DoltSession
branchControl *branch_control.Controller
parallelism int
skippedQueries []string
setupData []setup.SetupScript
resetData []setup.SetupScript
engine *gms.Engine
skipSetupCommit bool
}
var _ enginetest.Harness = (*DoltHarness)(nil)
@@ -87,6 +88,10 @@ func (d *DoltHarness) Setup(setupData ...[]setup.SetupScript) {
}
}
func (d *DoltHarness) SkipSetupCommit() {
d.skipSetupCommit = true
}
// resetScripts returns a set of queries that will reset the given database
// names. If [autoInc], the queries for resetting autoincrement tables are
// included.
@@ -184,9 +189,11 @@ func (d *DoltHarness) NewEngine(t *testing.T) (*gms.Engine, error) {
dbs = append(dbs, db.Name())
}
e, err = enginetest.RunEngineScripts(ctx, e, commitScripts(dbs), d.SupportsNativeIndexCreation())
if err != nil {
return nil, err
if !d.skipSetupCommit {
e, err = enginetest.RunEngineScripts(ctx, e, commitScripts(dbs), d.SupportsNativeIndexCreation())
if err != nil {
return nil, err
}
}
return e, nil

View File

@@ -2703,3 +2703,194 @@ var CommitDiffSystemTableScriptTests = []queries.ScriptTest{
},
},
}
type systabScript struct {
name string
setup []string
queries []systabQuery
}
type systabQuery struct {
query string
exp []sql.Row
skip bool
}
var systabSetup = []string{
"create table xy (x int primary key, y varchar(20));",
"insert into xy values (0, 'row 0'), (1, 'row 1'), (2, 'row 2'), (3, 'row 3'), (4, 'row 4');",
"call dolt_add('.');",
"call dolt_commit('-m', 'commit 0');",
"update xy set y = y+1 where x < 10",
"insert into xy values (20, 'row 20'), (21, 'row 21'), (22, 'row 22'), (23, 'row 23'), (24, 'row 24');",
"call dolt_add('.');",
"call dolt_commit('-m', 'commit 1');",
"update xy set y = y+1 where x > 10 and x < 30",
"insert into xy values (40, 'row 40'), (41, 'row 41'), (42, 'row 42'), (43, 'row 43'), (44, 'row 44');",
"call dolt_add('.');",
"call dolt_commit('-m', 'commit 2');",
"update xy set y = y+1 where x > 30 and x < 50",
"insert into xy values (60, 'row 60'), (61, 'row 61'), (62, 'row 62'), (63, 'row 63'), (64, 'row 64');",
"call dolt_add('.');",
"call dolt_commit('-m', 'commit 3');",
"update xy set y = y+1 where x > 50 and x < 70",
"insert into xy values (80, 'row 80'), (81, 'row 81'), (82, 'row 82'), (83, 'row 83'), (84, 'row 84');",
"call dolt_add('.');",
"call dolt_commit('-m', 'commit 4');",
}
var SystemTableIndexTests = []systabScript{
{
name: "systab benchmarks",
setup: append(systabSetup,
"set @commit = (select commit_hash from dolt_log where message = 'commit 2');",
),
queries: []systabQuery{
{
query: "select from_x, to_x from dolt_diff_xy where to_commit = @commit;",
exp: []sql.Row{{20, 20}, {21, 21}, {22, 22}, {23, 23}, {24, 24}, {nil, 40}, {nil, 41}, {nil, 42}, {nil, 43}, {nil, 44}},
},
{
query: "select from_x, to_x from dolt_diff_xy where from_commit = @commit;",
exp: []sql.Row{{40, 40}, {41, 41}, {42, 42}, {43, 43}, {44, 44}, {nil, 60}, {nil, 61}, {nil, 62}, {nil, 63}, {nil, 64}},
},
{
query: "select count(*) from dolt_diff where commit_hash = @commit;",
exp: []sql.Row{{1}},
},
{
query: "select count(*) from dolt_history_xy where commit_hash = @commit;",
exp: []sql.Row{{15}},
},
{
query: "select count(*) from dolt_log where commit_hash = @commit;",
exp: []sql.Row{{1}},
},
{
query: "select count(*) from dolt_commits where commit_hash = @commit;",
exp: []sql.Row{{1}},
},
{
query: "select count(*) from dolt_commit_ancestors where commit_hash = @commit;",
exp: []sql.Row{{1}},
},
{
query: "select count(*) from dolt_diff_xy join dolt_log on commit_hash = to_commit",
exp: []sql.Row{{45}},
},
{
query: "select count(*) from dolt_diff_xy join dolt_log on commit_hash = from_commit",
exp: []sql.Row{{45}},
},
{
query: "select count(*) from dolt_blame_xy",
exp: []sql.Row{{25}},
},
{
query: `SELECT count(*)
FROM dolt_commits as cm
JOIN dolt_commit_ancestors as an
ON cm.commit_hash = an.parent_hash
ORDER BY cm.date, cm.message asc`,
exp: []sql.Row{{5}},
},
},
},
{
name: "commit indexing edge cases",
setup: append(systabSetup,
"call dolt_checkout('-b', 'feat');",
"call dolt_commit('--allow-empty', '-m', 'feat commit');",
"call dolt_checkout('main');",
"update xy set y = y+1 where x > 70 and x < 90;",
"set @commit = (select commit_hash from dolt_log where message = 'commit 1');",
"set @root_commit = (select commit_hash from dolt_log where message = 'Initialize data repository');",
"set @feat_head = hashof('feat');",
),
queries: []systabQuery{
{
query: "select from_x, to_x from dolt_diff_xy where to_commit = 'WORKING';",
exp: []sql.Row{{80, 80}, {81, 81}, {82, 82}, {83, 83}, {84, 84}},
},
{
// TODO from_commit should find all commits that reference it
// as a parent
query: "select * from dolt_diff_xy where from_commit = 'WORKING';",
exp: []sql.Row{},
},
{
query: "select count(*) from dolt_diff where commit_hash = 'WORKING';",
exp: []sql.Row{{1}},
},
{
query: "select count(*) from dolt_history_xy where commit_hash = 'WORKING';",
exp: []sql.Row{{0}},
},
{
query: "select count(*) from dolt_commit_ancestors where commit_hash = 'WORKING';",
exp: []sql.Row{{0}},
},
{
query: "select sum(to_x) from dolt_diff_xy where to_commit in (@commit, 'WORKING');",
exp: []sql.Row{{530.0}},
},
{
// TODO from_commit optimization
query: "select sum(to_x) from dolt_diff_xy where from_commit in (@commit, 'WORKING');",
exp: []sql.Row{{320.0}},
},
{
query: "select count(*) from dolt_diff where commit_hash in (@commit, 'WORKING');",
exp: []sql.Row{{2}},
},
{
query: "select sum(x) from dolt_history_xy where commit_hash in (@commit, 'WORKING');",
exp: []sql.Row{{120.0}},
},
{
// init commit has nil ancestor
query: "select count(*) from dolt_commit_ancestors where commit_hash in (@commit, @root_commit);",
exp: []sql.Row{{2}},
},
{
query: "select count(*) from dolt_log where commit_hash in (@commit, @root_commit);",
exp: []sql.Row{{2}},
},
{
// log table cannot access commits is feature branch
query: "select count(*) from dolt_log where commit_hash = @feat_head;",
exp: []sql.Row{{0}},
},
{
// commit table can access all commits
query: "select count(*) from dolt_commits where commit_hash = @feat_head;",
exp: []sql.Row{{1}},
},
{
query: "select count(*) from dolt_commits where commit_hash in (@commit, @root_commit);",
exp: []sql.Row{{2}},
},
// unknown
{
query: "select from_x, to_x from dolt_diff_xy where to_commit = 'unknown';",
exp: []sql.Row{},
},
{
query: "select * from dolt_diff_xy where from_commit = 'unknown';",
exp: []sql.Row{},
},
{
query: "select * from dolt_diff where commit_hash = 'unknown';",
exp: []sql.Row{},
},
{
query: "select * from dolt_history_xy where commit_hash = 'unknown';",
exp: []sql.Row{},
},
{
query: "select * from dolt_commit_ancestors where commit_hash = 'unknown';",
exp: []sql.Row{},
},
},
},
}

View File

@@ -16,6 +16,7 @@ package sqle
import (
"context"
"fmt"
"io"
"strings"
@@ -29,7 +30,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dtables"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/index"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
@@ -64,6 +64,7 @@ type HistoryTable struct {
doltTable *DoltTable
commitFilters []sql.Expression
cmItr doltdb.CommitItr
commitCheck doltdb.CommitFilter
indexLookup sql.IndexLookup
projectedCols []uint64
}
@@ -76,14 +77,57 @@ func (ht *HistoryTable) GetIndexes(ctx *sql.Context) ([]sql.Index, error) {
// For index pushdown to work, we need to represent the indexes from the underlying table as belonging to this one
// Our results will also not be ordered, so we need to declare them as such
return index.DoltHistoryIndexesFromTable(ctx, ht.doltTable.db.Name(), ht.Name(), tbl)
return index.DoltHistoryIndexesFromTable(ctx, ht.doltTable.db.Name(), ht.Name(), tbl, ht.doltTable.db.DbData().Ddb)
}
func (ht *HistoryTable) IndexedAccess(i sql.Index) sql.IndexedTable {
return ht
func (ht *HistoryTable) IndexedAccess(_ sql.IndexLookup) sql.IndexedTable {
ret := *ht
return &ret
}
func (ht *HistoryTable) LookupPartitions(ctx *sql.Context, lookup sql.IndexLookup) (sql.PartitionIter, error) {
if lookup.Index.ID() == index.CommitHashIndexId {
hs, ok := index.LookupToPointSelectStr(lookup)
if !ok {
return nil, fmt.Errorf("failed to parse commit hash lookup: %s", sql.DebugString(lookup.Ranges))
}
var hashes []hash.Hash
var commits []*doltdb.Commit
var metas []*datas.CommitMeta
for _, hs := range hs {
if hs == doltdb.Working {
}
h, ok := hash.MaybeParse(hs)
if !ok {
continue
}
hashes = append(hashes, h)
cm, err := doltdb.HashToCommit(ctx, ht.doltTable.db.DbData().Ddb.ValueReadWriter(), ht.doltTable.db.DbData().Ddb.NodeStore(), h)
if err != nil {
return nil, err
}
commits = append(commits, cm)
meta, err := cm.GetCommitMeta(ctx)
if err != nil {
return nil, err
}
metas = append(metas, meta)
}
if len(hashes) == 0 {
return sql.PartitionsToPartitionIter(), nil
}
iter, err := ht.filterIter(ctx, doltdb.NewCommitSliceIter(commits, hashes))
if err != nil {
return nil, err
}
return &commitPartitioner{cmItr: iter}, nil
}
ht.indexLookup = lookup
return ht.Partitions(ctx)
}
@@ -144,20 +188,47 @@ func (ht *HistoryTable) Filters() []sql.Expression {
// WithFilters returns a new sql.Table instance with the filters applied. We handle filters on any commit columns.
func (ht *HistoryTable) WithFilters(ctx *sql.Context, filters []sql.Expression) sql.Table {
if ht.commitFilters == nil {
ht.commitFilters = dtables.FilterFilters(filters, dtables.ColumnPredicate(historyTableCommitMetaCols))
}
ret := *ht
ret.commitFilters = dtables.FilterFilters(filters, dtables.ColumnPredicate(historyTableCommitMetaCols))
return &ret
}
func (ht *HistoryTable) filterIter(ctx *sql.Context, iter doltdb.CommitItr) (doltdb.CommitItr, error) {
if len(ht.commitFilters) > 0 {
commitCheck, err := commitFilterForExprs(ctx, ht.commitFilters)
r, err := ht.doltTable.db.GetRoot(ctx)
if err != nil {
return sqlutil.NewStaticErrorTable(ht, err)
return doltdb.FilteringCommitItr{}, err
}
h, err := r.HashOf()
if err != nil {
return doltdb.FilteringCommitItr{}, err
}
filters := substituteWorkingHash(h, ht.commitFilters)
check, err := commitFilterForExprs(ctx, filters)
if err != nil {
return doltdb.FilteringCommitItr{}, err
}
ht.cmItr = doltdb.NewFilteringCommitItr(ht.cmItr, commitCheck)
return doltdb.NewFilteringCommitItr(iter, check), nil
}
return iter, nil
}
return ht
func substituteWorkingHash(h hash.Hash, f []sql.Expression) []sql.Expression {
ret := make([]sql.Expression, len(f))
for i, e := range f {
ret[i], _, _ = transform.Expr(e, func(e sql.Expression) (sql.Expression, transform.TreeIdentity, error) {
switch e := e.(type) {
case *expression.Literal:
if e.Value() == doltdb.Working {
return expression.NewLiteral(h.String(), e.Type()), transform.NewTree, nil
}
default:
}
return e, transform.SameTree, nil
})
}
return ret
}
var historyTableCommitMetaCols = set.NewStrSet([]string{CommitHashCol, CommitDateCol, CommitterCol})
@@ -323,7 +394,11 @@ func (ht *HistoryTable) Collation() sql.CollationID {
// Partitions returns a PartitionIter which will be used in getting partitions each of which is used to create RowIter.
func (ht *HistoryTable) Partitions(ctx *sql.Context) (sql.PartitionIter, error) {
return &commitPartitioner{ht.cmItr}, nil
iter, err := ht.filterIter(ctx, ht.cmItr)
if err != nil {
return nil, err
}
return &commitPartitioner{cmItr: iter}, nil
}
// PartitionRows takes a partition and returns a row iterator for that partition
@@ -407,13 +482,15 @@ func newRowItrForTableAtCommit(ctx *sql.Context, tableName string, table *DoltTa
}
for _, idx := range indexes {
if idx.ID() == lookup.Index.ID() {
histTable = table.IndexedAccess(idx)
newLookup := sql.IndexLookup{Index: idx, Ranges: lookup.Ranges}
partIter, err = histTable.(sql.IndexedTable).LookupPartitions(ctx, newLookup)
if err != nil {
return nil, err
histTable = table.IndexedAccess(lookup)
if histTable != nil {
newLookup := sql.IndexLookup{Index: idx, Ranges: lookup.Ranges}
partIter, err = histTable.(sql.IndexedTable).LookupPartitions(ctx, newLookup)
if err != nil {
return nil, err
}
break
}
break
}
}
}

View File

@@ -27,6 +27,8 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
@@ -34,6 +36,11 @@ import (
"github.com/dolthub/dolt/go/store/val"
)
const (
CommitHashIndexId = "commit_hash"
ToCommitIndexId = "to_commit"
)
type DoltTableable interface {
DoltTable(*sql.Context) (*doltdb.Table, error)
DataCacheKey(*sql.Context) (doltdb.DataCacheKey, bool, error)
@@ -53,6 +60,43 @@ type DoltIndex interface {
lookupTags(s *durableIndexState) map[uint64]int
}
func NewCommitIndex(i *doltIndex) *CommitIndex {
return &CommitIndex{doltIndex: i}
}
type CommitIndex struct {
*doltIndex
}
func (p *CommitIndex) CanSupport(ranges ...sql.Range) bool {
var selects []string
for _, r := range ranges {
if len(r) != 1 {
return false
}
lb, ok := r[0].LowerBound.(sql.Below)
if !ok {
return false
}
lk, ok := lb.Key.(string)
if !ok {
return false
}
ub, ok := r[0].UpperBound.(sql.Above)
if !ok {
return false
}
uk, ok := ub.Key.(string)
if uk != lk {
return false
}
selects = append(selects, uk)
}
return true
}
var _ DoltIndex = (*CommitIndex)(nil)
func DoltDiffIndexesFromTable(ctx context.Context, db, tbl string, t *doltdb.Table) (indexes []sql.Index, err error) {
sch, err := t.GetSchema(ctx)
if err != nil {
@@ -96,7 +140,49 @@ func DoltDiffIndexesFromTable(ctx context.Context, db, tbl string, t *doltdb.Tab
constrainedToLookupExpression: false,
}
return append(indexes, &toIndex), nil
indexes = append(indexes, &toIndex)
indexes = append(indexes, NewCommitIndex(&doltIndex{
id: ToCommitIndexId,
tblName: doltdb.DoltDiffTablePrefix + tbl,
dbName: db,
columns: []schema.Column{
schema.NewColumn("to_commit", schema.DiffCommitTag, types.StringKind, false),
},
indexSch: sch,
tableSch: sch,
unique: true,
comment: "",
vrw: t.ValueReadWriter(),
ns: t.NodeStore(),
order: sql.IndexOrderAsc,
constrainedToLookupExpression: false,
}))
return indexes, nil
}
func DoltCommitIndexes(tab string, db *doltdb.DoltDB, unique bool) (indexes []sql.Index, err error) {
if !types.IsFormat_DOLT(db.Format()) {
return nil, nil
}
return []sql.Index{
NewCommitIndex(&doltIndex{
id: CommitHashIndexId,
tblName: tab,
dbName: "",
columns: []schema.Column{
schema.NewColumn(CommitHashIndexId, 0, types.StringKind, false),
},
indexSch: nil,
tableSch: nil,
unique: unique,
comment: "",
vrw: db.ValueReadWriter(),
ns: db.NodeStore(),
order: sql.IndexOrderNone,
constrainedToLookupExpression: false,
}),
}, nil
}
func DoltIndexesFromTable(ctx context.Context, db, tbl string, t *doltdb.Table) (indexes []sql.Index, err error) {
@@ -173,7 +259,7 @@ func indexesMatch(a sql.Index, b sql.Index) bool {
return true
}
func DoltHistoryIndexesFromTable(ctx context.Context, db, tbl string, t *doltdb.Table) ([]sql.Index, error) {
func DoltHistoryIndexesFromTable(ctx context.Context, db, tbl string, t *doltdb.Table, ddb *doltdb.DoltDB) ([]sql.Index, error) {
indexes, err := DoltIndexesFromTable(ctx, db, tbl, t)
if err != nil {
return nil, err
@@ -189,6 +275,12 @@ func DoltHistoryIndexesFromTable(ctx context.Context, db, tbl string, t *doltdb.
unorderedIndexes[i] = di
}
cmIdx, err := DoltCommitIndexes(tbl, ddb, false)
if err != nil {
return nil, err
}
unorderedIndexes = append(unorderedIndexes, cmIdx...)
return unorderedIndexes, nil
}
@@ -977,3 +1069,110 @@ func SplitNullsFromRanges(rs []sql.Range) ([]sql.Range, error) {
}
return ret, nil
}
// LookupToPointSelectStr converts a set of point lookups on string
// fields, returning a nil list and false if any expression failed
// to convert.
func LookupToPointSelectStr(lookup sql.IndexLookup) ([]string, bool) {
var selects []string
for _, r := range lookup.Ranges {
if len(r) != 1 {
return nil, false
}
lb, ok := r[0].LowerBound.(sql.Below)
if !ok {
return nil, false
}
if lb.Key == nil {
continue
}
lk, ok := lb.Key.(string)
if !ok {
return nil, false
}
ub, ok := r[0].UpperBound.(sql.Above)
if !ok {
return nil, false
}
if ub.Key == nil {
continue
}
uk, ok := ub.Key.(string)
if uk != lk {
return nil, false
}
selects = append(selects, uk)
}
return selects, true
}
// HashesToCommits converts a set of strings into hashes, commits,
// and commit metadata. Strings that are invalid hashes, or do
// not refer to commits are filtered from the return lists.
//
// The doltdb.Working edge case is handled specially depending on
// whether we are: 1) interested in converting "WORKING" into a
// commit hash (or leave it as "WORKING"), and 2) whether we want
// to attempt to load a commit if WORKING == HEAD. The commit and
// metadata for a working hash will be nil if indicated.
func HashesToCommits(
ctx *sql.Context,
ddb *doltdb.DoltDB,
hashStrs []string,
head *doltdb.Commit,
convertWorkingToCommit bool,
) ([]hash.Hash, []*doltdb.Commit, []*datas.CommitMeta) {
var hashes []hash.Hash
var commits []*doltdb.Commit
var metas []*datas.CommitMeta
var err error
var ok bool
for _, hs := range hashStrs {
var h hash.Hash
var cm *doltdb.Commit
var meta *datas.CommitMeta
switch hs {
case doltdb.Working:
if head == nil {
continue
}
h, err = head.HashOf()
if err != nil {
continue
}
if convertWorkingToCommit {
cm, err = doltdb.HashToCommit(ctx, ddb.ValueReadWriter(), ddb.NodeStore(), h)
if err != nil {
cm = nil
} else {
cm = head
meta, err = cm.GetCommitMeta(ctx)
if err != nil {
continue
}
}
}
default:
h, ok = hash.MaybeParse(hs)
if !ok {
continue
}
cm, err = doltdb.HashToCommit(ctx, ddb.ValueReadWriter(), ddb.NodeStore(), h)
if err != nil {
continue
}
meta, err = cm.GetCommitMeta(ctx)
if err != nil {
continue
}
}
if err != nil {
continue
}
hashes = append(hashes, h)
commits = append(commits, cm)
metas = append(metas, meta)
}
return hashes, commits, metas
}

View File

@@ -21,7 +21,6 @@ import (
"testing"
"github.com/dolthub/go-mysql-server/sql"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
@@ -247,8 +246,5 @@ func testHistoryTable(t *testing.T, test historyTableTest, dEnv *env.DoltEnv) {
actRows, err := sqle.ExecuteSelect(dEnv, root, test.query)
require.NoError(t, err)
require.Equal(t, len(test.rows), len(actRows))
for i := range test.rows {
assert.Equal(t, test.rows[i], actRows[i])
}
require.ElementsMatch(t, test.rows, actRows)
}

View File

@@ -174,7 +174,7 @@ func (db *SingleTableInfoDatabase) GetForeignKeyEditor(ctx *sql.Context) sql.For
}
// IndexedAccess implements sql.IndexedTable.
func (db *SingleTableInfoDatabase) IndexedAccess(sql.Index) sql.IndexedTable {
func (db *SingleTableInfoDatabase) IndexedAccess(lookup sql.IndexLookup) sql.IndexedTable {
return db
}
@@ -301,7 +301,7 @@ func (idx fmtIndex) IsGenerated() bool {
return idx.generated
}
func (idx fmtIndex) IndexedAccess(index sql.IndexLookup) (sql.IndexedTable, error) {
func (idx fmtIndex) IndexedAccess(_ sql.IndexLookup) (sql.IndexedTable, error) {
panic("unimplemented")
}

View File

@@ -1255,17 +1255,6 @@ func TestSelect(t *testing.T) {
}
}
func TestDiffQueries(t *testing.T) {
if types.Format_Default != types.Format_LD_1 {
t.Skip("") // todo: convert to enginetests
}
for _, test := range SelectDiffTests {
t.Run(test.Name, func(t *testing.T) {
testSelectDiffQuery(t, test)
})
}
}
func TestAsOfQueries(t *testing.T) {
if types.Format_Default != types.Format_LD_1 {
t.Skip("") // todo: convert to enginetests

View File

@@ -101,7 +101,7 @@ func (e *StaticErrorEditor) Close(*sql.Context) error {
return nil
}
func (e *StaticErrorEditor) IndexedAccess(index sql.Index) sql.IndexedTable {
func (e *StaticErrorEditor) IndexedAccess(_ sql.IndexLookup) sql.IndexedTable {
return &StaticErrorTable{nil, e.err}
}

View File

@@ -183,8 +183,8 @@ var _ doltReadOnlyTableInterface = (*DoltTable)(nil)
//var _ sql.ProjectedTable = (*DoltTable)(nil)
// IndexedAccess implements sql.IndexAddressableTable
func (t *DoltTable) IndexedAccess(idx sql.Index) sql.IndexedTable {
return NewIndexedDoltTable(t, idx.(index.DoltIndex))
func (t *DoltTable) IndexedAccess(lookup sql.IndexLookup) sql.IndexedTable {
return NewIndexedDoltTable(t, lookup.Index.(index.DoltIndex))
}
// doltTable returns the underlying doltTable from the current session
@@ -506,8 +506,8 @@ func (t *WritableDoltTable) setRoot(ctx *sql.Context, newRoot *doltdb.RootValue)
return t.db.SetRoot(ctx, newRoot)
}
func (t *WritableDoltTable) IndexedAccess(idx sql.Index) sql.IndexedTable {
return NewWritableIndexedDoltTable(t, idx.(index.DoltIndex))
func (t *WritableDoltTable) IndexedAccess(lookup sql.IndexLookup) sql.IndexedTable {
return NewWritableIndexedDoltTable(t, lookup.Index.(index.DoltIndex))
}
// WithProjections implements sql.ProjectedTable

View File

@@ -252,7 +252,7 @@ func (t *TempTable) PartitionRows(ctx *sql.Context, partition sql.Partition) (sq
}
}
func (t *TempTable) IndexedAccess(idx sql.Index) sql.IndexedTable {
func (t *TempTable) IndexedAccess(_ sql.IndexLookup) sql.IndexedTable {
return t
}

View File

@@ -139,8 +139,8 @@ func (te *nomsTableWriter) SetAutoIncrementValue(ctx *sql.Context, val uint64) e
return te.flush(ctx)
}
func (te *nomsTableWriter) IndexedAccess(i sql.Index) sql.IndexedTable {
idx := index.DoltIndexFromSqlIndex(i)
func (te *nomsTableWriter) IndexedAccess(i sql.IndexLookup) sql.IndexedTable {
idx := index.DoltIndexFromSqlIndex(i.Index)
return &nomsFkIndexer{
writer: te,
idxName: idx.ID(),

View File

@@ -256,8 +256,8 @@ func (w *prollyTableWriter) GetIndexes(ctx *sql.Context) ([]sql.Index, error) {
}
// IndexedAccess implements sql.IndexAddressableTable.
func (w *prollyTableWriter) IndexedAccess(i sql.Index) sql.IndexedTable {
idx := index.DoltIndexFromSqlIndex(i)
func (w *prollyTableWriter) IndexedAccess(i sql.IndexLookup) sql.IndexedTable {
idx := index.DoltIndexFromSqlIndex(i.Index)
return &prollyFkIndexer{
writer: w,
index: idx,

View File

@@ -29,6 +29,7 @@ var run = flag.String("run", "", "the path to a test file")
var scriptDir = flag.String("script-dir", "", "the path to the script directory")
var config = flag.String("config", "", "the path to a config file")
var out = flag.String("out", "", "result output path")
var verbose = flag.Bool("verbose", true, "verbose output")
func main() {
flag.Parse()
@@ -42,7 +43,7 @@ func main() {
log.Fatalln(err)
}
conf = conf.WithScriptDir(*scriptDir)
conf = conf.WithScriptDir(*scriptDir).WithVerbose(*verbose)
if err := os.Chdir(*scriptDir); err != nil {
log.Fatalf("failed to 'cd %s'", *scriptDir)
}

View File

@@ -0,0 +1,17 @@
tests:
- name: "system table"
repos:
- name: dolt
server:
port: 3309
args: [ "--port", "3309", "--password", "password"]
scripts:
- gen/dolt_diff_log_join_on_commit.gen.lua
- name: "dummy system table"
repos:
- name: dolt
server:
port: 3309
args: [ "--port", "3309" ]
scripts:
- gen/dolt_diff_log_join_on_commit_dummy.gen.lua

View File

@@ -15,6 +15,8 @@ tests:
- gen/dolt_diffs_commit_filter.gen.lua
- gen/dolt_history_commit_filter.gen.lua
- gen/dolt_log_commit_filter.gen.lua
- gen/dolt_blame_basic.gen.lua
- gen/dolt_blame_commit_filter.gen.lua
- name: "dummy system table"
repos:
- name: dolt
@@ -30,4 +32,6 @@ tests:
- gen/dolt_diff_table_from_commit_filter_dummy.gen.lua
- gen/dolt_diffs_commit_filter_dummy.gen.lua
- gen/dolt_history_commit_filter_dummy.gen.lua
- gen/dolt_log_commit_filter_dummy.gen.lua
- gen/dolt_log_commit_filter_dummy.gen.lua
- gen/dolt_blame_basic_dummy.gen.lua
- gen/dolt_blame_commit_filter_dummy.gen.lua

View File

@@ -18,6 +18,7 @@ import (
"bytes"
"database/sql"
"fmt"
"log"
"math"
"os"
"os/exec"
@@ -53,6 +54,7 @@ type Config struct {
TableSize int `yaml:"tableSize"`
Histogram bool `yaml:"histogram"`
ScriptDir string `yaml:"scriptDir"`
Verbose bool `yaml:"verbose"`
}
func (c Config) WithScriptDir(dir string) Config {
@@ -60,6 +62,11 @@ func (c Config) WithScriptDir(dir string) Config {
return c
}
func (c Config) WithVerbose(v bool) Config {
c.Verbose = v
return c
}
func (c Config) AsOpts() []string {
var ret []string
if c.DbDriver != "" {
@@ -491,7 +498,7 @@ func (test *Script) RunExternalServerTests(repoName string, s *driver.ExternalSe
func (test *Script) RunSqlServerTests(repo driver.TestRepo, user driver.DoltUser, conf Config) error {
return test.IterSysbenchScripts(conf, test.Scripts, func(script string, prep, run, clean *exec.Cmd) error {
//make a new server for every test
server, err := newServer(user, repo)
server, err := newServer(user, repo, conf)
if err != nil {
return err
}
@@ -515,6 +522,7 @@ func (test *Script) RunSqlServerTests(repo driver.TestRepo, user driver.DoltUser
run.Stdout = buf
err = run.Run()
if err != nil {
fmt.Println(buf)
return err
}
@@ -527,11 +535,14 @@ func (test *Script) RunSqlServerTests(repo driver.TestRepo, user driver.DoltUser
}
test.Results.Append(r)
if conf.Verbose {
return nil
}
return clean.Run()
})
}
func newServer(u driver.DoltUser, r driver.TestRepo) (*driver.SqlServer, error) {
func newServer(u driver.DoltUser, r driver.TestRepo, conf Config) (*driver.SqlServer, error) {
rs, err := u.MakeRepoStore()
if err != nil {
return nil, err
@@ -541,6 +552,10 @@ func newServer(u driver.DoltUser, r driver.TestRepo) (*driver.SqlServer, error)
if err != nil {
return nil, err
}
if conf.Verbose {
log.Printf("database at: '%s'", repo.Dir)
}
r.Server.Args = append(r.Server.Args, "")
server, err := MakeServer(repo, r.Server)
if err != nil {

View File

@@ -61,6 +61,8 @@ const (
commitName = "Commit"
)
var ErrCommitNotFound = errors.New("target commit not found")
type Commit struct {
val types.Value
addr hash.Hash
@@ -311,7 +313,7 @@ func LoadCommitRef(ctx context.Context, vr types.ValueReader, r types.Ref) (*Com
return nil, err
}
if v == nil {
return nil, errors.New("target commit not found")
return nil, ErrCommitNotFound
}
return commitPtr(vr.Format(), v, &r)
}

View File

@@ -100,6 +100,15 @@ func (c CommitClosure) IsEmpty() bool {
return c.Node().Size() == 0
}
func (c CommitClosure) ContainsKey(ctx context.Context, h hash.Hash, height uint64) (bool, error) {
k := NewCommitClosureKey(c.closure.NodeStore.Pool(), height, h)
cur, err := tree.NewCursorAtKey(ctx, c.closure.NodeStore, c.closure.Root, k, c.closure.Order)
if err != nil {
return false, err
}
return cur.Valid(), nil
}
func DecodeCommitClosureKey(key []byte) (height uint64, addr hash.Hash) {
height = binary.LittleEndian.Uint64(key)
addr = hash.New(key[8:])

View File

@@ -23,7 +23,7 @@ import (
"strings"
)
var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2019-2020|2019-2021|2019-2022|2020-2021|2020-2022) Dolthub, Inc.
var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2023|2019-2020|2019-2021|2019-2022|2020-2021|2020-2022) Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 \(the "License"\);
// you may not use this file except in compliance with the License.
@@ -39,7 +39,7 @@ var ExpectedHeader = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2019-
`)
var ExpectedHeaderForFileFromNoms = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2019-2020|2019-2021|2019-2022|2020-2021|2020-2022) Dolthub, Inc.
var ExpectedHeaderForFileFromNoms = regexp.MustCompile(`// Copyright (2019|2020|2021|2022|2023|2019-2020|2019-2021|2019-2022|2020-2021|2020-2022) Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 \(the "License"\);
// you may not use this file except in compliance with the License.