Merge pull request #5892 from dolthub/fulghum/binlog-replication

Fix binlog connection re-establishment when connection closes between messages
This commit is contained in:
Jason Fulghum
2023-05-08 17:43:38 -07:00
committed by GitHub
3 changed files with 25 additions and 12 deletions
@@ -289,12 +289,11 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
case err := <-eventProducer.ErrorChan():
if sqlError, isSqlError := err.(*mysql.SQLError); isSqlError {
if sqlError.Message == io.EOF.Error() {
ctx.GetLogger().Trace("No more binlog messages; retrying in 1s...")
time.Sleep(1 * time.Second)
} else if strings.HasPrefix(sqlError.Message, io.ErrUnexpectedEOF.Error()) {
badConnection := sqlError.Message == io.EOF.Error() ||
strings.HasPrefix(sqlError.Message, io.ErrUnexpectedEOF.Error())
if badConnection {
DoltBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.LastIoError = io.ErrUnexpectedEOF.Error()
status.LastIoError = sqlError.Message
status.LastIoErrNumber = ERNetReadError
currentTime := time.Now()
status.LastIoErrorTimestamp = &currentTime
@@ -17,6 +17,7 @@ package binlogreplication
import (
"fmt"
"strconv"
"strings"
"testing"
"time"
@@ -40,15 +41,20 @@ func TestBinlogReplicationAutoReconnect(t *testing.T) {
configureFastConnectionRetry(t)
startReplication(t, proxyPort)
// Get the replica started up and ensure it's in sync with the primary before turning on the limit_data toxic
testInitialReplicaStatus(t)
primaryDatabase.MustExec("create table reconnect_test(pk int primary key, c1 varchar(255));")
waitForReplicaToCatchUp(t)
turnOnLimitDataToxic(t)
for i := 0; i < 1000; i++ {
value := "foobarbazbashfoobarbazbashfoobarbazbashfoobarbazbashfoobarbazbash"
primaryDatabase.MustExec(fmt.Sprintf("insert into reconnect_test values (%v, %q)", i, value))
}
// Remove the limit_data toxic so that a connection can be reestablished
mysqlProxy.RemoveToxic("limit_data")
err := mysqlProxy.RemoveToxic("limit_data")
require.NoError(t, err)
t.Logf("Toxiproxy proxy limit_data toxic removed")
// Assert that all records get written to the table
waitForReplicaToCatchUp(t)
@@ -65,7 +71,7 @@ func TestBinlogReplicationAutoReconnect(t *testing.T) {
// Assert that show replica status show reconnection IO error
status := showReplicaStatus(t)
require.Equal(t, "1158", status["Last_IO_Errno"])
require.Equal(t, "unexpected EOF", status["Last_IO_Error"])
require.True(t, strings.Contains(status["Last_IO_Error"].(string), "EOF"))
requireRecentTimeString(t, status["Last_IO_Error_Timestamp"])
}
@@ -151,7 +157,7 @@ func configureToxiProxy(t *testing.T) {
toxiproxyServer.Listen("localhost", strconv.Itoa(toxiproxyPort))
}()
time.Sleep(500 * time.Millisecond)
fmt.Printf("Toxiproxy server running on port %d \n", toxiproxyPort)
t.Logf("Toxiproxy control plane running on port %d", toxiproxyPort)
toxiClient = toxiproxyclient.NewClient(fmt.Sprintf("localhost:%d", toxiproxyPort))
@@ -163,11 +169,19 @@ func configureToxiProxy(t *testing.T) {
if err != nil {
panic(fmt.Sprintf("unable to create toxiproxy: %v", err.Error()))
}
t.Logf("Toxiproxy proxy started on port %d", proxyPort)
}
mysqlProxy.AddToxic("limit_data", "limit_data", "downstream", 1.0, toxiproxyclient.Attributes{
// turnOnLimitDataToxic adds a limit_data toxic to the active Toxiproxy, which prevents more than 1KB of data
// from being sent from the primary through the proxy to the replica. Callers MUST call configureToxiProxy
// before calling this function.
func turnOnLimitDataToxic(t *testing.T) {
require.NotNil(t, mysqlProxy)
_, err := mysqlProxy.AddToxic("limit_data", "limit_data", "downstream", 1.0, toxiproxyclient.Attributes{
"bytes": 1_000,
})
fmt.Printf("Toxiproxy proxy with limit_data toxic (1KB) started on port %d \n", proxyPort)
require.NoError(t, err)
t.Logf("Toxiproxy proxy with limit_data toxic (1KB) started on port %d", proxyPort)
}
// convertByteArraysToStrings converts each []byte value in the specified map |m| into a string.
@@ -476,7 +476,7 @@ func TestCharsetsAndCollations(t *testing.T) {
// Test Helper Functions
//
// waitForReplicaToCatchUp waits (up to 20s) for the replica to catch up with the primary database. The
// waitForReplicaToCatchUp waits (up to 60s) for the replica to catch up with the primary database. The
// lag is measured by checking that gtid_executed is the same on the primary and replica.
func waitForReplicaToCatchUp(t *testing.T) {
timeLimit := 60 * time.Second