From fc46082dcf9e574db5cc106d3b23d02b69676271 Mon Sep 17 00:00:00 2001 From: Maximilian Hoffman Date: Mon, 21 Nov 2022 15:46:17 -0800 Subject: [PATCH] Blob chunker rewrite (#4825) * Fix mising projection on indexed table * add test * rewrite * cleanup * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * blob test * Fix tests * node arena * delete stale * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * docs * more cleanup * add newline * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * aaron's rewrite * benchmarks * [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh * concurrency safe blob builder * fmt * pointer to pool * test nodeStore edits Co-authored-by: max-hoffman --- go/libraries/doltcore/migrate/tuples.go | 20 +- .../doltcore/sqle/index/prolly_fields.go | 14 +- .../import_benchmarker/testdata/blob.yaml | 152 ++++++++ go/store/prolly/tree/blob_bench_test.go | 59 +++ go/store/prolly/tree/blob_builder.go | 364 ++++++++++++++++++ ...able_tree_test.go => blob_builder_test.go} | 83 ++-- go/store/prolly/tree/immutable_tree.go | 299 -------------- go/store/prolly/tree/node.go | 27 ++ go/store/prolly/tree/node_store.go | 19 + go/store/prolly/tree/testutils.go | 22 +- 10 files changed, 703 insertions(+), 356 deletions(-) create mode 100644 go/performance/import_benchmarker/testdata/blob.yaml create mode 100644 go/store/prolly/tree/blob_bench_test.go create mode 100644 go/store/prolly/tree/blob_builder.go rename go/store/prolly/tree/{immutable_tree_test.go => blob_builder_test.go} (89%) delete mode 100644 go/store/prolly/tree/immutable_tree.go diff --git a/go/libraries/doltcore/migrate/tuples.go b/go/libraries/doltcore/migrate/tuples.go index 16b305257e..87f633b744 100644 --- a/go/libraries/doltcore/migrate/tuples.go +++ b/go/libraries/doltcore/migrate/tuples.go @@ -243,11 +243,13 @@ func translateStringField(ctx context.Context, ns tree.NodeStore, value types.St case val.StringAddrEnc: // note: previously, TEXT fields were serialized as types.String rd := strings.NewReader(string(value)) - t, err := tree.NewImmutableTreeFromReader(ctx, rd, ns, tree.DefaultFixedChunkLength) + bb := ns.BlobBuilder() + bb.Init(len(value)) + _, addr, err := bb.Chunk(ctx, rd) if err != nil { return err } - b.PutStringAddr(idx, t.Addr) + b.PutStringAddr(idx, addr) default: panic(fmt.Sprintf("unexpected encoding for string (%d)", typ.Enc)) @@ -310,11 +312,13 @@ func translateJSONField(ctx context.Context, ns tree.NodeStore, value types.JSON } buf := bytes.NewBuffer([]byte(s)) - t, err := tree.NewImmutableTreeFromReader(ctx, buf, ns, tree.DefaultFixedChunkLength) + bb := ns.BlobBuilder() + bb.Init(len(s)) + _, addr, err := bb.Chunk(ctx, buf) if err != nil { return err } - b.PutJSONAddr(idx, t.Addr) + b.PutJSONAddr(idx, addr) return nil } @@ -338,7 +342,9 @@ func translateBlobField(ctx context.Context, ns tree.NodeStore, value types.Blob return err } - t, err := tree.NewImmutableTreeFromReader(ctx, bytes.NewReader(buf), ns, tree.DefaultFixedChunkLength) + bb := ns.BlobBuilder() + bb.Init(int(value.Len())) + _, addr, err := bb.Chunk(ctx, bytes.NewReader(buf)) if err != nil { return err } @@ -346,9 +352,9 @@ func translateBlobField(ctx context.Context, ns tree.NodeStore, value types.Blob typ := b.Desc.Types[idx] switch typ.Enc { case val.BytesAddrEnc: - b.PutBytesAddr(idx, t.Addr) + b.PutBytesAddr(idx, addr) case val.StringAddrEnc: - b.PutStringAddr(idx, t.Addr) + b.PutStringAddr(idx, addr) } return nil } diff --git a/go/libraries/doltcore/sqle/index/prolly_fields.go b/go/libraries/doltcore/sqle/index/prolly_fields.go index 63a0bead3f..6decc3ad80 100644 --- a/go/libraries/doltcore/sqle/index/prolly_fields.go +++ b/go/libraries/doltcore/sqle/index/prolly_fields.go @@ -194,20 +194,20 @@ func PutField(ctx context.Context, ns tree.NodeStore, tb *val.TupleBuilder, i in if err != nil { return err } - h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(buf)) + h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(buf), len(buf)) if err != nil { return err } tb.PutJSONAddr(i, h) case val.BytesAddrEnc: - h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(v.([]byte))) + h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader(v.([]byte)), len(v.([]byte))) if err != nil { return err } tb.PutBytesAddr(i, h) case val.StringAddrEnc: //todo: v will be []byte after daylon's changes - h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader([]byte(v.(string)))) + h, err := serializeBytesToAddr(ctx, ns, bytes.NewReader([]byte(v.(string))), len(v.(string))) if err != nil { return err } @@ -305,12 +305,14 @@ func serializeGeometry(v interface{}) []byte { } } -func serializeBytesToAddr(ctx context.Context, ns tree.NodeStore, r io.Reader) (hash.Hash, error) { - tree, err := tree.NewImmutableTreeFromReader(ctx, r, ns, tree.DefaultFixedChunkLength) +func serializeBytesToAddr(ctx context.Context, ns tree.NodeStore, r io.Reader, dataSize int) (hash.Hash, error) { + bb := ns.BlobBuilder() + bb.Init(dataSize) + _, addr, err := bb.Chunk(ctx, r) if err != nil { return hash.Hash{}, err } - return tree.Addr, nil + return addr, nil } func convJson(v interface{}) (buf []byte, err error) { diff --git a/go/performance/import_benchmarker/testdata/blob.yaml b/go/performance/import_benchmarker/testdata/blob.yaml new file mode 100644 index 0000000000..ad55a9d48b --- /dev/null +++ b/go/performance/import_benchmarker/testdata/blob.yaml @@ -0,0 +1,152 @@ +opts: + seed: 0 +tests: +- name: "sql" + repos: + - name: dolt + server: + port: 3308 + args: [ "--port", "3308" ] +# - name: mysql # mysqld --port 3308 --local-infile=1 --socket=/tmp/mysqld2.sock +# external-server: +# name: test +# host: 127.0.0.1 +# user: root +# password: +# port: 3309 + tables: + - name: "1 blob" + rows: 1 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 10 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 20 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 40 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 50 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 60 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 80 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 100 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 200 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 400 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 600 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 800 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 1000 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 10000 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); + - name: "1 blob" + rows: 100000 + schema: | + create table xy ( + x int primary key, + y blob, + z varchar(30), + w varchar(30) + ); diff --git a/go/store/prolly/tree/blob_bench_test.go b/go/store/prolly/tree/blob_bench_test.go new file mode 100644 index 0000000000..6037d9e958 --- /dev/null +++ b/go/store/prolly/tree/blob_bench_test.go @@ -0,0 +1,59 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tree + +import ( + "bytes" + "context" + "fmt" + "math" + "testing" + + "github.com/dolthub/dolt/go/store/hash" +) + +var result hash.Hash + +func BenchmarkBlobBuilder(b *testing.B) { + var r hash.Hash + var err error + dataSizes := []int{1e3, 1e4, 1e5, 1e6} + for _, d := range dataSizes { + b.Run(fmt.Sprintf("datasize: %.0f", math.Log10(float64(d))), func(b *testing.B) { + ns := NewTestNodeStore() + bb := mustNewBlobBuilder(DefaultFixedChunkLength) + bb.SetNodeStore(ns) + buf := make([]byte, d) + for i := 0; i < d; i++ { + buf[i] = uint8(i) + } + + b.ResetTimer() + for n := 0; n < b.N; n++ { + // always record the result to prevent + // the compiler eliminating the function call. + bb.Init(d) + _, r, err = bb.Chunk(context.Background(), bytes.NewReader(buf)) + if err != nil { + b.Fatal(err) + } + bb.Reset() + } + // always store the result to a package level variable + // so the compiler cannot eliminate the Benchmark itself. + result = r + }) + } +} diff --git a/go/store/prolly/tree/blob_builder.go b/go/store/prolly/tree/blob_builder.go new file mode 100644 index 0000000000..21ed38fb6a --- /dev/null +++ b/go/store/prolly/tree/blob_builder.go @@ -0,0 +1,364 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tree + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + + "github.com/dolthub/go-mysql-server/sql" + + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/prolly/message" +) + +const DefaultFixedChunkLength = 4000 + +var ErrInvalidChunkSize = errors.New("invalid chunkSize; value must be a multiple of 20") + +func mustNewBlobBuilder(chunkSize int) *BlobBuilder { + b, _ := NewBlobBuilder(chunkSize) + return b +} + +// NewBlobBuilder writes the contents of |reader| as an append-only +// tree, returning the root node or an error if applicable. |chunkSize| +// fixes the split size of leaf and intermediate node chunks. +func NewBlobBuilder(chunkSize int) (*BlobBuilder, error) { + if chunkSize%hash.ByteLen != 0 { + return nil, ErrInvalidChunkSize + } + + keys := make([][]byte, chunkSize/hash.ByteLen) + for i := range keys { + keys[i] = zeroKey + } + return &BlobBuilder{ + chunkSize: chunkSize, + keys: keys, + }, nil +} + +type blobNodeWriter interface { + Write(ctx context.Context, r io.Reader) (hash.Hash, uint64, error) +} + +type BlobBuilder struct { + ns NodeStore + S message.Serializer + chunkSize int + keys [][]byte + wr blobNodeWriter + lastN Node + topLevel int + + levelCap int + buf []byte + vals [][]byte + subtrees []uint64 +} + +func (b *BlobBuilder) SetNodeStore(ns NodeStore) { + b.ns = ns + b.S = message.NewBlobSerializer(ns.Pool()) +} + +// Reset clears the BlobBuilder for re-use. +func (b *BlobBuilder) Reset() { + b.wr = nil + b.topLevel = 0 +} + +// Init calculates tree dimensions for a given blob. +func (b *BlobBuilder) Init(dataSize int) { + b.Reset() + + if dataSize == 0 { + return + } + + if dataSize <= b.chunkSize { + b.wr = &blobLeafWriter{ + bb: b, + buf: make([]byte, dataSize), + } + return + } + + b.wr = &blobLeafWriter{ + bb: b, + buf: make([]byte, b.chunkSize), + } + + numAddrs := b.chunkSize / hash.ByteLen + dataSize = dataSize / b.chunkSize + for dataSize > 0 { + dataSize = dataSize / numAddrs + b.topLevel += 1 + } + + // Allocate everything we need in batch, slice them up down below. + if b.levelCap < b.topLevel { + b.expand(numAddrs) + b.levelCap = b.topLevel + } + + writers := make([]blobLevelWriter, b.topLevel) + for i, addrs := 0, 0; i < b.topLevel; i, addrs = i+1, addrs+numAddrs { + wr := &writers[i] + wr.bb = b + wr.child = b.wr + wr.buf = b.buf[addrs*hash.ByteLen : (addrs+numAddrs)*hash.ByteLen] + wr.vals = b.vals[addrs : addrs+numAddrs] + wr.subtrees = b.subtrees[addrs : addrs+numAddrs] + wr.level = i + 1 + wr.sz = numAddrs + b.wr = wr + } +} + +func (b *BlobBuilder) expand(numAddrs int) { + b.buf = make([]byte, b.topLevel*numAddrs*hash.ByteLen) + b.vals = make([][]byte, numAddrs*b.topLevel) + b.subtrees = make([]uint64, numAddrs*b.topLevel) +} + +// Chunk builds the blob tree by passing the Reader to the chain of level +// writers, terminated in a leaf writer. The leaf writer reads chunks from the +// Reader and writes them, returning their hashes to its parent level writer. +// When the parent level writer fills up with addresses, it writes a chunk and +// returns that address to its parent. This continues until the Reader returns +// io.EOF, when every writer in the chain completes its chunk and we return the +// root node. +func (b *BlobBuilder) Chunk(ctx context.Context, r io.Reader) (Node, hash.Hash, error) { + if b.wr == nil { + return Node{}, hash.Hash{}, nil + } + h, _, err := b.wr.Write(ctx, r) + if err != nil && err != io.EOF { + return Node{}, hash.Hash{}, err + } + return b.lastN, h, nil +} + +// blobLeafWriter writes leaf chunks of the blob, with max capacity len(buf), +// for every call to Write(). +type blobLeafWriter struct { + bb *BlobBuilder + buf []byte +} + +var zeroKey = []byte{0} +var zeroKeys = [][]byte{zeroKey} +var leafSubtrees = []uint64{1} + +func (lw *blobLeafWriter) Write(ctx context.Context, r io.Reader) (hash.Hash, uint64, error) { + n, err := r.Read(lw.buf) + if err != nil { + return hash.Hash{}, 0, err + } + h, err := lw.bb.write(ctx, zeroKeys, [][]byte{lw.buf[:n]}, leafSubtrees, 0) + return h, 1, err +} + +// blobLevelWriters writes internal chunks of a blob, using its |child| to +// write the level below it. On a call to |Write|, it repeatedly calls +// |child.Write|, accumulating addresses to its children, until it fills up or +// the Reader is exhausted. In either case, it then writes its node and +// returns. +type blobLevelWriter struct { + bb *BlobBuilder + child blobNodeWriter + buf []byte + vals [][]byte + subtrees []uint64 + sz int + level int +} + +func (lw *blobLevelWriter) Write(ctx context.Context, r io.Reader) (hash.Hash, uint64, error) { + i, off, totalCount := 0, 0, uint64(0) + for { + // Sketchy hack to elide a copy here... + //h := (*hash.Hash)(unsafe.Pointer(&lw.buf[off])) + //var n uint64 + //var err error + h, n, err := lw.child.Write(ctx, r) + if err != nil && err != io.EOF { + return hash.Hash{}, 0, err + } + if n != 0 { + totalCount += n + copy(lw.buf[off:], h[:]) + lw.subtrees[i] = n + lw.vals[i] = lw.buf[off : off+hash.ByteLen] + i += 1 + off += hash.ByteLen + } + if i >= lw.sz || err == io.EOF { + h, nerr := lw.bb.write(ctx, lw.bb.keys[:i], lw.vals[:i], lw.subtrees[:i], lw.level) + if nerr != nil { + return hash.Hash{}, 0, nerr + } + return h, totalCount, err + } + } +} + +// Write the blob node. Called by level and leaf writers. Will store lastN if +// the level corresponds to our root level. +func (b *BlobBuilder) write(ctx context.Context, keys, vals [][]byte, subtrees []uint64, level int) (hash.Hash, error) { + msg := b.S.Serialize(keys, vals, subtrees, level) + node, err := NodeFromBytes(msg) + if err != nil { + return hash.Hash{}, err + } + h, err := b.ns.Write(ctx, node) + if err != nil { + return hash.Hash{}, err + } + if level == b.topLevel { + b.lastN = node + } + return h, nil +} + +const bytePeekLength = 128 + +type ByteArray struct { + ImmutableTree +} + +func NewByteArray(addr hash.Hash, ns NodeStore) *ByteArray { + return &ByteArray{ImmutableTree{Addr: addr, ns: ns}} +} + +func (b *ByteArray) ToBytes(ctx context.Context) ([]byte, error) { + return b.bytes(ctx) +} + +func (b *ByteArray) ToString(ctx context.Context) (string, error) { + buf, err := b.bytes(ctx) + if err != nil { + return "", err + } + toShow := bytePeekLength + if len(buf) < toShow { + toShow = len(buf) + } + return string(buf[:toShow]), nil +} + +type JSONDoc struct { + ImmutableTree +} + +func NewJSONDoc(addr hash.Hash, ns NodeStore) *JSONDoc { + return &JSONDoc{ImmutableTree{Addr: addr, ns: ns}} +} + +func (b *JSONDoc) ToJSONDocument(ctx context.Context) (sql.JSONDocument, error) { + buf, err := b.bytes(ctx) + if err != nil { + return sql.JSONDocument{}, err + } + var doc sql.JSONDocument + err = json.Unmarshal(buf, &doc.Val) + if err != nil { + return sql.JSONDocument{}, err + } + return doc, err +} + +func (b *JSONDoc) ToString(ctx context.Context) (string, error) { + buf, err := b.bytes(ctx) + if err != nil { + return "", err + } + toShow := bytePeekLength + if len(buf) < toShow { + toShow = len(buf) + } + return string(buf[:toShow]), nil +} + +type TextStorage struct { + ImmutableTree +} + +func NewTextStorage(addr hash.Hash, ns NodeStore) *TextStorage { + return &TextStorage{ImmutableTree{Addr: addr, ns: ns}} +} + +func (b *TextStorage) ToBytes(ctx context.Context) ([]byte, error) { + return b.bytes(ctx) +} + +func (b *TextStorage) ToString(ctx context.Context) (string, error) { + buf, err := b.bytes(ctx) + if err != nil { + return "", err + } + return string(buf), nil +} + +type ImmutableTree struct { + Addr hash.Hash + buf []byte + ns NodeStore +} + +func (t *ImmutableTree) load(ctx context.Context) error { + if t.Addr.IsEmpty() { + t.buf = []byte{} + return nil + } + n, err := t.ns.Read(ctx, t.Addr) + if err != nil { + return err + } + + return WalkNodes(ctx, n, t.ns, func(ctx context.Context, n Node) error { + if n.IsLeaf() { + t.buf = append(t.buf, n.GetValue(0)...) + } + return nil + }) +} + +func (t *ImmutableTree) bytes(ctx context.Context) ([]byte, error) { + if t.buf == nil { + err := t.load(ctx) + if err != nil { + return nil, err + } + } + return t.buf[:], nil +} + +func (t *ImmutableTree) next() (Node, error) { + panic("not implemented") +} + +func (t *ImmutableTree) close() error { + panic("not implemented") +} + +func (t *ImmutableTree) Read(_ bytes.Buffer) (int, error) { + panic("not implemented") +} diff --git a/go/store/prolly/tree/immutable_tree_test.go b/go/store/prolly/tree/blob_builder_test.go similarity index 89% rename from go/store/prolly/tree/immutable_tree_test.go rename to go/store/prolly/tree/blob_builder_test.go index 9a829c3a6b..dc3b749fae 100644 --- a/go/store/prolly/tree/immutable_tree_test.go +++ b/go/store/prolly/tree/blob_builder_test.go @@ -35,7 +35,8 @@ func TestWriteImmutableTree(t *testing.T) { tests := []struct { inputSize int chunkSize int - err error + execErr error + initErr error checkSum bool }{ { @@ -48,7 +49,7 @@ func TestWriteImmutableTree(t *testing.T) { }, { inputSize: 100, - chunkSize: 101, + chunkSize: 100, }, { inputSize: 255, @@ -80,22 +81,22 @@ func TestWriteImmutableTree(t *testing.T) { }, { inputSize: 1_000, - chunkSize: 47, + chunkSize: 40, checkSum: false, }, { inputSize: 1_000, - chunkSize: 53, + chunkSize: 60, checkSum: false, }, { inputSize: 1_000, - chunkSize: 67, + chunkSize: 80, checkSum: false, }, { inputSize: 10_000, - chunkSize: 89, + chunkSize: 100, checkSum: false, }, { @@ -109,24 +110,13 @@ func TestWriteImmutableTree(t *testing.T) { checkSum: false, }, { - inputSize: 50_000_000, - chunkSize: 33_000, - err: ErrInvalidChunkSize, + inputSize: 0, + chunkSize: 40, }, { - inputSize: 10, - chunkSize: 1, - err: ErrInvalidChunkSize, - }, - { - inputSize: 10, - chunkSize: -1, - err: ErrInvalidChunkSize, - }, - { - inputSize: 10, - chunkSize: 39, - err: ErrInvalidChunkSize, + inputSize: 100, + chunkSize: 41, + initErr: ErrInvalidChunkSize, }, } @@ -139,10 +129,19 @@ func TestWriteImmutableTree(t *testing.T) { ctx := context.Background() r := bytes.NewReader(buf) ns := NewTestNodeStore() - serializer := message.NewBlobSerializer(ns.Pool()) - root, err := buildImmutableTree(ctx, r, ns, serializer, tt.chunkSize) - if tt.err != nil { - require.True(t, errors.Is(err, tt.err)) + //serializer := message.NewBlobSerializer(ns.Pool()) + + b, err := NewBlobBuilder(tt.chunkSize) + if tt.initErr != nil { + require.True(t, errors.Is(err, tt.initErr)) + return + } + b.SetNodeStore(ns) + b.Init(tt.inputSize) + root, _, err := b.Chunk(ctx, r) + + if tt.execErr != nil { + require.True(t, errors.Is(err, tt.execErr)) return } require.NoError(t, err) @@ -158,6 +157,9 @@ func TestWriteImmutableTree(t *testing.T) { sum := 0 byteCnt := 0 WalkNodes(ctx, root, ns, func(ctx context.Context, n Node) error { + if n.empty() { + return nil + } var keyCnt int leaf := n.IsLeaf() if leaf { @@ -242,7 +244,7 @@ func expectedSum(size int) int { } func expectedUnfilled(size, chunk int) int { - if size == chunk { + if size == chunk || size == 0 { return 0 } else if size < chunk { return 1 @@ -276,7 +278,7 @@ func TestImmutableTreeWalk(t *testing.T) { }{ { blobLen: 250, - chunkSize: 41, + chunkSize: 60, keyCnt: 4, }, { @@ -286,7 +288,7 @@ func TestImmutableTreeWalk(t *testing.T) { }, { blobLen: 378, - chunkSize: 43, + chunkSize: 60, keyCnt: 12, }, { @@ -299,11 +301,6 @@ func TestImmutableTreeWalk(t *testing.T) { chunkSize: 40, keyCnt: 6, }, - { - blobLen: 0, - chunkSize: 40, - keyCnt: 6, - }, { blobLen: 50_000_000, chunkSize: 4000, @@ -311,7 +308,7 @@ func TestImmutableTreeWalk(t *testing.T) { }, { blobLen: 10_000, - chunkSize: 83, + chunkSize: 80, keyCnt: 6, }, } @@ -362,8 +359,8 @@ func newTree(t *testing.T, ns NodeStore, keyCnt, blobLen, chunkSize int) Node { keyBld.PutUint32(0, uint32(i)) tuples[i][0] = keyBld.Build(sharedPool) - b := mustNewBlob(ctx, ns, blobLen, chunkSize) - valBld.PutBytesAddr(0, b.Addr) + addr := mustNewBlob(ctx, ns, blobLen, chunkSize) + valBld.PutBytesAddr(0, addr) tuples[i][1] = valBld.Build(sharedPool) } @@ -379,17 +376,23 @@ func newTree(t *testing.T, ns NodeStore, keyCnt, blobLen, chunkSize int) Node { return root } -func mustNewBlob(ctx context.Context, ns NodeStore, len, chunkSize int) *ImmutableTree { +func mustNewBlob(ctx context.Context, ns NodeStore, len, chunkSize int) hash.Hash { buf := make([]byte, len) for i := range buf { buf[i] = byte(i) } r := bytes.NewReader(buf) - root, err := NewImmutableTreeFromReader(ctx, r, ns, chunkSize) + b, err := NewBlobBuilder(chunkSize) if err != nil { panic(err) } - return root + b.SetNodeStore(ns) + b.Init(len) + _, addr, err := b.Chunk(ctx, r) + if err != nil { + panic(err) + } + return addr } func getBlobValues(msg serial.Message) []byte { diff --git a/go/store/prolly/tree/immutable_tree.go b/go/store/prolly/tree/immutable_tree.go deleted file mode 100644 index bd5a33bee6..0000000000 --- a/go/store/prolly/tree/immutable_tree.go +++ /dev/null @@ -1,299 +0,0 @@ -// Copyright 2022 Dolthub, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tree - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "io" - - "github.com/dolthub/go-mysql-server/sql" - - "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/prolly/message" -) - -const DefaultFixedChunkLength = 4000 - -var ErrInvalidChunkSize = errors.New("invalid chunkSize; value must be > 1") - -// buildImmutableTree writes the contents of |reader| as an append-only -// tree, returning the root node or an error if applicable. |chunkSize| -// fixes the split size of leaf and intermediate node chunks. -func buildImmutableTree(ctx context.Context, r io.Reader, ns NodeStore, S message.Serializer, chunkSize int) (Node, error) { - if chunkSize < hash.ByteLen*2 || chunkSize > int(message.MaxVectorOffset)/2 { - // internal nodes must fit at least two 20-byte hashes - return Node{}, ErrInvalidChunkSize - } - - var levels [][]novelNode - var levelCnts []int - var finalize bool - - // We use lookahead to check whether the reader has - // more bytes. The reader will only EOF when reading - // zero bytes into the lookahead buffer, but we want - // to know at the beginning of a loop whether we are - // finished. - lookahead := make([]byte, chunkSize) - lookaheadN, err := r.Read(lookahead) - if err != nil { - return Node{}, err - } - - buf := make([]byte, chunkSize) - for { - copy(buf, lookahead) - curN := lookaheadN - lookaheadN, err = r.Read(lookahead) - if err == io.EOF { - finalize = true - } else if err != nil { - return Node{}, err - } - - novel, err := _newLeaf(ctx, ns, S, buf[:curN]) - if err != nil { - return Node{}, err - } - - i := 0 - for { - // Three cases for building tree - // 1) reached new level => create new level - // 2) add novel node to current level - // 3) we didn't fill the current level => break - // 4) we filled current level, chunk and recurse into parent - // - // Two cases for finalizing tree - // 1) we haven't hit root, so we add the final chunk, finalize level, and continue upwards - // 2) we overshot root finalizing chunks, and we return the single root in the lower level - if i > len(levels)-1 { - levels = append(levels, make([]novelNode, chunkSize)) - levelCnts = append(levelCnts, 0) - } - - levels[i][levelCnts[i]] = novel - levelCnts[i]++ - // note: the size of an internal node will be the key count times key length (hash) - if levelCnts[i]*hash.ByteLen < chunkSize { - // current level is not full - if !finalize { - // only continue and chunk this level if finalizing all in-progress nodes - break - } - } - - nodes := levels[i][:levelCnts[i]] - if len(nodes) == 1 && i == len(levels)-1 { - // this is necessary and only possible if we're finalizing - // note: this is the only non-error return - return nodes[0].node, nil - } - - // chunk the current level - novel, err = _newInternal(ctx, ns, S, nodes, i+1, chunkSize) - if err != nil { - return Node{}, err - } - levelCnts[i] = 0 - i++ - } - } -} - -func _newInternal(ctx context.Context, ns NodeStore, s message.Serializer, nodes []novelNode, level int, chunkSize int) (novelNode, error) { - keys := make([][]byte, len(nodes)) - vals := make([][]byte, len(nodes)) - subtrees := make([]uint64, len(nodes)) - treeCnt := uint64(0) - for i := range nodes { - keys[i] = []byte{0} - vals[i] = nodes[i].addr[:] - subtrees[i] = nodes[i].treeCount - treeCnt += nodes[i].treeCount - } - msg := s.Serialize(keys, vals, subtrees, level) - node, err := NodeFromBytes(msg) - if err != nil { - return novelNode{}, err - } - addr, err := ns.Write(ctx, node) - if err != nil { - return novelNode{}, err - } - return novelNode{ - addr: addr, - node: node, - lastKey: []byte{0}, - treeCount: treeCnt, - }, nil -} - -func _newLeaf(ctx context.Context, ns NodeStore, s message.Serializer, buf []byte) (novelNode, error) { - msg := s.Serialize([][]byte{{0}}, [][]byte{buf}, []uint64{1}, 0) - node, err := NodeFromBytes(msg) - if err != nil { - return novelNode{}, err - } - addr, err := ns.Write(ctx, node) - if err != nil { - return novelNode{}, err - } - return novelNode{ - addr: addr, - node: node, - lastKey: []byte{0}, - treeCount: 1, - }, nil -} - -const bytePeekLength = 128 - -type ByteArray struct { - ImmutableTree -} - -func NewByteArray(addr hash.Hash, ns NodeStore) *ByteArray { - return &ByteArray{ImmutableTree{Addr: addr, ns: ns}} -} - -func (b *ByteArray) ToBytes(ctx context.Context) ([]byte, error) { - return b.bytes(ctx) -} - -func (b *ByteArray) ToString(ctx context.Context) (string, error) { - buf, err := b.bytes(ctx) - if err != nil { - return "", err - } - toShow := bytePeekLength - if len(buf) < toShow { - toShow = len(buf) - } - return string(buf[:toShow]), nil -} - -type JSONDoc struct { - ImmutableTree -} - -func NewJSONDoc(addr hash.Hash, ns NodeStore) *JSONDoc { - return &JSONDoc{ImmutableTree{Addr: addr, ns: ns}} -} - -func (b *JSONDoc) ToJSONDocument(ctx context.Context) (sql.JSONDocument, error) { - buf, err := b.bytes(ctx) - if err != nil { - return sql.JSONDocument{}, err - } - var doc sql.JSONDocument - err = json.Unmarshal(buf, &doc.Val) - if err != nil { - return sql.JSONDocument{}, err - } - return doc, err -} - -func (b *JSONDoc) ToString(ctx context.Context) (string, error) { - buf, err := b.bytes(ctx) - if err != nil { - return "", err - } - toShow := bytePeekLength - if len(buf) < toShow { - toShow = len(buf) - } - return string(buf[:toShow]), nil -} - -type TextStorage struct { - ImmutableTree -} - -func NewTextStorage(addr hash.Hash, ns NodeStore) *TextStorage { - return &TextStorage{ImmutableTree{Addr: addr, ns: ns}} -} - -func (b *TextStorage) ToBytes(ctx context.Context) ([]byte, error) { - return b.bytes(ctx) -} - -func (b *TextStorage) ToString(ctx context.Context) (string, error) { - buf, err := b.bytes(ctx) - if err != nil { - return "", err - } - return string(buf), nil -} - -type ImmutableTree struct { - Addr hash.Hash - buf []byte - ns NodeStore -} - -func NewImmutableTreeFromReader(ctx context.Context, r io.Reader, ns NodeStore, chunkSize int) (*ImmutableTree, error) { - s := message.NewBlobSerializer(ns.Pool()) - root, err := buildImmutableTree(ctx, r, ns, s, chunkSize) - if errors.Is(err, io.EOF) { - return &ImmutableTree{Addr: hash.Hash{}}, nil - } else if err != nil { - return nil, err - } - return &ImmutableTree{Addr: root.HashOf()}, nil -} - -func (t *ImmutableTree) load(ctx context.Context) error { - if t.Addr.IsEmpty() { - t.buf = []byte{} - return nil - } - n, err := t.ns.Read(ctx, t.Addr) - if err != nil { - return err - } - - return WalkNodes(ctx, n, t.ns, func(ctx context.Context, n Node) error { - if n.IsLeaf() { - t.buf = append(t.buf, n.GetValue(0)...) - } - return nil - }) -} - -func (t *ImmutableTree) bytes(ctx context.Context) ([]byte, error) { - if t.buf == nil { - err := t.load(ctx) - if err != nil { - return nil, err - } - } - return t.buf[:], nil -} - -func (t *ImmutableTree) next() (Node, error) { - panic("not implemented") -} - -func (t *ImmutableTree) close() error { - panic("not implemented") -} - -func (t *ImmutableTree) Read(buf bytes.Buffer) (int, error) { - panic("not implemented") -} diff --git a/go/store/prolly/tree/node.go b/go/store/prolly/tree/node.go index 9eab687a4b..54b616c0b5 100644 --- a/go/store/prolly/tree/node.go +++ b/go/store/prolly/tree/node.go @@ -113,6 +113,33 @@ func walkOpaqueNodes(ctx context.Context, nd Node, ns NodeStore, cb NodeCb) erro }) } +type nodeArena []Node + +const nodeArenaSize = 10000 + +func (a *nodeArena) Get() Node { + if len(*a) == 0 { + *a = make([]Node, nodeArenaSize) + } + n := (*a)[len(*a)-1] + *a = (*a)[:len(*a)-1] + return n +} + +func (a *nodeArena) NodeFromBytes(msg []byte) (Node, error) { + keys, values, level, count, err := message.UnpackFields(msg) + if err != nil { + return Node{}, err + } + n := a.Get() + n.keys = keys + n.values = values + n.count = count + n.level = level + n.msg = msg + return n, nil +} + func NodeFromBytes(msg []byte) (Node, error) { keys, values, level, count, err := message.UnpackFields(msg) return Node{ diff --git a/go/store/prolly/tree/node_store.go b/go/store/prolly/tree/node_store.go index 3eea9709e6..88d89cfedd 100644 --- a/go/store/prolly/tree/node_store.go +++ b/go/store/prolly/tree/node_store.go @@ -44,12 +44,15 @@ type NodeStore interface { // Format returns the types.NomsBinFormat of this NodeStore. Format() *types.NomsBinFormat + + BlobBuilder() *BlobBuilder } type nodeStore struct { store chunks.ChunkStore cache nodeCache bp pool.BuffPool + bbp *sync.Pool } var _ NodeStore = nodeStore{} @@ -58,12 +61,19 @@ var sharedCache = newChunkCache(cacheSize) var sharedPool = pool.NewBuffPool() +var blobBuilderPool = sync.Pool{ + New: func() any { + return mustNewBlobBuilder(DefaultFixedChunkLength) + }, +} + // NewNodeStore makes a new NodeStore. func NewNodeStore(cs chunks.ChunkStore) NodeStore { return nodeStore{ store: cs, cache: sharedCache, bp: sharedPool, + bbp: &blobBuilderPool, } } @@ -149,6 +159,15 @@ func (ns nodeStore) Pool() pool.BuffPool { return ns.bp } +// BlobBuilder implements NodeStore. +func (ns nodeStore) BlobBuilder() *BlobBuilder { + bb := ns.bbp.Get().(*BlobBuilder) + if bb.ns == nil { + bb.SetNodeStore(ns) + } + return bb +} + func (ns nodeStore) Format() *types.NomsBinFormat { nbf, err := types.GetFormatForVersionString(ns.store.Version()) if err != nil { diff --git a/go/store/prolly/tree/testutils.go b/go/store/prolly/tree/testutils.go index c6171e4716..5674176abf 100644 --- a/go/store/prolly/tree/testutils.go +++ b/go/store/prolly/tree/testutils.go @@ -21,6 +21,7 @@ import ( "math" "math/rand" "sort" + "sync" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" @@ -240,13 +241,16 @@ func randomField(tb *val.TupleBuilder, idx int, typ val.Type, ns NodeStore) { testRand.Read(buf) tb.PutCommitAddr(idx, hash.New(buf)) case val.BytesAddrEnc, val.StringAddrEnc, val.JSONAddrEnc: - buf := make([]byte, (testRand.Int63()%40)+10) + len := (testRand.Int63() % 40) + 10 + buf := make([]byte, len) testRand.Read(buf) - tree, err := NewImmutableTreeFromReader(context.Background(), bytes.NewReader(buf), ns, DefaultFixedChunkLength) + bb := ns.BlobBuilder() + bb.Init(int(len)) + _, addr, err := bb.Chunk(context.Background(), bytes.NewReader(buf)) if err != nil { panic("failed to write bytes tree") } - tb.PutBytesAddr(idx, tree.Addr) + tb.PutBytesAddr(idx, addr) default: panic("unknown encoding") } @@ -255,11 +259,13 @@ func randomField(tb *val.TupleBuilder, idx int, typ val.Type, ns NodeStore) { func NewTestNodeStore() NodeStore { ts := &chunks.TestStorage{} ns := NewNodeStore(ts.NewViewWithFormat(types.Format_DOLT.VersionString())) - return nodeStoreValidator{ns: ns} + bb := &blobBuilderPool + return nodeStoreValidator{ns: ns, bb: bb} } type nodeStoreValidator struct { ns NodeStore + bb *sync.Pool } func (v nodeStoreValidator) Read(ctx context.Context, ref hash.Hash) (Node, error) { @@ -309,6 +315,14 @@ func (v nodeStoreValidator) Pool() pool.BuffPool { return v.ns.Pool() } +func (v nodeStoreValidator) BlobBuilder() *BlobBuilder { + bb := v.bb.Get().(*BlobBuilder) + if bb.ns == nil { + bb.SetNodeStore(v) + } + return bb +} + func (v nodeStoreValidator) Format() *types.NomsBinFormat { return v.ns.Format() }