Merge branch 'main' into james/ref-string-index

This commit is contained in:
James Cor
2022-11-21 16:17:43 -08:00
10 changed files with 703 additions and 356 deletions

View File

@@ -243,11 +243,13 @@ func translateStringField(ctx context.Context, ns tree.NodeStore, value types.St
case val.StringAddrEnc:
// note: previously, TEXT fields were serialized as types.String
rd := strings.NewReader(string(value))
t, err := tree.NewImmutableTreeFromReader(ctx, rd, ns, tree.DefaultFixedChunkLength)
bb := ns.BlobBuilder()
bb.Init(len(value))
_, addr, err := bb.Chunk(ctx, rd)
if err != nil {
return err
}
b.PutStringAddr(idx, t.Addr)
b.PutStringAddr(idx, addr)
default:
panic(fmt.Sprintf("unexpected encoding for string (%d)", typ.Enc))
@@ -310,11 +312,13 @@ func translateJSONField(ctx context.Context, ns tree.NodeStore, value types.JSON
}
buf := bytes.NewBuffer([]byte(s))
t, err := tree.NewImmutableTreeFromReader(ctx, buf, ns, tree.DefaultFixedChunkLength)
bb := ns.BlobBuilder()
bb.Init(len(s))
_, addr, err := bb.Chunk(ctx, buf)
if err != nil {
return err
}
b.PutJSONAddr(idx, t.Addr)
b.PutJSONAddr(idx, addr)
return nil
}
@@ -338,7 +342,9 @@ func translateBlobField(ctx context.Context, ns tree.NodeStore, value types.Blob
return err
}
t, err := tree.NewImmutableTreeFromReader(ctx, bytes.NewReader(buf), ns, tree.DefaultFixedChunkLength)
bb := ns.BlobBuilder()
bb.Init(int(value.Len()))
_, addr, err := bb.Chunk(ctx, bytes.NewReader(buf))
if err != nil {
return err
}
@@ -346,9 +352,9 @@ func translateBlobField(ctx context.Context, ns tree.NodeStore, value types.Blob
typ := b.Desc.Types[idx]
switch typ.Enc {
case val.BytesAddrEnc:
b.PutBytesAddr(idx, t.Addr)
b.PutBytesAddr(idx, addr)
case val.StringAddrEnc:
b.PutStringAddr(idx, t.Addr)
b.PutStringAddr(idx, addr)
}
return nil
}

View File

