mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-09 11:19:01 -05:00
[no-release-notes] text and json written as byte trees (#3690)
* persistable vars * [no-release-notes] text and json types written as byte trees * fmt * more tests, fix other failing tests * GMS bump * fix remaining tests * small fixes * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * bats failures * revert * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * tidy * update plans * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * more plan changes * delete todo * better docstring * zach comments Co-authored-by: max-hoffman <max-hoffman@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
e386f4f539
commit
91690bee18
@@ -44,6 +44,8 @@ const (
|
||||
EncodingSet Encoding = 20
|
||||
EncodingBytesAddr Encoding = 21
|
||||
EncodingCommitAddr Encoding = 22
|
||||
EncodingStringAddr Encoding = 23
|
||||
EncodingJSONAddr Encoding = 24
|
||||
EncodingString Encoding = 128
|
||||
EncodingBytes Encoding = 129
|
||||
EncodingDecimal Encoding = 130
|
||||
@@ -73,6 +75,8 @@ var EnumNamesEncoding = map[Encoding]string{
|
||||
EncodingSet: "Set",
|
||||
EncodingBytesAddr: "BytesAddr",
|
||||
EncodingCommitAddr: "CommitAddr",
|
||||
EncodingStringAddr: "StringAddr",
|
||||
EncodingJSONAddr: "JSONAddr",
|
||||
EncodingString: "String",
|
||||
EncodingBytes: "Bytes",
|
||||
EncodingDecimal: "Decimal",
|
||||
@@ -102,6 +106,8 @@ var EnumValuesEncoding = map[string]Encoding{
|
||||
"Set": EncodingSet,
|
||||
"BytesAddr": EncodingBytesAddr,
|
||||
"CommitAddr": EncodingCommitAddr,
|
||||
"StringAddr": EncodingStringAddr,
|
||||
"JSONAddr": EncodingJSONAddr,
|
||||
"String": EncodingString,
|
||||
"Bytes": EncodingBytes,
|
||||
"Decimal": EncodingDecimal,
|
||||
|
||||
@@ -110,6 +110,9 @@ func (t *Table) ValueReadWriter() types.ValueReadWriter {
|
||||
|
||||
// NodeStore returns the NodeStore for this table.
|
||||
func (t *Table) NodeStore() tree.NodeStore {
|
||||
if t == nil {
|
||||
return nil
|
||||
}
|
||||
return tree.NewNodeStore(shim.ChunkStoreFromVRW(t.ValueReadWriter()))
|
||||
}
|
||||
|
||||
|
||||
@@ -75,14 +75,14 @@ func EncodingFromSqlType(typ query.Type) serial.Encoding {
|
||||
return serial.EncodingString
|
||||
case query.Type_VARCHAR:
|
||||
return serial.EncodingString
|
||||
case query.Type_JSON:
|
||||
return serial.EncodingJSON
|
||||
case query.Type_GEOMETRY:
|
||||
return serial.EncodingGeometry
|
||||
case query.Type_JSON:
|
||||
return serial.EncodingJSONAddr
|
||||
case query.Type_BLOB:
|
||||
return serial.EncodingBytesAddr
|
||||
case query.Type_TEXT:
|
||||
return serial.EncodingString
|
||||
return serial.EncodingStringAddr
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown encoding %v", typ))
|
||||
}
|
||||
|
||||
@@ -256,7 +256,7 @@ func newProllyDiffIter(ctx *sql.Context, dp DiffPartition, ddb *doltdb.DoltDB, t
|
||||
}
|
||||
to := durable.ProllyMapFromIndex(t)
|
||||
|
||||
fromConverter, err := NewProllyRowConverter(fSch, targetFromSchema, ctx.Warn, nil)
|
||||
fromConverter, err := NewProllyRowConverter(fSch, targetFromSchema, ctx.Warn, dp.from.NodeStore())
|
||||
if err != nil {
|
||||
return prollyDiffIter{}, err
|
||||
}
|
||||
|
||||
@@ -100,10 +100,22 @@ func GetField(ctx context.Context, td val.TupleDesc, i int, tup val.Tuple, ns tr
|
||||
case val.Hash128Enc:
|
||||
v, ok = td.GetHash128(i, tup)
|
||||
case val.BytesAddrEnc:
|
||||
var b val.BytesAddr
|
||||
b, ok = td.GetBlob(i, tup)
|
||||
var h hash.Hash
|
||||
h, ok = td.GetBytesAddr(i, tup)
|
||||
if ok {
|
||||
v, err = tree.NewByteArray(b.Addr, ns).ToBytes(ctx)
|
||||
v, err = tree.NewByteArray(h, ns).ToBytes(ctx)
|
||||
}
|
||||
case val.JSONAddrEnc:
|
||||
var h hash.Hash
|
||||
h, ok = td.GetJSONAddr(i, tup)
|
||||
if ok {
|
||||
v, err = tree.NewJSONDoc(h, ns).ToJSONDocument(ctx)
|
||||
}
|
||||
case val.StringAddrEnc:
|
||||
var h hash.Hash
|
||||
h, ok = td.GetStringAddr(i, tup)
|
||||
if ok {
|
||||
v, err = tree.NewTextStorage(h, ns).ToString(ctx)
|
||||
}
|
||||
case val.CommitAddrEnc:
|
||||
v, ok = td.GetCommitAddr(i, tup)
|
||||
@@ -170,29 +182,37 @@ func PutField(ctx context.Context, ns tree.NodeStore, tb *val.TupleBuilder, i in
|
||||
v = []byte(s)
|
||||
}
|
||||
tb.PutByteString(i, v.([]byte))
|
||||
case val.Hash128Enc:
|
||||
tb.PutHash128(i, v.([]byte))
|
||||
case val.GeometryEnc:
|
||||
geo := serializeGeometry(v)
|
||||
if len(geo) > math.MaxUint16 {
|
||||
return ErrValueExceededMaxFieldSize
|
||||
}
|
||||
tb.PutGeometry(i, serializeGeometry(v))
|
||||
case val.JSONEnc:
|
||||
case val.JSONAddrEnc:
|
||||
buf, err := convJson(v)
|
||||
if len(buf) > math.MaxUint16 {
|
||||
return ErrValueExceededMaxFieldSize
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tb.PutJSON(i, buf)
|
||||
case val.Hash128Enc:
|
||||
tb.PutHash128(i, v.([]byte))
|
||||
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(buf))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tb.PutJSONAddr(i, h)
|
||||
case val.BytesAddrEnc:
|
||||
b, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(v.([]byte)))
|
||||
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(v.([]byte)))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tb.PutBytesAddr(i, b.Addr)
|
||||
tb.PutBytesAddr(i, h)
|
||||
case val.StringAddrEnc:
|
||||
//todo: v will be []byte after daylon's changes
|
||||
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader([]byte(v.(string))))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
tb.PutStringAddr(i, h)
|
||||
case val.CommitAddrEnc:
|
||||
tb.PutCommitAddr(i, v.(hash.Hash))
|
||||
default:
|
||||
@@ -282,12 +302,12 @@ func serializeGeometry(v interface{}) []byte {
|
||||
}
|
||||
}
|
||||
|
||||
func serializeBytesToAddr(ctx context.Context, ns tree.NodeStore, r io.Reader) (val.BytesAddr, error) {
|
||||
func serializeBytesToAddr(ctx context.Context, ns tree.NodeStore, r io.Reader) (hash.Hash, error) {
|
||||
tree, err := tree.NewImmutableTreeFromReader(ctx, r, ns, tree.DefaultFixedChunkLength)
|
||||
if err != nil {
|
||||
return val.BytesAddr{}, err
|
||||
return hash.Hash{}, err
|
||||
}
|
||||
return val.NewBytesAddr(tree.Addr), nil
|
||||
return tree.Addr, nil
|
||||
}
|
||||
|
||||
func convJson(v interface{}) (buf []byte, err error) {
|
||||
|
||||
@@ -113,6 +113,11 @@ func TestRoundTripProllyFields(t *testing.T) {
|
||||
typ: val.Type{Enc: val.StringEnc},
|
||||
value: "lorem ipsum",
|
||||
},
|
||||
{
|
||||
name: "string",
|
||||
typ: val.Type{Enc: val.StringAddrEnc},
|
||||
value: "lorem ipsum",
|
||||
},
|
||||
{
|
||||
name: "bytes",
|
||||
typ: val.Type{Enc: val.ByteStringEnc},
|
||||
@@ -145,7 +150,7 @@ func TestRoundTripProllyFields(t *testing.T) {
|
||||
},
|
||||
{
|
||||
name: "json",
|
||||
typ: val.Type{Enc: val.JSONEnc},
|
||||
typ: val.Type{Enc: val.JSONAddrEnc},
|
||||
value: mustParseJson(t, `{"a": 1, "b": false}`),
|
||||
},
|
||||
{
|
||||
|
||||
@@ -35,8 +35,10 @@ enum Encoding : uint8 {
|
||||
Datetime = 18,
|
||||
Enum = 19,
|
||||
Set = 20,
|
||||
BytesAddr = 21,
|
||||
CommitAddr = 22,
|
||||
BytesAddr = 21,
|
||||
CommitAddr = 22,
|
||||
StringAddr = 23,
|
||||
JSONAddr = 24,
|
||||
|
||||
// variable width
|
||||
String = 128,
|
||||
|
||||
@@ -17,9 +17,12 @@ package tree
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
@@ -152,6 +155,8 @@ func _newLeaf(ctx context.Context, ns NodeStore, s message.Serializer, buf []byt
|
||||
}, nil
|
||||
}
|
||||
|
||||
const bytePeekLength = 128
|
||||
|
||||
type ByteArray struct {
|
||||
ImmutableTree
|
||||
}
|
||||
@@ -161,27 +166,72 @@ func NewByteArray(addr hash.Hash, ns NodeStore) *ByteArray {
|
||||
}
|
||||
|
||||
func (b *ByteArray) ToBytes(ctx context.Context) ([]byte, error) {
|
||||
if b.buf == nil {
|
||||
err := b.load(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return b.buf[:], nil
|
||||
return b.bytes(ctx)
|
||||
}
|
||||
|
||||
func (b *ByteArray) ToString(ctx context.Context) (string, error) {
|
||||
if b.buf == nil {
|
||||
err := b.load(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
buf, err := b.bytes(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
toShow := 128
|
||||
if len(b.buf) < toShow {
|
||||
toShow = len(b.buf)
|
||||
toShow := bytePeekLength
|
||||
if len(buf) < toShow {
|
||||
toShow = len(buf)
|
||||
}
|
||||
return string(b.buf[:toShow]), nil
|
||||
return string(buf[:toShow]), nil
|
||||
}
|
||||
|
||||
type JSONDoc struct {
|
||||
ImmutableTree
|
||||
}
|
||||
|
||||
func NewJSONDoc(addr hash.Hash, ns NodeStore) *JSONDoc {
|
||||
return &JSONDoc{ImmutableTree{Addr: addr, ns: ns}}
|
||||
}
|
||||
|
||||
func (b *JSONDoc) ToJSONDocument(ctx context.Context) (sql.JSONDocument, error) {
|
||||
buf, err := b.bytes(ctx)
|
||||
if err != nil {
|
||||
return sql.JSONDocument{}, err
|
||||
}
|
||||
var doc sql.JSONDocument
|
||||
err = json.Unmarshal(buf, &doc.Val)
|
||||
if err != nil {
|
||||
return sql.JSONDocument{}, err
|
||||
}
|
||||
return doc, err
|
||||
}
|
||||
|
||||
func (b *JSONDoc) ToString(ctx context.Context) (string, error) {
|
||||
buf, err := b.bytes(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
toShow := bytePeekLength
|
||||
if len(buf) < toShow {
|
||||
toShow = len(buf)
|
||||
}
|
||||
return string(buf[:toShow]), nil
|
||||
}
|
||||
|
||||
type TextStorage struct {
|
||||
ImmutableTree
|
||||
}
|
||||
|
||||
func NewTextStorage(addr hash.Hash, ns NodeStore) *TextStorage {
|
||||
return &TextStorage{ImmutableTree{Addr: addr, ns: ns}}
|
||||
}
|
||||
|
||||
func (b *TextStorage) ToBytes(ctx context.Context) ([]byte, error) {
|
||||
return b.bytes(ctx)
|
||||
}
|
||||
|
||||
func (b *TextStorage) ToString(ctx context.Context) (string, error) {
|
||||
buf, err := b.bytes(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(buf), nil
|
||||
}
|
||||
|
||||
type ImmutableTree struct {
|
||||
@@ -220,6 +270,16 @@ func (t *ImmutableTree) load(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *ImmutableTree) bytes(ctx context.Context) ([]byte, error) {
|
||||
if t.buf == nil {
|
||||
err := t.load(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return t.buf[:], nil
|
||||
}
|
||||
|
||||
func (t *ImmutableTree) next() (Node, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
@@ -235,15 +235,14 @@ func randomField(tb *val.TupleBuilder, idx int, typ val.Type, ns NodeStore) {
|
||||
buf := make([]byte, 20)
|
||||
testRand.Read(buf)
|
||||
tb.PutCommitAddr(idx, hash.New(buf))
|
||||
case val.BytesAddrEnc:
|
||||
case val.BytesAddrEnc, val.StringAddrEnc, val.JSONAddrEnc:
|
||||
buf := make([]byte, (testRand.Int63()%40)+10)
|
||||
testRand.Read(buf)
|
||||
tree, err := NewImmutableTreeFromReader(context.Background(), bytes.NewReader(buf), ns, DefaultFixedChunkLength)
|
||||
if err != nil {
|
||||
panic("failed to write blob tree")
|
||||
panic("failed to write bytes tree")
|
||||
}
|
||||
tb.PutBytesAddr(idx, tree.Addr)
|
||||
|
||||
default:
|
||||
panic("unknown encoding")
|
||||
}
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
// Copyright 2022 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package val
|
||||
|
||||
import "github.com/dolthub/dolt/go/store/hash"
|
||||
|
||||
type BytesAddr struct {
|
||||
Addr hash.Hash
|
||||
}
|
||||
|
||||
func NewBytesAddr(addr hash.Hash) BytesAddr {
|
||||
return BytesAddr{Addr: addr}
|
||||
}
|
||||
@@ -87,6 +87,8 @@ const (
|
||||
SetEnc = Encoding(serial.EncodingSet)
|
||||
BytesAddrEnc = Encoding(serial.EncodingBytesAddr)
|
||||
CommitAddrEnc = Encoding(serial.EncodingCommitAddr)
|
||||
StringAddrEnc = Encoding(serial.EncodingStringAddr)
|
||||
JSONAddrEnc = Encoding(serial.EncodingJSONAddr)
|
||||
|
||||
sentinel Encoding = 127
|
||||
)
|
||||
@@ -564,12 +566,7 @@ func compareHash128(l, r []byte) int {
|
||||
return bytes.Compare(l, r)
|
||||
}
|
||||
|
||||
func compareBytesAddr(l, r hash.Hash) int {
|
||||
// TODO sort
|
||||
return l.Compare(r)
|
||||
}
|
||||
|
||||
func compareCommitAddr(l, r hash.Hash) int {
|
||||
func compareAddr(l, r hash.Hash) int {
|
||||
return l.Compare(r)
|
||||
}
|
||||
|
||||
|
||||
@@ -263,13 +263,6 @@ func (tb *TupleBuilder) PutHash128(i int, v []byte) {
|
||||
tb.pos += hash128Size
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutCommitAddr(i int, v hash.Hash) {
|
||||
tb.Desc.expectEncoding(i, CommitAddrEnc)
|
||||
tb.fields[i] = tb.buf[tb.pos : tb.pos+addrSize]
|
||||
writeAddr(tb.fields[i], v[:])
|
||||
tb.pos += addrSize
|
||||
}
|
||||
|
||||
// PutRaw writes a []byte to the ith field of the Tuple being built.
|
||||
func (tb *TupleBuilder) PutRaw(i int, buf []byte) {
|
||||
if buf == nil {
|
||||
@@ -283,9 +276,35 @@ func (tb *TupleBuilder) PutRaw(i int, buf []byte) {
|
||||
tb.pos += sz
|
||||
}
|
||||
|
||||
// PutBytesAddr writes an out of band []byte to the ith field of the Tuple being built.
|
||||
// PutCommitAddr writes a commit's address ref to the ith field
|
||||
// of the Tuple being built.
|
||||
func (tb *TupleBuilder) PutCommitAddr(i int, v hash.Hash) {
|
||||
tb.Desc.expectEncoding(i, CommitAddrEnc)
|
||||
tb.putAddr(i, v)
|
||||
}
|
||||
|
||||
// PutBytesAddr writes a blob's address ref to the ith field
|
||||
// of the Tuple being built.
|
||||
func (tb *TupleBuilder) PutBytesAddr(i int, v hash.Hash) {
|
||||
tb.Desc.expectEncoding(i, BytesAddrEnc)
|
||||
tb.putAddr(i, v)
|
||||
}
|
||||
|
||||
// PutStringAddr writes a string's address ref to the ith field
|
||||
// of the Tuple being built.
|
||||
func (tb *TupleBuilder) PutStringAddr(i int, v hash.Hash) {
|
||||
tb.Desc.expectEncoding(i, StringAddrEnc)
|
||||
tb.putAddr(i, v)
|
||||
}
|
||||
|
||||
// PutJSONAddr writes a JSON string's address ref to the ith field
|
||||
// of the Tuple being built.
|
||||
func (tb *TupleBuilder) PutJSONAddr(i int, v hash.Hash) {
|
||||
tb.Desc.expectEncoding(i, JSONAddrEnc)
|
||||
tb.putAddr(i, v)
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) putAddr(i int, v hash.Hash) {
|
||||
tb.fields[i] = tb.buf[tb.pos : tb.pos+addrSize]
|
||||
writeAddr(tb.fields[i], v[:])
|
||||
tb.pos += addrSize
|
||||
|
||||
@@ -113,9 +113,13 @@ func compare(typ Type, left, right []byte) int {
|
||||
case Hash128Enc:
|
||||
return compareHash128(readHash128(left), readHash128(right))
|
||||
case BytesAddrEnc:
|
||||
return compareBytesAddr(readAddr(left), readAddr(right))
|
||||
return compareAddr(readAddr(left), readAddr(right))
|
||||
case CommitAddrEnc:
|
||||
return compareCommitAddr(readAddr(left), readAddr(right))
|
||||
return compareAddr(readAddr(left), readAddr(right))
|
||||
case JSONAddrEnc:
|
||||
return compareAddr(readAddr(left), readAddr(right))
|
||||
case StringAddrEnc:
|
||||
return compareAddr(readAddr(left), readAddr(right))
|
||||
default:
|
||||
panic("unknown encoding")
|
||||
}
|
||||
|
||||
@@ -54,7 +54,8 @@ func NewTupleDescriptorWithComparator(cmp TupleComparator, types ...Type) (td Tu
|
||||
|
||||
var addrIdxs []int
|
||||
for i, t := range types {
|
||||
if t.Enc == BytesAddrEnc {
|
||||
switch t.Enc {
|
||||
case BytesAddrEnc, StringAddrEnc, JSONAddrEnc:
|
||||
addrIdxs = append(addrIdxs, i)
|
||||
}
|
||||
}
|
||||
@@ -408,23 +409,32 @@ func (td TupleDesc) GetHash128(i int, tup Tuple) (v []byte, ok bool) {
|
||||
return
|
||||
}
|
||||
|
||||
func (td TupleDesc) GetBlob(i int, tup Tuple) (BytesAddr, bool) {
|
||||
func (td TupleDesc) GetJSONAddr(i int, tup Tuple) (hash.Hash, bool) {
|
||||
td.expectEncoding(i, JSONAddrEnc)
|
||||
return td.getAddr(i, tup)
|
||||
}
|
||||
|
||||
func (td TupleDesc) GetStringAddr(i int, tup Tuple) (hash.Hash, bool) {
|
||||
td.expectEncoding(i, StringAddrEnc)
|
||||
return td.getAddr(i, tup)
|
||||
}
|
||||
|
||||
func (td TupleDesc) GetBytesAddr(i int, tup Tuple) (hash.Hash, bool) {
|
||||
td.expectEncoding(i, BytesAddrEnc)
|
||||
b := td.GetField(i, tup)
|
||||
if len(b) == 0 {
|
||||
return BytesAddr{}, false
|
||||
}
|
||||
return NewBytesAddr(hash.New(b)), true
|
||||
return td.getAddr(i, tup)
|
||||
}
|
||||
|
||||
func (td TupleDesc) GetCommitAddr(i int, tup Tuple) (v hash.Hash, ok bool) {
|
||||
td.expectEncoding(i, CommitAddrEnc)
|
||||
return td.getAddr(i, tup)
|
||||
}
|
||||
|
||||
func (td TupleDesc) getAddr(i int, tup Tuple) (hash.Hash, bool) {
|
||||
b := td.GetField(i, tup)
|
||||
if b != nil {
|
||||
v = hash.New(b)
|
||||
ok = true
|
||||
if b == nil {
|
||||
return hash.Hash{}, false
|
||||
}
|
||||
return
|
||||
return hash.New(b), true
|
||||
}
|
||||
|
||||
func (td TupleDesc) expectEncoding(i int, encodings ...Encoding) {
|
||||
|
||||
@@ -302,4 +302,4 @@ SQL
|
||||
[ "${#lines[@]}" -eq 3 ]
|
||||
[[ "${lines[1]}" =~ 'v1,int,YES,"",NULL,""' ]] || false
|
||||
[[ "${lines[2]}" =~ 'v2,int,YES,"",NULL,""' ]] || false
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user