decrease allocations made while reading values (#1212)

* decrease allocations made while reading values
* Read tuples from sequences directly without conversion to Values (#1213)
* fix unreleased date encoding bug
This commit is contained in:
Brian Hendriks
2021-01-13 15:46:54 -08:00
committed by GitHub
parent dfaf082fa2
commit bfed2aafda
16 changed files with 165 additions and 57 deletions

View File

@@ -63,6 +63,9 @@ func CreateDatetimeTypeFromParams(params map[string]string) (TypeInfo, error) {
// ConvertNomsValueToValue implements TypeInfo interface.
func (ti *datetimeType) ConvertNomsValueToValue(v types.Value) (interface{}, error) {
if val, ok := v.(types.Timestamp); ok {
if ti.Equals(DateType) {
return time.Time(val).Truncate(24 * time.Hour).UTC(), nil
}
return time.Time(val).UTC(), nil
}
if _, ok := v.(types.Null); ok || v == nil {
@@ -82,6 +85,9 @@ func (ti *datetimeType) ReadFrom(_ *types.NomsBinFormat, reader types.CodecReade
return nil, err
}
if ti.Equals(DateType) {
return t.Truncate(24 * time.Hour).UTC(), nil
}
return t.UTC(), nil
case types.NullKind:
return nil, nil

View File

@@ -18,6 +18,7 @@ import (
"context"
"fmt"
"strconv"
"unsafe"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/sqltypes"
@@ -71,7 +72,7 @@ func CreateInlineBlobTypeFromParams(params map[string]string) (TypeInfo, error)
// ConvertNomsValueToValue implements TypeInfo interface.
func (ti *inlineBlobType) ConvertNomsValueToValue(v types.Value) (interface{}, error) {
if val, ok := v.(types.InlineBlob); ok {
return string(val), nil
return *(*string)(unsafe.Pointer(&val)), nil
}
if _, ok := v.(types.Null); ok || v == nil {
return nil, nil
@@ -85,7 +86,7 @@ func (ti *inlineBlobType) ReadFrom(_ *types.NomsBinFormat, reader types.CodecRea
switch k {
case types.InlineBlobKind:
bytes := reader.ReadInlineBlob()
return string(bytes), nil
return *(*string)(unsafe.Pointer(&bytes)), nil
case types.NullKind:
return nil, nil
}
@@ -104,7 +105,7 @@ func (ti *inlineBlobType) ConvertValueToNomsValue(ctx context.Context, vrw types
}
val, ok := strVal.(string)
if ok {
return types.InlineBlob(val), nil
return *(*types.InlineBlob)(unsafe.Pointer(&val)), nil
}
return nil, fmt.Errorf(`"%v" has unexpectedly encountered a value of type "%T" from embedded type`, ti.String(), v)
}
@@ -187,7 +188,7 @@ func (ti *inlineBlobType) ParseValue(ctx context.Context, vrw types.ValueReadWri
return nil, err
}
if val, ok := strVal.(string); ok {
return types.InlineBlob(val), nil
return *(*types.InlineBlob)(unsafe.Pointer(&val)), nil
}
return nil, fmt.Errorf(`"%v" cannot convert the string "%v" to a value`, ti.String(), str)
}

View File

@@ -105,6 +105,8 @@ func verifyTypeInfoArrays(t *testing.T, tiArrays [][]TypeInfo, vaArrays [][]type
// assuming valid data, verifies that the To-From interface{} functions can round trip
func testTypeInfoConvertRoundTrip(t *testing.T, tiArrays [][]TypeInfo, vaArrays [][]types.Value) {
nbf := types.Format_Default
for rowIndex, tiArray := range tiArrays {
t.Run(tiArray[0].GetTypeIdentifier().String(), func(t *testing.T) {
for _, ti := range tiArray {
@@ -125,6 +127,18 @@ func testTypeInfoConvertRoundTrip(t *testing.T, tiArrays [][]TypeInfo, vaArrays
} else if ti.GetTypeIdentifier() != DecimalTypeIdentifier { // Any Decimal's on-disk representation varies by precision/scale
require.True(t, val.Equals(outVal), "\"%v\"\n\"%v\"", val, outVal)
}
tup, err := types.NewTuple(nbf, outVal)
require.NoError(t, err)
itr, err := tup.Iterator()
require.NoError(t, err)
reader, n := itr.CodecReader()
require.Equal(t, uint64(1), n)
readVal, err := ti.ReadFrom(nbf, reader)
require.Equal(t, readVal, vInterface)
}
})
}

View File

@@ -36,8 +36,9 @@ func maxU64(x, y uint64) uint64 {
// KVToSqlRowConverter takes noms types.Value key value pairs and converts them directly to a sql.Row. It
// can be configured to only process a portion of the columns and map columns to desired output columns.
type KVToSqlRowConverter struct {
tagToSqlColIdx map[uint64]int
nbf *types.NomsBinFormat
cols []schema.Column
tagToSqlColIdx map[uint64]int
// rowSize is the number of columns in the output row. This may be bigger than the number of columns being converted,
// but not less. When rowSize is bigger than the number of columns being processed that means that some of the columns
// in the output row will be filled with nils
@@ -47,12 +48,13 @@ type KVToSqlRowConverter struct {
maxValTag uint64
}
func NewKVToSqlRowConverter(tagToSqlColIdx map[uint64]int, cols []schema.Column, rowSize int) *KVToSqlRowConverter {
func NewKVToSqlRowConverter(nbf *types.NomsBinFormat, tagToSqlColIdx map[uint64]int, cols []schema.Column, rowSize int) *KVToSqlRowConverter {
valsFromKey, valsFromVal, maxValTag := getValLocations(tagToSqlColIdx, cols)
return &KVToSqlRowConverter{
tagToSqlColIdx: tagToSqlColIdx,
nbf: nbf,
cols: cols,
tagToSqlColIdx: tagToSqlColIdx,
rowSize: rowSize,
valsFromKey: valsFromKey,
valsFromVal: valsFromVal,
@@ -80,13 +82,13 @@ func getValLocations(tagToSqlColIdx map[uint64]int, cols []schema.Column) (int,
}
// NewKVToSqlRowConverterForCols returns a KVToSqlConverter instance based on the list of columns passed in
func NewKVToSqlRowConverterForCols(cols []schema.Column) *KVToSqlRowConverter {
func NewKVToSqlRowConverterForCols(nbf *types.NomsBinFormat, cols []schema.Column) *KVToSqlRowConverter {
tagToSqlColIdx := make(map[uint64]int)
for i, col := range cols {
tagToSqlColIdx[col.Tag] = i
}
return NewKVToSqlRowConverter(tagToSqlColIdx, cols, len(cols))
return NewKVToSqlRowConverter(nbf, tagToSqlColIdx, cols, len(cols))
}
// ConvertKVToSqlRow returns a sql.Row generated from the key and value provided.
@@ -104,15 +106,22 @@ func (conv *KVToSqlRowConverter) ConvertKVToSqlRow(k, v types.Value) (sql.Row, e
if !ok {
return nil, errors.New("invalid value is not a tuple")
}
} else {
valTup = types.EmptyTuple(conv.nbf)
}
return conv.ConvertKVTuplesToSqlRow(keyTup, valTup)
}
// ConvertKVToSqlRow returns a sql.Row generated from the key and value provided.
func (conv *KVToSqlRowConverter) ConvertKVTuplesToSqlRow(k, v types.Tuple) (sql.Row, error) {
tupItr := types.TupleItrPool.Get().(*types.TupleIterator)
defer types.TupleItrPool.Put(tupItr)
cols := make([]interface{}, conv.rowSize)
if conv.valsFromKey > 0 {
// keys are not in sorted order so cannot use max tag to early exit
err := conv.processTuple(cols, conv.valsFromKey, 0xFFFFFFFFFFFFFFFF, keyTup, tupItr)
err := conv.processTuple(cols, conv.valsFromKey, 0xFFFFFFFFFFFFFFFF, k, tupItr)
if err != nil {
return nil, err
@@ -120,7 +129,7 @@ func (conv *KVToSqlRowConverter) ConvertKVToSqlRow(k, v types.Value) (sql.Row, e
}
if conv.valsFromVal > 0 {
err := conv.processTuple(cols, conv.valsFromVal, conv.maxValTag, valTup, tupItr)
err := conv.processTuple(cols, conv.valsFromVal, conv.maxValTag, v, tupItr)
if err != nil {
return nil, err
@@ -178,19 +187,24 @@ func (conv *KVToSqlRowConverter) processTuple(cols []interface{}, valsToFill int
}
// KVGetFunc defines a function that returns a Key Value pair
type KVGetFunc func(ctx context.Context) (types.Value, types.Value, error)
type KVGetFunc func(ctx context.Context) (types.Tuple, types.Tuple, error)
func GetGetFuncForMapIter(mapItr types.MapIterator) func(ctx context.Context) (types.Value, types.Value, error) {
return func(ctx context.Context) (types.Value, types.Value, error) {
func GetGetFuncForMapIter(nbf *types.NomsBinFormat, mapItr types.MapIterator) func(ctx context.Context) (types.Tuple, types.Tuple, error) {
return func(ctx context.Context) (types.Tuple, types.Tuple, error) {
k, v, err := mapItr.Next(ctx)
if err != nil {
return nil, nil, err
return types.Tuple{}, types.Tuple{}, err
} else if k == nil {
return nil, nil, io.EOF
return types.Tuple{}, types.Tuple{}, io.EOF
}
return k, v, nil
valTup, ok := v.(types.Tuple)
if !ok {
valTup = types.EmptyTuple(nbf)
}
return k.(types.Tuple), valTup, nil
}
}
@@ -221,7 +235,7 @@ func (dmi *DoltMapIter) Next() (sql.Row, error) {
return nil, err
}
return dmi.conv.ConvertKVToSqlRow(k, v)
return dmi.conv.ConvertKVTuplesToSqlRow(k, v)
}
func (dmi *DoltMapIter) Close() error {

View File

@@ -51,7 +51,7 @@ func NewIndexLookupRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter nomsK
}
cols := idx.Schema().GetAllCols().GetColumns()
conv := NewKVToSqlRowConverterForCols(cols)
conv := NewKVToSqlRowConverterForCols(idx.IndexRowData().Format(), cols)
iter := &indexLookupRowIterAdapter{
idx: idx,
@@ -239,7 +239,7 @@ func NewCoveringIndexRowIterAdapter(ctx *sql.Context, idx DoltIndex, keyIter nom
return &coveringIndexRowIterAdapter{
idx: idx,
keyIter: keyIter,
conv: NewKVToSqlRowConverter(tagToSqlColIdx, cols, len(cols)),
conv: NewKVToSqlRowConverter(idx.IndexRowData().Format(), tagToSqlColIdx, cols, len(cols)),
ctx: ctx,
pkCols: sch.GetPKCols(),
nonPKCols: sch.GetNonPKCols(),

View File

@@ -79,9 +79,9 @@ func newKeyedRowIter(ctx context.Context, tbl *DoltTable, partition *doltTablePa
return nil, err
}
var mapIter types.MapIterator
var mapIter types.MapTupleIterator
if partition == nil {
mapIter, err = rowData.BufferedIterator(ctx)
mapIter, err = rowData.RangeIterator(ctx, 0, rowData.Len())
} else {
mapIter, err = partition.IteratorForPartition(ctx, rowData)
}
@@ -104,8 +104,8 @@ func newKeyedRowIter(ctx context.Context, tbl *DoltTable, partition *doltTablePa
}
}
conv := NewKVToSqlRowConverter(tagToSqlColIdx, cols, len(cols))
return NewDoltMapIter(ctx, GetGetFuncForMapIter(mapIter), nil, conv), nil
conv := NewKVToSqlRowConverter(tbl.table.Format(), tagToSqlColIdx, cols, len(cols))
return NewDoltMapIter(ctx, mapIter.Next, nil, conv), nil
}
// Next returns the next row in this row iterator, or an io.EOF error if there aren't any more.

View File

@@ -522,7 +522,7 @@ func (p doltTablePartition) Key() []byte {
// IteratorForPartition returns a types.MapIterator implementation which will iterate through the values
// for index = start; index < end. This iterator is not thread safe and should only be used from a single go routine
// unless paired with a mutex
func (p doltTablePartition) IteratorForPartition(ctx context.Context, m types.Map) (types.MapIterator, error) {
func (p doltTablePartition) IteratorForPartition(ctx context.Context, m types.Map) (types.MapTupleIterator, error) {
return m.RangeIterator(ctx, p.start, p.end)
}

View File

@@ -25,6 +25,7 @@ import (
"encoding/binary"
"math"
"time"
"unsafe"
"github.com/google/uuid"
"github.com/shopspring/decimal"
@@ -101,6 +102,12 @@ type binaryNomsReader struct {
}
func (b *binaryNomsReader) readBytes(count uint32) []byte {
v := b.buff[b.offset : b.offset+count]
b.offset += count
return v
}
func (b *binaryNomsReader) readCopyOfBytes(count uint32) []byte {
v := make([]byte, count)
copy(v, b.buff[b.offset:b.offset+count])
b.offset += count
@@ -296,10 +303,9 @@ func (b *binaryNomsReader) skipBool() {
func (b *binaryNomsReader) ReadString() string {
size := uint32(b.readCount())
v := string(b.buff[b.offset : b.offset+size])
strBytes := b.buff[b.offset : b.offset+size]
b.offset += size
return v
return *(*string)(unsafe.Pointer(&strBytes))
}
func (b *binaryNomsReader) ReadInlineBlob() []byte {

View File

@@ -87,6 +87,32 @@ func (seq leafSequence) valuesSlice(from, to uint64) ([]Value, error) {
return vs, nil
}
func (seq leafSequence) kvTuples(from, to uint64, dest []Tuple) ([]Tuple, error) {
if l := seq.Len(); to > l {
to = l
}
dec := seq.decoderSkipToIndex(int(from))
numTuples := (to - from) * uint64(getValuesPerIdx(seq.Kind()))
if uint64(cap(dest)) < numTuples {
dest = make([]Tuple, numTuples)
}
dest = dest[:numTuples]
nbf := seq.format()
for i := uint64(0); i < numTuples; i++ {
var err error
dest[i], err = dec.readTuple(nbf)
if err != nil {
return nil, err
}
}
return dest, nil
}
func (seq leafSequence) getCompareFnHelper(other leafSequence) compareFn {
dec := seq.decoder()
otherDec := other.decoder()

View File

@@ -23,6 +23,7 @@ package types
import (
"context"
"io"
"sync/atomic"
"github.com/dolthub/dolt/go/store/atomicerr"
@@ -311,34 +312,38 @@ func iterAll(ctx context.Context, col Collection, f func(v Value, index uint64)
return ae.Get()
}
type collRangeIter struct {
type collTupleRangeIter struct {
leaves []Collection
currLeaf int
startIdx uint64
endIdx uint64
valsPerIdx uint64
currLeafValues []Value
currLeafValues []Tuple
leafValPos int
nbf *NomsBinFormat
tupleBuffer []Tuple
}
func (itr *collRangeIter) Next() (Value, error) {
func (itr *collTupleRangeIter) Next() (Tuple, error) {
var err error
if itr.currLeafValues == nil {
if itr.currLeaf >= len(itr.leaves) {
// reached the end
return nil, nil
return Tuple{}, io.EOF
}
currLeaf := itr.leaves[itr.currLeaf]
itr.currLeaf++
seq := currLeaf.asSequence()
itr.currLeafValues, err = seq.valuesSlice(itr.startIdx, itr.endIdx)
itr.leafValPos = 0
itr.tupleBuffer, err = seq.kvTuples(itr.startIdx, itr.endIdx, itr.tupleBuffer)
if err != nil {
return nil, err
return Tuple{}, err
}
itr.currLeafValues = itr.tupleBuffer
itr.leafValPos = 0
}
v := itr.currLeafValues[itr.leafValPos]
@@ -353,7 +358,7 @@ func (itr *collRangeIter) Next() (Value, error) {
return v, nil
}
func newCollRangeIter(ctx context.Context, col Collection, startIdx, endIdx uint64) (*collRangeIter, error) {
func newCollRangeIter(ctx context.Context, col Collection, startIdx, endIdx uint64) (*collTupleRangeIter, error) {
l := col.Len()
d.PanicIfTrue(startIdx > endIdx || endIdx > l)
if startIdx == endIdx {
@@ -370,11 +375,13 @@ func newCollRangeIter(ctx context.Context, col Collection, startIdx, endIdx uint
startIdx = localStart
valuesPerIdx := uint64(getValuesPerIdx(col.Kind()))
return &collRangeIter{
leaves: leaves,
startIdx: startIdx,
endIdx: endIdx,
valsPerIdx: valuesPerIdx,
return &collTupleRangeIter{
leaves: leaves,
startIdx: startIdx,
endIdx: endIdx,
valsPerIdx: valuesPerIdx,
tupleBuffer: make([]Tuple, 32),
nbf: col.asSequence().format(),
}, nil
}

View File

@@ -24,6 +24,7 @@ package types
import (
"context"
"errors"
"io"
)
// MapIterator is the interface used by iterators over Noms Maps.
@@ -31,6 +32,12 @@ type MapIterator interface {
Next(ctx context.Context) (k, v Value, err error)
}
// MapTupleIterator is an iterator that returns map keys and values as types.Tuple instances and follow the standard go
// convention of using io.EOF to mean that all the data has been read.
type MapTupleIterator interface {
Next(ctx context.Context) (k, v Tuple, err error)
}
// mapIterator can efficiently iterate through a Noms Map.
type mapIterator struct {
sequenceIter sequenceIterator
@@ -62,34 +69,35 @@ func (mi *mapIterator) Next(ctx context.Context) (k, v Value, err error) {
return mi.currentKey, mi.currentValue, nil
}
type mapKeyValuePair struct {
k Value
v Value
}
var errClosed = errors.New("closed")
type mapRangeIter struct {
collItr *collRangeIter
collItr *collTupleRangeIter
}
func (itr *mapRangeIter) Next(context.Context) (Value, Value, error) {
k, err := itr.collItr.Next()
if err != nil {
return nil, nil, err
func (itr *mapRangeIter) Next(ctx context.Context) (k, v Tuple, err error) {
if itr.collItr == nil {
// only happens if there is nothing to iterate over
return Tuple{}, Tuple{}, io.EOF
}
v, err := itr.collItr.Next()
k, err = itr.collItr.Next()
if err != nil {
return nil, nil, err
return Tuple{}, Tuple{}, err
}
v, err = itr.collItr.Next()
if err != nil {
return Tuple{}, Tuple{}, err
}
return k, v, nil
}
func (m Map) RangeIterator(ctx context.Context, startIdx, endIdx uint64) (MapIterator, error) {
func (m Map) RangeIterator(ctx context.Context, startIdx, endIdx uint64) (MapTupleIterator, error) {
// newCollRangeItr returns nil if the number of elements being iterated over is 0
collItr, err := newCollRangeIter(ctx, m, startIdx, endIdx)
if err != nil {

View File

@@ -362,6 +362,10 @@ func (ms metaSequence) valuesSlice(from, to uint64) ([]Value, error) {
panic("meta sequence")
}
func (seq metaSequence) kvTuples(from, to uint64, dest []Tuple) ([]Tuple, error) {
panic("meta sequence")
}
func (ms metaSequence) typeOf() (*Type, error) {
dec, count := ms.decoderSkipToValues()
ts := make(typeSlice, 0, count)
@@ -646,6 +650,10 @@ func (es emptySequence) valuesSlice(from, to uint64) ([]Value, error) {
panic("empty sequence")
}
func (es emptySequence) kvTuples(from, to uint64, dest []Tuple) ([]Tuple, error) {
panic("empty sequence")
}
func (es emptySequence) writeTo(w nomsWriter, nbf *NomsBinFormat) error {
panic("empty sequence")
}

View File

@@ -53,6 +53,7 @@ type sequence interface {
typeOf() (*Type, error)
valueReadWriter() ValueReadWriter
valuesSlice(from, to uint64) ([]Value, error)
kvTuples(from, to uint64, dest []Tuple) ([]Tuple, error)
WalkRefs(nbf *NomsBinFormat, cb RefCallback) error
writeTo(nomsWriter, *NomsBinFormat) error
}

View File

@@ -103,6 +103,10 @@ func (ts testSequence) valuesSlice(from, to uint64) ([]Value, error) {
panic("not reached")
}
func (ts testSequence) kvTuples(from, to uint64, dest []Tuple) ([]Tuple, error) {
panic("not reached")
}
func (ts testSequence) Less(nbf *NomsBinFormat, other LesserValuable) (bool, error) {
panic("not reached")
}

View File

@@ -24,6 +24,7 @@ package types
import (
"bytes"
"context"
"errors"
"fmt"
"strings"
"sync"
@@ -204,11 +205,23 @@ type Tuple struct {
// readTuple reads the data provided by a decoder and moves the decoder forward.
func readTuple(nbf *NomsBinFormat, dec *valueDecoder) (Tuple, error) {
start := dec.pos()
k := dec.PeekKind()
if k == NullKind {
dec.skipKind()
return EmptyTuple(nbf), nil
}
if k != TupleKind {
return Tuple{}, errors.New("current value is not a tuple")
}
err := skipTuple(nbf, dec)
if err != nil {
return EmptyTuple(nbf), err
return Tuple{}, err
}
end := dec.pos()
return Tuple{valueImpl{dec.vrw, nbf, dec.byteSlice(start, end), nil}}, nil
}

View File

@@ -560,7 +560,7 @@ func (r *valueDecoder) readStruct(nbf *NomsBinFormat) (Value, error) {
return readStruct(nbf, r)
}
func (r *valueDecoder) readTuple(nbf *NomsBinFormat) (Value, error) {
func (r *valueDecoder) readTuple(nbf *NomsBinFormat) (Tuple, error) {
return readTuple(nbf, r)
}