@@ -194,20 +194,20 @@ func PutField(ctx context.Context, ns tree.NodeStore, tb *val.TupleBuilder, i in
if err != nil {
return err
}
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(buf))
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(buf), len(buf))
if err != nil {
return err
}
tb.PutJSONAddr(i, h)
case val.BytesAddrEnc:
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(v.([]byte)))
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(v.([]byte)), len(v.([]byte)))
if err != nil {
return err
}
tb.PutBytesAddr(i, h)
case val.StringAddrEnc:
//todo: v will be []byte after daylon's changes
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader([]byte(v.(string))))
h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader([]byte(v.(string))), len(v.(string)))
if err != nil {
return err
}
@@ -305,12 +305,14 @@ func serializeGeometry(v interface{}) []byte {
}
}
func serializeBytesToAddr(ctx context.Context, ns tree.NodeStore, r io.Reader) (hash.Hash, error) {
tree, err := tree.NewImmutableTreeFromReader(ctx, r, ns, tree.DefaultFixedChunkLength)
func serializeBytesToAddr(ctx context.Context, ns tree.NodeStore, r io.Reader, dataSize int) (hash.Hash, error) {
bb := ns.BlobBuilder()
bb.Init(dataSize)
_, addr, err := bb.Chunk(ctx, r)
if err != nil {
return hash.Hash{}, err
}
return tree.Addr, nil
return addr, nil
}
func convJson(v interface{}) (buf []byte, err error) {

View File

@@ -0,0 +1,152 @@
opts:
seed: 0
tests:
- name: "sql"
repos:
- name: dolt
server:
port: 3308
args: [ "--port", "3308" ]
# - name: mysql # mysqld --port 3308 --local-infile=1 --socket=/tmp/mysqld2.sock
# external-server:
# name: test
# host: 127.0.0.1
# user: root
# password:
# port: 3309
tables:
- name: "1 blob"
rows: 1
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 10
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 20
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 40
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 50
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 60
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 80
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 100
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 200
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 400
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 600
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 800
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 1000
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 10000
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);
- name: "1 blob"
rows: 100000
schema: |
create table xy (
x int primary key,
y blob,
z varchar(30),
w varchar(30)
);

View File

@@ -0,0 +1,59 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tree
import (
"bytes"
"context"
"fmt"
"math"
"testing"
"github.com/dolthub/dolt/go/store/hash"
)
var result hash.Hash
func BenchmarkBlobBuilder(b *testing.B) {
var r hash.Hash
var err error
dataSizes := []int{1e3, 1e4, 1e5, 1e6}
for _, d := range dataSizes {
b.Run(fmt.Sprintf("datasize: %.0f", math.Log10(float64(d))), func(b *testing.B) {
ns := NewTestNodeStore()
bb := mustNewBlobBuilder(DefaultFixedChunkLength)
bb.SetNodeStore(ns)
buf := make([]byte, d)
for i := 0; i < d; i++ {
buf[i] = uint8(i)
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
// always record the result to prevent
// the compiler eliminating the function call.
bb.Init(d)
_, r, err = bb.Chunk(context.Background(), bytes.NewReader(buf))
if err != nil {
b.Fatal(err)
}
bb.Reset()
}
// always store the result to a package level variable
// so the compiler cannot eliminate the Benchmark itself.
result = r
})
}
}

View File

@@ -0,0 +1,364 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tree
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly/message"
)
const DefaultFixedChunkLength = 4000
var ErrInvalidChunkSize = errors.New("invalid chunkSize; value must be a multiple of 20")
func mustNewBlobBuilder(chunkSize int) *BlobBuilder {
b, _ := NewBlobBuilder(chunkSize)
return b
}
// NewBlobBuilder writes the contents of |reader| as an append-only
// tree, returning the root node or an error if applicable. |chunkSize|
// fixes the split size of leaf and intermediate node chunks.
func NewBlobBuilder(chunkSize int) (*BlobBuilder, error) {
if chunkSize%hash.ByteLen != 0 {
return nil, ErrInvalidChunkSize
}
keys := make([][]byte, chunkSize/hash.ByteLen)
for i := range keys {
keys[i] = zeroKey
}
return &BlobBuilder{
chunkSize: chunkSize,
keys: keys,
}, nil
}
type blobNodeWriter interface {
Write(ctx context.Context, r io.Reader) (hash.Hash, uint64, error)
}
type BlobBuilder struct {
ns NodeStore
S message.Serializer
chunkSize int
keys [][]byte
wr blobNodeWriter
lastN Node
topLevel int
levelCap int
buf []byte
vals [][]byte
subtrees []uint64
}
func (b *BlobBuilder) SetNodeStore(ns NodeStore) {
b.ns = ns
b.S = message.NewBlobSerializer(ns.Pool())
}
// Reset clears the BlobBuilder for re-use.
func (b *BlobBuilder) Reset() {
b.wr = nil
b.topLevel = 0
}
// Init calculates tree dimensions for a given blob.
func (b *BlobBuilder) Init(dataSize int) {
b.Reset()
if dataSize == 0 {
return
}
if dataSize <= b.chunkSize {
b.wr = &blobLeafWriter{
bb: b,
buf: make([]byte, dataSize),
}
return
}
b.wr = &blobLeafWriter{
bb: b,
buf: make([]byte, b.chunkSize),
}
numAddrs := b.chunkSize / hash.ByteLen
dataSize = dataSize / b.chunkSize
for dataSize > 0 {
dataSize = dataSize / numAddrs
b.topLevel += 1
}
// Allocate everything we need in batch, slice them up down below.
if b.levelCap < b.topLevel {
b.expand(numAddrs)
b.levelCap = b.topLevel
}
writers := make([]blobLevelWriter, b.topLevel)
for i, addrs := 0, 0; i < b.topLevel; i, addrs = i+1, addrs+numAddrs {
wr := &writers[i]
wr.bb = b
wr.child = b.wr
wr.buf = b.buf[addrs*hash.ByteLen : (addrs+numAddrs)*hash.ByteLen]
wr.vals = b.vals[addrs : addrs+numAddrs]
wr.subtrees = b.subtrees[addrs : addrs+numAddrs]
wr.level = i + 1
wr.sz = numAddrs
b.wr = wr
}
}
func (b *BlobBuilder) expand(numAddrs int) {
b.buf = make([]byte, b.topLevel*numAddrs*hash.ByteLen)
b.vals = make([][]byte, numAddrs*b.topLevel)
b.subtrees = make([]uint64, numAddrs*b.topLevel)
}
// Chunk builds the blob tree by passing the Reader to the chain of level
// writers, terminated in a leaf writer. The leaf writer reads chunks from the
// Reader and writes them, returning their hashes to its parent level writer.
// When the parent level writer fills up with addresses, it writes a chunk and
// returns that address to its parent. This continues until the Reader returns
// io.EOF, when every writer in the chain completes its chunk and we return the
// root node.
func (b *BlobBuilder) Chunk(ctx context.Context, r io.Reader) (Node, hash.Hash, error) {
if b.wr == nil {
return Node{}, hash.Hash{}, nil
}
h, _, err := b.wr.Write(ctx, r)
if err != nil && err != io.EOF {
return Node{}, hash.Hash{}, err
}
return b.lastN, h, nil
}
// blobLeafWriter writes leaf chunks of the blob, with max capacity len(buf),
// for every call to Write().
type blobLeafWriter struct {
bb *BlobBuilder
buf []byte
}
var zeroKey = []byte{0}
var zeroKeys = [][]byte{zeroKey}
var leafSubtrees = []uint64{1}
func (lw *blobLeafWriter) Write(ctx context.Context, r io.Reader) (hash.Hash, uint64, error) {
n, err := r.Read(lw.buf)
if err != nil {
return hash.Hash{}, 0, err
}
h, err := lw.bb.write(ctx, zeroKeys, [][]byte{lw.buf[:n]}, leafSubtrees, 0)
return h, 1, err
}
// blobLevelWriters writes internal chunks of a blob, using its |child| to
// write the level below it. On a call to |Write|, it repeatedly calls
// |child.Write|, accumulating addresses to its children, until it fills up or
// the Reader is exhausted. In either case, it then writes its node and
// returns.
type blobLevelWriter struct {
bb *BlobBuilder
child blobNodeWriter
buf []byte
vals [][]byte
subtrees []uint64
sz int
level int
}
func (lw *blobLevelWriter) Write(ctx context.Context, r io.Reader) (hash.Hash, uint64, error) {
i, off, totalCount := 0, 0, uint64(0)
for {
// Sketchy hack to elide a copy here...
//h := (*hash.Hash)(unsafe.Pointer(&lw.buf[off]))
//var n uint64
//var err error
h, n, err := lw.child.Write(ctx, r)
if err != nil && err != io.EOF {
return hash.Hash{}, 0, err
}
if n != 0 {
totalCount += n
copy(lw.buf[off:], h[:])
lw.subtrees[i] = n
lw.vals[i] = lw.buf[off : off+hash.ByteLen]
i += 1
off += hash.ByteLen
}
if i >= lw.sz || err == io.EOF {
h, nerr := lw.bb.write(ctx, lw.bb.keys[:i], lw.vals[:i], lw.subtrees[:i], lw.level)
if nerr != nil {
return hash.Hash{}, 0, nerr
}
return h, totalCount, err
}
}
}
// Write the blob node. Called by level and leaf writers. Will store lastN if
// the level corresponds to our root level.
func (b *BlobBuilder) write(ctx context.Context, keys, vals [][]byte, subtrees []uint64, level int) (hash.Hash, error) {
msg := b.S.Serialize(keys, vals, subtrees, level)
node, err := NodeFromBytes(msg)
if err != nil {
return hash.Hash{}, err
}
h, err := b.ns.Write(ctx, node)
if err != nil {
return hash.Hash{}, err
}
if level == b.topLevel {
b.lastN = node
}
return h, nil
}
const bytePeekLength = 128
type ByteArray struct {
ImmutableTree
}
func NewByteArray(addr hash.Hash, ns NodeStore) *ByteArray {
return &ByteArray{ImmutableTree{Addr: addr, ns: ns}}
}
func (b *ByteArray) ToBytes(ctx context.Context) ([]byte, error) {
return b.bytes(ctx)
}
func (b *ByteArray) ToString(ctx context.Context) (string, error) {
buf, err := b.bytes(ctx)
if err != nil {
return "", err
}
toShow := bytePeekLength
if len(buf) < toShow {
toShow = len(buf)
}
return string(buf[:toShow]), nil
}
type JSONDoc struct {
ImmutableTree
}
func NewJSONDoc(addr hash.Hash, ns NodeStore) *JSONDoc {
return &JSONDoc{ImmutableTree{Addr: addr, ns: ns}}
}
func (b *JSONDoc) ToJSONDocument(ctx context.Context) (sql.JSONDocument, error) {
buf, err := b.bytes(ctx)
if err != nil {
return sql.JSONDocument{}, err
}
var doc sql.JSONDocument
err = json.Unmarshal(buf, &doc.Val)
if err != nil {
return sql.JSONDocument{}, err
}
return doc, err
}
func (b *JSONDoc) ToString(ctx context.Context) (string, error) {
buf, err := b.bytes(ctx)
if err != nil {
return "", err
}
toShow := bytePeekLength
if len(buf) < toShow {
toShow = len(buf)
}
return string(buf[:toShow]), nil
}
type TextStorage struct {
ImmutableTree
}
func NewTextStorage(addr hash.Hash, ns NodeStore) *TextStorage {
return &TextStorage{ImmutableTree{Addr: addr, ns: ns}}
}
func (b *TextStorage) ToBytes(ctx context.Context) ([]byte, error) {
return b.bytes(ctx)
}
func (b *TextStorage) ToString(ctx context.Context) (string, error) {
buf, err := b.bytes(ctx)
if err != nil {
return "", err
}
return string(buf), nil
}
type ImmutableTree struct {
Addr hash.Hash
buf []byte
ns NodeStore
}
func (t *ImmutableTree) load(ctx context.Context) error {
if t.Addr.IsEmpty() {
t.buf = []byte{}
return nil
}
n, err := t.ns.Read(ctx, t.Addr)
if err != nil {
return err
}
return WalkNodes(ctx, n, t.ns, func(ctx context.Context, n Node) error {
if n.IsLeaf() {
t.buf = append(t.buf, n.GetValue(0)...)
}
return nil
})
}
func (t *ImmutableTree) bytes(ctx context.Context) ([]byte, error) {
if t.buf == nil {
err := t.load(ctx)
if err != nil {
return nil, err
}
}
return t.buf[:], nil
}
func (t *ImmutableTree) next() (Node, error) {
panic("not implemented")
}
func (t *ImmutableTree) close() error {
panic("not implemented")
}
func (t *ImmutableTree) Read(_ bytes.Buffer) (int, error) {
panic("not implemented")
}

