Adding support for connecting to a primary with binlog checksums configured.

This commit is contained in:
Jason Fulghum
2023-02-22 16:43:00 -08:00
parent cca6cb7a53
commit 1d044a81e4
2 changed files with 27 additions and 5 deletions

View File

@@ -228,6 +228,20 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
a.currentPosition = position
// Clear out the format description in case we're reconnecting, so that we don't use the old format description
// to interpret any event messages before we receive the new format description from the new stream.
a.format = mysql.BinlogFormat{}
// If the source server has binlog checksums enabled (@@global.binlog_checksum), then the replica MUST
// set @master_binlog_checksum to handshake with the server to acknowledge that it knows that checksums
// are in use. Without this step, the server will just send back error messages saying that the replica
// does not support the binlog checksum algorithm in use on the primary.
// For more details, see: https://dev.mysql.com/worklog/task/?id=2540
_, err = conn.ExecuteFetch("set @master_binlog_checksum=@@global.binlog_checksum;", 0, false)
if err != nil {
return err
}
return conn.SendBinlogDumpCommand(serverId, *position)
}
@@ -271,10 +285,6 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
return err
}
continue
} else if strings.Contains(sqlError.Message, "can not handle replication events with the checksum") {
// Ignore any errors about checksums
ctx.GetLogger().Debug("ignoring binlog checksum error message")
continue
}
}
@@ -285,6 +295,19 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
continue
}
// We don't support checksum validation, so we must strip off any checksum data if present, otherwise
// it could get interpreted as part of the data fields and corrupt the fields we pull out. There is not
// a future-proof guarantee on the checksum size, so we can't strip a checksum until we've seen the
// Format binlog event that definitively tells us if checksums are enabled and what algorithm they use.
if a.format.IsZero() == false {
event, _, err = event.StripChecksum(a.format)
if err != nil {
msg := fmt.Sprintf("unable to strip checksum from binlog event: '%v'", err.Error())
ctx.GetLogger().Error(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
}
err = a.processBinlogEvent(ctx, engine, event)
if err != nil {
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())

View File

@@ -592,7 +592,6 @@ func startMySqlServer(dir string) (int, *os.Process, error) {
fmt.Sprintf("--port=%v", mySqlPort),
"--server-id=11223344",
fmt.Sprintf("--socket=mysql-%v.sock", mySqlPort),
"--binlog-checksum=NONE",
"--general_log_file="+dir+"general_log",
"--log-bin="+dir+"log_bin",
"--slow_query_log_file="+dir+"slow_query_log",