From 377de80b1102b8da03db59cbb92e6d373a36b8cd Mon Sep 17 00:00:00 2001 From: Brian Hendriks Date: Tue, 26 Jan 2021 12:38:47 -0800 Subject: [PATCH] Decrease the number of allocations for indexed reads (#1239) Decrease the number of allocations for indexed reads optimize less and skipint/skipuint --- go/libraries/doltcore/sqle/index_lookup.go | 2 +- go/libraries/doltcore/sqle/index_row_iter.go | 35 ++- go/libraries/doltcore/sqle/rows.go | 2 +- .../table/editor/keyless_table_editor.go | 13 +- .../doltcore/table/map_point_reader.go | 53 +++-- .../doltcore/table/typed/noms/range_reader.go | 44 ++-- go/store/store_test.go | 187 ++++++++++++--- go/store/types/buffered_sequence_iterator.go | 5 + go/store/types/codec.go | 18 +- go/store/types/map.go | 11 + go/store/types/map_iterator.go | 48 ++-- go/store/types/map_leaf_sequence.go | 26 +++ go/store/types/sequence_cursor.go | 7 + go/store/types/tuple.go | 217 ++++++++++++++---- go/store/types/tuple_test.go | 21 ++ go/store/types/value.go | 50 ++++ 16 files changed, 578 insertions(+), 161 deletions(-) diff --git a/go/libraries/doltcore/sqle/index_lookup.go b/go/libraries/doltcore/sqle/index_lookup.go index 0254d2c997..aec142dbc9 100644 --- a/go/libraries/doltcore/sqle/index_lookup.go +++ b/go/libraries/doltcore/sqle/index_lookup.go @@ -168,5 +168,5 @@ func (il *doltIndexLookup) RowIterForRanges(ctx *sql.Context, ranges []lookup.Ra } type nomsKeyIter interface { - ReadKey(ctx context.Context) (types.Value, error) + ReadKey(ctx context.Context) (types.Tuple, error) } diff --git a/go/libraries/doltcore/sqle/index_row_iter.go b/go/libraries/doltcore/sqle/index_row_iter.go index 85d286a852..77ed95fe17 100644 --- a/go/libraries/doltcore/sqle/index_row_iter.go +++ b/go/libraries/doltcore/sqle/index_row_iter.go @@ -92,13 +92,13 @@ func (i *indexLookupRowIterAdapter) queueRows() { shouldBreak := false pos := 0 for ; pos < len(i.buffer); pos++ { - var indexKey types.Value + var indexKey types.Tuple indexKey, err = i.keyIter.ReadKey(i.ctx) if err != nil { break } exec.Execute(keyPos{ - key: indexKey.(types.Tuple), + key: indexKey, position: pos, }) } @@ -131,42 +131,41 @@ func (i *indexLookupRowIterAdapter) queueRows() { } } -func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat, indexKey types.Tuple) (types.Value, error) { +func (i *indexLookupRowIterAdapter) indexKeyToTableKey(nbf *types.NomsBinFormat, indexKey types.Tuple) (types.Tuple, error) { tplItr, err := indexKey.Iterator() if err != nil { - return nil, err + return types.Tuple{}, err } resVals := make([]types.Value, len(i.pkTags)*2) for { - _, tagVal, err := tplItr.Next() + _, tag, err := tplItr.NextUint64() if err != nil { - return nil, err + if err == io.EOF { + break + } + + return types.Tuple{}, err } - if tagVal == nil { - break - } - - tag := uint64(tagVal.(types.Uint)) idx, inPK := i.pkTags[tag] if inPK { _, valVal, err := tplItr.Next() if err != nil { - return nil, err + return types.Tuple{}, err } - resVals[idx*2] = tagVal + resVals[idx*2] = types.Uint(tag) resVals[idx*2+1] = valVal } else { err := tplItr.Skip() if err != nil { - return nil, err + return types.Tuple{}, err } } } @@ -185,16 +184,16 @@ func (i *indexLookupRowIterAdapter) processKey(_ context.Context, valInt interfa return err } - fieldsVal, _, err := tableData.MaybeGet(i.ctx, pkTupleVal) + fieldsVal, ok, err := tableData.MaybeGetTuple(i.ctx, pkTupleVal) if err != nil { return err } - if fieldsVal == nil { + if !ok { return nil } - sqlRow, err := i.conv.ConvertKVToSqlRow(pkTupleVal, fieldsVal) + sqlRow, err := i.conv.ConvertKVTuplesToSqlRow(pkTupleVal, fieldsVal) if err != nil { return err } @@ -255,7 +254,7 @@ func (ci *coveringIndexRowIterAdapter) Next() (sql.Row, error) { return nil, err } - return ci.conv.ConvertKVToSqlRow(key, nil) + return ci.conv.ConvertKVTuplesToSqlRow(key, types.Tuple{}) } func (ci *coveringIndexRowIterAdapter) Close() error { diff --git a/go/libraries/doltcore/sqle/rows.go b/go/libraries/doltcore/sqle/rows.go index a923cb32ff..db24b363be 100644 --- a/go/libraries/doltcore/sqle/rows.go +++ b/go/libraries/doltcore/sqle/rows.go @@ -102,7 +102,7 @@ func newKeyedRowIter(ctx context.Context, tbl *DoltTable, projectedCols []string } conv := NewKVToSqlRowConverter(tbl.table.Format(), tagToSqlColIdx, cols, len(cols)) - return NewDoltMapIter(ctx, mapIter.Next, nil, conv), nil + return NewDoltMapIter(ctx, mapIter.NextTuple, nil, conv), nil } // Next returns the next row in this row iterator, or an io.EOF error if there aren't any more. diff --git a/go/libraries/doltcore/table/editor/keyless_table_editor.go b/go/libraries/doltcore/table/editor/keyless_table_editor.go index a4b9478874..5804643fb2 100644 --- a/go/libraries/doltcore/table/editor/keyless_table_editor.go +++ b/go/libraries/doltcore/table/editor/keyless_table_editor.go @@ -281,13 +281,13 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc) (*do } idx := 0 - keys := make([]types.Value, len(acc.deltas)) + keys := make([]types.Tuple, len(acc.deltas)) for _, vd := range acc.deltas { keys[idx] = vd.key idx++ } - err = types.SortWithErroringLess(types.ValueSort{Values: keys, Nbf: acc.nbf}) + err = types.SortWithErroringLess(types.TupleSort{Tuples: keys, Nbf: acc.nbf}) if err != nil { return nil, err } @@ -296,25 +296,26 @@ func applyEdits(ctx context.Context, tbl *doltdb.Table, acc keylessEditAcc) (*do iter := table.NewMapPointReader(rowData, keys...) for { - k, v, err := iter.Next(ctx) + k, v, err := iter.NextTuple(ctx) if err == io.EOF { break } + if err != nil { return nil, err } - delta, err := acc.getRowDelta(k.(types.Tuple)) + delta, err := acc.getRowDelta(k) if err != nil { return nil, err } var ok bool - if v == nil { + if v.Empty() { // row does not yet exist v, ok, err = initializeCardinality(delta.val, delta.delta) } else { - v, ok, err = modifyCardinalityWithDelta(v.(types.Tuple), delta.delta) + v, ok, err = modifyCardinalityWithDelta(v, delta.delta) } if err != nil { return nil, err diff --git a/go/libraries/doltcore/table/map_point_reader.go b/go/libraries/doltcore/table/map_point_reader.go index d23c19f84f..240d36348d 100644 --- a/go/libraries/doltcore/table/map_point_reader.go +++ b/go/libraries/doltcore/table/map_point_reader.go @@ -22,34 +22,61 @@ import ( ) type PointReader struct { - m types.Map - keys []types.Value - idx int + m types.Map + emptyTuple types.Tuple + keys []types.Tuple + idx int } var _ types.MapIterator = &PointReader{} // read the map values for a set of map keys -func NewMapPointReader(m types.Map, keys ...types.Value) types.MapIterator { +func NewMapPointReader(m types.Map, keys ...types.Tuple) types.MapIterator { return &PointReader{ - m: m, - keys: keys, + m: m, + emptyTuple: types.EmptyTuple(m.Format()), + keys: keys, } } // Next implements types.MapIterator. func (pr *PointReader) Next(ctx context.Context) (k, v types.Value, err error) { - if pr.idx >= len(pr.keys) { - return nil, nil, io.EOF - } + kt, vt, err := pr.NextTuple(ctx) - k = pr.keys[pr.idx] - // todo: optimize by implementing MapIterator.Seek() - v, _, err = pr.m.MaybeGet(ctx, k) if err != nil { return nil, nil, err } + + if !kt.Empty() { + k = kt + } + + if !vt.Empty() { + v = vt + } + + return k, v, nil +} + +// NextTuple implements types.MapIterator. +func (pr *PointReader) NextTuple(ctx context.Context) (k, v types.Tuple, err error) { + if pr.idx >= len(pr.keys) { + return types.Tuple{}, types.Tuple{}, io.EOF + } + + k = pr.keys[pr.idx] + v = pr.emptyTuple + + var ok bool + // todo: optimize by implementing MapIterator.Seek() + v, ok, err = pr.m.MaybeGetTuple(ctx, k) pr.idx++ - return k, v, err + if err != nil { + return types.Tuple{}, types.Tuple{}, err + } else if !ok { + return k, pr.emptyTuple, nil + } + + return k, v, nil } diff --git a/go/libraries/doltcore/table/typed/noms/range_reader.go b/go/libraries/doltcore/table/typed/noms/range_reader.go index 526717090f..89fd8f54ed 100644 --- a/go/libraries/doltcore/table/typed/noms/range_reader.go +++ b/go/libraries/doltcore/table/typed/noms/range_reader.go @@ -118,20 +118,20 @@ func (nrr *NomsRangeReader) ReadRow(ctx context.Context) (row.Row, error) { return nil, err } - return row.FromNoms(nrr.sch, k.(types.Tuple), v.(types.Tuple)) + return row.FromNoms(nrr.sch, k, v) } -func (nrr *NomsRangeReader) ReadKey(ctx context.Context) (types.Value, error) { +func (nrr *NomsRangeReader) ReadKey(ctx context.Context) (types.Tuple, error) { k, _, err := nrr.ReadKV(ctx) return k, err } -func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Value, types.Value, error) { +func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Tuple, types.Tuple, error) { var err error + var k types.Tuple + var v types.Tuple for nrr.itr != nil || nrr.idx < len(nrr.ranges) { - var k types.Value - var v types.Value if nrr.itr == nil { r := nrr.ranges[nrr.idx] nrr.idx++ @@ -143,46 +143,42 @@ func (nrr *NomsRangeReader) ReadKV(ctx context.Context) (types.Value, types.Valu } if err != nil { - return nil, nil, err + return types.Tuple{}, types.Tuple{}, err } nrr.currCheck = r.Check - k, v, err = nrr.itr.Next(ctx) + k, v, err = nrr.itr.NextTuple(ctx) - if !r.Inclusive && r.Start.Equals(k) { - k, v, err = nrr.itr.Next(ctx) + if err == nil && !r.Inclusive && r.Start.Compare(k) == 0 { + k, v, err = nrr.itr.NextTuple(ctx) } } else { - k, v, err = nrr.itr.Next(ctx) + k, v, err = nrr.itr.NextTuple(ctx) } - if err != nil { - return nil, nil, err + if err != nil && err != io.EOF { + return types.Tuple{}, types.Tuple{}, err } var inRange bool - if k != nil { - inRange, err = nrr.currCheck(k.(types.Tuple)) + if err != io.EOF { + inRange, err = nrr.currCheck(k) if err != nil { - return nil, nil, err + return types.Tuple{}, types.Tuple{}, err } - if !inRange { - nrr.itr = nil - nrr.currCheck = nil - continue - } else { + if inRange { return k, v, nil } - } else { - nrr.itr = nil - nrr.currCheck = nil } + + nrr.itr = nil + nrr.currCheck = nil } - return nil, nil, io.EOF + return types.Tuple{}, types.Tuple{}, io.EOF } // VerifySchema checks that the incoming schema matches the schema from the existing table diff --git a/go/store/store_test.go b/go/store/store_test.go index c865edd7e8..5118544729 100644 --- a/go/store/store_test.go +++ b/go/store/store_test.go @@ -14,42 +14,52 @@ package store -/*import ( +import ( "context" + "io" "math/rand" "os" + "sync" "testing" "time" + "github.com/dolthub/go-mysql-server/sql" "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/nbs" "github.com/dolthub/dolt/go/store/types" ) -const ( - simIdxBenchDataset = "simulated_index_benchmark" - numRows = 100000 - rangeSize = 10 -) - func poe(err error) { if err != nil { panic(err) } } -var benchmarkTmpDir = os.TempDir() - -func getBenchmarkDB(ctx context.Context) datas.Database { - cs, err := nbs.NewLocalStore(ctx, types.Format_Default.VersionString(), benchmarkTmpDir, 1<<28) +func getDBAtDir(ctx context.Context, dir string) datas.Database { + cs, err := nbs.NewLocalStore(ctx, types.Format_Default.VersionString(), dir, 1<<28) poe(err) return datas.NewDatabase(nbs.NewNBSMetricWrapper(cs)) } +const ( + simIdxBenchDataset = "simulated_index_benchmark" + numRows = 100000 + rangeSize = 10 +) + +var benchmarkTmpDir = os.TempDir() +var genOnce = &sync.Once{} + +func getBenchmarkDB(ctx context.Context) datas.Database { + return getDBAtDir(ctx, benchmarkTmpDir) +} + func writeTupleToDB(ctx context.Context, db datas.Database, dsID string, vals ...types.Value) { root, err := types.NewTuple(db.Format(), vals...) poe(err) @@ -83,44 +93,59 @@ func readTupleFromDB(ctx context.Context, t require.TestingT, dsID string) (*typ return db.Format(), valSlice } -func init() { - ctx := context.Background() - db := getBenchmarkDB(ctx) - nbf := db.Format() +var testDataCols = []schema.Column{ + schema.NewColumn("id", 0, types.IntKind, true), + schema.NewColumn("fColh", 1, types.FloatKind, false), + schema.NewColumn("bCol", 2, types.BoolKind, false), + schema.NewColumn("uuidStrCol", 3, types.StringKind, false), + schema.NewColumn("timeCol", 4, types.TimestampKind, false), + schema.NewColumn("colInt1", 6, types.IntKind, false), + schema.NewColumn("colInt2", 7, types.IntKind, false), + schema.NewColumn("colInt3", 8, types.IntKind, false), + schema.NewColumn("colInt4", 9, types.IntKind, false), +} - m, err := types.NewMap(ctx, db) - poe(err) +func generateTestData(ctx context.Context) { + genOnce.Do(func() { + db := getBenchmarkDB(ctx) + nbf := db.Format() - idx, err := types.NewMap(ctx, db) - poe(err) - - me := m.Edit() - idxMe := idx.Edit() - rng := rand.New(rand.NewSource(0)) - for i := 0; i <= numRows; i++ { - k, err := types.NewTuple(nbf, types.Uint(0), types.Int(int64(i))) - poe(err) - randf := rng.Float64() - v, err := types.NewTuple(nbf, types.Uint(1), types.Float(randf), types.Uint(2), types.Bool(i%2 == 0), types.Uint(3), types.String(uuid.New().String()), types.Uint(4), types.Timestamp(time.Now())) - poe(err) - idxKey, err := types.NewTuple(nbf, types.Uint(5), types.Float(randf), types.Uint(0), types.Int(int64(i))) + m, err := types.NewMap(ctx, db) poe(err) - me = me.Set(k, v) - idxMe = idxMe.Set(idxKey, types.NullValue) - } + idx, err := types.NewMap(ctx, db) + poe(err) - m, err = me.Map(ctx) - poe(err) + me := m.Edit() + idxMe := idx.Edit() + rng := rand.New(rand.NewSource(0)) + for i := 0; i <= numRows; i++ { + k, err := types.NewTuple(nbf, types.Uint(0), types.Int(int64(i))) + poe(err) + randf := rng.Float64() + v, err := types.NewTuple(nbf, types.Uint(1), types.Float(randf), types.Uint(2), types.Bool(i%2 == 0), types.Uint(3), types.String(uuid.New().String()), types.Uint(4), types.Timestamp(time.Now()), types.Uint(6), types.Int(-100), types.Uint(7), types.Int(-1000), types.Uint(8), types.Int(-10000), types.Uint(9), types.Int(-1000000)) + poe(err) + idxKey, err := types.NewTuple(nbf, types.Uint(5), types.Float(randf), types.Uint(0), types.Int(int64(i))) + poe(err) - idx, err = idxMe.Map(ctx) - poe(err) + me = me.Set(k, v) + idxMe = idxMe.Set(idxKey, types.NullValue) + } - writeTupleToDB(ctx, db, simIdxBenchDataset, m, idx) + m, err = me.Map(ctx) + poe(err) + + idx, err = idxMe.Map(ctx) + poe(err) + + writeTupleToDB(ctx, db, simIdxBenchDataset, m, idx) + }) } func BenchmarkSimulatedIndex(b *testing.B) { ctx := context.Background() + generateTestData(ctx) + rng := rand.New(rand.NewSource(0)) nbf, vals := readTupleFromDB(ctx, b, simIdxBenchDataset) @@ -158,6 +183,8 @@ func BenchmarkSimulatedIndex(b *testing.B) { func BenchmarkSimulatedCoveringIndex(b *testing.B) { ctx := context.Background() + generateTestData(ctx) + rng := rand.New(rand.NewSource(0)) nbf, vals := readTupleFromDB(ctx, b, simIdxBenchDataset) @@ -182,4 +209,88 @@ func BenchmarkSimulatedCoveringIndex(b *testing.B) { } } } +} + +func BenchmarkMapItr(b *testing.B) { + ctx := context.Background() + generateTestData(ctx) + + require.True(b, b.N < numRows, "b.N:%d >= numRows:%d", b.N, numRows) + + _, vals := readTupleFromDB(ctx, b, simIdxBenchDataset) + m := vals[0].(types.Map) + + itr, err := m.RangeIterator(ctx, 0, uint64(b.N)) + require.NoError(b, err) + + var closeFunc func() error + if cl, ok := itr.(io.Closer); ok { + closeFunc = cl.Close + } + + dmItr := sqle.NewDoltMapIter(ctx, itr.NextTuple, closeFunc, sqle.NewKVToSqlRowConverterForCols(m.Format(), testDataCols)) + + b.ResetTimer() + for { + var r sql.Row + r, err = dmItr.Next() + + if r == nil || err != nil { + break + } + } + b.StopTimer() + + if err != io.EOF { + require.NoError(b, err) + } +} + +/*func BenchmarkFullScan(b *testing.B) { + const dir = "dolt directory containing db with table to scan" + const branch = "master" + const tableName = "bigram_counts" + + ctx := context.Background() + ddb, err := doltdb.LoadDoltDB(ctx, types.Format_Default, dir) + require.NoError(b, err) + + cs, err := doltdb.NewCommitSpec("HEAD") + require.NoError(b, err) + + cm, err := ddb.Resolve(ctx, cs, ref.NewBranchRef(branch)) + require.NoError(b, err) + + root, err := cm.GetRootValue() + require.NoError(b, err) + + tbl, ok, err := root.GetTable(ctx, tableName) + require.NoError(b, err) + require.True(b, ok) + + m, err := tbl.GetRowData(ctx) + require.NoError(b, err) + require.True(b, uint64(b.N) < m.Len(), "b.N:%d >= numRows:%d", b.N, m.Len()) + + itr, err := m.RangeIterator(ctx, 0, uint64(b.N)) + require.NoError(b, err) + + dmItr := sqle.NewDoltMapIter(ctx, itr.NextTuple, closeFunc, sqle.NewKVToSqlRowConverterForCols(m.Format(), testDataCols)) + + b.ResetTimer() + for { + var r sql.Row + r, err = dmItr.Next() + + if r == nil || err != nil { + break + } + } + b.StopTimer() + + if err != io.EOF { + require.NoError(b, err) + } }*/ + + diff --git a/go/store/types/buffered_sequence_iterator.go b/go/store/types/buffered_sequence_iterator.go index 9f829ea443..30be8844e7 100644 --- a/go/store/types/buffered_sequence_iterator.go +++ b/go/store/types/buffered_sequence_iterator.go @@ -79,6 +79,11 @@ func (cur *bufferedSequenceIterator) current() (sequenceItem, error) { return cur.seq.getItem(cur.idx) } +func (cur *bufferedSequenceIterator) currentTuple() (tupleMapEntry, error) { + d.PanicIfFalse(cur.valid()) + return cur.seq.(mapLeafSequence).getTupleMapEntry(cur.idx) +} + func (cur *bufferedSequenceIterator) valid() bool { return cur.idx >= 0 && cur.idx < cur.length() } diff --git a/go/store/types/codec.go b/go/store/types/codec.go index e6e9d03a59..65da549517 100644 --- a/go/store/types/codec.go +++ b/go/store/types/codec.go @@ -200,8 +200,13 @@ func (b *binaryNomsReader) skipFloat(nbf *NomsBinFormat) { } func (b *binaryNomsReader) skipInt() { - _, count := unrolledDecodeVarint(b.buff[b.offset:]) - b.offset += uint32(count) + maxOffset := b.offset + 10 + for ; b.offset < maxOffset; b.offset++ { + if b.buff[b.offset]&0x80 == 0 { + b.offset++ + return + } + } } func (b *binaryNomsReader) ReadInt() int64 { @@ -289,8 +294,13 @@ func unrolledDecodeVarint(buf []byte) (int64, int) { } func (b *binaryNomsReader) skipUint() { - _, count := unrolledDecodeUVarint(b.buff[b.offset:]) - b.offset += uint32(count) + maxOffset := b.offset + 10 + for ; b.offset < maxOffset; b.offset++ { + if b.buff[b.offset]&0x80 == 0 { + b.offset++ + return + } + } } func (b *binaryNomsReader) ReadBool() bool { diff --git a/go/store/types/map.go b/go/store/types/map.go index 88569492ef..bf90a79f69 100644 --- a/go/store/types/map.go +++ b/go/store/types/map.go @@ -310,6 +310,17 @@ func (m Map) MaybeGet(ctx context.Context, key Value) (v Value, ok bool, err err return entry.value, true, nil } +func (m Map) MaybeGetTuple(ctx context.Context, key Tuple) (v Tuple, ok bool, err error) { + var val Value + val, ok, err = m.MaybeGet(ctx, key) + + if val != nil { + return val.(Tuple), ok, err + } + + return Tuple{}, ok, err +} + func (m Map) Has(ctx context.Context, key Value) (bool, error) { cur, err := newCursorAtValue(ctx, m.orderedSequence, key, false, false) diff --git a/go/store/types/map_iterator.go b/go/store/types/map_iterator.go index 221c2bde32..1bb8ec035f 100644 --- a/go/store/types/map_iterator.go +++ b/go/store/types/map_iterator.go @@ -27,22 +27,21 @@ import ( "io" ) -// MapIterator is the interface used by iterators over Noms Maps. -type MapIterator interface { - Next(ctx context.Context) (k, v Value, err error) -} - // MapTupleIterator is an iterator that returns map keys and values as types.Tuple instances and follow the standard go // convention of using io.EOF to mean that all the data has been read. type MapTupleIterator interface { - Next(ctx context.Context) (k, v Tuple, err error) + NextTuple(ctx context.Context) (k, v Tuple, err error) +} + +// MapIterator is the interface used by iterators over Noms Maps. +type MapIterator interface { + MapTupleIterator + Next(ctx context.Context) (k, v Value, err error) } // mapIterator can efficiently iterate through a Noms Map. type mapIterator struct { sequenceIter sequenceIterator - currentKey Value - currentValue Value } // Next returns the subsequent entries from the Map, starting with the entry at which the iterator @@ -56,17 +55,38 @@ func (mi *mapIterator) Next(ctx context.Context) (k, v Value, err error) { } entry := item.(mapEntry) - mi.currentKey, mi.currentValue = entry.key, entry.value _, err = mi.sequenceIter.advance(ctx) if err != nil { return nil, nil, err } - } else { - mi.currentKey, mi.currentValue = nil, nil - } - return mi.currentKey, mi.currentValue, nil + return entry.key, entry.value, nil + } else { + return nil, nil, nil + } +} + +// Next returns the subsequent entries from the Map, starting with the entry at which the iterator +// was created. If there are no more entries, Next() returns nils. +func (mi *mapIterator) NextTuple(ctx context.Context) (k, v Tuple, err error) { + if mi.sequenceIter.valid() { + entry, err := mi.sequenceIter.currentTuple() + + if err != nil { + return Tuple{}, Tuple{}, err + } + + _, err = mi.sequenceIter.advance(ctx) + + if err != nil { + return Tuple{}, Tuple{}, err + } + + return entry.key, entry.value, nil + } else { + return Tuple{}, Tuple{}, io.EOF + } } var errClosed = errors.New("closed") @@ -75,7 +95,7 @@ type mapRangeIter struct { collItr *collTupleRangeIter } -func (itr *mapRangeIter) Next(ctx context.Context) (k, v Tuple, err error) { +func (itr *mapRangeIter) NextTuple(ctx context.Context) (k, v Tuple, err error) { if itr.collItr == nil { // only happens if there is nothing to iterate over return Tuple{}, Tuple{}, io.EOF diff --git a/go/store/types/map_leaf_sequence.go b/go/store/types/map_leaf_sequence.go index 5b08730b1c..ad9aa55977 100644 --- a/go/store/types/map_leaf_sequence.go +++ b/go/store/types/map_leaf_sequence.go @@ -29,6 +29,11 @@ type mapLeafSequence struct { leafSequence } +type tupleMapEntry struct { + key Tuple + value Tuple +} + type mapEntry struct { key Value value Value @@ -60,6 +65,22 @@ func readMapEntry(r *valueDecoder, nbf *NomsBinFormat) (mapEntry, error) { return mapEntry{k, v}, nil } +func readTupleMapEntry(r *valueDecoder, nbf *NomsBinFormat) (tupleMapEntry, error) { + k, err := r.readTuple(nbf) + + if err != nil { + return tupleMapEntry{}, err + } + + v, err := r.readTuple(nbf) + + if err != nil { + return tupleMapEntry{}, err + } + + return tupleMapEntry{k, v}, nil +} + func (entry mapEntry) equals(other mapEntry) bool { return entry.key.Equals(other.key) && entry.value.Equals(other.value) } @@ -131,6 +152,11 @@ func (ml mapLeafSequence) getItem(idx int) (sequenceItem, error) { return readMapEntry(&dec, ml.format()) } +func (ml mapLeafSequence) getTupleMapEntry(idx int) (tupleMapEntry, error) { + dec := ml.decoderSkipToIndex(idx) + return readTupleMapEntry(&dec, ml.format()) +} + func (ml mapLeafSequence) WalkRefs(nbf *NomsBinFormat, cb RefCallback) error { w := binaryNomsWriter{make([]byte, 4), 0} err := ml.writeTo(&w, ml.format()) diff --git a/go/store/types/sequence_cursor.go b/go/store/types/sequence_cursor.go index 43fd8d9fda..da1ac64187 100644 --- a/go/store/types/sequence_cursor.go +++ b/go/store/types/sequence_cursor.go @@ -31,6 +31,7 @@ import ( type sequenceIterator interface { valid() bool current() (sequenceItem, error) + currentTuple() (tupleMapEntry, error) advance(ctx context.Context) (bool, error) iter(ctx context.Context, cb cursorIterCallback) error } @@ -101,6 +102,12 @@ func (cur *sequenceCursor) current() (sequenceItem, error) { return cur.getItem(cur.idx) } +// currentTuple returns the tupleMapEntry at the current cursor position +func (cur *sequenceCursor) currentTuple() (tupleMapEntry, error) { + d.PanicIfFalse(cur.valid()) + return cur.seq.(mapLeafSequence).getTupleMapEntry(cur.idx) +} + func (cur *sequenceCursor) valid() bool { return cur.idx >= 0 && cur.idx < cur.length() } diff --git a/go/store/types/tuple.go b/go/store/types/tuple.go index 73d13da358..ed7d49b577 100644 --- a/go/store/types/tuple.go +++ b/go/store/types/tuple.go @@ -26,6 +26,7 @@ import ( "context" "errors" "fmt" + "io" "strings" "sync" @@ -137,6 +138,24 @@ func (itr *TupleIterator) Next() (uint64, Value, error) { return itr.count, nil, nil } +func (itr *TupleIterator) NextUint64() (uint64, uint64, error) { + if itr.pos < itr.count { + k := itr.dec.ReadKind() + + if k != UintKind { + return 0, 0, errors.New("NextUint64 called when the next value is not a Uint64") + } + + valPos := itr.pos + val := itr.dec.ReadUint() + itr.pos++ + + return valPos, val, nil + } + + return itr.count, 0, io.EOF +} + func (itr *TupleIterator) CodecReader() (CodecReader, uint64) { return itr.dec, itr.count - itr.pos } @@ -337,6 +356,10 @@ func (t Tuple) PrefixEquals(ctx context.Context, other Tuple, prefixCount uint64 return true, nil } +func (t Tuple) Compare(other Tuple) int { + return bytes.Compare(t.buff, other.buff) +} + func (t Tuple) typeOf() (*Type, error) { dec, count := t.decoderSkipToFields() ts := make(typeSlice, 0, count) @@ -569,52 +592,162 @@ func (t Tuple) splitFieldsAt(n uint64) (prolog, head, tail []byte, count uint64, return } -func (t Tuple) Less(nbf *NomsBinFormat, other LesserValuable) (bool, error) { - if otherTuple, ok := other.(Tuple); ok { - itrs := tupItrPairPool.Get().(*tupleItrPair) - defer tupItrPairPool.Put(itrs) +func (t Tuple) TupleLess(nbf *NomsBinFormat, otherTuple Tuple) (bool, error) { + itrs := tupItrPairPool.Get().(*tupleItrPair) + defer tupItrPairPool.Put(itrs) - itr := itrs.thisItr - err := itr.InitForTuple(t) + itr := itrs.thisItr + err := itr.InitForTuple(t) - if err != nil { - return false, err - } - - otherItr := itrs.otherItr - err = otherItr.InitForTuple(otherTuple) - - if err != nil { - return false, err - } - - for itr.HasMore() { - if !otherItr.HasMore() { - // equal up til the end of other. other is shorter, therefore it is less - return false, nil - } - - _, currVal, err := itr.Next() - - if err != nil { - return false, err - } - - _, currOthVal, err := otherItr.Next() - - if err != nil { - return false, err - } - - if !currVal.Equals(currOthVal) { - return currVal.Less(nbf, currOthVal) - } - } - - return itr.Len() < otherItr.Len(), nil + if err != nil { + return false, err } - return TupleKind < other.Kind(), nil + otherItr := itrs.otherItr + err = otherItr.InitForTuple(otherTuple) + + if err != nil { + return false, err + } + + smallerCount := itr.count + if otherItr.count < smallerCount { + smallerCount = otherItr.count + } + + dec := itr.dec + otherDec := otherItr.dec + for i := uint64(0); i < smallerCount; i++ { + kind := dec.ReadKind() + otherKind := otherDec.ReadKind() + + if kind != otherKind { + return kind < otherKind, nil + } + + var res int + switch kind { + case NullKind: + continue + + case BoolKind: + res = int(dec.buff[dec.offset]) - int(otherDec.buff[otherDec.offset]) + dec.offset += 1 + otherDec.offset += 1 + + case StringKind, InlineBlobKind: + size, otherSize := uint32(dec.readCount()), uint32(otherDec.readCount()) + start, otherStart := dec.offset, otherDec.offset + dec.offset += size + otherDec.offset += otherSize + res = bytes.Compare(dec.buff[start:dec.offset], otherDec.buff[otherStart:otherDec.offset]) + + case UUIDKind: + start, otherStart := dec.offset, otherDec.offset + dec.offset += uuidNumBytes + otherDec.offset += uuidNumBytes + res = bytes.Compare(dec.buff[start:dec.offset], otherDec.buff[otherStart:otherDec.offset]) + + case IntKind: + n := dec.ReadInt() + otherN := otherDec.ReadInt() + + if n == otherN { + continue + } else { + return n < otherN, nil + } + + case UintKind: + n := dec.ReadUint() + otherN := otherDec.ReadUint() + + if n == otherN { + continue + } else { + return n < otherN, nil + } + + case DecimalKind: + d, err := dec.ReadDecimal() + + if err != nil { + return false, err + } + + otherD, err := otherDec.ReadDecimal() + + if err != nil { + return false, err + } + + res = d.Cmp(otherD) + + case FloatKind: + f := dec.ReadFloat(nbf) + otherF := otherDec.ReadFloat(nbf) + res = int(f - otherF) + + if f == otherF { + continue + } else { + return f < otherF, nil + } + + case TimestampKind: + tm, err := dec.ReadTimestamp() + + if err != nil { + return false, err + } + + otherTm, err := otherDec.ReadTimestamp() + + if err != nil { + return false, err + } + + if tm.Equal(otherTm) { + continue + } else { + return tm.Before(otherTm), nil + } + + default: + v, err := dec.readValue(nbf) + + if err != nil { + return false, err + } + + otherV, err := otherDec.readValue(nbf) + + if err != nil { + return false, err + } + + if v.Equals(otherV) { + continue + } else { + return v.Less(nbf, otherV) + } + } + + if res != 0 { + return res < 0, nil + } + } + + return itr.Len() < otherItr.Len(), nil +} + +func (t Tuple) Less(nbf *NomsBinFormat, other LesserValuable) (bool, error) { + otherTuple, ok := other.(Tuple) + if !ok { + return TupleKind < other.Kind(), nil + } + + return t.TupleLess(nbf, otherTuple) } func (t Tuple) StartsWith(otherTuple Tuple) bool { diff --git a/go/store/types/tuple_test.go b/go/store/types/tuple_test.go index 13f476d915..23d0defb74 100644 --- a/go/store/types/tuple_test.go +++ b/go/store/types/tuple_test.go @@ -17,7 +17,9 @@ package types import ( "context" "fmt" + "math/rand" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -285,3 +287,22 @@ func TestTupleStartsWith(t *testing.T) { }) } } + +func BenchmarkLess(b *testing.B) { + nbf := Format_Default + rng := rand.New(rand.NewSource(0)) + + tuples := make([]Tuple, b.N+1) + for i := 0; i < len(tuples); i++ { + randf := rng.Float64() + v, err := NewTuple(nbf, Uint(1), Float(randf), Uint(2), Bool(i%2 == 0), Uint(3), String(uuid.New().String()), Uint(4), Timestamp(time.Now()), Uint(6), Int(-100), Uint(7), Int(-1000), Uint(8), Int(-10000), Uint(9), Int(-1000000)) + require.NoError(b, err) + + tuples[i] = v + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + tuples[i].Less(nbf, tuples[i+1]) + } +} diff --git a/go/store/types/value.go b/go/store/types/value.go index d4bc0d1a8f..111c584eba 100644 --- a/go/store/types/value.go +++ b/go/store/types/value.go @@ -176,6 +176,56 @@ type valueReadWriter interface { valueReadWriter() ValueReadWriter } +type TupleSlice []Tuple + +func (vs TupleSlice) Equals(other TupleSlice) bool { + if len(vs) != len(other) { + return false + } + + for i, v := range vs { + if !v.Equals(other[i]) { + return false + } + } + + return true +} + +func (vs TupleSlice) Contains(nbf *NomsBinFormat, v Tuple) bool { + for _, v := range vs { + if v.Equals(v) { + return true + } + } + return false +} + +type TupleSort struct { + Tuples []Tuple + Nbf *NomsBinFormat +} + +func (vs TupleSort) Len() int { + return len(vs.Tuples) +} + +func (vs TupleSort) Swap(i, j int) { + vs.Tuples[i], vs.Tuples[j] = vs.Tuples[j], vs.Tuples[i] +} + +func (vs TupleSort) Less(i, j int) (bool, error) { + return vs.Tuples[i].TupleLess(vs.Nbf, vs.Tuples[j]) +} + +func (vs TupleSort) Equals(other TupleSort) bool { + return TupleSlice(vs.Tuples).Equals(other.Tuples) +} + +func (vs TupleSort) Contains(v Tuple) bool { + return TupleSlice(vs.Tuples).Contains(vs.Nbf, v) +} + type valueImpl struct { vrw ValueReadWriter nbf *NomsBinFormat