View File

@@ -35,7 +35,8 @@ func TestWriteImmutableTree(t *testing.T) {
tests := []struct {
inputSize int
chunkSize int
err error
execErr error
initErr error
checkSum bool
}{
{
@@ -48,7 +49,7 @@ func TestWriteImmutableTree(t *testing.T) {
},
{
inputSize: 100,
chunkSize: 101,
chunkSize: 100,
},
{
inputSize: 255,
@@ -80,22 +81,22 @@ func TestWriteImmutableTree(t *testing.T) {
},
{
inputSize: 1_000,
chunkSize: 47,
chunkSize: 40,
checkSum: false,
},
{
inputSize: 1_000,
chunkSize: 53,
chunkSize: 60,
checkSum: false,
},
{
inputSize: 1_000,
chunkSize: 67,
chunkSize: 80,
checkSum: false,
},
{
inputSize: 10_000,
chunkSize: 89,
chunkSize: 100,
checkSum: false,
},
{
@@ -109,24 +110,13 @@ func TestWriteImmutableTree(t *testing.T) {
checkSum: false,
},
{
inputSize: 50_000_000,
chunkSize: 33_000,
err: ErrInvalidChunkSize,
inputSize: 0,
chunkSize: 40,
},
{
inputSize: 10,
chunkSize: 1,
err: ErrInvalidChunkSize,
},
{
inputSize: 10,
chunkSize: -1,
err: ErrInvalidChunkSize,
},
{
inputSize: 10,
chunkSize: 39,
err: ErrInvalidChunkSize,
inputSize: 100,
chunkSize: 41,
initErr: ErrInvalidChunkSize,
},
}
@@ -139,10 +129,19 @@ func TestWriteImmutableTree(t *testing.T) {
ctx := context.Background()
r := bytes.NewReader(buf)
ns := NewTestNodeStore()
serializer := message.NewBlobSerializer(ns.Pool())
root, err := buildImmutableTree(ctx, r, ns, serializer, tt.chunkSize)
if tt.err != nil {
require.True(t, errors.Is(err, tt.err))
//serializer := message.NewBlobSerializer(ns.Pool())
b, err := NewBlobBuilder(tt.chunkSize)
if tt.initErr != nil {
require.True(t, errors.Is(err, tt.initErr))
return
}
b.SetNodeStore(ns)
b.Init(tt.inputSize)
root, _, err := b.Chunk(ctx, r)
if tt.execErr != nil {
require.True(t, errors.Is(err, tt.execErr))
return
}
require.NoError(t, err)
@@ -158,6 +157,9 @@ func TestWriteImmutableTree(t *testing.T) {
sum := 0
byteCnt := 0
WalkNodes(ctx, root, ns, func(ctx context.Context, n Node) error {
if n.empty() {
return nil
}
var keyCnt int
leaf := n.IsLeaf()
if leaf {
@@ -242,7 +244,7 @@ func expectedSum(size int) int {
}
func expectedUnfilled(size, chunk int) int {
if size == chunk {
if size == chunk || size == 0 {
return 0
} else if size < chunk {
return 1
@@ -276,7 +278,7 @@ func TestImmutableTreeWalk(t *testing.T) {
}{
{
blobLen: 250,
chunkSize: 41,
chunkSize: 60,
keyCnt: 4,
},
{
@@ -286,7 +288,7 @@ func TestImmutableTreeWalk(t *testing.T) {
},
{
blobLen: 378,
chunkSize: 43,
chunkSize: 60,
keyCnt: 12,
},
{
@@ -299,11 +301,6 @@ func TestImmutableTreeWalk(t *testing.T) {
chunkSize: 40,
keyCnt: 6,
},
{
blobLen: 0,
chunkSize: 40,
keyCnt: 6,
},
{
blobLen: 50_000_000,
chunkSize: 4000,
@@ -311,7 +308,7 @@ func TestImmutableTreeWalk(t *testing.T) {
},
{
blobLen: 10_000,
chunkSize: 83,
chunkSize: 80,
keyCnt: 6,
},
}
@@ -362,8 +359,8 @@ func newTree(t *testing.T, ns NodeStore, keyCnt, blobLen, chunkSize int) Node {
keyBld.PutUint32(0, uint32(i))
tuples[i][0] = keyBld.Build(sharedPool)
b := mustNewBlob(ctx, ns, blobLen, chunkSize)
valBld.PutBytesAddr(0, b.Addr)
addr := mustNewBlob(ctx, ns, blobLen, chunkSize)
valBld.PutBytesAddr(0, addr)
tuples[i][1] = valBld.Build(sharedPool)
}
@@ -379,17 +376,23 @@ func newTree(t *testing.T, ns NodeStore, keyCnt, blobLen, chunkSize int) Node {
return root
}
func mustNewBlob(ctx context.Context, ns NodeStore, len, chunkSize int) *ImmutableTree {
func mustNewBlob(ctx context.Context, ns NodeStore, len, chunkSize int) hash.Hash {
buf := make([]byte, len)
for i := range buf {
buf[i] = byte(i)
}
r := bytes.NewReader(buf)
root, err := NewImmutableTreeFromReader(ctx, r, ns, chunkSize)
b, err := NewBlobBuilder(chunkSize)
if err != nil {
panic(err)
}
return root
b.SetNodeStore(ns)
b.Init(len)
_, addr, err := b.Chunk(ctx, r)
if err != nil {
panic(err)
}
return addr
}
func getBlobValues(msg serial.Message) []byte {

View File

@@ -1,299 +0,0 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package tree
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly/message"
)
const DefaultFixedChunkLength = 4000
var ErrInvalidChunkSize = errors.New("invalid chunkSize; value must be > 1")
// buildImmutableTree writes the contents of |reader| as an append-only
// tree, returning the root node or an error if applicable. |chunkSize|
// fixes the split size of leaf and intermediate node chunks.
func buildImmutableTree(ctx context.Context, r io.Reader, ns NodeStore, S message.Serializer, chunkSize int) (Node, error) {
if chunkSize < hash.ByteLen*2 || chunkSize > int(message.MaxVectorOffset)/2 {
// internal nodes must fit at least two 20-byte hashes
return Node{}, ErrInvalidChunkSize
}
var levels [][]novelNode
var levelCnts []int
var finalize bool
// We use lookahead to check whether the reader has
// more bytes. The reader will only EOF when reading
// zero bytes into the lookahead buffer, but we want
// to know at the beginning of a loop whether we are
// finished.
lookahead := make([]byte, chunkSize)
lookaheadN, err := r.Read(lookahead)
if err != nil {
return Node{}, err
}
buf := make([]byte, chunkSize)
for {
copy(buf, lookahead)
curN := lookaheadN
lookaheadN, err = r.Read(lookahead)
if err == io.EOF {
finalize = true
} else if err != nil {
return Node{}, err
}
novel, err := _newLeaf(ctx, ns, S, buf[:curN])
if err != nil {
return Node{}, err
}
i := 0
for {
// Three cases for building tree
// 1) reached new level => create new level
// 2) add novel node to current level
// 3) we didn't fill the current level => break
// 4) we filled current level, chunk and recurse into parent
//
// Two cases for finalizing tree
// 1) we haven't hit root, so we add the final chunk, finalize level, and continue upwards
// 2) we overshot root finalizing chunks, and we return the single root in the lower level
if i > len(levels)-1 {
levels = append(levels, make([]novelNode, chunkSize))
levelCnts = append(levelCnts, 0)
}
levels[i][levelCnts[i]] = novel
levelCnts[i]++
// note: the size of an internal node will be the key count times key length (hash)
if levelCnts[i]*hash.ByteLen < chunkSize {
// current level is not full
if !finalize {
// only continue and chunk this level if finalizing all in-progress nodes
break
}
}
nodes := levels[i][:levelCnts[i]]
if len(nodes) == 1 && i == len(levels)-1 {
// this is necessary and only possible if we're finalizing
// note: this is the only non-error return
return nodes[0].node, nil
}
// chunk the current level
novel, err = _newInternal(ctx, ns, S, nodes, i+1, chunkSize)
if err != nil {
return Node{}, err
}
levelCnts[i] = 0
i++
}
}
}
func _newInternal(ctx context.Context, ns NodeStore, s message.Serializer, nodes []novelNode, level int, chunkSize int) (novelNode, error) {
keys := make([][]byte, len(nodes))
vals := make([][]byte, len(nodes))
subtrees := make([]uint64, len(nodes))
treeCnt := uint64(0)
for i := range nodes {
keys[i] = []byte{0}
vals[i] = nodes[i].addr[:]
subtrees[i] = nodes[i].treeCount
treeCnt += nodes[i].treeCount
}
msg := s.Serialize(keys, vals, subtrees, level)
node, err := NodeFromBytes(msg)
if err != nil {
return novelNode{}, err
}
addr, err := ns.Write(ctx, node)
if err != nil {
return novelNode{}, err
}
return novelNode{
addr: addr,
node: node,
lastKey: []byte{0},
treeCount: treeCnt,
}, nil
}
func _newLeaf(ctx context.Context, ns NodeStore, s message.Serializer, buf []byte) (novelNode, error) {
msg := s.Serialize([][]byte{{0}}, [][]byte{buf}, []uint64{1}, 0)
node, err := NodeFromBytes(msg)
if err != nil {
return novelNode{}, err
}
addr, err := ns.Write(ctx, node)
if err != nil {
return novelNode{}, err
}
return novelNode{
addr: addr,
node: node,
lastKey: []byte{0},
treeCount: 1,
}, nil
}
const bytePeekLength = 128
type ByteArray struct {
ImmutableTree
}
func NewByteArray(addr hash.Hash, ns NodeStore) *ByteArray {
return &ByteArray{ImmutableTree{Addr: addr, ns: ns}}
}
func (b *ByteArray) ToBytes(ctx context.Context) ([]byte, error) {
return b.bytes(ctx)
}
func (b *ByteArray) ToString(ctx context.Context) (string, error) {
buf, err := b.bytes(ctx)
if err != nil {
return "", err
}
toShow := bytePeekLength
if len(buf) < toShow {
toShow = len(buf)
}
return string(buf[:toShow]), nil
}
type JSONDoc struct {
ImmutableTree
}
func NewJSONDoc(addr hash.Hash, ns NodeStore) *JSONDoc {
return &JSONDoc{ImmutableTree{Addr: addr, ns: ns}}
}
func (b *JSONDoc) ToJSONDocument(ctx context.Context) (sql.JSONDocument, error) {
buf, err := b.bytes(ctx)
if err != nil {
return sql.JSONDocument{}, err
}
var doc sql.JSONDocument
err = json.Unmarshal(buf, &doc.Val)
if err != nil {
return sql.JSONDocument{}, err
}
return doc, err
}
func (b *JSONDoc) ToString(ctx context.Context) (string, error) {
buf, err := b.bytes(ctx)
if err != nil {
return "", err
}
toShow := bytePeekLength
if len(buf) < toShow {
toShow = len(buf)
}
return string(buf[:toShow]), nil
}
type TextStorage struct {
ImmutableTree
}
func NewTextStorage(addr hash.Hash, ns NodeStore) *TextStorage {
return &TextStorage{ImmutableTree{Addr: addr, ns: ns}}
}
func (b *TextStorage) ToBytes(ctx context.Context) ([]byte, error) {
return b.bytes(ctx)
}
func (b *TextStorage) ToString(ctx context.Context) (string, error) {
buf, err := b.bytes(ctx)
if err != nil {
return "", err
}
return string(buf), nil
}
type ImmutableTree struct {
Addr hash.Hash
buf []byte
ns NodeStore
}
func NewImmutableTreeFromReader(ctx context.Context, r io.Reader, ns NodeStore, chunkSize int) (*ImmutableTree, error) {
s := message.NewBlobSerializer(ns.Pool())
root, err := buildImmutableTree(ctx, r, ns, s, chunkSize)
if errors.Is(err, io.EOF) {
return &ImmutableTree{Addr: hash.Hash{}}, nil
} else if err != nil {
return nil, err
}
return &ImmutableTree{Addr: root.HashOf()}, nil
}
func (t *ImmutableTree) load(ctx context.Context) error {
if t.Addr.IsEmpty() {
t.buf = []byte{}
return nil
}
n, err := t.ns.Read(ctx, t.Addr)
if err != nil {
return err
}
return WalkNodes(ctx, n, t.ns, func(ctx context.Context, n Node) error {
if n.IsLeaf() {
t.buf = append(t.buf, n.GetValue(0)...)
}
return nil
})
}
func (t *ImmutableTree) bytes(ctx context.Context) ([]byte, error) {
if t.buf == nil {
err := t.load(ctx)
if err != nil {
return nil, err
}
}
return t.buf[:], nil
}
func (t *ImmutableTree) next() (Node, error) {
panic("not implemented")
}
func (t *ImmutableTree) close() error {
panic("not implemented")
}
func (t *ImmutableTree) Read(buf bytes.Buffer) (int, error) {
panic("not implemented")
}

View File

@@ -113,6 +113,33 @@ func walkOpaqueNodes(ctx context.Context, nd Node, ns NodeStore, cb NodeCb) erro
})
}
type nodeArena []Node
const nodeArenaSize = 10000
func (a *nodeArena) Get() Node {
if len(*a) == 0 {
*a = make([]Node, nodeArenaSize)
}
n := (*a)[len(*a)-1]
*a = (*a)[:len(*a)-1]
return n
}
func (a *nodeArena) NodeFromBytes(msg []byte) (Node, error) {
keys, values, level, count, err := message.UnpackFields(msg)
if err != nil {
return Node{}, err
}
n := a.Get()
n.keys = keys
n.values = values
n.count = count
n.level = level
n.msg = msg
return n, nil
}
func NodeFromBytes(msg []byte) (Node, error) {
keys, values, level, count, err := message.UnpackFields(msg)
return Node{

View File

@@ -44,12 +44,15 @@ type NodeStore interface {
// Format returns the types.NomsBinFormat of this NodeStore.
Format() *types.NomsBinFormat
BlobBuilder() *BlobBuilder
}
type nodeStore struct {
store chunks.ChunkStore
cache nodeCache
bp pool.BuffPool
bbp *sync.Pool
}
var _ NodeStore = nodeStore{}
@@ -58,12 +61,19 @@ var sharedCache = newChunkCache(cacheSize)
var sharedPool = pool.NewBuffPool()
var blobBuilderPool = sync.Pool{
New: func() any {
return mustNewBlobBuilder(DefaultFixedChunkLength)
},
}
// NewNodeStore makes a new NodeStore.
func NewNodeStore(cs chunks.ChunkStore) NodeStore {
return nodeStore{
store: cs,
cache: sharedCache,
bp: sharedPool,
bbp: &blobBuilderPool,
}
}
@@ -149,6 +159,15 @@ func (ns nodeStore) Pool() pool.BuffPool {
return ns.bp
}
// BlobBuilder implements NodeStore.
func (ns nodeStore) BlobBuilder() *BlobBuilder {
bb := ns.bbp.Get().(*BlobBuilder)
if bb.ns == nil {
bb.SetNodeStore(ns)
}
return bb
}
func (ns nodeStore) Format() *types.NomsBinFormat {
nbf, err := types.GetFormatForVersionString(ns.store.Version())
if err != nil {

View File

@@ -21,6 +21,7 @@ import (
"math"
"math/rand"
"sort"
"sync"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
@@ -240,13 +241,16 @@ func randomField(tb *val.TupleBuilder, idx int, typ val.Type, ns NodeStore) {
testRand.Read(buf)
tb.PutCommitAddr(idx, hash.New(buf))
case val.BytesAddrEnc, val.StringAddrEnc, val.JSONAddrEnc:
buf := make([]byte, (testRand.Int63()%40)+10)
len := (testRand.Int63() % 40) + 10
buf := make([]byte, len)
testRand.Read(buf)
tree, err := NewImmutableTreeFromReader(context.Background(), bytes.NewReader(buf), ns, DefaultFixedChunkLength)
bb := ns.BlobBuilder()
bb.Init(int(len))
_, addr, err := bb.Chunk(context.Background(), bytes.NewReader(buf))
if err != nil {
panic("failed to write bytes tree")
}
tb.PutBytesAddr(idx, tree.Addr)
tb.PutBytesAddr(idx, addr)
default:
panic("unknown encoding")
}
@@ -255,11 +259,13 @@ func randomField(tb *val.TupleBuilder, idx int, typ val.Type, ns NodeStore) {
func NewTestNodeStore() NodeStore {
ts := &chunks.TestStorage{}
ns := NewNodeStore(ts.NewViewWithFormat(types.Format_DOLT.VersionString()))
return nodeStoreValidator{ns: ns}
bb := &blobBuilderPool
return nodeStoreValidator{ns: ns, bb: bb}
}
type nodeStoreValidator struct {
ns NodeStore
bb *sync.Pool
}
func (v nodeStoreValidator) Read(ctx context.Context, ref hash.Hash) (Node, error) {
@@ -309,6 +315,14 @@ func (v nodeStoreValidator) Pool() pool.BuffPool {
return v.ns.Pool()
}
func (v nodeStoreValidator) BlobBuilder() *BlobBuilder {
bb := v.bb.Get().(*BlobBuilder)
if bb.ns == nil {
bb.SetNodeStore(v)
}
return bb
}
func (v nodeStoreValidator) Format() *types.NomsBinFormat {
return v.ns.Format()
}