mirror of
https://github.com/dolthub/dolt.git
synced 2025-12-31 00:50:14 -06:00
Merge pull request #9855 from dolthub/nicktobey/adaptive
Preemptively store extended adaptive values out-of-band if they're too large to fit in a Tuple
This commit is contained in:
@@ -30,8 +30,7 @@ func EncodingFromSqlType(typ sql.Type) serial.Encoding {
|
||||
case sql.ExtendedTypeSerializedWidth_64K:
|
||||
return serial.EncodingExtended
|
||||
case sql.ExtendedTypeSerializedWidth_Unbounded:
|
||||
// TODO: should use serial.EncodingExtendedAdaptive, but it's currently broken
|
||||
return serial.EncodingExtendedAddr
|
||||
return serial.EncodingExtendedAdaptive
|
||||
default:
|
||||
panic(fmt.Errorf("unknown serialization width"))
|
||||
}
|
||||
|
||||
@@ -360,11 +360,30 @@ func PutField(ctx context.Context, ns NodeStore, tb *val.TupleBuilder, i int, v
|
||||
}
|
||||
}
|
||||
case val.ExtendedAdaptiveEnc:
|
||||
b, err := tb.Desc.Handlers[i].SerializeValue(ctx, v)
|
||||
if err != nil {
|
||||
return err
|
||||
switch value := v.(type) {
|
||||
case *val.ExtendedValueWrapper:
|
||||
if value.IsExactLength() {
|
||||
tb.PutAdaptiveExtendedFromOutline(i, value)
|
||||
} else {
|
||||
valueBytes, err := value.GetBytes(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tb.PutAdaptiveBytesFromInline(ctx, i, valueBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
default:
|
||||
valueBytes, err := tb.Desc.Handlers[i].SerializeValue(ctx, v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = tb.PutAdaptiveValue(ctx, ns, i, valueBytes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
tb.PutRaw(i, b)
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown encoding %v %v", enc, v))
|
||||
}
|
||||
|
||||
@@ -542,6 +542,21 @@ func (tb *TupleBuilder) PutCell(i int, v Cell) {
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveBytesFromInline(ctx context.Context, i int, v []byte) error {
|
||||
tb.Desc.expectEncoding(i, BytesAdaptiveEnc)
|
||||
return tb.PutAdaptiveFromInline(ctx, i, v)
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveStringFromInline(ctx context.Context, i int, s string) error {
|
||||
tb.Desc.expectEncoding(i, StringAdaptiveEnc)
|
||||
return tb.PutAdaptiveFromInline(ctx, i, []byte(s))
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveExtendedFromInline(ctx context.Context, i int, v []byte) error {
|
||||
tb.Desc.expectEncoding(i, ExtendedAdaptiveEnc)
|
||||
return tb.PutAdaptiveFromInline(ctx, i, v)
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveFromInline(ctx context.Context, i int, v []byte) error {
|
||||
|
||||
inlineSize := ByteSize(len(v)) + 1 // include extra header byte
|
||||
if int64(inlineSize) > tb.tupleLengthTarget {
|
||||
// Inline value is too large. We must store it out-of-band.
|
||||
@@ -574,67 +589,48 @@ func (tb *TupleBuilder) PutAdaptiveBytesFromInline(ctx context.Context, i int, v
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveStringFromInline(ctx context.Context, i int, v string) error {
|
||||
tb.Desc.expectEncoding(i, StringAdaptiveEnc)
|
||||
inlineSize := ByteSize(len(v)) + 1 // include extra header byte
|
||||
if int64(inlineSize) > tb.tupleLengthTarget {
|
||||
// Inline value is too large. We must store it out of line.
|
||||
maxLengthBytes := 9
|
||||
tb.ensureCapacity(ByteSize(hash.ByteLen + maxLengthBytes))
|
||||
blobLength := uint64(len(v))
|
||||
lengthSize, _ := makeVarInt(blobLength, tb.buf[tb.pos:])
|
||||
outOfBandSize := int64(lengthSize) + hash.ByteLen
|
||||
|
||||
blobHash, err := tb.vs.WriteBytes(ctx, []byte(v))
|
||||
func (tb *TupleBuilder) PutAdaptiveValue(ctx context.Context, vs ValueStore, i int, v AdaptiveValue) error {
|
||||
if v.getMessageLength() > tb.tupleLengthTarget {
|
||||
// The message won't fit inlined, so premptively store it out-of-band
|
||||
byteArray, err := v.convertToByteArray(ctx, vs, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
copy(tb.buf[tb.pos+int64(lengthSize):], blobHash[:])
|
||||
field := tb.buf[tb.pos : tb.pos+outOfBandSize]
|
||||
tb.fields[i] = field
|
||||
tb.pos += outOfBandSize
|
||||
tb.inlineSize += int64(inlineSize)
|
||||
tb.outOfBandSize += outOfBandSize
|
||||
tb.PutAdaptiveFromOutline(i, byteArray.maxByteLength, byteArray.Addr)
|
||||
return nil
|
||||
} else {
|
||||
bytes, err := v.getUnderlyingBytes(ctx, vs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tb.PutAdaptiveFromInline(ctx, i, bytes)
|
||||
}
|
||||
tb.ensureCapacity(inlineSize)
|
||||
field := AdaptiveValue(tb.buf[tb.pos : tb.pos+int64(inlineSize)])
|
||||
tb.fields[i] = field
|
||||
field[0] = 0 // Mark this as inline
|
||||
copy(field[1:], v)
|
||||
tb.pos += int64(inlineSize)
|
||||
tb.inlineSize += int64(inlineSize)
|
||||
tb.outOfBandSize += field.outOfBandSize()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveExtendedFromOutline(i int, v *ExtendedValueWrapper) {
|
||||
tb.Desc.expectEncoding(i, ExtendedAdaptiveEnc)
|
||||
tb.PutAdaptiveFromOutline(i, v.outOfBandLength, v.Addr)
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveBytesFromOutline(i int, v *ByteArray) {
|
||||
tb.Desc.expectEncoding(i, BytesAdaptiveEnc)
|
||||
|
||||
maxLengthBytes := 9
|
||||
tb.ensureCapacity(ByteSize(hash.ByteLen + maxLengthBytes))
|
||||
blobLength := uint64(v.MaxByteLength())
|
||||
lengthSize, _ := makeVarInt(blobLength, tb.buf[tb.pos:])
|
||||
outOfBandSize := int64(lengthSize) + hash.ByteLen
|
||||
|
||||
copy(tb.buf[tb.pos+int64(lengthSize):], v.Addr[:])
|
||||
field := tb.buf[tb.pos : tb.pos+outOfBandSize]
|
||||
tb.fields[i] = field
|
||||
tb.pos += outOfBandSize
|
||||
tb.inlineSize += int64(blobLength) + 1
|
||||
tb.outOfBandSize += outOfBandSize
|
||||
tb.PutAdaptiveFromOutline(i, v.maxByteLength, v.Addr)
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveStringFromOutline(i int, v *TextStorage) {
|
||||
tb.Desc.expectEncoding(i, StringAdaptiveEnc)
|
||||
tb.PutAdaptiveFromOutline(i, v.maxByteLength, v.Addr)
|
||||
}
|
||||
|
||||
func (tb *TupleBuilder) PutAdaptiveFromOutline(i int, maxByteLength int64, addr hash.Hash) {
|
||||
|
||||
maxLengthBytes := 9
|
||||
tb.ensureCapacity(ByteSize(hash.ByteLen + maxLengthBytes))
|
||||
blobLength := uint64(v.MaxByteLength())
|
||||
blobLength := uint64(maxByteLength)
|
||||
lengthSize, _ := makeVarInt(blobLength, tb.buf[tb.pos:])
|
||||
outOfBandSize := int64(lengthSize) + hash.ByteLen
|
||||
|
||||
copy(tb.buf[tb.pos+int64(lengthSize):], v.Addr[:])
|
||||
copy(tb.buf[tb.pos+int64(lengthSize):], addr[:])
|
||||
field := tb.buf[tb.pos : tb.pos+outOfBandSize]
|
||||
tb.fields[i] = field
|
||||
tb.pos += outOfBandSize
|
||||
|
||||
Reference in New Issue
Block a user