chore:reva bump v.2.34 (#1139)

This commit is contained in:
Viktor Scharf
2025-06-30 10:00:47 +02:00
committed by GitHub
parent bb36239433
commit b30bb19ad7
97 changed files with 1479 additions and 829 deletions

View File

@@ -25,6 +25,7 @@ Asta Xie <xiemengjun at gmail.com>
B Lamarche <blam413 at gmail.com>
Bes Dollma <bdollma@thousandeyes.com>
Bogdan Constantinescu <bog.con.bc at gmail.com>
Brad Higgins <brad at defined.net>
Brian Hendriks <brian at dolthub.com>
Bulat Gaifullin <gaifullinbf at gmail.com>
Caine Jette <jette at alum.mit.edu>
@@ -37,6 +38,7 @@ Daniel Montoya <dsmontoyam at gmail.com>
Daniel Nichter <nil at codenode.com>
Daniël van Eeden <git at myname.nl>
Dave Protasowski <dprotaso at gmail.com>
Diego Dupin <diego.dupin at gmail.com>
Dirkjan Bussink <d.bussink at gmail.com>
DisposaBoy <disposaboy at dby.me>
Egor Smolyakov <egorsmkv at gmail.com>
@@ -133,6 +135,7 @@ Ziheng Lyu <zihenglv at gmail.com>
Barracuda Networks, Inc.
Counting Ltd.
Defined Networking Inc.
DigitalOcean Inc.
Dolthub Inc.
dyves labs AG

View File

@@ -1,5 +1,16 @@
# Changelog
## v1.9.3 (2025-06-13)
* `tx.Commit()` and `tx.Rollback()` returned `ErrInvalidConn` always.
Now they return cached real error if present. (#1690)
* Optimize reading small resultsets to fix performance regression
introduced by compression protocol support. (#1707)
* Fix `db.Ping()` on compressed connection. (#1723)
## v1.9.2 (2025-04-07)
v1.9.2 is a re-release of v1.9.1 due to a release process issue; no changes were made to the content.

View File

@@ -42,6 +42,11 @@ func (b *buffer) busy() bool {
return len(b.buf) > 0
}
// len returns how many bytes in the read buffer.
func (b *buffer) len() int {
return len(b.buf)
}
// fill reads into the read buffer until at least _need_ bytes are in it.
func (b *buffer) fill(need int, r readerFunc) error {
// we'll move the contents of the current buffer to dest before filling it.
@@ -86,17 +91,10 @@ func (b *buffer) fill(need int, r readerFunc) error {
// returns next N bytes from buffer.
// The returned slice is only guaranteed to be valid until the next read
func (b *buffer) readNext(need int, r readerFunc) ([]byte, error) {
if len(b.buf) < need {
// refill
if err := b.fill(need, r); err != nil {
return nil, err
}
}
data := b.buf[:need]
func (b *buffer) readNext(need int) []byte {
data := b.buf[:need:need]
b.buf = b.buf[need:]
return data, nil
return data
}
// takeBuffer returns a buffer with the requested size.

View File

@@ -84,9 +84,9 @@ func (c *compIO) reset() {
c.buff.Reset()
}
func (c *compIO) readNext(need int, r readerFunc) ([]byte, error) {
func (c *compIO) readNext(need int) ([]byte, error) {
for c.buff.Len() < need {
if err := c.readCompressedPacket(r); err != nil {
if err := c.readCompressedPacket(); err != nil {
return nil, err
}
}
@@ -94,8 +94,8 @@ func (c *compIO) readNext(need int, r readerFunc) ([]byte, error) {
return data[:need:need], nil // prevent caller writes into c.buff
}
func (c *compIO) readCompressedPacket(r readerFunc) error {
header, err := c.mc.buf.readNext(7, r) // size of compressed header
func (c *compIO) readCompressedPacket() error {
header, err := c.mc.readNext(7)
if err != nil {
return err
}
@@ -103,7 +103,7 @@ func (c *compIO) readCompressedPacket(r readerFunc) error {
// compressed header structure
comprLength := getUint24(header[0:3])
compressionSequence := uint8(header[3])
compressionSequence := header[3]
uncompressedLength := getUint24(header[4:7])
if debug {
fmt.Printf("uncompress cmplen=%v uncomplen=%v pkt_cmp_seq=%v expected_cmp_seq=%v\n",
@@ -113,14 +113,13 @@ func (c *compIO) readCompressedPacket(r readerFunc) error {
// Server may return error packet (e.g. 1153 Got a packet bigger than 'max_allowed_packet' bytes)
// before receiving all packets from client. In this case, seqnr is younger than expected.
// NOTE: Both of mariadbclient and mysqlclient do not check seqnr. Only server checks it.
if debug && compressionSequence != c.mc.sequence {
if debug && compressionSequence != c.mc.compressSequence {
fmt.Printf("WARN: unexpected cmpress seq nr: expected %v, got %v",
c.mc.sequence, compressionSequence)
c.mc.compressSequence, compressionSequence)
}
c.mc.sequence = compressionSequence + 1
c.mc.compressSequence = c.mc.sequence
c.mc.compressSequence = compressionSequence + 1
comprData, err := c.mc.buf.readNext(comprLength, r)
comprData, err := c.mc.readNext(comprLength)
if err != nil {
return err
}
@@ -200,7 +199,7 @@ func (c *compIO) writeCompressedPacket(data []byte, uncompressedLen int) (int, e
comprLength := len(data) - 7
if debug {
fmt.Printf(
"writeCompressedPacket: comprLength=%v, uncompressedLen=%v, seq=%v",
"writeCompressedPacket: comprLength=%v, uncompressedLen=%v, seq=%v\n",
comprLength, uncompressedLen, mc.compressSequence)
}

View File

@@ -17,6 +17,7 @@ import (
"fmt"
"io"
"math"
"os"
"strconv"
"time"
)
@@ -25,19 +26,30 @@ import (
// https://dev.mysql.com/doc/dev/mysql-server/latest/PAGE_PROTOCOL.html
// https://mariadb.com/kb/en/clientserver-protocol/
// read n bytes from mc.buf
func (mc *mysqlConn) readNext(n int) ([]byte, error) {
if mc.buf.len() < n {
err := mc.buf.fill(n, mc.readWithTimeout)
if err != nil {
return nil, err
}
}
return mc.buf.readNext(n), nil
}
// Read packet to buffer 'data'
func (mc *mysqlConn) readPacket() ([]byte, error) {
var prevData []byte
invalidSequence := false
readNext := mc.buf.readNext
readNext := mc.readNext
if mc.compress {
readNext = mc.compIO.readNext
}
for {
// read packet header
data, err := readNext(4, mc.readWithTimeout)
data, err := readNext(4)
if err != nil {
mc.close()
if cerr := mc.canceled.Value(); cerr != nil {
@@ -51,17 +63,11 @@ func (mc *mysqlConn) readPacket() ([]byte, error) {
pktLen := getUint24(data[:3])
seq := data[3]
if mc.compress {
// check packet sync [8 bit]
if seq != mc.sequence {
mc.log(fmt.Sprintf("[warn] unexpected sequence nr: expected %v, got %v", mc.sequence, seq))
// MySQL and MariaDB doesn't check packet nr in compressed packet.
if debug && seq != mc.compressSequence {
fmt.Printf("[debug] mismatched compression sequence nr: expected: %v, got %v",
mc.compressSequence, seq)
}
mc.compressSequence = seq + 1
} else {
// check packet sync [8 bit]
if seq != mc.sequence {
mc.log(fmt.Sprintf("[warn] unexpected seq nr: expected %v, got %v", mc.sequence, seq))
if !mc.compress {
// For large packets, we stop reading as soon as sync error.
if len(prevData) > 0 {
mc.close()
@@ -69,8 +75,8 @@ func (mc *mysqlConn) readPacket() ([]byte, error) {
}
invalidSequence = true
}
mc.sequence++
}
mc.sequence = seq + 1
// packets with length 0 terminate a previous packet which is a
// multiple of (2^24)-1 bytes long
@@ -85,7 +91,7 @@ func (mc *mysqlConn) readPacket() ([]byte, error) {
}
// read packet body [pktLen bytes]
data, err = readNext(pktLen, mc.readWithTimeout)
data, err = readNext(pktLen)
if err != nil {
mc.close()
if cerr := mc.canceled.Value(); cerr != nil {
@@ -135,7 +141,7 @@ func (mc *mysqlConn) writePacket(data []byte) error {
// Write packet
if debug {
fmt.Printf("writePacket: size=%v seq=%v", size, mc.sequence)
fmt.Fprintf(os.Stderr, "writePacket: size=%v seq=%v\n", size, mc.sequence)
}
n, err := writeFunc(data[:4+size])
@@ -434,7 +440,9 @@ func (mc *mysqlConn) writeCommandPacket(command byte) error {
data[4] = command
// Send CMD packet
return mc.writePacket(data)
err = mc.writePacket(data)
mc.syncSequence()
return err
}
func (mc *mysqlConn) writeCommandPacketStr(command byte, arg string) error {
@@ -475,7 +483,9 @@ func (mc *mysqlConn) writeCommandPacketUint32(command byte, arg uint32) error {
binary.LittleEndian.PutUint32(data[5:], arg)
// Send CMD packet
return mc.writePacket(data)
err = mc.writePacket(data)
mc.syncSequence()
return err
}
/******************************************************************************
@@ -945,7 +955,6 @@ func (stmt *mysqlStmt) writeCommandLongData(paramID int, arg []byte) error {
pktLen = dataOffset + argLen
}
stmt.mc.resetSequence()
// Add command byte [1 byte]
data[4] = comStmtSendLongData
@@ -957,6 +966,8 @@ func (stmt *mysqlStmt) writeCommandLongData(paramID int, arg []byte) error {
// Send CMD packet
err := stmt.mc.writePacket(data[:4+pktLen])
// Every COM_LONG_DATA packet reset Packet Sequence
stmt.mc.resetSequence()
if err == nil {
data = data[pktLen-dataOffset:]
continue
@@ -964,8 +975,6 @@ func (stmt *mysqlStmt) writeCommandLongData(paramID int, arg []byte) error {
return err
}
// Reset Packet Sequence
stmt.mc.resetSequence()
return nil
}

View File

@@ -13,18 +13,32 @@ type mysqlTx struct {
}
func (tx *mysqlTx) Commit() (err error) {
if tx.mc == nil || tx.mc.closed.Load() {
if tx.mc == nil {
return ErrInvalidConn
}
if tx.mc.closed.Load() {
err = tx.mc.error()
if err == nil {
err = ErrInvalidConn
}
return
}
err = tx.mc.exec("COMMIT")
tx.mc = nil
return
}
func (tx *mysqlTx) Rollback() (err error) {
if tx.mc == nil || tx.mc.closed.Load() {
if tx.mc == nil {
return ErrInvalidConn
}
if tx.mc.closed.Load() {
err = tx.mc.error()
if err == nil {
err = ErrInvalidConn
}
return
}
err = tx.mc.exec("ROLLBACK")
tx.mc = nil
return