Minor tweaks and exposing logged errors to SHOW REPLICA STATUS

This commit is contained in:
Jason Fulghum
2023-01-25 16:49:47 -08:00
parent 3dd93dcf50
commit 4b0e35bd51
2 changed files with 13 additions and 15 deletions

View File

@@ -161,7 +161,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
}
// Request binlog events to start
// TODO: This also needs retry logic
// TODO: This should also have retry logic
err = a.startReplicationEventStream(ctx, conn)
if err != nil {
return nil, err
@@ -233,13 +233,10 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
time.Sleep(1 * time.Second)
continue
} else if strings.HasPrefix(sqlError.Message, io.ErrUnexpectedEOF.Error()) {
// TODO: Do we have these errors defined in GMS anywhere yet?
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
// TODO: Are we also setting errors when we log them?
const ER_NET_READ_ERROR = 1158
const ERNetReadError = 1158
DoltBinlogReplicaController.updateStatus(func(status *binlogreplication.ReplicaStatus) {
status.LastIoError = io.ErrUnexpectedEOF.Error()
status.LastIoErrNumber = ER_NET_READ_ERROR
status.LastIoErrNumber = ERNetReadError
currentTime := time.Now()
status.LastIoErrorTimestamp = &currentTime
})
@@ -249,25 +246,24 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
}
continue
} else if strings.Contains(sqlError.Message, "can not handle replication events with the checksum") {
// For now, just ignore any errors about checksums
logger.Debug("received binlog checksum error message")
// Ignore any errors about checksums
logger.Debug("ignoring binlog checksum error message")
continue
}
}
// otherwise, log the error if it's something we don't expect and continue
logger.Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setIoError(mysql.ERUnknownError, err.Error())
continue
}
err = a.processBinlogEvent(ctx, engine, event)
if err != nil {
logger.Errorf("unexpected error of type %T: '%v'", err, err.Error())
err = nil
// TODO: Need a general pass on error handling; make sure handling is sane and logged and status updated
// TODO: We don't currently return an error from this function yet.
// Need to refine error handling so that all errors are always logged, but not all errors
// stop the replication processor and ensure proper retry is in place for various operations.
const UNKNOWN = 0
DoltBinlogReplicaController.setSqlError(UNKNOWN, err.Error())
}
}
}

View File

@@ -66,7 +66,7 @@ func newDoltBinlogReplicaController() *doltBinlogReplicaController {
// StartReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
if false {
// TODO: If the database is already configured for Dolt replication/clustering, then error out
// TODO: If the database is already configured for Dolt replication/clustering, then error out.
// Add a (BATS?) test to cover this case
return fmt.Errorf("dolt replication already enabled; unable to use binlog replication with other replication modes. " +
"Disable Dolt replication first before starting binlog replication")
@@ -291,7 +291,9 @@ func (d *doltBinlogReplicaController) setSqlError(errno uint, message string) {
d.status.LastSqlError = message
}
// ------------------------------------------------
//
// Helper functions
//
func getOptionValueAsString(option binlogreplication.ReplicationOption) (string, error) {
stringOptionValue, ok := option.Value.(binlogreplication.StringReplicationOptionValue)