Merge pull request #5501 from dolthub/fulghum/binlog-replication

Minor bug fixes for MySQL binlog replication
This commit is contained in:
Jason Fulghum
2023-03-07 16:28:28 -08:00
committed by GitHub
3 changed files with 25 additions and 4 deletions
@@ -304,7 +304,7 @@ func (d *doltBinlogReplicaController) ResetReplica(ctx *sql.Context, resetAll bo
return err
}
d.filters = nil
d.filters = newFilterConfiguration()
}
return nil
@@ -137,8 +137,8 @@ func requireRecentTimeString(t *testing.T, datetime interface{}) {
// name of each column.
func showReplicaStatus(t *testing.T) map[string]interface{} {
rows, err := replicaDatabase.Queryx("show replica status;")
defer rows.Close()
require.NoError(t, err)
defer rows.Close()
return convertByteArraysToStrings(readNextRow(t, rows))
}
@@ -33,9 +33,10 @@ import (
"time"
"github.com/jmoiron/sqlx"
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/require"
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
_ "github.com/go-sql-driver/mysql"
)
var mySqlPort, doltPort int
@@ -172,6 +173,14 @@ func TestResetReplica(t *testing.T) {
require.NoError(t, err)
require.False(t, rows.Next())
require.NoError(t, rows.Close())
// Start replication again and verify that we can still query replica status
startReplication(t, mySqlPort)
replicaStatus := showReplicaStatus(t)
require.Equal(t, "0", replicaStatus["Last_Errno"])
require.Equal(t, "", replicaStatus["Last_Error"])
require.True(t, replicaStatus["Replica_IO_Running"] == binlogreplication.ReplicaIoRunning ||
replicaStatus["Replica_IO_Running"] == binlogreplication.ReplicaIoConnecting)
}
// TestStartReplicaErrors tests that the "START REPLICA" command returns appropriate responses
@@ -205,6 +214,18 @@ func TestStartReplicaErrors(t *testing.T) {
assertWarning(t, replicaDatabase, 3083, "Replication thread(s) for channel '' are already running.")
}
// TestShowReplicaStatus tests various cases "SHOW REPLICA STATUS" that aren't covered by other tests.
func TestShowReplicaStatus(t *testing.T) {
defer teardown(t)
startSqlServers(t)
// Assert that very long hostnames are handled correctly
longHostname := "really.really.really.really.long.host.name.012345678901234567890123456789012345678901234567890123456789.com"
replicaDatabase.MustExec(fmt.Sprintf("CHANGE REPLICATION SOURCE TO SOURCE_HOST='%s';", longHostname))
status := showReplicaStatus(t)
require.Equal(t, longHostname, status["Source_Host"])
}
// TestStopReplica tests that STOP REPLICA correctly stops the replication process, and that
// warnings are logged when STOP REPLICA is invoked when replication is not running.
func TestStopReplica(t *testing.T) {