mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-07 00:39:44 -06:00
Decrease the number of allocations for indexed reads (#1239)
Decrease the number of allocations for indexed reads optimize less and skipint/skipuint
This commit is contained in:
@@ -168,5 +168,5 @@ func (il *doltIndexLookup) RowIterForRanges(ctx *sql.Context, ranges []lookup.Ra
|
||||
}
|
||||
|
||||
type nomsKeyIter interface {
|
||||
ReadKey(ctx context.Context) (types.Value, error)
|
||||
ReadKey(ctx context.Context) (types.Tuple, error)
|
||||
}
|
||||
|
||||
@@ -92,13 +92,13 @@ func (i *indexLookupRowIterAdapter) queueRows() {
|
||||
shouldBreak := false
|
||||
pos := 0
|
||||
for ; pos < len(i.buffer); pos++ {
|
||||
var indexKey types.Value
|
||||
var indexKey types.Tuple
|
||||
indexKey, err = i.keyIter.ReadKey(i.ctx)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
exec.Execute(keyPos{
|
||||
key: indexKey.(types.Tuple),
|
||||
key: indexKey,
|
||||
position: pos,
|
||||
})
|
||||
}
|
||||
@@ -131,42 +131,41 @@ func (i *indexLookupRowIterAdapter) queueRows() {
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat, indexKey types.Tuple) (types.Value, error) {
|
||||
func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat, indexKey types.Tuple) (types.Tuple, error) {
|
||||
tplItr, err := indexKey.Iterator()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return types.Tuple{}, err
|
||||
}
|
||||
|
||||
resVals := make([]types.Value, len(i.pkTags)*2)
|
||||
for {
|
||||
_, tagVal, err := tplItr.Next()
|
||||
_, tag, err := tplItr.NextUint64()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return types.Tuple{}, err
|
||||
}
|
||||
|
||||
if tagVal == nil {
|
||||
break
|
||||
}
|
||||
|
||||
tag := uint64(tagVal.(types.Uint))
|
||||
idx, inPK := i.pkTags[tag]
|
||||
|
||||
if inPK {
|
||||
_, valVal, err := tplItr.Next()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return types.Tuple{}, err
|
||||
}
|
||||
|
||||
resVals[idx*2] = tagVal
|
||||
resVals[idx*2] = types.Uint(tag)
|
||||
resVals[idx*2+1] = valVal
|
||||
} else {
|
||||
err := tplItr.Skip()
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return types.Tuple{}, err
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -185,16 +184,16 @@ func (i *indexLookupRowIterAdapter) processKey(_ context.Context, valInt interfa
|
||||
return err
|
||||
}
|
||||
|
||||
fieldsVal, _, err := tableData.MaybeGet(i.ctx, pkTupleVal)
|
||||
fieldsVal, ok, err := tableData.MaybeGetTuple(i.ctx, pkTupleVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if fieldsVal == nil {
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
sqlRow, err := i.conv.ConvertKVToSqlRow(pkTupleVal, fieldsVal)
|
||||
sqlRow, err := i.conv.ConvertKVTuplesToSqlRow(pkTupleVal, fieldsVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -255,7 +254,7 @@ func (ci *coveringIndexRowIterAdapter) Next() (sql.Row, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ci.conv.ConvertKVToSqlRow(key, nil)
|
||||
return ci.conv.ConvertKVTuplesToSqlRow(key, types.Tuple{})
|
||||
}
|
||||
|
||||
func (ci *coveringIndexRowIterAdapter) Close() error {
|
||||
|
||||
@@ -102,7 +102,7 @@ func newKeyedRowIter(ctx context.Context, tbl *DoltTable, projectedCols []string
|
||||
}
|
||||
|
||||
conv := NewKVToSqlRowConverter(tbl.table.Format(), tagToSqlColIdx, cols, len(cols))
|
||||
return NewDoltMapIter(ctx, mapIter.Next, nil, conv), nil
|
||||
return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil
|
||||
}
|
||||
|
||||
// Next returns the next row in this row iterator, or an io.EOF error if there aren't any more.
|
||||
|
||||
@@ -281,13 +281,13 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc) (*do
|
||||
}
|
||||
|
||||
idx := 0
|
||||
keys := make([]types.Value, len(acc.deltas))
|
||||
keys := make([]types.Tuple, len(acc.deltas))
|
||||
for _, vd := range acc.deltas {
|
||||
keys[idx] = vd.key
|
||||
idx++
|
||||
}
|
||||
|
||||
err = types.SortWithErroringLess(types.ValueSort{Values: keys, Nbf: acc.nbf})
|
||||
err = types.SortWithErroringLess(types.TupleSort{Tuples: keys, Nbf: acc.nbf})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -296,25 +296,26 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc) (*do
|
||||
iter := table.NewMapPointReader(rowData, keys...)
|
||||
|
||||
for {
|
||||
k, v, err := iter.Next(ctx)
|
||||
k, v, err := iter.NextTuple(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
delta, err := acc.getRowDelta(k.(types.Tuple))
|
||||
delta, err := acc.getRowDelta(k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ok bool
|
||||
if v == nil {
|
||||
if v.Empty() {
|
||||
// row does not yet exist
|
||||
v, ok, err = initializeCardinality(delta.val, delta.delta)
|
||||
} else {
|
||||
v, ok, err = modifyCardinalityWithDelta(v.(types.Tuple), delta.delta)
|
||||
v, ok, err = modifyCardinalityWithDelta(v, delta.delta)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -22,34 +22,61 @@ import (
|
||||
)
|
||||
|
||||
type PointReader struct {
|
||||
m types.Map
|
||||
keys []types.Value
|
||||
idx int
|
||||
m types.Map
|
||||
emptyTuple types.Tuple
|
||||
keys []types.Tuple
|
||||
idx int
|
||||
}
|
||||
|
||||
var _ types.MapIterator = &PointReader{}
|
||||
|
||||
// read the map values for a set of map keys
|
||||
func NewMapPointReader(m types.Map, keys ...types.Value) types.MapIterator {
|
||||
func NewMapPointReader(m types.Map, keys ...types.Tuple) types.MapIterator {
|
||||
return &PointReader{
|
||||
m: m,
|
||||
keys: keys,
|
||||
m: m,
|
||||
emptyTuple: types.EmptyTuple(m.Format()),
|
||||
keys: keys,
|
||||
}
|
||||
}
|
||||
|
||||
// Next implements types.MapIterator.
|
||||
func (pr *PointReader) Next(ctx context.Context) (k, v types.Value, err error) {
|
||||
if pr.idx >= len(pr.keys) {
|
||||
return nil, nil, io.EOF
|
||||
}
|
||||
kt, vt, err := pr.NextTuple(ctx)
|
||||
|
||||
k = pr.keys[pr.idx]
|
||||
// todo: optimize by implementing MapIterator.Seek()
|
||||
v, _, err = pr.m.MaybeGet(ctx, k)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
if !kt.Empty() {
|
||||
k = kt
|
||||
}
|
||||
|
||||
if !vt.Empty() {
|
||||
v = vt
|
||||
}
|
||||
|
||||
return k, v, nil
|
||||
}
|
||||
|
||||
// NextTuple implements types.MapIterator.
|
||||
func (pr *PointReader) NextTuple(ctx context.Context) (k, v types.Tuple, err error) {
|
||||
if pr.idx >= len(pr.keys) {
|
||||
return types.Tuple{}, types.Tuple{}, io.EOF
|
||||
}
|
||||
|
||||
k = pr.keys[pr.idx]
|
||||
v = pr.emptyTuple
|
||||
|
||||
var ok bool
|
||||
// todo: optimize by implementing MapIterator.Seek()
|
||||
v, ok, err = pr.m.MaybeGetTuple(ctx, k)
|
||||
pr.idx++
|
||||
|
||||
return k, v, err
|
||||
if err != nil {
|
||||
return types.Tuple{}, types.Tuple{}, err
|
||||
} else if !ok {
|
||||
return k, pr.emptyTuple, nil
|
||||
}
|
||||
|
||||
return k, v, nil
|
||||
}
|
||||
|
||||
@@ -118,20 +118,20 @@ func (nrr *NomsRangeReader) ReadRow(ctx context.Context) (row.Row, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return row.FromNoms(nrr.sch, k.(types.Tuple), v.(types.Tuple))
|
||||
return row.FromNoms(nrr.sch, k, v)
|
||||
}
|
||||
|
||||
func (nrr *NomsRangeReader) ReadKey(ctx context.Context) (types.Value, error) {
|
||||
func (nrr *NomsRangeReader) ReadKey(ctx context.Context) (types.Tuple, error) {
|
||||
k, _, err := nrr.ReadKV(ctx)
|
||||
|
||||
return k, err
|
||||
}
|
||||
|
||||
func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Value, types.Value, error) {
|
||||
func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Tuple, types.Tuple, error) {
|
||||
var err error
|
||||
var k types.Tuple
|
||||
var v types.Tuple
|
||||
for nrr.itr != nil || nrr.idx < len(nrr.ranges) {
|
||||
var k types.Value
|
||||
var v types.Value
|
||||
if nrr.itr == nil {
|
||||
r := nrr.ranges[nrr.idx]
|
||||
nrr.idx++
|
||||
@@ -143,46 +143,42 @@ func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Value, types.Valu
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return types.Tuple{}, types.Tuple{}, err
|
||||
}
|
||||
|
||||
nrr.currCheck = r.Check
|
||||
|
||||
k, v, err = nrr.itr.Next(ctx)
|
||||
k, v, err = nrr.itr.NextTuple(ctx)
|
||||
|
||||
if !r.Inclusive && r.Start.Equals(k) {
|
||||
k, v, err = nrr.itr.Next(ctx)
|
||||
if err == nil && !r.Inclusive && r.Start.Compare(k) == 0 {
|
||||
k, v, err = nrr.itr.NextTuple(ctx)
|
||||
}
|
||||
} else {
|
||||
k, v, err = nrr.itr.Next(ctx)
|
||||
k, v, err = nrr.itr.NextTuple(ctx)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
if err != nil && err != io.EOF {
|
||||
return types.Tuple{}, types.Tuple{}, err
|
||||
}
|
||||
|
||||
var inRange bool
|
||||
if k != nil {
|
||||
inRange, err = nrr.currCheck(k.(types.Tuple))
|
||||
if err != io.EOF {
|
||||
inRange, err = nrr.currCheck(k)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return types.Tuple{}, types.Tuple{}, err
|
||||
}
|
||||
|
||||
if !inRange {
|
||||
nrr.itr = nil
|
||||
nrr.currCheck = nil
|
||||
continue
|
||||
} else {
|
||||
if inRange {
|
||||
return k, v, nil
|
||||
}
|
||||
} else {
|
||||
nrr.itr = nil
|
||||
nrr.currCheck = nil
|
||||
}
|
||||
|
||||
nrr.itr = nil
|
||||
nrr.currCheck = nil
|
||||
}
|
||||
|
||||
return nil, nil, io.EOF
|
||||
return types.Tuple{}, types.Tuple{}, io.EOF
|
||||
}
|
||||
|
||||
// VerifySchema checks that the incoming schema matches the schema from the existing table
|
||||
|
||||
@@ -14,42 +14,52 @@
|
||||
|
||||
package store
|
||||
|
||||
/*import (
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
const (
|
||||
simIdxBenchDataset = "simulated_index_benchmark"
|
||||
numRows = 100000
|
||||
rangeSize = 10
|
||||
)
|
||||
|
||||
func poe(err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
var benchmarkTmpDir = os.TempDir()
|
||||
|
||||
func getBenchmarkDB(ctx context.Context) datas.Database {
|
||||
cs, err := nbs.NewLocalStore(ctx, types.Format_Default.VersionString(), benchmarkTmpDir, 1<<28)
|
||||
func getDBAtDir(ctx context.Context, dir string) datas.Database {
|
||||
cs, err := nbs.NewLocalStore(ctx, types.Format_Default.VersionString(), dir, 1<<28)
|
||||
poe(err)
|
||||
|
||||
return datas.NewDatabase(nbs.NewNBSMetricWrapper(cs))
|
||||
}
|
||||
|
||||
const (
|
||||
simIdxBenchDataset = "simulated_index_benchmark"
|
||||
numRows = 100000
|
||||
rangeSize = 10
|
||||
)
|
||||
|
||||
var benchmarkTmpDir = os.TempDir()
|
||||
var genOnce = &sync.Once{}
|
||||
|
||||
func getBenchmarkDB(ctx context.Context) datas.Database {
|
||||
return getDBAtDir(ctx, benchmarkTmpDir)
|
||||
}
|
||||
|
||||
func writeTupleToDB(ctx context.Context, db datas.Database, dsID string, vals ...types.Value) {
|
||||
root, err := types.NewTuple(db.Format(), vals...)
|
||||
poe(err)
|
||||
@@ -83,44 +93,59 @@ func readTupleFromDB(ctx context.Context, t require.TestingT, dsID string) (*typ
|
||||
return db.Format(), valSlice
|
||||
}
|
||||
|
||||
func init() {
|
||||
ctx := context.Background()
|
||||
db := getBenchmarkDB(ctx)
|
||||
nbf := db.Format()
|
||||
var testDataCols = []schema.Column{
|
||||
schema.NewColumn("id", 0, types.IntKind, true),
|
||||
schema.NewColumn("fColh", 1, types.FloatKind, false),
|
||||
schema.NewColumn("bCol", 2, types.BoolKind, false),
|
||||
schema.NewColumn("uuidStrCol", 3, types.StringKind, false),
|
||||
schema.NewColumn("timeCol", 4, types.TimestampKind, false),
|
||||
schema.NewColumn("colInt1", 6, types.IntKind, false),
|
||||
schema.NewColumn("colInt2", 7, types.IntKind, false),
|
||||
schema.NewColumn("colInt3", 8, types.IntKind, false),
|
||||
schema.NewColumn("colInt4", 9, types.IntKind, false),
|
||||
}
|
||||
|
||||
m, err := types.NewMap(ctx, db)
|
||||
poe(err)
|
||||
func generateTestData(ctx context.Context) {
|
||||
genOnce.Do(func() {
|
||||
db := getBenchmarkDB(ctx)
|
||||
nbf := db.Format()
|
||||
|
||||
idx, err := types.NewMap(ctx, db)
|
||||
poe(err)
|
||||
|
||||
me := m.Edit()
|
||||
idxMe := idx.Edit()
|
||||
rng := rand.New(rand.NewSource(0))
|
||||
for i := 0; i <= numRows; i++ {
|
||||
k, err := types.NewTuple(nbf, types.Uint(0), types.Int(int64(i)))
|
||||
poe(err)
|
||||
randf := rng.Float64()
|
||||
v, err := types.NewTuple(nbf, types.Uint(1), types.Float(randf), types.Uint(2), types.Bool(i%2 == 0), types.Uint(3), types.String(uuid.New().String()), types.Uint(4), types.Timestamp(time.Now()))
|
||||
poe(err)
|
||||
idxKey, err := types.NewTuple(nbf, types.Uint(5), types.Float(randf), types.Uint(0), types.Int(int64(i)))
|
||||
m, err := types.NewMap(ctx, db)
|
||||
poe(err)
|
||||
|
||||
me = me.Set(k, v)
|
||||
idxMe = idxMe.Set(idxKey, types.NullValue)
|
||||
}
|
||||
idx, err := types.NewMap(ctx, db)
|
||||
poe(err)
|
||||
|
||||
m, err = me.Map(ctx)
|
||||
poe(err)
|
||||
me := m.Edit()
|
||||
idxMe := idx.Edit()
|
||||
rng := rand.New(rand.NewSource(0))
|
||||
for i := 0; i <= numRows; i++ {
|
||||
k, err := types.NewTuple(nbf, types.Uint(0), types.Int(int64(i)))
|
||||
poe(err)
|
||||
randf := rng.Float64()
|
||||
v, err := types.NewTuple(nbf, types.Uint(1), types.Float(randf), types.Uint(2), types.Bool(i%2 == 0), types.Uint(3), types.String(uuid.New().String()), types.Uint(4), types.Timestamp(time.Now()), types.Uint(6), types.Int(-100), types.Uint(7), types.Int(-1000), types.Uint(8), types.Int(-10000), types.Uint(9), types.Int(-1000000))
|
||||
poe(err)
|
||||
idxKey, err := types.NewTuple(nbf, types.Uint(5), types.Float(randf), types.Uint(0), types.Int(int64(i)))
|
||||
poe(err)
|
||||
|
||||
idx, err = idxMe.Map(ctx)
|
||||
poe(err)
|
||||
me = me.Set(k, v)
|
||||
idxMe = idxMe.Set(idxKey, types.NullValue)
|
||||
}
|
||||
|
||||
writeTupleToDB(ctx, db, simIdxBenchDataset, m, idx)
|
||||
m, err = me.Map(ctx)
|
||||
poe(err)
|
||||
|
||||
idx, err = idxMe.Map(ctx)
|
||||
poe(err)
|
||||
|
||||
writeTupleToDB(ctx, db, simIdxBenchDataset, m, idx)
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkSimulatedIndex(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
generateTestData(ctx)
|
||||
|
||||
rng := rand.New(rand.NewSource(0))
|
||||
nbf, vals := readTupleFromDB(ctx, b, simIdxBenchDataset)
|
||||
|
||||
@@ -158,6 +183,8 @@ func BenchmarkSimulatedIndex(b *testing.B) {
|
||||
|
||||
func BenchmarkSimulatedCoveringIndex(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
generateTestData(ctx)
|
||||
|
||||
rng := rand.New(rand.NewSource(0))
|
||||
nbf, vals := readTupleFromDB(ctx, b, simIdxBenchDataset)
|
||||
|
||||
@@ -182,4 +209,88 @@ func BenchmarkSimulatedCoveringIndex(b *testing.B) {
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMapItr(b *testing.B) {
|
||||
ctx := context.Background()
|
||||
generateTestData(ctx)
|
||||
|
||||
require.True(b, b.N < numRows, "b.N:%d >= numRows:%d", b.N, numRows)
|
||||
|
||||
_, vals := readTupleFromDB(ctx, b, simIdxBenchDataset)
|
||||
m := vals[0].(types.Map)
|
||||
|
||||
itr, err := m.RangeIterator(ctx, 0, uint64(b.N))
|
||||
require.NoError(b, err)
|
||||
|
||||
var closeFunc func() error
|
||||
if cl, ok := itr.(io.Closer); ok {
|
||||
closeFunc = cl.Close
|
||||
}
|
||||
|
||||
dmItr := sqle.NewDoltMapIter(ctx, itr.NextTuple, closeFunc, sqle.NewKVToSqlRowConverterForCols(m.Format(), testDataCols))
|
||||
|
||||
b.ResetTimer()
|
||||
for {
|
||||
var r sql.Row
|
||||
r, err = dmItr.Next()
|
||||
|
||||
if r == nil || err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
|
||||
if err != io.EOF {
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}
|
||||
|
||||
/*func BenchmarkFullScan(b *testing.B) {
|
||||
const dir = "dolt directory containing db with table to scan"
|
||||
const branch = "master"
|
||||
const tableName = "bigram_counts"
|
||||
|
||||
ctx := context.Background()
|
||||
ddb, err := doltdb.LoadDoltDB(ctx, types.Format_Default, dir)
|
||||
require.NoError(b, err)
|
||||
|
||||
cs, err := doltdb.NewCommitSpec("HEAD")
|
||||
require.NoError(b, err)
|
||||
|
||||
cm, err := ddb.Resolve(ctx, cs, ref.NewBranchRef(branch))
|
||||
require.NoError(b, err)
|
||||
|
||||
root, err := cm.GetRootValue()
|
||||
require.NoError(b, err)
|
||||
|
||||
tbl, ok, err := root.GetTable(ctx, tableName)
|
||||
require.NoError(b, err)
|
||||
require.True(b, ok)
|
||||
|
||||
m, err := tbl.GetRowData(ctx)
|
||||
require.NoError(b, err)
|
||||
require.True(b, uint64(b.N) < m.Len(), "b.N:%d >= numRows:%d", b.N, m.Len())
|
||||
|
||||
itr, err := m.RangeIterator(ctx, 0, uint64(b.N))
|
||||
require.NoError(b, err)
|
||||
|
||||
dmItr := sqle.NewDoltMapIter(ctx, itr.NextTuple, closeFunc, sqle.NewKVToSqlRowConverterForCols(m.Format(), testDataCols))
|
||||
|
||||
b.ResetTimer()
|
||||
for {
|
||||
var r sql.Row
|
||||
r, err = dmItr.Next()
|
||||
|
||||
if r == nil || err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
b.StopTimer()
|
||||
|
||||
if err != io.EOF {
|
||||
require.NoError(b, err)
|
||||
}
|
||||
}*/
|
||||
|
||||
|
||||
|
||||
@@ -79,6 +79,11 @@ func (cur *bufferedSequenceIterator) current() (sequenceItem, error) {
|
||||
return cur.seq.getItem(cur.idx)
|
||||
}
|
||||
|
||||
func (cur *bufferedSequenceIterator) currentTuple() (tupleMapEntry, error) {
|
||||
d.PanicIfFalse(cur.valid())
|
||||
return cur.seq.(mapLeafSequence).getTupleMapEntry(cur.idx)
|
||||
}
|
||||
|
||||
func (cur *bufferedSequenceIterator) valid() bool {
|
||||
return cur.idx >= 0 && cur.idx < cur.length()
|
||||
}
|
||||
|
||||
@@ -200,8 +200,13 @@ func (b *binaryNomsReader) skipFloat(nbf *NomsBinFormat) {
|
||||
}
|
||||
|
||||
func (b *binaryNomsReader) skipInt() {
|
||||
_, count := unrolledDecodeVarint(b.buff[b.offset:])
|
||||
b.offset += uint32(count)
|
||||
maxOffset := b.offset + 10
|
||||
for ; b.offset < maxOffset; b.offset++ {
|
||||
if b.buff[b.offset]&0x80 == 0 {
|
||||
b.offset++
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *binaryNomsReader) ReadInt() int64 {
|
||||
@@ -289,8 +294,13 @@ func unrolledDecodeVarint(buf []byte) (int64, int) {
|
||||
}
|
||||
|
||||
func (b *binaryNomsReader) skipUint() {
|
||||
_, count := unrolledDecodeUVarint(b.buff[b.offset:])
|
||||
b.offset += uint32(count)
|
||||
maxOffset := b.offset + 10
|
||||
for ; b.offset < maxOffset; b.offset++ {
|
||||
if b.buff[b.offset]&0x80 == 0 {
|
||||
b.offset++
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *binaryNomsReader) ReadBool() bool {
|
||||
|
||||
@@ -310,6 +310,17 @@ func (m Map) MaybeGet(ctx context.Context, key Value) (v Value, ok bool, err err
|
||||
return entry.value, true, nil
|
||||
}
|
||||
|
||||
func (m Map) MaybeGetTuple(ctx context.Context, key Tuple) (v Tuple, ok bool, err error) {
|
||||
var val Value
|
||||
val, ok, err = m.MaybeGet(ctx, key)
|
||||
|
||||
if val != nil {
|
||||
return val.(Tuple), ok, err
|
||||
}
|
||||
|
||||
return Tuple{}, ok, err
|
||||
}
|
||||
|
||||
func (m Map) Has(ctx context.Context, key Value) (bool, error) {
|
||||
cur, err := newCursorAtValue(ctx, m.orderedSequence, key, false, false)
|
||||
|
||||
|
||||
@@ -27,22 +27,21 @@ import (
|
||||
"io"
|
||||
)
|
||||
|
||||
// MapIterator is the interface used by iterators over Noms Maps.
|
||||
type MapIterator interface {
|
||||
Next(ctx context.Context) (k, v Value, err error)
|
||||
}
|
||||
|
||||
// MapTupleIterator is an iterator that returns map keys and values as types.Tuple instances and follow the standard go
|
||||
// convention of using io.EOF to mean that all the data has been read.
|
||||
type MapTupleIterator interface {
|
||||
Next(ctx context.Context) (k, v Tuple, err error)
|
||||
NextTuple(ctx context.Context) (k, v Tuple, err error)
|
||||
}
|
||||
|
||||
// MapIterator is the interface used by iterators over Noms Maps.
|
||||
type MapIterator interface {
|
||||
MapTupleIterator
|
||||
Next(ctx context.Context) (k, v Value, err error)
|
||||
}
|
||||
|
||||
// mapIterator can efficiently iterate through a Noms Map.
|
||||
type mapIterator struct {
|
||||
sequenceIter sequenceIterator
|
||||
currentKey Value
|
||||
currentValue Value
|
||||
}
|
||||
|
||||
// Next returns the subsequent entries from the Map, starting with the entry at which the iterator
|
||||
@@ -56,17 +55,38 @@ func (mi *mapIterator) Next(ctx context.Context) (k, v Value, err error) {
|
||||
}
|
||||
|
||||
entry := item.(mapEntry)
|
||||
mi.currentKey, mi.currentValue = entry.key, entry.value
|
||||
_, err = mi.sequenceIter.advance(ctx)
|
||||
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
} else {
|
||||
mi.currentKey, mi.currentValue = nil, nil
|
||||
}
|
||||
|
||||
return mi.currentKey, mi.currentValue, nil
|
||||
return entry.key, entry.value, nil
|
||||
} else {
|
||||
return nil, nil, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Next returns the subsequent entries from the Map, starting with the entry at which the iterator
|
||||
// was created. If there are no more entries, Next() returns nils.
|
||||
func (mi *mapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) {
|
||||
if mi.sequenceIter.valid() {
|
||||
entry, err := mi.sequenceIter.currentTuple()
|
||||
|
||||
if err != nil {
|
||||
return Tuple{}, Tuple{}, err
|
||||
}
|
||||
|
||||
_, err = mi.sequenceIter.advance(ctx)
|
||||
|
||||
if err != nil {
|
||||
return Tuple{}, Tuple{}, err
|
||||
}
|
||||
|
||||
return entry.key, entry.value, nil
|
||||
} else {
|
||||
return Tuple{}, Tuple{}, io.EOF
|
||||
}
|
||||
}
|
||||
|
||||
var errClosed = errors.New("closed")
|
||||
@@ -75,7 +95,7 @@ type mapRangeIter struct {
|
||||
collItr *collTupleRangeIter
|
||||
}
|
||||
|
||||
func (itr *mapRangeIter) Next(ctx context.Context) (k, v Tuple, err error) {
|
||||
func (itr *mapRangeIter) NextTuple(ctx context.Context) (k, v Tuple, err error) {
|
||||
if itr.collItr == nil {
|
||||
// only happens if there is nothing to iterate over
|
||||
return Tuple{}, Tuple{}, io.EOF
|
||||
|
||||
@@ -29,6 +29,11 @@ type mapLeafSequence struct {
|
||||
leafSequence
|
||||
}
|
||||
|
||||
type tupleMapEntry struct {
|
||||
key Tuple
|
||||
value Tuple
|
||||
}
|
||||
|
||||
type mapEntry struct {
|
||||
key Value
|
||||
value Value
|
||||
@@ -60,6 +65,22 @@ func readMapEntry(r *valueDecoder, nbf *NomsBinFormat) (mapEntry, error) {
|
||||
return mapEntry{k, v}, nil
|
||||
}
|
||||
|
||||
func readTupleMapEntry(r *valueDecoder, nbf *NomsBinFormat) (tupleMapEntry, error) {
|
||||
k, err := r.readTuple(nbf)
|
||||
|
||||
if err != nil {
|
||||
return tupleMapEntry{}, err
|
||||
}
|
||||
|
||||
v, err := r.readTuple(nbf)
|
||||
|
||||
if err != nil {
|
||||
return tupleMapEntry{}, err
|
||||
}
|
||||
|
||||
return tupleMapEntry{k, v}, nil
|
||||
}
|
||||
|
||||
func (entry mapEntry) equals(other mapEntry) bool {
|
||||
return entry.key.Equals(other.key) && entry.value.Equals(other.value)
|
||||
}
|
||||
@@ -131,6 +152,11 @@ func (ml mapLeafSequence) getItem(idx int) (sequenceItem, error) {
|
||||
return readMapEntry(&dec, ml.format())
|
||||
}
|
||||
|
||||
func (ml mapLeafSequence) getTupleMapEntry(idx int) (tupleMapEntry, error) {
|
||||
dec := ml.decoderSkipToIndex(idx)
|
||||
return readTupleMapEntry(&dec, ml.format())
|
||||
}
|
||||
|
||||
func (ml mapLeafSequence) WalkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
w := binaryNomsWriter{make([]byte, 4), 0}
|
||||
err := ml.writeTo(&w, ml.format())
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
type sequenceIterator interface {
|
||||
valid() bool
|
||||
current() (sequenceItem, error)
|
||||
currentTuple() (tupleMapEntry, error)
|
||||
advance(ctx context.Context) (bool, error)
|
||||
iter(ctx context.Context, cb cursorIterCallback) error
|
||||
}
|
||||
@@ -101,6 +102,12 @@ func (cur *sequenceCursor) current() (sequenceItem, error) {
|
||||
return cur.getItem(cur.idx)
|
||||
}
|
||||
|
||||
// currentTuple returns the tupleMapEntry at the current cursor position
|
||||
func (cur *sequenceCursor) currentTuple() (tupleMapEntry, error) {
|
||||
d.PanicIfFalse(cur.valid())
|
||||
return cur.seq.(mapLeafSequence).getTupleMapEntry(cur.idx)
|
||||
}
|
||||
|
||||
func (cur *sequenceCursor) valid() bool {
|
||||
return cur.idx >= 0 && cur.idx < cur.length()
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@@ -137,6 +138,24 @@ func (itr *TupleIterator) Next() (uint64, Value, error) {
|
||||
return itr.count, nil, nil
|
||||
}
|
||||
|
||||
func (itr *TupleIterator) NextUint64() (uint64, uint64, error) {
|
||||
if itr.pos < itr.count {
|
||||
k := itr.dec.ReadKind()
|
||||
|
||||
if k != UintKind {
|
||||
return 0, 0, errors.New("NextUint64 called when the next value is not a Uint64")
|
||||
}
|
||||
|
||||
valPos := itr.pos
|
||||
val := itr.dec.ReadUint()
|
||||
itr.pos++
|
||||
|
||||
return valPos, val, nil
|
||||
}
|
||||
|
||||
return itr.count, 0, io.EOF
|
||||
}
|
||||
|
||||
func (itr *TupleIterator) CodecReader() (CodecReader, uint64) {
|
||||
return itr.dec, itr.count - itr.pos
|
||||
}
|
||||
@@ -337,6 +356,10 @@ func (t Tuple) PrefixEquals(ctx context.Context, other Tuple, prefixCount uint64
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (t Tuple) Compare(other Tuple) int {
|
||||
return bytes.Compare(t.buff, other.buff)
|
||||
}
|
||||
|
||||
func (t Tuple) typeOf() (*Type, error) {
|
||||
dec, count := t.decoderSkipToFields()
|
||||
ts := make(typeSlice, 0, count)
|
||||
@@ -569,52 +592,162 @@ func (t Tuple) splitFieldsAt(n uint64) (prolog, head, tail []byte, count uint64,
|
||||
return
|
||||
}
|
||||
|
||||
func (t Tuple) Less(nbf *NomsBinFormat, other LesserValuable) (bool, error) {
|
||||
if otherTuple, ok := other.(Tuple); ok {
|
||||
itrs := tupItrPairPool.Get().(*tupleItrPair)
|
||||
defer tupItrPairPool.Put(itrs)
|
||||
func (t Tuple) TupleLess(nbf *NomsBinFormat, otherTuple Tuple) (bool, error) {
|
||||
itrs := tupItrPairPool.Get().(*tupleItrPair)
|
||||
defer tupItrPairPool.Put(itrs)
|
||||
|
||||
itr := itrs.thisItr
|
||||
err := itr.InitForTuple(t)
|
||||
itr := itrs.thisItr
|
||||
err := itr.InitForTuple(t)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
otherItr := itrs.otherItr
|
||||
err = otherItr.InitForTuple(otherTuple)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
for itr.HasMore() {
|
||||
if !otherItr.HasMore() {
|
||||
// equal up til the end of other. other is shorter, therefore it is less
|
||||
return false, nil
|
||||
}
|
||||
|
||||
_, currVal, err := itr.Next()
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
_, currOthVal, err := otherItr.Next()
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if !currVal.Equals(currOthVal) {
|
||||
return currVal.Less(nbf, currOthVal)
|
||||
}
|
||||
}
|
||||
|
||||
return itr.Len() < otherItr.Len(), nil
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return TupleKind < other.Kind(), nil
|
||||
otherItr := itrs.otherItr
|
||||
err = otherItr.InitForTuple(otherTuple)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
smallerCount := itr.count
|
||||
if otherItr.count < smallerCount {
|
||||
smallerCount = otherItr.count
|
||||
}
|
||||
|
||||
dec := itr.dec
|
||||
otherDec := otherItr.dec
|
||||
for i := uint64(0); i < smallerCount; i++ {
|
||||
kind := dec.ReadKind()
|
||||
otherKind := otherDec.ReadKind()
|
||||
|
||||
if kind != otherKind {
|
||||
return kind < otherKind, nil
|
||||
}
|
||||
|
||||
var res int
|
||||
switch kind {
|
||||
case NullKind:
|
||||
continue
|
||||
|
||||
case BoolKind:
|
||||
res = int(dec.buff[dec.offset]) - int(otherDec.buff[otherDec.offset])
|
||||
dec.offset += 1
|
||||
otherDec.offset += 1
|
||||
|
||||
case StringKind, InlineBlobKind:
|
||||
size, otherSize := uint32(dec.readCount()), uint32(otherDec.readCount())
|
||||
start, otherStart := dec.offset, otherDec.offset
|
||||
dec.offset += size
|
||||
otherDec.offset += otherSize
|
||||
res = bytes.Compare(dec.buff[start:dec.offset], otherDec.buff[otherStart:otherDec.offset])
|
||||
|
||||
case UUIDKind:
|
||||
start, otherStart := dec.offset, otherDec.offset
|
||||
dec.offset += uuidNumBytes
|
||||
otherDec.offset += uuidNumBytes
|
||||
res = bytes.Compare(dec.buff[start:dec.offset], otherDec.buff[otherStart:otherDec.offset])
|
||||
|
||||
case IntKind:
|
||||
n := dec.ReadInt()
|
||||
otherN := otherDec.ReadInt()
|
||||
|
||||
if n == otherN {
|
||||
continue
|
||||
} else {
|
||||
return n < otherN, nil
|
||||
}
|
||||
|
||||
case UintKind:
|
||||
n := dec.ReadUint()
|
||||
otherN := otherDec.ReadUint()
|
||||
|
||||
if n == otherN {
|
||||
continue
|
||||
} else {
|
||||
return n < otherN, nil
|
||||
}
|
||||
|
||||
case DecimalKind:
|
||||
d, err := dec.ReadDecimal()
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
otherD, err := otherDec.ReadDecimal()
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
res = d.Cmp(otherD)
|
||||
|
||||
case FloatKind:
|
||||
f := dec.ReadFloat(nbf)
|
||||
otherF := otherDec.ReadFloat(nbf)
|
||||
res = int(f - otherF)
|
||||
|
||||
if f == otherF {
|
||||
continue
|
||||
} else {
|
||||
return f < otherF, nil
|
||||
}
|
||||
|
||||
case TimestampKind:
|
||||
tm, err := dec.ReadTimestamp()
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
otherTm, err := otherDec.ReadTimestamp()
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if tm.Equal(otherTm) {
|
||||
continue
|
||||
} else {
|
||||
return tm.Before(otherTm), nil
|
||||
}
|
||||
|
||||
default:
|
||||
v, err := dec.readValue(nbf)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
otherV, err := otherDec.readValue(nbf)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if v.Equals(otherV) {
|
||||
continue
|
||||
} else {
|
||||
return v.Less(nbf, otherV)
|
||||
}
|
||||
}
|
||||
|
||||
if res != 0 {
|
||||
return res < 0, nil
|
||||
}
|
||||
}
|
||||
|
||||
return itr.Len() < otherItr.Len(), nil
|
||||
}
|
||||
|
||||
func (t Tuple) Less(nbf *NomsBinFormat, other LesserValuable) (bool, error) {
|
||||
otherTuple, ok := other.(Tuple)
|
||||
if !ok {
|
||||
return TupleKind < other.Kind(), nil
|
||||
}
|
||||
|
||||
return t.TupleLess(nbf, otherTuple)
|
||||
}
|
||||
|
||||
func (t Tuple) StartsWith(otherTuple Tuple) bool {
|
||||
|
||||
@@ -17,7 +17,9 @@ package types
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -285,3 +287,22 @@ func TestTupleStartsWith(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLess(b *testing.B) {
|
||||
nbf := Format_Default
|
||||
rng := rand.New(rand.NewSource(0))
|
||||
|
||||
tuples := make([]Tuple, b.N+1)
|
||||
for i := 0; i < len(tuples); i++ {
|
||||
randf := rng.Float64()
|
||||
v, err := NewTuple(nbf, Uint(1), Float(randf), Uint(2), Bool(i%2 == 0), Uint(3), String(uuid.New().String()), Uint(4), Timestamp(time.Now()), Uint(6), Int(-100), Uint(7), Int(-1000), Uint(8), Int(-10000), Uint(9), Int(-1000000))
|
||||
require.NoError(b, err)
|
||||
|
||||
tuples[i] = v
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
tuples[i].Less(nbf, tuples[i+1])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,6 +176,56 @@ type valueReadWriter interface {
|
||||
valueReadWriter() ValueReadWriter
|
||||
}
|
||||
|
||||
type TupleSlice []Tuple
|
||||
|
||||
func (vs TupleSlice) Equals(other TupleSlice) bool {
|
||||
if len(vs) != len(other) {
|
||||
return false
|
||||
}
|
||||
|
||||
for i, v := range vs {
|
||||
if !v.Equals(other[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (vs TupleSlice) Contains(nbf *NomsBinFormat, v Tuple) bool {
|
||||
for _, v := range vs {
|
||||
if v.Equals(v) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type TupleSort struct {
|
||||
Tuples []Tuple
|
||||
Nbf *NomsBinFormat
|
||||
}
|
||||
|
||||
func (vs TupleSort) Len() int {
|
||||
return len(vs.Tuples)
|
||||
}
|
||||
|
||||
func (vs TupleSort) Swap(i, j int) {
|
||||
vs.Tuples[i], vs.Tuples[j] = vs.Tuples[j], vs.Tuples[i]
|
||||
}
|
||||
|
||||
func (vs TupleSort) Less(i, j int) (bool, error) {
|
||||
return vs.Tuples[i].TupleLess(vs.Nbf, vs.Tuples[j])
|
||||
}
|
||||
|
||||
func (vs TupleSort) Equals(other TupleSort) bool {
|
||||
return TupleSlice(vs.Tuples).Equals(other.Tuples)
|
||||
}
|
||||
|
||||
func (vs TupleSort) Contains(v Tuple) bool {
|
||||
return TupleSlice(vs.Tuples).Contains(vs.Nbf, v)
|
||||
}
|
||||
|
||||
type valueImpl struct {
|
||||
vrw ValueReadWriter
|
||||
nbf *NomsBinFormat
|
||||
|
||||
Reference in New Issue
Block a user