mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-19 19:21:44 -05:00
Merge remote-tracking branch 'origin/main' into andy/auto-fix
This commit is contained in:
@@ -82,7 +82,7 @@ func AutoResolveTables(ctx context.Context, dEnv *env.DoltEnv, strategy AutoReso
|
||||
// ResolveTable resolves all conflicts in the given table according to the given
|
||||
// |strategy|. It errors if the schema of the conflict version you are choosing
|
||||
// differs from the current schema.
|
||||
func ResolveTable(ctx context.Context, dEnv *env.DoltEnv, root *doltdb.RootValue, tblName string, strategy AutoResolveStrategy) error {
|
||||
func ResolveTable(ctx context.Context, dEnv *env.DoltEnv, root *doltdb.RootValue, tblName string, strategy AutoResolveStrategy) (err error) {
|
||||
tbl, ok, err := root.GetTable(ctx, tblName)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -135,6 +135,27 @@ func ResolveTable(ctx context.Context, dEnv *env.DoltEnv, root *doltdb.RootValue
|
||||
return err
|
||||
}
|
||||
|
||||
v, err := getFirstColumn(sqlCtx, eng, "SELECT @@DOLT_ALLOW_COMMIT_CONFLICTS;")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oldAllowCommitConflicts, ok := v.(int8)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected type of @DOLT_ALLOW_COMMIT_CONFLICTS: %T", v)
|
||||
}
|
||||
|
||||
// Resolving conflicts for one table, will not resolve conflicts on another.
|
||||
err = execute(sqlCtx, eng, "SET DOLT_ALLOW_COMMIT_CONFLICTS = 1;")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err2 := execute(sqlCtx, eng, fmt.Sprintf("SET DOLT_ALLOW_COMMIT_CONFLICTS = %d", oldAllowCommitConflicts))
|
||||
if err == nil {
|
||||
err = err2
|
||||
}
|
||||
}()
|
||||
|
||||
if !schema.IsKeyless(sch) {
|
||||
err = resolvePkTable(sqlCtx, tblName, sch, strategy, eng)
|
||||
} else {
|
||||
@@ -144,6 +165,11 @@ func ResolveTable(ctx context.Context, dEnv *env.DoltEnv, root *doltdb.RootValue
|
||||
return err
|
||||
}
|
||||
|
||||
err = execute(sqlCtx, eng, "COMMIT;")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
after, err := dEnv.WorkingRoot(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -157,14 +183,23 @@ func ResolveTable(ctx context.Context, dEnv *env.DoltEnv, root *doltdb.RootValue
|
||||
return nil
|
||||
}
|
||||
|
||||
func resolvePkTable(sqlCtx *sql.Context, tblName string, sch schema.Schema, strategy AutoResolveStrategy, eng *engine.SqlEngine) error {
|
||||
queries := getResolveQueries(strategy, tblName, sch)
|
||||
for _, query := range queries {
|
||||
err := execute(sqlCtx, eng, query)
|
||||
func resolvePkTable(ctx *sql.Context, tblName string, sch schema.Schema, strategy AutoResolveStrategy, eng *engine.SqlEngine) error {
|
||||
identCols := getIdentifyingColumnNames(sch)
|
||||
allCols := sch.GetAllCols().GetColumnNames()
|
||||
|
||||
switch strategy {
|
||||
case AutoResolveStrategyOurs:
|
||||
err := oursPKResolver(ctx, eng, tblName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case AutoResolveStrategyTheirs:
|
||||
err := theirsPKResolver(ctx, eng, tblName, allCols, identCols)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -248,21 +283,120 @@ func resolveKeylessTable(sqlCtx *sql.Context, tblName string, sch schema.Schema,
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = execute(sqlCtx, eng, fmt.Sprintf("DELETE FROM dolt_conflicts_%s", tblName))
|
||||
err = execute(sqlCtx, eng, fmt.Sprintf("DELETE FROM dolt_conflicts_%s", tblName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func oursPKResolver(sqlCtx *sql.Context, eng *engine.SqlEngine, tblName string) error {
|
||||
del := fmt.Sprintf("DELETE FROM dolt_conflicts_%s;", tblName)
|
||||
err := execute(sqlCtx, eng, del)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getIdentifyingColumnNames(sch schema.Schema) []string {
|
||||
if schema.IsKeyless(sch) {
|
||||
return sch.GetAllCols().GetColumnNames()
|
||||
} else {
|
||||
return sch.GetPKCols().GetColumnNames()
|
||||
}
|
||||
}
|
||||
|
||||
func theirsPKResolver(sqlCtx *sql.Context, eng *engine.SqlEngine, tblName string, allCols []string, identCols []string) error {
|
||||
dstCols := strings.Join(allCols, ", ")
|
||||
srcCols := strings.Join(withPrefix(allCols, "their_"), ", ")
|
||||
q1 := fmt.Sprintf(
|
||||
`
|
||||
REPLACE INTO %s (%s) (
|
||||
SELECT %s
|
||||
FROM dolt_conflicts_%s
|
||||
WHERE their_diff_type = 'modified' OR their_diff_type = 'added'
|
||||
);
|
||||
`, tblName, dstCols, srcCols, tblName)
|
||||
err := execute(sqlCtx, eng, q1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
selCols := strings.Join(coalesced(identCols), ", ")
|
||||
q2 := fmt.Sprintf("SELECT %s from dolt_conflicts_%s WHERE their_diff_type = 'removed';", selCols, tblName)
|
||||
sch, itr, err := eng.Query(sqlCtx, q2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
row, err := itr.Next(sqlCtx)
|
||||
if err != nil && err != io.EOF {
|
||||
return err
|
||||
}
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
deleteFilter, err := buildFilter(identCols, row, sch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = execute(sqlCtx, eng, "COMMIT;")
|
||||
del := fmt.Sprintf("DELETE from %s WHERE %s;", tblName, deleteFilter)
|
||||
err = execute(sqlCtx, eng, del)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
q3 := fmt.Sprintf("DELETE FROM dolt_conflicts_%s;", tblName)
|
||||
err = execute(sqlCtx, eng, q3)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildFilter(columns []string, row sql.Row, rowSch sql.Schema) (string, error) {
|
||||
if len(columns) != len(row) {
|
||||
return "", errors.New("cannot build filter since number of columns does not match number of values")
|
||||
}
|
||||
vals, err := sqlfmt.SqlRowAsStrings(row, rowSch)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
var b strings.Builder
|
||||
var seen bool
|
||||
for i, v := range vals {
|
||||
_, _ = fmt.Fprintf(&b, "%s = %s", columns[i], v)
|
||||
if seen {
|
||||
_, _ = fmt.Fprintf(&b, "AND %s = %s", columns[i], v)
|
||||
}
|
||||
seen = true
|
||||
}
|
||||
return b.String(), nil
|
||||
}
|
||||
|
||||
func coalesced(cols []string) []string {
|
||||
out := make([]string, len(cols))
|
||||
for i := range out {
|
||||
out[i] = fmt.Sprintf("coalesce(base_%s, our_%s, their_%s)", cols[i], cols[i], cols[i])
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func withPrefix(arr []string, prefix string) []string {
|
||||
out := make([]string, len(arr))
|
||||
for i := range arr {
|
||||
out[i] = prefix + arr[i]
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func execute(ctx *sql.Context, eng *engine.SqlEngine, query string) error {
|
||||
_, itr, err := eng.Query(ctx, query)
|
||||
if err != nil {
|
||||
@@ -275,86 +409,19 @@ func execute(ctx *sql.Context, eng *engine.SqlEngine, query string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func getResolveQueries(strategy AutoResolveStrategy, tblName string, sch schema.Schema) (queries []string) {
|
||||
identCols := getIdentifyingColumnNames(sch)
|
||||
allCols := sch.GetAllCols().GetColumnNames()
|
||||
|
||||
r := autoResolverMap[strategy]
|
||||
queries = r(tblName, allCols, identCols)
|
||||
// auto_commit is off
|
||||
queries = append(queries, "COMMIT;")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getIdentifyingColumnNames(sch schema.Schema) []string {
|
||||
if schema.IsKeyless(sch) {
|
||||
return sch.GetAllCols().GetColumnNames()
|
||||
} else {
|
||||
return sch.GetPKCols().GetColumnNames()
|
||||
func getFirstColumn(ctx *sql.Context, eng *engine.SqlEngine, query string) (interface{}, error) {
|
||||
_, itr, err := eng.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var autoResolverMap = map[AutoResolveStrategy]autoResolver{
|
||||
AutoResolveStrategyOurs: ours,
|
||||
AutoResolveStrategyTheirs: theirs,
|
||||
}
|
||||
|
||||
type autoResolver func(tblName string, allCols []string, identCols []string) []string
|
||||
|
||||
func theirs(tblName string, allCols []string, identCols []string) []string {
|
||||
dstCols := strings.Join(allCols, ", ")
|
||||
srcCols := strings.Join(withPrefix(allCols, "their_"), ", ")
|
||||
q1 := fmt.Sprintf(
|
||||
`
|
||||
REPLACE INTO %s (%s) (
|
||||
SELECT %s
|
||||
FROM dolt_conflicts_%s
|
||||
WHERE their_diff_type = 'modified' OR their_diff_type = 'added'
|
||||
);
|
||||
`, tblName, dstCols, srcCols, tblName)
|
||||
|
||||
q2 := fmt.Sprintf(
|
||||
`
|
||||
DELETE t1
|
||||
FROM %s t1
|
||||
WHERE (
|
||||
SELECT count(*) from dolt_conflicts_%s t2
|
||||
WHERE %s AND t2.their_diff_type = 'removed'
|
||||
) > 0;
|
||||
`, tblName, tblName, buildJoinCond(identCols, "base_"))
|
||||
|
||||
q3 := fmt.Sprintf("DELETE FROM dolt_conflicts_%s;", tblName)
|
||||
|
||||
return []string{q1, q2, q3}
|
||||
}
|
||||
|
||||
func ours(tblName string, allCols []string, identCols []string) []string {
|
||||
|
||||
q3 := fmt.Sprintf("DELETE FROM dolt_conflicts_%s;", tblName)
|
||||
|
||||
return []string{q3}
|
||||
}
|
||||
|
||||
func buildJoinCond(identCols []string, prefix string) string {
|
||||
b := &strings.Builder{}
|
||||
var seenOne bool
|
||||
for _, col := range identCols {
|
||||
if seenOne {
|
||||
_, _ = b.WriteString(" AND ")
|
||||
}
|
||||
seenOne = true
|
||||
_, _ = fmt.Fprintf(b, "t1.%s = t2.%s%s", col, prefix, col)
|
||||
r, err := itr.Next(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func withPrefix(arr []string, prefix string) []string {
|
||||
out := make([]string, len(arr))
|
||||
for i := range arr {
|
||||
out[i] = prefix + arr[i]
|
||||
if len(r) == 0 {
|
||||
return nil, fmt.Errorf("no columns returned")
|
||||
}
|
||||
return out
|
||||
return r[0], nil
|
||||
}
|
||||
|
||||
func validateConstraintViolations(ctx context.Context, before, after *doltdb.RootValue, table string) error {
|
||||
|
||||
@@ -56,7 +56,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Version = "0.40.30"
|
||||
Version = "0.40.31"
|
||||
)
|
||||
|
||||
var dumpDocsCommand = &commands.DumpDocsCmd{}
|
||||
|
||||
@@ -23,12 +23,13 @@
|
||||
package eventsapi
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
durationpb "google.golang.org/protobuf/types/known/durationpb"
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,6 +8,7 @@ package eventsapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -23,10 +23,11 @@
|
||||
package eventsapi
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -21,11 +21,12 @@
|
||||
package remotesapi
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,6 +8,7 @@ package remotesapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -21,10 +21,11 @@
|
||||
package remotesapi
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -8,6 +8,7 @@ package remotesapi
|
||||
|
||||
import (
|
||||
context "context"
|
||||
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
|
||||
@@ -748,6 +748,12 @@ func pruneEmptyRanges(sqlRanges []sql.Range) (pruned []sql.Range, err error) {
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, ce := range sr {
|
||||
if lb, ok := ce.LowerBound.(sql.Below); ok && lb.Key == nil {
|
||||
empty = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !empty {
|
||||
pruned = append(pruned, sr)
|
||||
}
|
||||
@@ -762,19 +768,13 @@ func (di *doltIndex) prollyRangesFromSqlRanges(ctx context.Context, ns tree.Node
|
||||
}
|
||||
|
||||
pranges := make([]prolly.Range, len(ranges))
|
||||
for i := range pranges {
|
||||
pranges[i] = prolly.Range{
|
||||
Fields: make([]prolly.RangeField, len(ranges[i])),
|
||||
Desc: di.keyBld.Desc,
|
||||
}
|
||||
}
|
||||
|
||||
for k, rng := range ranges {
|
||||
prollyRange := pranges[k]
|
||||
fields := make([]prolly.RangeField, len(rng))
|
||||
for j, expr := range rng {
|
||||
if rangeCutIsBinding(expr.LowerBound) {
|
||||
bound := expr.LowerBound.TypeAsLowerBound()
|
||||
prollyRange.Fields[j].Lo = prolly.Bound{
|
||||
fields[j].Lo = prolly.Bound{
|
||||
Binding: true,
|
||||
Inclusive: bound == sql.Closed,
|
||||
}
|
||||
@@ -787,19 +787,19 @@ func (di *doltIndex) prollyRangesFromSqlRanges(ctx context.Context, ns tree.Node
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
prollyRange.Fields[j].Lo = prolly.Bound{}
|
||||
fields[j].Lo = prolly.Bound{}
|
||||
}
|
||||
}
|
||||
// BuildPermissive() allows nulls in non-null fields
|
||||
tup := tb.BuildPermissive(sharePool)
|
||||
for i := range prollyRange.Fields {
|
||||
prollyRange.Fields[i].Lo.Value = tup.GetField(i)
|
||||
for i := range fields {
|
||||
fields[i].Lo.Value = tup.GetField(i)
|
||||
}
|
||||
|
||||
for i, expr := range rng {
|
||||
if rangeCutIsBinding(expr.UpperBound) {
|
||||
bound := expr.UpperBound.TypeAsUpperBound()
|
||||
prollyRange.Fields[i].Hi = prolly.Bound{
|
||||
fields[i].Hi = prolly.Bound{
|
||||
Binding: true,
|
||||
Inclusive: bound == sql.Closed,
|
||||
}
|
||||
@@ -812,25 +812,29 @@ func (di *doltIndex) prollyRangesFromSqlRanges(ctx context.Context, ns tree.Node
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
prollyRange.Fields[i].Hi = prolly.Bound{}
|
||||
fields[i].Hi = prolly.Bound{}
|
||||
}
|
||||
}
|
||||
|
||||
tup = tb.BuildPermissive(sharePool)
|
||||
for i := range prollyRange.Fields {
|
||||
prollyRange.Fields[i].Hi.Value = tup.GetField(i)
|
||||
for i := range fields {
|
||||
fields[i].Hi.Value = tup.GetField(i)
|
||||
}
|
||||
|
||||
order := prollyRange.Desc.Comparator()
|
||||
for i, field := range prollyRange.Fields {
|
||||
order := di.keyBld.Desc.Comparator()
|
||||
for i, field := range fields {
|
||||
if !field.Hi.Binding || !field.Lo.Binding {
|
||||
prollyRange.Fields[i].Exact = false
|
||||
fields[i].Exact = false
|
||||
continue
|
||||
}
|
||||
// maybe set RangeField.Exact
|
||||
typ := prollyRange.Desc.Types[i]
|
||||
typ := di.keyBld.Desc.Types[i]
|
||||
cmp := order.CompareValues(i, field.Hi.Value, field.Lo.Value, typ)
|
||||
prollyRange.Fields[i].Exact = cmp == 0
|
||||
fields[i].Exact = cmp == 0
|
||||
}
|
||||
pranges[k] = prolly.Range{
|
||||
Fields: fields,
|
||||
Desc: di.keyBld.Desc,
|
||||
Tup: tup,
|
||||
}
|
||||
}
|
||||
return pranges, nil
|
||||
|
||||
@@ -22,11 +22,14 @@ import (
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
func RowIterForIndexLookup(ctx *sql.Context, t DoltTableable, lookup sql.IndexLookup, pkSch sql.PrimaryKeySchema, columns []uint64) (sql.RowIter, error) {
|
||||
@@ -181,11 +184,15 @@ func (rp rangePartition) Key() []byte {
|
||||
return rp.key
|
||||
}
|
||||
|
||||
// LookupBuilder generates secondary lookups for partitions and
|
||||
// encapsulates fast path optimizations for certain point lookups.
|
||||
type LookupBuilder interface {
|
||||
// NewRowIter returns a new index iter for the given partition
|
||||
NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error)
|
||||
Key() doltdb.DataCacheKey
|
||||
}
|
||||
|
||||
func NewLookupBuilder(part sql.Partition, idx DoltIndex, projections []uint64, pkSch sql.PrimaryKeySchema, isDoltFormat bool) LookupBuilder {
|
||||
func NewLookupBuilder(key doltdb.DataCacheKey, part sql.Partition, idx DoltIndex, projections []uint64, pkSch sql.PrimaryKeySchema, isDoltFormat bool) LookupBuilder {
|
||||
p := part.(rangePartition)
|
||||
if len(projections) == 0 {
|
||||
projections = idx.Schema().GetAllCols().Tags
|
||||
@@ -193,9 +200,17 @@ func NewLookupBuilder(part sql.Partition, idx DoltIndex, projections []uint64, p
|
||||
|
||||
base := &baseLookupBuilder{
|
||||
idx: idx.(*doltIndex),
|
||||
key: key,
|
||||
sch: pkSch,
|
||||
projections: projections,
|
||||
}
|
||||
|
||||
if isDoltFormat {
|
||||
base.sec = durable.ProllyMapFromIndex(p.durableState.Secondary)
|
||||
base.secKd, base.secVd = base.sec.Descriptors()
|
||||
base.ns = base.sec.NodeStore()
|
||||
}
|
||||
|
||||
switch {
|
||||
case !isDoltFormat:
|
||||
return &nomsLookupBuilder{
|
||||
@@ -205,59 +220,213 @@ func NewLookupBuilder(part sql.Partition, idx DoltIndex, projections []uint64, p
|
||||
return &keylessLookupBuilder{
|
||||
baseLookupBuilder: base,
|
||||
}
|
||||
case idx.coversColumns(p.durableState, projections) || idx.ID() == "PRIMARY":
|
||||
return &coveringLookupBuilder{
|
||||
baseLookupBuilder: base,
|
||||
}
|
||||
case idx.coversColumns(p.durableState, projections):
|
||||
return newCoveringLookupBuilder(base)
|
||||
default:
|
||||
return base
|
||||
return newNonCoveringLookupBuilder(p, base)
|
||||
}
|
||||
}
|
||||
|
||||
type baseLookupBuilder struct {
|
||||
idx *doltIndex
|
||||
sch sql.PrimaryKeySchema
|
||||
projections []uint64
|
||||
func newCoveringLookupBuilder(b *baseLookupBuilder) *coveringLookupBuilder {
|
||||
var keyMap, valMap, ordMap val.OrdinalMapping
|
||||
if b.idx.IsPrimaryKey() {
|
||||
keyMap, valMap, ordMap = primaryIndexMapping(b.idx, b.sch, b.projections)
|
||||
} else {
|
||||
keyMap, ordMap = coveringIndexMapping(b.idx, b.projections)
|
||||
}
|
||||
return &coveringLookupBuilder{
|
||||
baseLookupBuilder: b,
|
||||
keyMap: keyMap,
|
||||
valMap: valMap,
|
||||
ordMap: ordMap,
|
||||
}
|
||||
}
|
||||
|
||||
type coveringLookupBuilder struct {
|
||||
*baseLookupBuilder
|
||||
}
|
||||
|
||||
type nomsLookupBuilder struct {
|
||||
*baseLookupBuilder
|
||||
}
|
||||
|
||||
type keylessLookupBuilder struct {
|
||||
*baseLookupBuilder
|
||||
func newNonCoveringLookupBuilder(p rangePartition, b *baseLookupBuilder) *nonCoveringLookupBuilder {
|
||||
primary := durable.ProllyMapFromIndex(p.durableState.Primary)
|
||||
priKd, _ := primary.Descriptors()
|
||||
tbBld := val.NewTupleBuilder(priKd)
|
||||
pkMap := ordinalMappingFromIndex(b.idx)
|
||||
keyProj, valProj, ordProj := projectionMappings(b.idx.Schema(), b.projections)
|
||||
return &nonCoveringLookupBuilder{
|
||||
baseLookupBuilder: b,
|
||||
pri: primary,
|
||||
priKd: priKd,
|
||||
pkBld: tbBld,
|
||||
pkMap: pkMap,
|
||||
keyMap: keyProj,
|
||||
valMap: valProj,
|
||||
ordMap: ordProj,
|
||||
}
|
||||
}
|
||||
|
||||
var _ LookupBuilder = (*baseLookupBuilder)(nil)
|
||||
var _ LookupBuilder = (*nomsLookupBuilder)(nil)
|
||||
var _ LookupBuilder = (*coveringLookupBuilder)(nil)
|
||||
var _ LookupBuilder = (*keylessLookupBuilder)(nil)
|
||||
var _ LookupBuilder = (*nonCoveringLookupBuilder)(nil)
|
||||
|
||||
func (lb *baseLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
p := part.(rangePartition)
|
||||
return newProllyIndexIter(ctx, lb.idx, p.prollyRange, lb.sch, lb.projections, p.durableState.Primary, p.durableState.Secondary)
|
||||
// baseLookupBuilder is a common lookup builder for prolly covering and
|
||||
// non covering index lookups.
|
||||
type baseLookupBuilder struct {
|
||||
key doltdb.DataCacheKey
|
||||
|
||||
idx *doltIndex
|
||||
sch sql.PrimaryKeySchema
|
||||
projections []uint64
|
||||
|
||||
sec prolly.Map
|
||||
secKd, secVd val.TupleDesc
|
||||
ns tree.NodeStore
|
||||
|
||||
cur *tree.Cursor
|
||||
}
|
||||
|
||||
func (lb *baseLookupBuilder) Key() doltdb.DataCacheKey {
|
||||
return lb.key
|
||||
}
|
||||
|
||||
// NewRowIter implements IndexLookup
|
||||
func (lb *baseLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
panic("cannot call NewRowIter on baseLookupBuilder")
|
||||
}
|
||||
|
||||
// newPointLookup will create a cursor once, and then use the same cursor for
|
||||
// every subsequent point lookup. Note that equality joins can have a mix of
|
||||
// point lookups on concrete values, and range lookups for null matches.
|
||||
func (lb *baseLookupBuilder) newPointLookup(ctx *sql.Context, rang prolly.Range) (prolly.MapIter, error) {
|
||||
if lb.cur == nil {
|
||||
cur, err := tree.NewCursorFromCompareFn(ctx, lb.sec.NodeStore(), lb.sec.Node(), tree.Item(rang.Tup), lb.sec.CompareItems)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !cur.Valid() {
|
||||
// map does not contain |rng|
|
||||
return prolly.EmptyPointLookup, nil
|
||||
}
|
||||
|
||||
lb.cur = cur
|
||||
}
|
||||
|
||||
err := lb.cur.Seek(ctx, tree.Item(rang.Tup), lb.sec.CompareItems)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !lb.cur.Valid() {
|
||||
return prolly.EmptyPointLookup, nil
|
||||
}
|
||||
|
||||
key := val.Tuple(lb.cur.CurrentKey())
|
||||
value := val.Tuple(lb.cur.CurrentValue())
|
||||
|
||||
if !rang.Matches(key) {
|
||||
return prolly.EmptyPointLookup, nil
|
||||
}
|
||||
|
||||
return prolly.NewPointLookup(key, value), nil
|
||||
}
|
||||
|
||||
// coveringLookupBuilder constructs row iters for covering lookups,
|
||||
// where we only need to cursor seek on a single index to both identify
|
||||
// target keys and fill all requested projections
|
||||
type coveringLookupBuilder struct {
|
||||
*baseLookupBuilder
|
||||
|
||||
// keyMap transforms secondary index key tuples into SQL tuples.
|
||||
// secondary index value tuples are assumed to be empty.
|
||||
keyMap, valMap, ordMap val.OrdinalMapping
|
||||
}
|
||||
|
||||
// NewRowIter implements IndexLookup
|
||||
func (lb *coveringLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
p := part.(rangePartition)
|
||||
|
||||
var rangeIter prolly.MapIter
|
||||
var err error
|
||||
if p.prollyRange.IsPointLookup(lb.secKd) {
|
||||
rangeIter, err = lb.newPointLookup(ctx, p.prollyRange)
|
||||
} else {
|
||||
rangeIter, err = lb.sec.IterRange(ctx, p.prollyRange)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return prollyCoveringIndexIter{
|
||||
idx: lb.idx,
|
||||
indexIter: rangeIter,
|
||||
keyDesc: lb.secKd,
|
||||
valDesc: lb.secVd,
|
||||
keyMap: lb.keyMap,
|
||||
valMap: lb.valMap,
|
||||
ordMap: lb.ordMap,
|
||||
sqlSch: lb.sch.Schema,
|
||||
ns: lb.ns,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// nonCoveringLookupBuilder constructs row iters for non-covering lookups,
|
||||
// where we need to seek on the secondary table for key identity, and then
|
||||
// the primary table to fill all requrested projections.
|
||||
type nonCoveringLookupBuilder struct {
|
||||
*baseLookupBuilder
|
||||
|
||||
pri prolly.Map
|
||||
priKd val.TupleDesc
|
||||
pkBld *val.TupleBuilder
|
||||
|
||||
pkMap, keyMap, valMap, ordMap val.OrdinalMapping
|
||||
}
|
||||
|
||||
// NewRowIter implements IndexLookup
|
||||
func (lb *nonCoveringLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
p := part.(rangePartition)
|
||||
var rangeIter prolly.MapIter
|
||||
var err error
|
||||
if p.prollyRange.IsPointLookup(lb.secKd) {
|
||||
rangeIter, err = lb.newPointLookup(ctx, p.prollyRange)
|
||||
} else {
|
||||
rangeIter, err = lb.sec.IterRange(ctx, p.prollyRange)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return prollyIndexIter{
|
||||
idx: lb.idx,
|
||||
indexIter: rangeIter,
|
||||
primary: lb.pri,
|
||||
pkBld: lb.pkBld,
|
||||
pkMap: lb.pkMap,
|
||||
keyMap: lb.keyMap,
|
||||
valMap: lb.valMap,
|
||||
ordMap: lb.ordMap,
|
||||
sqlSch: lb.sch.Schema,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// TODO keylessLookupBuilder should be similar to the non-covering
|
||||
// index case, where we will need to reference the primary index,
|
||||
// but can take advantage of point lookup optimizations
|
||||
type keylessLookupBuilder struct {
|
||||
*baseLookupBuilder
|
||||
}
|
||||
|
||||
// NewRowIter implements IndexLookup
|
||||
func (lb *keylessLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
p := part.(rangePartition)
|
||||
return newProllyKeylessIndexIter(ctx, lb.idx, p.prollyRange, lb.sch, lb.projections, p.durableState.Primary, p.durableState.Secondary)
|
||||
}
|
||||
|
||||
type nomsLookupBuilder struct {
|
||||
*baseLookupBuilder
|
||||
}
|
||||
|
||||
// NewRowIter implements IndexLookup
|
||||
func (lb *nomsLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
p := part.(rangePartition)
|
||||
ranges := []*noms.ReadRange{p.nomsRange}
|
||||
return RowIterForNomsRanges(ctx, lb.idx, ranges, lb.projections, p.durableState)
|
||||
}
|
||||
|
||||
func (lb *coveringLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
p := part.(rangePartition)
|
||||
return newProllyCoveringIndexIter(ctx, lb.idx, p.prollyRange, lb.sch, lb.projections, p.durableState.Secondary)
|
||||
}
|
||||
|
||||
func (lb *keylessLookupBuilder) NewRowIter(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
p := part.(rangePartition)
|
||||
return newProllyKeylessIndexIter(ctx, lb.idx, p.prollyRange, lb.sch, lb.projections, p.durableState.Primary, p.durableState.Secondary)
|
||||
}
|
||||
|
||||
// boundsCase determines the case upon which the bounds are tested.
|
||||
type boundsCase byte
|
||||
|
||||
|
||||
@@ -93,7 +93,6 @@ func (p prollyIndexIter) Next(ctx *sql.Context) (sql.Row, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for to := range p.pkMap {
|
||||
from := p.pkMap.MapOrdinal(to)
|
||||
p.pkBld.PutRaw(to, idxKey.GetField(from))
|
||||
|
||||
@@ -70,16 +70,24 @@ func (idt *IndexedDoltTable) Partitions(ctx *sql.Context) (sql.PartitionIter, er
|
||||
}
|
||||
|
||||
func (idt *IndexedDoltTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
if idt.lb == nil {
|
||||
idt.lb = index.NewLookupBuilder(part, idt.idx, nil, idt.table.sqlSch, idt.isDoltFormat)
|
||||
key, canCache, err := idt.table.DataCacheKey(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if idt.lb == nil || !canCache || idt.lb.Key() != key {
|
||||
idt.lb = index.NewLookupBuilder(key, part, idt.idx, nil, idt.table.sqlSch, idt.isDoltFormat)
|
||||
}
|
||||
|
||||
return idt.lb.NewRowIter(ctx, part)
|
||||
}
|
||||
|
||||
func (idt *IndexedDoltTable) PartitionRows2(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
if idt.lb == nil {
|
||||
idt.lb = index.NewLookupBuilder(part, idt.idx, nil, idt.table.sqlSch, idt.isDoltFormat)
|
||||
key, canCache, err := idt.table.DataCacheKey(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if idt.lb == nil || !canCache || idt.lb.Key() != key {
|
||||
idt.lb = index.NewLookupBuilder(key, part, idt.idx, nil, idt.table.sqlSch, idt.isDoltFormat)
|
||||
}
|
||||
|
||||
return idt.lb.NewRowIter(ctx, part)
|
||||
@@ -122,8 +130,12 @@ func (t *WritableIndexedDoltTable) Partitions(ctx *sql.Context) (sql.PartitionIt
|
||||
}
|
||||
|
||||
func (t *WritableIndexedDoltTable) PartitionRows(ctx *sql.Context, part sql.Partition) (sql.RowIter, error) {
|
||||
if t.lb == nil {
|
||||
t.lb = index.NewLookupBuilder(part, t.idx, t.projectedCols, t.sqlSch, t.isDoltFormat)
|
||||
key, canCache, err := t.DataCacheKey(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if t.lb == nil || !canCache || t.lb.Key() != key {
|
||||
t.lb = index.NewLookupBuilder(key, part, t.idx, t.projectedCols, t.sqlSch, t.isDoltFormat)
|
||||
}
|
||||
|
||||
return t.lb.NewRowIter(ctx, part)
|
||||
|
||||
@@ -196,7 +196,7 @@ func ProllyRowIterFromPartition(
|
||||
partition.end = uint64(c)
|
||||
}
|
||||
|
||||
iter, err := rows.IterOrdinalRange(ctx, partition.start, partition.end)
|
||||
iter, err := rows.FetchOrdinalRange(ctx, partition.start, partition.end)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -351,6 +351,22 @@ func SqlRowAsTupleString(r sql.Row, tableSch schema.Schema) (string, error) {
|
||||
return b.String(), nil
|
||||
}
|
||||
|
||||
// SqlRowAsStrings returns the string representation for each column of |r|
|
||||
// which should have schema |sch|.
|
||||
func SqlRowAsStrings(r sql.Row, sch sql.Schema) ([]string, error) {
|
||||
out := make([]string, len(r))
|
||||
for i := range out {
|
||||
v := r[i]
|
||||
sqlType := sch[i].Type
|
||||
s, err := sqlutil.SqlColToStr(sqlType, v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[i] = s
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// SqlRowAsDeleteStmt generates a sql statement. Non-zero |limit| adds a limit clause.
|
||||
func SqlRowAsDeleteStmt(r sql.Row, tableName string, tableSch schema.Schema, limit uint64) (string, error) {
|
||||
var b strings.Builder
|
||||
|
||||
@@ -214,6 +214,12 @@ func (m Map) IterOrdinalRange(ctx context.Context, start, stop uint64) (MapIter,
|
||||
return m.tuples.iterOrdinalRange(ctx, start, stop)
|
||||
}
|
||||
|
||||
// FetchOrdinalRange fetches all leaf Nodes for the ordinal range beginning at |start|
|
||||
// and ending before |stop| and returns an iterator over their Items.
|
||||
func (m Map) FetchOrdinalRange(ctx context.Context, start, stop uint64) (MapIter, error) {
|
||||
return m.tuples.fetchOrdinalRange(ctx, start, stop)
|
||||
}
|
||||
|
||||
// IterRange returns a mutableMapIter that iterates over a Range.
|
||||
func (m Map) IterRange(ctx context.Context, rng Range) (MapIter, error) {
|
||||
if rng.IsPointLookup(m.keyDesc) {
|
||||
@@ -236,6 +242,10 @@ func (m Map) Pool() pool.BuffPool {
|
||||
return m.tuples.ns.Pool()
|
||||
}
|
||||
|
||||
func (m Map) CompareItems(left, right tree.Item) int {
|
||||
return m.keyDesc.Compare(val.Tuple(left), val.Tuple(right))
|
||||
}
|
||||
|
||||
func (m Map) pointLookupFromRange(ctx context.Context, rng Range) (*pointLookup, error) {
|
||||
cur, err := tree.NewCursorFromSearchFn(ctx, m.tuples.ns, m.tuples.root, rangeStartSearchFn(rng))
|
||||
if err != nil {
|
||||
|
||||
@@ -60,7 +60,7 @@ func TestMap(t *testing.T) {
|
||||
testIterRange(t, prollyMap, tuples)
|
||||
})
|
||||
t.Run("iter ordinal range", func(t *testing.T) {
|
||||
testIterOrdinalRange(t, prollyMap.(ordinalMap), tuples)
|
||||
testIterOrdinalRange(t, prollyMap.(Map), tuples)
|
||||
})
|
||||
|
||||
indexMap, tuples2 := makeProllySecondaryIndex(t, s)
|
||||
|
||||
@@ -295,6 +295,41 @@ func (t orderedTree[K, V, O]) iterOrdinalRange(ctx context.Context, start, stop
|
||||
return &orderedTreeIter[K, V]{curr: lo, stop: stopF, step: lo.Advance}, nil
|
||||
}
|
||||
|
||||
func (t orderedTree[K, V, O]) fetchOrdinalRange(ctx context.Context, start, stop uint64) (*orderedLeafSpanIter[K, V], error) {
|
||||
if stop == start {
|
||||
return &orderedLeafSpanIter[K, V]{}, nil
|
||||
}
|
||||
if stop < start {
|
||||
return nil, fmt.Errorf("invalid ordinal bounds (%d, %d)", start, stop)
|
||||
} else {
|
||||
c, err := t.count()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if stop > uint64(c) {
|
||||
return nil, fmt.Errorf("stop index (%d) out of bounds", stop)
|
||||
}
|
||||
}
|
||||
|
||||
span, err := tree.FetchLeafNodeSpan(ctx, t.ns, t.root, start, stop)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nd, leaves := span.Leaves[0], span.Leaves[1:]
|
||||
c, s := span.LocalStart, nd.Count()
|
||||
if len(leaves) == 0 {
|
||||
s = span.LocalStop // one leaf span
|
||||
}
|
||||
|
||||
return &orderedLeafSpanIter[K, V]{
|
||||
nd: nd,
|
||||
curr: c,
|
||||
stop: s,
|
||||
leaves: leaves,
|
||||
final: span.LocalStop,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// searchNode returns the smallest index where nd[i] >= query
|
||||
// Adapted from search.Sort to inline comparison.
|
||||
func (t orderedTree[K, V, O]) searchNode(query tree.Item, nd tree.Node) int {
|
||||
@@ -376,3 +411,41 @@ func (it *orderedTreeIter[K, V]) iterate(ctx context.Context) (err error) {
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type orderedLeafSpanIter[K, V ~[]byte] struct {
|
||||
// in-progress node
|
||||
nd tree.Node
|
||||
// current index,
|
||||
curr int
|
||||
// last index for |nd|
|
||||
stop int
|
||||
// remaining leaves
|
||||
leaves []tree.Node
|
||||
// stop index in last leaf node
|
||||
final int
|
||||
}
|
||||
|
||||
func (s *orderedLeafSpanIter[K, V]) Next(ctx context.Context) (key K, value V, err error) {
|
||||
if s.curr >= s.stop {
|
||||
// |s.nd| exhausted
|
||||
if len(s.leaves) == 0 {
|
||||
// span exhausted
|
||||
return nil, nil, io.EOF
|
||||
}
|
||||
|
||||
s.nd = s.leaves[0]
|
||||
s.curr = 0
|
||||
s.stop = s.nd.Count()
|
||||
|
||||
s.leaves = s.leaves[1:]
|
||||
if len(s.leaves) == 0 {
|
||||
// |s.nd| is the last leaf
|
||||
s.stop = s.final
|
||||
}
|
||||
}
|
||||
|
||||
key = K(s.nd.GetKey(s.curr))
|
||||
value = V(s.nd.GetValue(s.curr))
|
||||
s.curr++
|
||||
return
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ func PrefixRange(prefix val.Tuple, desc val.TupleDesc) Range {
|
||||
type Range struct {
|
||||
Fields []RangeField
|
||||
Desc val.TupleDesc
|
||||
Tup val.Tuple
|
||||
}
|
||||
|
||||
// RangeField bounds one dimension of a Range.
|
||||
|
||||
@@ -430,7 +430,7 @@ func intTuple(ints ...int32) val.Tuple {
|
||||
return tb.Build(sharedPool)
|
||||
}
|
||||
|
||||
func testIterOrdinalRange(t *testing.T, om ordinalMap, tuples [][2]val.Tuple) {
|
||||
func testIterOrdinalRange(t *testing.T, om Map, tuples [][2]val.Tuple) {
|
||||
cnt := len(tuples)
|
||||
t.Run("test two sided bounds", func(t *testing.T) {
|
||||
bounds := make([][2]int, 100)
|
||||
@@ -452,36 +452,56 @@ func testIterOrdinalRange(t *testing.T, om ordinalMap, tuples [][2]val.Tuple) {
|
||||
})
|
||||
}
|
||||
|
||||
func testIterOrdinalRangeWithBounds(t *testing.T, om ordinalMap, tuples [][2]val.Tuple, bounds [][2]int) {
|
||||
func testIterOrdinalRangeWithBounds(t *testing.T, om Map, tuples [][2]val.Tuple, bounds [][2]int) {
|
||||
ctx := context.Background()
|
||||
for _, bound := range bounds {
|
||||
start, stop := bound[0], bound[1]
|
||||
if start > stop {
|
||||
start, stop = stop, start
|
||||
}
|
||||
if start == stop {
|
||||
continue
|
||||
}
|
||||
|
||||
expected := tuples[start:stop]
|
||||
|
||||
iter, err := om.IterOrdinalRange(ctx, uint64(start), uint64(stop))
|
||||
require.NoError(t, err)
|
||||
|
||||
var actual [][2]val.Tuple
|
||||
var k, v val.Tuple
|
||||
|
||||
for {
|
||||
k, v, err = iter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
t.Run("IterOrdinalRange", func(t *testing.T) {
|
||||
for _, bound := range bounds {
|
||||
start, stop := bound[0], bound[1]
|
||||
if start > stop {
|
||||
start, stop = stop, start
|
||||
} else if start == stop {
|
||||
continue
|
||||
}
|
||||
expected := tuples[start:stop]
|
||||
|
||||
iter, err := om.IterOrdinalRange(ctx, uint64(start), uint64(stop))
|
||||
require.NoError(t, err)
|
||||
actual = append(actual, [2]val.Tuple{k, v})
|
||||
actual := iterOrdinalRange(t, ctx, iter)
|
||||
assert.Equal(t, len(expected), len(actual),
|
||||
"expected equal tuple slices for bounds (%d, %d)", start, stop)
|
||||
assert.Equal(t, expected, actual)
|
||||
}
|
||||
assert.Equal(t, len(expected), len(actual),
|
||||
"expected equal tuple slices for bounds (%d, %d)", start, stop)
|
||||
assert.Equal(t, expected, actual)
|
||||
assert.Equal(t, io.EOF, err)
|
||||
}
|
||||
})
|
||||
t.Run("FetchOrdinalRange", func(t *testing.T) {
|
||||
for _, bound := range bounds {
|
||||
start, stop := bound[0], bound[1]
|
||||
if start > stop {
|
||||
start, stop = stop, start
|
||||
} else if start == stop {
|
||||
continue
|
||||
}
|
||||
expected := tuples[start:stop]
|
||||
|
||||
iter, err := om.FetchOrdinalRange(ctx, uint64(start), uint64(stop))
|
||||
require.NoError(t, err)
|
||||
actual := iterOrdinalRange(t, ctx, iter)
|
||||
assert.Equal(t, len(expected), len(actual),
|
||||
"expected equal tuple slices for bounds (%d, %d)", start, stop)
|
||||
assert.Equal(t, expected, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func iterOrdinalRange(t *testing.T, ctx context.Context, iter MapIter) (actual [][2]val.Tuple) {
|
||||
for {
|
||||
k, v, err := iter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(t, k)
|
||||
assert.NotNil(t, v)
|
||||
actual = append(actual, [2]val.Tuple{k, v})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -274,7 +274,7 @@ func (t *ImmutableTree) load(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
if leaf {
|
||||
t.buf = append(t.buf, n.getValue(0)...)
|
||||
t.buf = append(t.buf, n.GetValue(0)...)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -164,7 +164,7 @@ func TestWriteImmutableTree(t *testing.T) {
|
||||
}
|
||||
if leaf {
|
||||
byteCnt += len(n.values.Items)
|
||||
for _, i := range n.getValue(0) {
|
||||
for _, i := range n.GetValue(0) {
|
||||
sum += int(i)
|
||||
}
|
||||
keyCnt = len(n.values.Items)
|
||||
|
||||
@@ -82,7 +82,7 @@ func ApplyMutations[S message.Serializer](
|
||||
for newKey != nil {
|
||||
|
||||
// move |cur| to the NextMutation mutation point
|
||||
err = cur.seek(ctx, newKey, compare)
|
||||
err = cur.Seek(ctx, newKey, compare)
|
||||
if err != nil {
|
||||
return Node{}, err
|
||||
}
|
||||
|
||||
@@ -148,8 +148,8 @@ func (nd Node) GetKey(i int) Item {
|
||||
return nd.keys.GetItem(i)
|
||||
}
|
||||
|
||||
// getValue returns the |ith| value of this node.
|
||||
func (nd Node) getValue(i int) Item {
|
||||
// GetValue returns the |ith| value of this node.
|
||||
func (nd Node) GetValue(i int) Item {
|
||||
return nd.values.GetItem(i)
|
||||
}
|
||||
|
||||
@@ -178,7 +178,7 @@ func (nd Node) getSubtreeCount(i int) (uint64, error) {
|
||||
// getAddress returns the |ith| address of this node.
|
||||
// This method assumes values are 20-byte address hashes.
|
||||
func (nd Node) getAddress(i int) hash.Hash {
|
||||
return hash.New(nd.getValue(i))
|
||||
return hash.New(nd.GetValue(i))
|
||||
}
|
||||
|
||||
func (nd Node) empty() bool {
|
||||
@@ -215,7 +215,7 @@ func OutputProllyNode(w io.Writer, node Node) error {
|
||||
return err
|
||||
}
|
||||
if leaf {
|
||||
v := node.getValue(i)
|
||||
v := node.GetValue(i)
|
||||
vt := val.Tuple(v)
|
||||
|
||||
w.Write([]byte(" value: "))
|
||||
|
||||
@@ -23,6 +23,7 @@ package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
@@ -240,6 +241,84 @@ func NewLeafCursorAtItem(ctx context.Context, ns NodeStore, nd Node, item Item,
|
||||
return cur, nil
|
||||
}
|
||||
|
||||
type LeafSpan struct {
|
||||
Leaves []Node
|
||||
LocalStart int
|
||||
LocalStop int
|
||||
}
|
||||
|
||||
// FetchLeafNodeSpan returns the leaf Node span for the ordinal range [start, stop). It fetches the span using
|
||||
// an eager breadth-first search and makes batch read calls to the persistence layer via NodeStore.ReadMany.
|
||||
func FetchLeafNodeSpan(ctx context.Context, ns NodeStore, root Node, start, stop uint64) (LeafSpan, error) {
|
||||
leaves, localStart, err := fetchLeafNodeSpan(ctx, ns, []Node{root}, start, stop)
|
||||
if err != nil {
|
||||
return LeafSpan{}, err
|
||||
}
|
||||
|
||||
localStop := (stop - start) + localStart
|
||||
for i := 0; i < len(leaves)-1; i++ {
|
||||
localStop -= uint64(leaves[i].Count())
|
||||
}
|
||||
|
||||
return LeafSpan{
|
||||
Leaves: leaves,
|
||||
LocalStart: int(localStart),
|
||||
LocalStop: int(localStop),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func fetchLeafNodeSpan(ctx context.Context, ns NodeStore, nodes []Node, start, stop uint64) ([]Node, uint64, error) {
|
||||
ok, err := nodes[0].IsLeaf()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
} else if ok {
|
||||
// verify leaf homogeneity
|
||||
for i := range nodes {
|
||||
ok, err = nodes[i].IsLeaf()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
} else if !ok {
|
||||
return nil, 0, errors.New("mixed leaf/non-leaf set")
|
||||
}
|
||||
}
|
||||
return nodes, start, nil
|
||||
}
|
||||
|
||||
gets := make(hash.HashSlice, 0, len(nodes)*nodes[0].Count())
|
||||
acc := uint64(0)
|
||||
|
||||
for _, nd := range nodes {
|
||||
if nd, err = nd.loadSubtrees(); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
for i := 0; i < nd.Count(); i++ {
|
||||
card, err := nd.getSubtreeCount(i)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if acc == 0 && card < start {
|
||||
start -= card
|
||||
stop -= card
|
||||
continue
|
||||
}
|
||||
|
||||
gets = append(gets, hash.New(nd.GetValue(i)))
|
||||
acc += card
|
||||
if acc >= stop {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
children, err := ns.ReadMany(ctx, gets)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return fetchLeafNodeSpan(ctx, ns, children, start, stop)
|
||||
}
|
||||
|
||||
func CurrentCursorItems(cur *Cursor) (key, value Item) {
|
||||
key = cur.nd.keys.GetItem(cur.idx)
|
||||
value = cur.nd.values.GetItem(cur.idx)
|
||||
@@ -258,7 +337,7 @@ func (cur *Cursor) CurrentKey() Item {
|
||||
}
|
||||
|
||||
func (cur *Cursor) CurrentValue() Item {
|
||||
return cur.nd.getValue(cur.idx)
|
||||
return cur.nd.GetValue(cur.idx)
|
||||
}
|
||||
|
||||
func (cur *Cursor) CurrentRef() hash.Hash {
|
||||
@@ -336,11 +415,11 @@ func (cur *Cursor) level() (uint64, error) {
|
||||
return uint64(lvl), nil
|
||||
}
|
||||
|
||||
// seek updates the cursor's node to one whose range spans the key's value, or the last
|
||||
// Seek updates the cursor's node to one whose range spans the key's value, or the last
|
||||
// node if the key is greater than all existing keys.
|
||||
// If a node does not contain the key, we recurse upwards to the parent cursor. If the
|
||||
// node contains a key, we recurse downwards into child nodes.
|
||||
func (cur *Cursor) seek(ctx context.Context, key Item, cb CompareFn) (err error) {
|
||||
func (cur *Cursor) Seek(ctx context.Context, key Item, cb CompareFn) (err error) {
|
||||
inBounds := true
|
||||
if cur.parent != nil {
|
||||
inBounds = inBounds && cb(key, cur.firstKey()) >= 0
|
||||
@@ -349,7 +428,7 @@ func (cur *Cursor) seek(ctx context.Context, key Item, cb CompareFn) (err error)
|
||||
|
||||
if !inBounds {
|
||||
// |item| is outside the bounds of |cur.nd|, search up the tree
|
||||
err = cur.parent.seek(ctx, key, cb)
|
||||
err = cur.parent.Seek(ctx, key, cb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@ package tree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
@@ -32,6 +33,9 @@ type NodeStore interface {
|
||||
// Read reads a prolly tree Node from the store.
|
||||
Read(ctx context.Context, ref hash.Hash) (Node, error)
|
||||
|
||||
// ReadMany reads many prolly tree Nodes from the store.
|
||||
ReadMany(ctx context.Context, refs hash.HashSlice) ([]Node, error)
|
||||
|
||||
// Write writes a prolly tree Node to the store.
|
||||
Write(ctx context.Context, nd Node) (hash.Hash, error)
|
||||
|
||||
@@ -81,6 +85,44 @@ func (ns nodeStore) Read(ctx context.Context, ref hash.Hash) (Node, error) {
|
||||
return NodeFromBytes(c.Data())
|
||||
}
|
||||
|
||||
// ReadMany implements NodeStore.
|
||||
func (ns nodeStore) ReadMany(ctx context.Context, refs hash.HashSlice) ([]Node, error) {
|
||||
found := make(map[hash.Hash]chunks.Chunk)
|
||||
gets := hash.HashSet{}
|
||||
|
||||
for _, r := range refs {
|
||||
c, ok := ns.cache.get(r)
|
||||
if ok {
|
||||
found[r] = c
|
||||
} else {
|
||||
gets.Insert(r)
|
||||
}
|
||||
}
|
||||
|
||||
mu := new(sync.Mutex)
|
||||
err := ns.store.GetMany(ctx, gets, func(ctx context.Context, chunk *chunks.Chunk) {
|
||||
mu.Lock()
|
||||
found[chunk.Hash()] = *chunk
|
||||
mu.Unlock()
|
||||
ns.cache.insert(*chunk)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]Node, len(refs))
|
||||
for i, r := range refs {
|
||||
c, ok := found[r]
|
||||
if ok {
|
||||
nodes[i], err = NodeFromBytes(c.Data())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
// Write implements NodeStore.
|
||||
func (ns nodeStore) Write(ctx context.Context, nd Node) (hash.Hash, error) {
|
||||
c := chunks.NewChunk(nd.bytes())
|
||||
|
||||
@@ -48,7 +48,7 @@ func TestRoundTripInts(t *testing.T) {
|
||||
assert.Equal(t, len(keys), int(nd.count))
|
||||
for i := range keys {
|
||||
assert.Equal(t, keys[i], val.Tuple(nd.GetKey(i)))
|
||||
assert.Equal(t, values[i], val.Tuple(nd.getValue(i)))
|
||||
assert.Equal(t, values[i], val.Tuple(nd.GetValue(i)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,7 +64,7 @@ func TestRoundTripNodeItems(t *testing.T) {
|
||||
assert.Equal(t, len(keys), int(nd.count))
|
||||
for i := range keys {
|
||||
assert.Equal(t, keys[i], nd.GetKey(i))
|
||||
assert.Equal(t, values[i], nd.getValue(i))
|
||||
assert.Equal(t, values[i], nd.GetValue(i))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,6 +276,21 @@ func (v nodeStoreValidator) Read(ctx context.Context, ref hash.Hash) (Node, erro
|
||||
return nd, nil
|
||||
}
|
||||
|
||||
func (v nodeStoreValidator) ReadMany(ctx context.Context, refs hash.HashSlice) ([]Node, error) {
|
||||
nodes, err := v.ns.ReadMany(ctx, refs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for i := range nodes {
|
||||
actual := hash.Of(nodes[i].msg)
|
||||
if refs[i] != actual {
|
||||
err = fmt.Errorf("incorrect node hash (%s != %s)", refs[i], actual)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (v nodeStoreValidator) Write(ctx context.Context, nd Node) (hash.Hash, error) {
|
||||
h, err := v.ns.Write(ctx, nd)
|
||||
if err != nil {
|
||||
|
||||
@@ -37,13 +37,6 @@ type testMap interface {
|
||||
var _ testMap = Map{}
|
||||
var _ testMap = MutableMap{}
|
||||
|
||||
type ordinalMap interface {
|
||||
testMap
|
||||
IterOrdinalRange(ctx context.Context, start, stop uint64) (MapIter, error)
|
||||
}
|
||||
|
||||
var _ testMap = Map{}
|
||||
|
||||
func countOrderedMap(t *testing.T, om testMap) (cnt int) {
|
||||
iter, err := om.IterAll(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
Reference in New Issue
Block a user