Merge pull request #5450 from dolthub/fulghum/binlog-replication

MySQL replication fixes for `START/STOP REPLICA`
This commit is contained in:
Jason Fulghum
2023-03-06 09:02:37 -08:00
committed by GitHub
5 changed files with 376 additions and 134 deletions

View File

@@ -19,6 +19,7 @@ import (
"io"
"strconv"
"strings"
"sync/atomic"
"time"
gms "github.com/dolthub/go-mysql-server"
@@ -53,16 +54,14 @@ const (
// This type is NOT used concurrently there is currently only one single applier process running to process binlog
// events, so the state in this type is NOT protected with a mutex.
type binlogReplicaApplier struct {
format mysql.BinlogFormat
tableMapsById map[uint64]*mysql.TableMap
stopReplicationChan chan struct{}
// currentGtid is the current GTID being processed, but not yet committed
currentGtid mysql.GTID
// replicationSourceUuid holds the UUID of the source server
format *mysql.BinlogFormat
tableMapsById map[uint64]*mysql.TableMap
stopReplicationChan chan struct{}
currentGtid mysql.GTID
replicationSourceUuid string
// currentPosition records which GTIDs have been successfully executed
currentPosition *mysql.Position
filters *filterConfiguration
currentPosition *mysql.Position // successfully executed GTIDs
filters *filterConfiguration
running atomic.Bool
}
func newBinlogReplicaApplier(filters *filterConfiguration) *binlogReplicaApplier {
@@ -87,7 +86,9 @@ const rowFlag_rowsAreComplete = 0x0008
// Go spawns a new goroutine to run the applier's binlog event handler.
func (a *binlogReplicaApplier) Go(ctx *sql.Context) {
go func() {
a.running.Store(true)
err := a.replicaBinlogEventHandler(ctx)
a.running.Store(false)
if err != nil {
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
@@ -97,16 +98,6 @@ func (a *binlogReplicaApplier) Go(ctx *sql.Context) {
// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
// and retrying if errors are encountered.
//
// NOTE: Our fork of Vitess currently only supports mysql_native_password auth. The latest code in the main
//
// Vitess repo supports the current MySQL default auth plugin, caching_sha2_password.
// https://dev.mysql.com/blog-archive/upgrading-to-mysql-8-0-default-authentication-plugin-considerations/
// To work around this limitation, add the following to your /etc/my.cnf:
// [mysqld]
// default-authentication-plugin=mysql_native_password
// or start mysqld with:
// --default-authentication-plugin=mysql_native_password
func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Context) (*mysql.Conn, error) {
var maxConnectionAttempts uint64
var connectRetryDelay uint32
@@ -131,15 +122,11 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
}
if replicaSourceInfo.Host == "" {
err = fmt.Errorf("fatal error: Invalid (empty) hostname when attempting to connect " +
"to the source server. Connection attempt terminated")
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, err.Error())
return nil, err
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyHostname.Error())
return nil, ErrEmptyHostname
} else if replicaSourceInfo.User == "" {
err = fmt.Errorf("fatal error: Invalid (empty) username when attempting to connect " +
"to the source server. Connection attempt terminated")
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, err.Error())
return nil, err
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyUsername.Error())
return nil, ErrEmptyUsername
}
connParams := mysql.ConnParams{
@@ -151,10 +138,24 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
}
conn, err = mysql.Connect(ctx, &connParams)
if err != nil && connectionAttempts >= maxConnectionAttempts {
return nil, err
} else if err != nil {
time.Sleep(time.Duration(connectRetryDelay) * time.Second)
if err != nil {
if connectionAttempts >= maxConnectionAttempts {
ctx.GetLogger().Errorf("Exceeded max connection attempts (%d) to source server", maxConnectionAttempts)
return nil, err
}
// If there was an error connecting (and we haven't used up all our retry attempts), listen for a
// STOP REPLICA signal or for the retry delay timer to fire. We need to use select here so that we don't
// block on our retry backoff and ignore the STOP REPLICA signal for a long time.
for {
select {
case <-a.stopReplicationChan:
ctx.GetLogger().Debugf("Received stop replication signal while trying to connect")
return nil, ErrReplicationStopped
case <-time.After(time.Duration(connectRetryDelay) * time.Second):
// Nothing to do here if our timer completes; just fall through
break
}
}
} else {
break
}
@@ -230,7 +231,7 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
// Clear out the format description in case we're reconnecting, so that we don't use the old format description
// to interpret any event messages before we receive the new format description from the new stream.
a.format = mysql.BinlogFormat{}
a.format = nil
// If the source server has binlog checksums enabled (@@global.binlog_checksum), then the replica MUST
// set @master_binlog_checksum to handshake with the server to acknowledge that it knows that checksums
@@ -254,65 +255,62 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
}
engine := server.Engine
conn, err := a.connectAndStartReplicationEventStream(ctx)
if err != nil {
return err
}
var conn *mysql.Conn
var eventProducer *binlogEventProducer
// Process binlog events
for {
if conn == nil {
ctx.GetLogger().Debug("no binlog connection to source, attempting to establish one")
if eventProducer != nil {
eventProducer.Stop()
}
var err error
if conn, err = a.connectAndStartReplicationEventStream(ctx); err == ErrReplicationStopped {
return nil
} else if err != nil {
return err
}
eventProducer = newBinlogEventProducer(conn)
eventProducer.Go(ctx)
}
select {
case <-a.stopReplicationChan:
ctx.GetLogger().Trace("received signal to stop replication routine")
return nil
default:
event, err := conn.ReadBinlogEvent()
if err != nil {
if sqlError, isSqlError := err.(*mysql.SQLError); isSqlError {
if sqlError.Message == io.EOF.Error() {
ctx.GetLogger().Trace("No more binlog messages; retrying in 1s...")
time.Sleep(1 * time.Second)
continue
} else if strings.HasPrefix(sqlError.Message, io.ErrUnexpectedEOF.Error()) {
DoltBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.LastIoError = io.ErrUnexpectedEOF.Error()
status.LastIoErrNumber = ERNetReadError
currentTime := time.Now()
status.LastIoErrorTimestamp = &currentTime
})
conn, err = a.connectAndStartReplicationEventStream(ctx)
if err != nil {
return err
}
continue
}
}
// otherwise, log the error if it's something we don't expect and continue
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setIoError(mysql.ERUnknownError, err.Error())
continue
}
// We don't support checksum validation, so we must strip off any checksum data if present, otherwise
// it could get interpreted as part of the data fields and corrupt the fields we pull out. There is not
// a future-proof guarantee on the checksum size, so we can't strip a checksum until we've seen the
// Format binlog event that definitively tells us if checksums are enabled and what algorithm they use.
if a.format.IsZero() == false {
event, _, err = event.StripChecksum(a.format)
if err != nil {
msg := fmt.Sprintf("unable to strip checksum from binlog event: '%v'", err.Error())
ctx.GetLogger().Error(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
}
err = a.processBinlogEvent(ctx, engine, event)
case event := <-eventProducer.EventChan():
err := a.processBinlogEvent(ctx, engine, event)
if err != nil {
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
}
case err := <-eventProducer.ErrorChan():
if sqlError, isSqlError := err.(*mysql.SQLError); isSqlError {
if sqlError.Message == io.EOF.Error() {
ctx.GetLogger().Trace("No more binlog messages; retrying in 1s...")
time.Sleep(1 * time.Second)
} else if strings.HasPrefix(sqlError.Message, io.ErrUnexpectedEOF.Error()) {
DoltBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.LastIoError = io.ErrUnexpectedEOF.Error()
status.LastIoErrNumber = ERNetReadError
currentTime := time.Now()
status.LastIoErrorTimestamp = &currentTime
})
eventProducer.Stop()
eventProducer = nil
conn.Close()
conn = nil
}
} else {
// otherwise, log the error if it's something we don't expect and continue
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setIoError(mysql.ERUnknownError, err.Error())
}
case <-a.stopReplicationChan:
ctx.GetLogger().Trace("received stop replication signal")
eventProducer.Stop()
return nil
}
}
}
@@ -324,6 +322,22 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
createCommit := false
commitToAllDatabases := false
// We don't support checksum validation, so we MUST strip off any checksum bytes if present, otherwise it gets
// interpreted as part of the payload and corrupts the data. Future checksum sizes, are not guaranteed to be the
// same size, so we can't strip the checksum until we've seen a valid Format binlog event that definitively
// tells us if checksums are enabled and what algorithm they use. We can NOT strip the checksum off of
// FormatDescription events, because FormatDescription always includes a CRC32 checksum, and Vitess depends on
// those bytes always being present when we parse the event into a FormatDescription type.
if a.format != nil && event.IsFormatDescription() == false {
var err error
event, _, err = event.StripChecksum(*a.format)
if err != nil {
msg := fmt.Sprintf("unable to strip checksum from binlog event: '%v'", err.Error())
ctx.GetLogger().Error(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
}
switch {
case event.IsRand():
// A RAND_EVENT contains two seed values that set the rand_seed1 and rand_seed2 system variables that are
@@ -343,7 +357,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
// replica. Used for all statements with statement-based replication, DDL statements with row-based replication
// as well as COMMITs for non-transactional engines such as MyISAM.
// For more details, see: https://mariadb.com/kb/en/query_event/
query, err := event.Query(a.format)
query, err := event.Query(*a.format)
if err != nil {
return err
}
@@ -409,18 +423,22 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
case event.IsFormatDescription():
// This is a descriptor event that is written to the beginning of a binary log file, at position 4 (after
// the 4 magic number bytes). For more details, see: https://mariadb.com/kb/en/format_description_event/
a.format, err = event.Format()
format, err := event.Format()
if err != nil {
return err
}
a.format = &format
ctx.GetLogger().WithFields(logrus.Fields{
"format": a.format,
"format": a.format,
"formatVersion": a.format.FormatVersion,
"serverVersion": a.format.ServerVersion,
"checksum": a.format.ChecksumAlgorithm,
}).Debug("Received binlog event: FormatDescription")
case event.IsPreviousGTIDs():
// Logged in every binlog to record the current replication state. Consists of the last GTID seen for each
// replication domain. For more details, see: https://mariadb.com/kb/en/gtid_list_event/
position, err := event.PreviousGTIDs(a.format)
position, err := event.PreviousGTIDs(*a.format)
if err != nil {
return err
}
@@ -431,7 +449,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
case event.IsGTID():
// For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event,
// and also to mark stand-alone (ddl). For more details, see: https://mariadb.com/kb/en/gtid_event/
gtid, isBegin, err := event.GTID(a.format)
gtid, isBegin, err := event.GTID(*a.format)
if err != nil {
return err
}
@@ -458,8 +476,8 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
// operation event and maps a table definition to a number, where the table definition consists of database
// and table names. For more details, see: https://mariadb.com/kb/en/table_map_event/
// Note: TableMap events are sent before each row event, so there is no need to persist them between restarts.
tableId := event.TableID(a.format)
tableMap, err := event.TableMap(a.format)
tableId := event.TableID(*a.format)
tableMap, err := event.TableMap(*a.format)
if err != nil {
return err
}
@@ -565,7 +583,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
}
ctx.GetLogger().Debugf("Received binlog event: %s", eventType)
tableId := event.TableID(a.format)
tableId := event.TableID(*a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
@@ -575,7 +593,7 @@ func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.Bin
return nil
}
rows, err := event.Rows(a.format, tableMap)
rows, err := event.Rows(*a.format, tableMap)
if err != nil {
return err
}

View File

@@ -31,6 +31,17 @@ var DoltBinlogReplicaController = newDoltBinlogReplicaController()
var ErrServerNotConfiguredAsReplica = fmt.Errorf(
"server is not configured as a replica; fix with CHANGE REPLICATION SOURCE TO")
// ErrEmptyHostname is returned when replication is started without a hostname configured.
var ErrEmptyHostname = fmt.Errorf("fatal error: Invalid (empty) hostname when attempting to connect " +
"to the source server. Connection attempt terminated")
// ErrEmptyUsername is returned when replication is started without a username configured.
var ErrEmptyUsername = fmt.Errorf("fatal error: Invalid (empty) username when attempting to connect " +
"to the source server. Connection attempt terminated")
// ErrReplicationStopped is an internal error that is not returned to users, and signals that STOP REPLICA was called.
var ErrReplicationStopped = fmt.Errorf("replication stop requested")
// doltBinlogReplicaController implements the BinlogReplicaController interface for a Dolt database in order to
// provide support for a Dolt server to be a replica of a MySQL primary.
//
@@ -41,7 +52,12 @@ type doltBinlogReplicaController struct {
filters *filterConfiguration
applier *binlogReplicaApplier
ctx *sql.Context
mu *sync.Mutex
// statusMutex blocks concurrent access to the ReplicaStatus struct
statusMutex *sync.Mutex
// operationMutex blocks concurrent access to the START/STOP/RESET REPLICA operations
operationMutex *sync.Mutex
}
var _ binlogreplication.BinlogReplicaController = (*doltBinlogReplicaController)(nil)
@@ -49,9 +65,12 @@ var _ binlogreplication.BinlogReplicaController = (*doltBinlogReplicaController)
// newDoltBinlogReplicaController creates a new doltBinlogReplicaController instance.
func newDoltBinlogReplicaController() *doltBinlogReplicaController {
controller := doltBinlogReplicaController{
mu: &sync.Mutex{},
filters: newFilterConfiguration(),
filters: newFilterConfiguration(),
statusMutex: &sync.Mutex{},
operationMutex: &sync.Mutex{},
}
controller.status.ConnectRetry = 60
controller.status.SourceRetryCount = 86400
controller.status.AutoPosition = true
controller.status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning
controller.status.ReplicaSqlRunning = binlogreplication.ReplicaSqlNotRunning
@@ -61,6 +80,16 @@ func newDoltBinlogReplicaController() *doltBinlogReplicaController {
// StartReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
d.operationMutex.Lock()
defer d.operationMutex.Unlock()
// START REPLICA may be called multiple times, but if replication is already running,
// it will log a warning and not start up new threads.
if d.applier.IsRunning() {
ctx.Warn(3083, "Replication thread(s) for channel '' are already running.")
return nil
}
if false {
// TODO: If the database is already configured for Dolt replication/clustering, then error out.
// Add a (BATS?) test to cover this case
@@ -82,6 +111,12 @@ func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
return err
} else if configuration == nil {
return ErrServerNotConfiguredAsReplica
} else if configuration.Host == "" {
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyHostname.Error())
return ErrEmptyHostname
} else if configuration.User == "" {
DoltBinlogReplicaController.setIoError(ERFatalReplicaError, ErrEmptyUsername.Error())
return ErrEmptyUsername
}
if d.ctx == nil {
@@ -101,14 +136,18 @@ func (d *doltBinlogReplicaController) SetExecutionContext(ctx *sql.Context) {
}
// StopReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StopReplica(_ *sql.Context) error {
func (d *doltBinlogReplicaController) StopReplica(ctx *sql.Context) error {
if d.applier.IsRunning() == false {
ctx.Warn(3084, "Replication thread(s) for channel '' are already stopped.")
return nil
}
d.applier.stopReplicationChan <- struct{}{}
d.mu.Lock()
defer d.mu.Unlock()
d.status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning
d.status.ReplicaSqlRunning = binlogreplication.ReplicaSqlNotRunning
d.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning
status.ReplicaSqlRunning = binlogreplication.ReplicaSqlNotRunning
})
return nil
}
@@ -218,8 +257,9 @@ func (d *doltBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlo
return nil, nil
}
d.mu.Lock()
defer d.mu.Unlock()
// Lock to read status consistently
d.statusMutex.Lock()
defer d.statusMutex.Unlock()
var copy = d.status
copy.SourceUser = replicaSourceInfo.User
@@ -231,26 +271,32 @@ func (d *doltBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlo
copy.ReplicateDoTables = d.filters.getDoTables()
copy.ReplicateIgnoreTables = d.filters.getIgnoreTables()
if d.applier.currentPosition != nil {
copy.ExecutedGtidSet = d.applier.currentPosition.GTIDSet.String()
copy.RetrievedGtidSet = copy.ExecutedGtidSet
}
return &copy, nil
}
// ResetReplica implements the BinlogReplicaController interface
func (d *doltBinlogReplicaController) ResetReplica(ctx *sql.Context, resetAll bool) error {
d.mu.Lock()
defer d.mu.Unlock()
d.operationMutex.Lock()
defer d.operationMutex.Unlock()
if d.status.ReplicaIoRunning != binlogreplication.ReplicaIoNotRunning ||
d.status.ReplicaSqlRunning != binlogreplication.ReplicaSqlNotRunning {
if d.applier.IsRunning() {
return fmt.Errorf("unable to reset replica while replication is running; stop replication and try again")
}
// Reset error status
d.status.LastIoErrNumber = 0
d.status.LastSqlErrNumber = 0
d.status.LastIoErrorTimestamp = nil
d.status.LastSqlErrorTimestamp = nil
d.status.LastSqlError = ""
d.status.LastIoError = ""
d.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.LastIoErrNumber = 0
status.LastSqlErrNumber = 0
status.LastIoErrorTimestamp = nil
status.LastSqlErrorTimestamp = nil
status.LastSqlError = ""
status.LastIoError = ""
})
if resetAll {
err := deleteReplicationConfiguration(ctx)
@@ -268,15 +314,15 @@ func (d *doltBinlogReplicaController) ResetReplica(ctx *sql.Context, resetAll bo
// before the specified function |f| is called, and unlocks it after |f| is finished running. The current status is
// passed into the callback function |f| and the caller can safely update or copy any fields they need.
func (d *doltBinlogReplicaController) updateStatus(f func(status *binlogreplication.ReplicaStatus)) {
d.mu.Lock()
defer d.mu.Unlock()
d.statusMutex.Lock()
defer d.statusMutex.Unlock()
f(&d.status)
}
// setIoError updates the current replication status with the specific |errno| and |message| to describe an IO error.
func (d *doltBinlogReplicaController) setIoError(errno uint, message string) {
d.mu.Lock()
defer d.mu.Unlock()
d.statusMutex.Lock()
defer d.statusMutex.Unlock()
// truncate the message to avoid errors when reporting replica status
if len(message) > 256 {
@@ -291,8 +337,8 @@ func (d *doltBinlogReplicaController) setIoError(errno uint, message string) {
// setSqlError updates the current replication status with the specific |errno| and |message| to describe an SQL error.
func (d *doltBinlogReplicaController) setSqlError(errno uint, message string) {
d.mu.Lock()
defer d.mu.Unlock()
d.statusMutex.Lock()
defer d.statusMutex.Unlock()
// truncate the message to avoid errors when reporting replica status
if len(message) > 256 {

View File

@@ -0,0 +1,101 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package binlogreplication
import (
"sync/atomic"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/mysql"
)
// binlogEventProducer is responsible for reading binlog events from an established connection and sending them back to
// a consumer over a channel. This is necessary because calls to conn.ReadBinlogEvent() block until a binlog event is
// received. If the source isn't sending more events, then the applier is blocked on reading events, and the user
// can't issue a call to STOP REPLICA. Reading binlog events in a thread and communicating with the applier via
// channels for events and errors decouples this.
type binlogEventProducer struct {
conn *mysql.Conn
errorChan chan error
eventChan chan mysql.BinlogEvent
running atomic.Bool
}
// newBinlogEventProducer creates a new binlog event producer that reads from the specified, established MySQL
// connection |conn|. The returned binlogEventProducer owns the communication channels
// and is responsible for closing them when the binlogEventProducer is stopped.
func newBinlogEventProducer(conn *mysql.Conn) *binlogEventProducer {
producer := &binlogEventProducer{
conn: conn,
eventChan: make(chan mysql.BinlogEvent),
errorChan: make(chan error),
}
producer.running.Store(true)
return producer
}
// EventChan returns the event channel through which this event
// producer sends binlog events.
func (p *binlogEventProducer) EventChan() <-chan mysql.BinlogEvent {
return p.eventChan
}
// ErrorChan returns the error channel through which this event
// producer sends any errors.
func (p *binlogEventProducer) ErrorChan() <-chan error {
return p.errorChan
}
// Go starts this binlogEventProducer in a new goroutine. Right before this routine exits, it will close the
// two communication channels it owns.
func (p *binlogEventProducer) Go(_ *sql.Context) {
go func() {
for p.IsRunning() {
// ReadBinlogEvent blocks until a binlog event can be read and returned, so this has to be done on a
// separate thread, otherwise the applier would be blocked and wouldn't be able to handle the STOP
// REPLICA signal.
event, err := p.conn.ReadBinlogEvent()
// If this binlogEventProducer has been stopped while we were blocked waiting to read the next
// binlog event, abort processing it and just return instead.
if p.IsRunning() == false {
break
}
if err != nil {
p.errorChan <- err
} else {
p.eventChan <- event
}
}
close(p.errorChan)
close(p.eventChan)
}()
}
// IsRunning returns true if this instance is processing binlog events and has not been stopped.
func (p *binlogEventProducer) IsRunning() bool {
return p.running.Load()
}
// Stop requests for this binlogEventProducer to stop processing events as soon as possible.
func (p *binlogEventProducer) Stop() {
p.running.Store(false)
}
// IsRunning returns true if this binlog applier is running and has not been stopped, otherwise returns false.
func (a *binlogReplicaApplier) IsRunning() bool {
return a.running.Load()
}

View File

@@ -31,9 +31,9 @@ var toxiClient *toxiproxyclient.Client
var mysqlProxy *toxiproxyclient.Proxy
var proxyPort int
// TestBinlogReplicationReconnection tests that the replica's connection to the primary is correctly
// TestBinlogReplicationAutoReconnect tests that the replica's connection to the primary is correctly
// reestablished if it drops.
func TestBinlogReplicationReconnection(t *testing.T) {
func TestBinlogReplicationAutoReconnect(t *testing.T) {
defer teardown(t)
startSqlServers(t)
configureToxiProxy(t)

View File

@@ -100,6 +100,33 @@ func TestBinlogReplicationSanityCheck(t *testing.T) {
assertRepoStateFileExists(t, "db01")
}
// TestFlushLogs tests that binary logs can be flushed on the primary, which forces a new binlog file to be written,
// including sending new Rotate and FormatDescription events to the replica. This is a simple sanity tests that we can
// process the events without errors.
func TestFlushLogs(t *testing.T) {
defer teardown(t)
startSqlServers(t)
startReplication(t, mySqlPort)
// Make changes on the primary and verify on the replica
primaryDatabase.MustExec("create table t (pk int primary key)")
waitForReplicaToCatchUp(t)
expectedStatement := "CREATE TABLE t ( pk int NOT NULL, PRIMARY KEY (pk)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_bin"
assertCreateTableStatement(t, replicaDatabase, "t", expectedStatement)
primaryDatabase.MustExec("flush binary logs;")
waitForReplicaToCatchUp(t)
primaryDatabase.MustExec("insert into t values (1), (2), (3);")
waitForReplicaToCatchUp(t)
rows, err := replicaDatabase.Queryx("select * from db01.t;")
require.NoError(t, err)
allRows := readAllRows(t, rows)
require.Equal(t, 3, len(allRows))
require.NoError(t, rows.Close())
}
// TestResetReplica tests that "RESET REPLICA" and "RESET REPLICA ALL" correctly clear out
// replication configuration and metadata.
func TestResetReplica(t *testing.T) {
@@ -165,22 +192,60 @@ func TestStartReplicaErrors(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, ErrServerNotConfiguredAsReplica.Error())
// For partial source configuration, START REPLICA doesn't throw an error, but an error will
// be populated in SHOW REPLICA STATUS after START REPLICA returns.
//START REPLICA doesn't return an error when replication source is only partially configured
// For an incomplete source configuration, throw an error as early as possible to make sure the user notices it.
replicaDatabase.MustExec("CHANGE REPLICATION SOURCE TO SOURCE_PORT=1234, SOURCE_HOST='localhost';")
replicaDatabase.MustExec("START REPLICA;")
rows, err := replicaDatabase.Queryx("SHOW REPLICA STATUS;")
require.NoError(t, err)
status := convertByteArraysToStrings(readNextRow(t, rows))
require.Equal(t, "13117", status["Last_IO_Errno"])
require.NotEmpty(t, status["Last_IO_Error"])
require.NotEmpty(t, status["Last_IO_Error_Timestamp"])
require.NoError(t, rows.Close())
rows, err := replicaDatabase.Queryx("START REPLICA;")
require.Error(t, err)
require.ErrorContains(t, err, "Invalid (empty) username")
require.Nil(t, rows)
// START REPLICA doesn't return an error if replication is already running
// START REPLICA logs a warning if replication is already running
startReplication(t, mySqlPort)
replicaDatabase.MustExec("START REPLICA;")
assertWarning(t, replicaDatabase, 3083, "Replication thread(s) for channel '' are already running.")
}
// TestStopReplica tests that STOP REPLICA correctly stops the replication process, and that
// warnings are logged when STOP REPLICA is invoked when replication is not running.
func TestStopReplica(t *testing.T) {
defer teardown(t)
startSqlServers(t)
// STOP REPLICA logs a warning if replication is not running
replicaDatabase.MustExec("STOP REPLICA;")
assertWarning(t, replicaDatabase, 3084, "Replication thread(s) for channel '' are already stopped.")
// Start replication with bad connection params
replicaDatabase.MustExec("SET @@GLOBAL.server_id=52;")
replicaDatabase.MustExec("CHANGE REPLICATION SOURCE TO SOURCE_HOST='doesnotexist', SOURCE_PORT=111, SOURCE_USER='nobody';")
replicaDatabase.MustExec("START REPLICA;")
time.Sleep(200 * time.Millisecond)
status := showReplicaStatus(t)
require.Equal(t, "Connecting", status["Replica_IO_Running"])
require.Equal(t, "Yes", status["Replica_SQL_Running"])
// STOP REPLICA works when replication cannot establish a connection
replicaDatabase.MustExec("STOP REPLICA;")
status = showReplicaStatus(t)
require.Equal(t, "No", status["Replica_IO_Running"])
require.Equal(t, "No", status["Replica_SQL_Running"])
// START REPLICA and verify status
startReplication(t, mySqlPort)
time.Sleep(100 * time.Millisecond)
status = showReplicaStatus(t)
require.True(t, status["Replica_IO_Running"] == "Connecting" || status["Replica_IO_Running"] == "Yes")
require.Equal(t, "Yes", status["Replica_SQL_Running"])
// STOP REPLICA stops replication when it is running and connected to the source
replicaDatabase.MustExec("STOP REPLICA;")
status = showReplicaStatus(t)
require.Equal(t, "No", status["Replica_IO_Running"])
require.Equal(t, "No", status["Replica_SQL_Running"])
// STOP REPLICA logs a warning if replication is not running
replicaDatabase.MustExec("STOP REPLICA;")
assertWarning(t, replicaDatabase, 3084, "Replication thread(s) for channel '' are already stopped.")
}
// TestDoltCommits tests that Dolt commits are created and use correct transaction boundaries.
@@ -415,6 +480,18 @@ func waitForReplicaToReachGtid(t *testing.T, target int) {
t.Fatal("replica did not reach target GTID within " + timeLimit.String())
}
// assertWarning asserts that the specified |database| has a warning with |code| and |message|,
// otherwise it will fail the current test.
func assertWarning(t *testing.T, database *sqlx.DB, code int, message string) {
rows, err := database.Queryx("SHOW WARNINGS;")
require.NoError(t, err)
warning := convertByteArraysToStrings(readNextRow(t, rows))
require.Equal(t, strconv.Itoa(code), warning["Code"])
require.Equal(t, message, warning["Message"])
require.False(t, rows.Next())
require.NoError(t, rows.Close())
}
func queryGtid(t *testing.T, database *sqlx.DB) string {
rows, err := database.Queryx("SELECT @@global.gtid_executed as gtid_executed;")
require.NoError(t, err)
@@ -455,7 +532,7 @@ func startSqlServers(t *testing.T) {
t.Skip("Skipping binlog replication integ tests in CI environment on Mac OS")
}
testDir = filepath.Join(os.TempDir(), t.Name()+"-"+time.Now().Format("12345"))
testDir = filepath.Join(os.TempDir(), fmt.Sprintf("%s-%v", t.Name(), time.Now().Unix()))
err := os.MkdirAll(testDir, 0777)
cmd := exec.Command("chmod", "777", testDir)