mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-20 19:31:56 -05:00
Merge pull request #3784 from dolthub/andy/chunker-panic
[no-release-notes] go/store/prolly/tree: Fix panic in chunker caused by stale `subtreeCounts` state in `Cursor`
This commit is contained in:
@@ -16,6 +16,9 @@ package prolly
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
@@ -134,3 +137,51 @@ func (mut MutableMap) IterRange(ctx context.Context, rng Range) (MapIter, error)
|
||||
func (mut MutableMap) HasEdits() bool {
|
||||
return mut.tuples.edits.Count() > 0
|
||||
}
|
||||
|
||||
func debugFormat(ctx context.Context, m MutableMap) (string, error) {
|
||||
kd, vd := m.keyDesc, m.valDesc
|
||||
|
||||
editIter := m.tuples.edits.IterAtStart()
|
||||
tupleIter, err := m.tuples.tree.iterAll(ctx)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString("Mutable Map {\n")
|
||||
|
||||
c := strconv.Itoa(m.tuples.edits.Count())
|
||||
sb.WriteString("\tedits (count: " + c + ") {\n")
|
||||
for {
|
||||
k, v := editIter.Current()
|
||||
if k == nil {
|
||||
break
|
||||
}
|
||||
sb.WriteString("\t\t")
|
||||
sb.WriteString(kd.Format(k))
|
||||
sb.WriteString(": ")
|
||||
sb.WriteString(vd.Format(v))
|
||||
sb.WriteString(",\n")
|
||||
editIter.Advance()
|
||||
}
|
||||
sb.WriteString("\t},\n")
|
||||
|
||||
c = strconv.Itoa(m.tuples.tree.count())
|
||||
sb.WriteString("\ttree (count: " + c + ") {\n")
|
||||
for {
|
||||
k, v, err := tupleIter.Next(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
sb.WriteString("\t\t")
|
||||
sb.WriteString(kd.Format(k))
|
||||
sb.WriteString(": ")
|
||||
sb.WriteString(vd.Format(v))
|
||||
sb.WriteString(",\n")
|
||||
}
|
||||
sb.WriteString("\t}\n}\n")
|
||||
return sb.String(), nil
|
||||
}
|
||||
|
||||
@@ -104,6 +104,14 @@ func TestMutableMapReads(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMutableMapFormat(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
mutableMap, _ := makeMutableMap(t, 100)
|
||||
s, err := debugFormat(ctx, mutableMap.(MutableMap))
|
||||
assert.NoError(t, err)
|
||||
assert.NotEmpty(t, s)
|
||||
}
|
||||
|
||||
func makeMutableMap(t *testing.T, count int) (testMap, [][2]val.Tuple) {
|
||||
ctx := context.Background()
|
||||
ns := tree.NewTestNodeStore()
|
||||
|
||||
@@ -23,8 +23,6 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
@@ -71,6 +69,9 @@ func TestMutableMapWrites(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
t.Run("test internal node splits", func(t *testing.T) {
|
||||
testInternalNodeSplits(t)
|
||||
})
|
||||
}
|
||||
|
||||
func testPointUpdates(t *testing.T, mapCount int) {
|
||||
@@ -425,32 +426,60 @@ func testBulkInserts(t *testing.T, size int) {
|
||||
}
|
||||
}
|
||||
|
||||
func testInternalNodeSplits(t *testing.T) {
|
||||
const n = 100_000
|
||||
var err error
|
||||
ctx := context.Background()
|
||||
|
||||
kd := val.NewTupleDescriptor(
|
||||
val.Type{Enc: val.Int32Enc},
|
||||
val.Type{Enc: val.Int32Enc},
|
||||
)
|
||||
vd := val.NewTupleDescriptor()
|
||||
bld := val.NewTupleBuilder(kd)
|
||||
|
||||
tuples := make([][2]val.Tuple, n)
|
||||
for i := range tuples {
|
||||
bld.PutInt32(0, int32(i))
|
||||
bld.PutInt32(1, int32(0))
|
||||
tuples[i][0] = bld.Build(sharedPool)
|
||||
tuples[i][1] = val.EmptyTuple
|
||||
}
|
||||
pm := prollyMapFromTuples(t, kd, vd, tuples).(Map)
|
||||
|
||||
// reproduces chunker panic (k = 10_600)
|
||||
repro := 20_000
|
||||
|
||||
for k := 100; k <= repro; k += 100 {
|
||||
mut := pm.Mutate()
|
||||
for j := 1; j <= k; j++ {
|
||||
bld.PutInt32(0, int32(j))
|
||||
bld.PutInt32(1, int32(j))
|
||||
key := bld.Build(sharedPool)
|
||||
value := val.EmptyTuple
|
||||
err = mut.Put(ctx, key, value)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
pm, err = mut.Map(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, n+k, pm.Count())
|
||||
}
|
||||
}
|
||||
|
||||
// utilities
|
||||
|
||||
func ascendingIntMap(t *testing.T, count int) Map {
|
||||
return ascendingIntMapWithStep(t, count, 1)
|
||||
}
|
||||
|
||||
func ascendingIntMapWithStep(t *testing.T, count, step int) Map {
|
||||
ctx := context.Background()
|
||||
ns := tree.NewTestNodeStore()
|
||||
|
||||
tuples := make([][2]val.Tuple, count)
|
||||
for i := range tuples {
|
||||
v := int64(i * step)
|
||||
tuples[i][0], tuples[i][1] = makePut(v, v)
|
||||
}
|
||||
|
||||
serializer := message.ProllyMapSerializer{Pool: ns.Pool()}
|
||||
chunker, err := tree.NewEmptyChunker(ctx, ns, serializer)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, pair := range tuples {
|
||||
err = chunker.AddPair(ctx, tree.Item(pair[0]), tree.Item(pair[1]))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
root, err := chunker.Done(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
return NewMap(root, ns, mutKeyDesc, mutKeyDesc)
|
||||
pm := prollyMapFromTuples(t, mutKeyDesc, mutValDesc, tuples)
|
||||
return pm.(Map)
|
||||
}
|
||||
|
||||
var mutKeyDesc = val.NewTupleDescriptor(
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
@@ -175,13 +176,17 @@ func TestWriteImmutableTree(t *testing.T) {
|
||||
return nil
|
||||
})
|
||||
|
||||
require.Equal(t, expLevel, root.Level())
|
||||
assert.Equal(t, expLevel, root.Level())
|
||||
if tt.checkSum {
|
||||
require.Equal(t, expSum, sum)
|
||||
assert.Equal(t, expSum, sum)
|
||||
}
|
||||
assert.Equal(t, tt.inputSize, byteCnt)
|
||||
assert.Equal(t, expUnfilled, unfilledCnt)
|
||||
if expLevel > 0 {
|
||||
for i := range expSubtrees {
|
||||
assert.Equal(t, expSubtrees[i], root.getSubtreeCount(i))
|
||||
}
|
||||
}
|
||||
require.Equal(t, tt.inputSize, byteCnt)
|
||||
require.Equal(t, expUnfilled, unfilledCnt)
|
||||
require.Equal(t, expSubtrees, root.getSubtreeCounts())
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -200,9 +205,9 @@ func expectedLevel(size, chunk int) int {
|
||||
return l
|
||||
}
|
||||
|
||||
func expectedSubtrees(size, chunk int) SubtreeCounts {
|
||||
func expectedSubtrees(size, chunk int) subtreeCounts {
|
||||
if size <= chunk {
|
||||
return SubtreeCounts{0}
|
||||
return subtreeCounts{0}
|
||||
}
|
||||
l := expectedLevel(size, chunk)
|
||||
|
||||
@@ -211,7 +216,7 @@ func expectedSubtrees(size, chunk int) SubtreeCounts {
|
||||
|
||||
filledSubtree := int(math.Pow(float64(intChunk), float64(l-1)))
|
||||
|
||||
subtrees := make(SubtreeCounts, 0)
|
||||
subtrees := make(subtreeCounts, 0)
|
||||
for size > filledSubtree {
|
||||
subtrees = append(subtrees, uint64(filledSubtree))
|
||||
size -= filledSubtree
|
||||
|
||||
@@ -27,10 +27,13 @@ import (
|
||||
|
||||
type Item []byte
|
||||
|
||||
type subtreeCounts []uint64
|
||||
|
||||
type Node struct {
|
||||
// keys and values contain sub-slices of |msg|,
|
||||
// allowing faster lookups by avoiding the vtable
|
||||
keys, values val.SlicedBuffer
|
||||
subtrees subtreeCounts
|
||||
count uint16
|
||||
msg message.Message
|
||||
}
|
||||
@@ -140,16 +143,24 @@ func (nd Node) getValue(i int) Item {
|
||||
return nd.values.GetSlice(i)
|
||||
}
|
||||
|
||||
func (nd *Node) getSubtreeCount(i int) uint64 {
|
||||
if nd.IsLeaf() {
|
||||
return 1
|
||||
}
|
||||
if nd.subtrees == nil {
|
||||
// deserializing subtree counts requires a
|
||||
// malloc, so we lazily load them here
|
||||
nd.subtrees = message.GetSubtrees(nd.msg)
|
||||
}
|
||||
return nd.subtrees[i]
|
||||
}
|
||||
|
||||
// getAddress returns the |ith| address of this node.
|
||||
// This method assumes values are 20-byte address hashes.
|
||||
func (nd Node) getAddress(i int) hash.Hash {
|
||||
return hash.New(nd.getValue(i))
|
||||
}
|
||||
|
||||
func (nd Node) getSubtreeCounts() SubtreeCounts {
|
||||
return message.GetSubtrees(nd.msg)
|
||||
}
|
||||
|
||||
func (nd Node) empty() bool {
|
||||
return nd.bytes() == nil || nd.count == 0
|
||||
}
|
||||
|
||||
@@ -66,7 +66,7 @@ func newNodeBuilder[S message.Serializer](serializer S, level int) (nb *nodeBuil
|
||||
type nodeBuilder[S message.Serializer] struct {
|
||||
keys, values [][]byte
|
||||
size, level int
|
||||
subtrees SubtreeCounts
|
||||
subtrees subtreeCounts
|
||||
serializer S
|
||||
}
|
||||
|
||||
|
||||
@@ -30,20 +30,10 @@ import (
|
||||
|
||||
// Cursor explores a tree of Nodes.
|
||||
type Cursor struct {
|
||||
nd Node
|
||||
idx int
|
||||
parent *Cursor
|
||||
subtrees SubtreeCounts
|
||||
nrw NodeStore
|
||||
}
|
||||
|
||||
type SubtreeCounts []uint64
|
||||
|
||||
func (sc SubtreeCounts) Sum() (s uint64) {
|
||||
for _, count := range sc {
|
||||
s += count
|
||||
}
|
||||
return
|
||||
nd Node
|
||||
idx int
|
||||
parent *Cursor
|
||||
nrw NodeStore
|
||||
}
|
||||
|
||||
type CompareFn func(left, right Item) int
|
||||
@@ -112,11 +102,8 @@ func NewCursorAtOrdinal(ctx context.Context, ns NodeStore, nd Node, ord uint64)
|
||||
return int(distance)
|
||||
}
|
||||
|
||||
// |subtrees| contains cardinalities of each child tree in |nd|
|
||||
subtrees := nd.getSubtreeCounts()
|
||||
|
||||
for idx = range subtrees {
|
||||
card := int64(subtrees[idx])
|
||||
for idx = 0; idx < nd.Count(); idx++ {
|
||||
card := int64(nd.getSubtreeCount(idx))
|
||||
if (distance - card) < 0 {
|
||||
break
|
||||
}
|
||||
@@ -230,10 +217,7 @@ func (cur *Cursor) currentSubtreeSize() uint64 {
|
||||
if cur.isLeaf() {
|
||||
return 1
|
||||
}
|
||||
if cur.subtrees == nil { // lazy load
|
||||
cur.subtrees = cur.nd.getSubtreeCounts()
|
||||
}
|
||||
return cur.subtrees[cur.idx]
|
||||
return cur.nd.getSubtreeCount(cur.idx)
|
||||
}
|
||||
|
||||
func (cur *Cursor) firstKey() Item {
|
||||
@@ -403,8 +387,6 @@ func (cur *Cursor) Advance(ctx context.Context) error {
|
||||
}
|
||||
|
||||
cur.skipToNodeStart()
|
||||
cur.subtrees = nil // lazy load
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -440,8 +422,6 @@ func (cur *Cursor) Retreat(ctx context.Context) error {
|
||||
}
|
||||
|
||||
cur.skipToNodeEnd()
|
||||
cur.subtrees = nil // lazy load
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -67,7 +67,7 @@ func TestRoundTripNodeItems(t *testing.T) {
|
||||
|
||||
func TestNodeSize(t *testing.T) {
|
||||
sz := unsafe.Sizeof(Node{})
|
||||
assert.Equal(t, 128, int(sz))
|
||||
assert.Equal(t, 152, int(sz))
|
||||
}
|
||||
|
||||
func TestNodeHashValueCompatibility(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user