PR Feedback: switching to use ctx.GetLogger() instead of logrus.StandardLogger

This commit is contained in:
Jason Fulghum
2023-02-10 15:38:13 -08:00
parent df4f273890
commit 38f00ad568
3 changed files with 44 additions and 48 deletions
@@ -94,7 +94,7 @@ func (a *binlogReplicaApplier) Go(ctx *sql.Context) {
go func() {
err := a.replicaBinlogEventHandler(ctx)
if err != nil {
logger.Errorf("unexpected error of type %T: '%v'", err, err.Error())
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
}
}()
@@ -206,7 +206,7 @@ func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, con
// with a "+" prefix. For now, just ignore the "+" prefix if we see it.
// https://dev.mysql.com/doc/refman/8.0/en/replication-options-gtids.html#sysvar_gtid_purged
if strings.HasPrefix(gtidPurged, "+") {
logger.Warnf("Ignoring unsupported '+' prefix on @@GTID_PURGED value")
ctx.GetLogger().Warnf("Ignoring unsupported '+' prefix on @@GTID_PURGED value")
gtidPurged = gtidPurged[1:]
}
@@ -254,14 +254,14 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
for {
select {
case <-a.stopReplicationChan:
logger.Trace("received signal to stop replication routine")
ctx.GetLogger().Trace("received signal to stop replication routine")
return nil
default:
event, err := conn.ReadBinlogEvent()
if err != nil {
if sqlError, isSqlError := err.(*mysql.SQLError); isSqlError {
if sqlError.Message == io.EOF.Error() {
logger.Trace("No more binlog messages; retrying in 1s...")
ctx.GetLogger().Trace("No more binlog messages; retrying in 1s...")
time.Sleep(1 * time.Second)
continue
} else if strings.HasPrefix(sqlError.Message, io.ErrUnexpectedEOF.Error()) {
@@ -278,13 +278,13 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
continue
} else if strings.Contains(sqlError.Message, "can not handle replication events with the checksum") {
// Ignore any errors about checksums
logger.Debug("ignoring binlog checksum error message")
ctx.GetLogger().Debug("ignoring binlog checksum error message")
continue
}
}
// otherwise, log the error if it's something we don't expect and continue
logger.Errorf("unexpected error of type %T: '%v'", err, err.Error())
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setIoError(mysql.ERUnknownError, err.Error())
continue
@@ -292,7 +292,7 @@ func (a *binlogReplicaApplier) replicaBinlogEventHandler(ctx *sql.Context) error
err = a.processBinlogEvent(ctx, engine, event)
if err != nil {
logger.Errorf("unexpected error of type %T: '%v'", err, err.Error())
ctx.GetLogger().Errorf("unexpected error of type %T: '%v'", err, err.Error())
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, err.Error())
}
}
@@ -311,12 +311,12 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
// A RAND_EVENT contains two seed values that set the rand_seed1 and rand_seed2 system variables that are
// used to compute the random number. For more details, see: https://mariadb.com/kb/en/rand_event/
// Note: it is written only before a QUERY_EVENT and is NOT used with row-based logging.
logger.Debug("Received binlog event: Rand")
ctx.GetLogger().Debug("Received binlog event: Rand")
case event.IsXID():
// An XID event is generated for a COMMIT of a transaction that modifies one or more tables of an
// XA-capable storage engine. For more details, see: https://mariadb.com/kb/en/xid_event/
logger.Debug("Received binlog event: XID")
ctx.GetLogger().Debug("Received binlog event: XID")
createCommit = true
commitToAllDatabases = true
@@ -329,7 +329,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if err != nil {
return err
}
logger.WithFields(logrus.Fields{
ctx.GetLogger().WithFields(logrus.Fields{
"database": query.Database,
"charset": query.Charset,
"query": query.SQL,
@@ -350,7 +350,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
// pointing to the next file in the sequence. ROTATE_EVENT is generated locally and written to the binary log
// on the source server and it's also written when a FLUSH LOGS statement occurs on the source server.
// For more details, see: https://mariadb.com/kb/en/rotate_event/
logger.Debug("Received binlog event: Rotate")
ctx.GetLogger().Debug("Received binlog event: Rotate")
case event.IsFormatDescription():
// This is a descriptor event that is written to the beginning of a binary log file, at position 4 (after
@@ -359,7 +359,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if err != nil {
return err
}
logger.WithFields(logrus.Fields{
ctx.GetLogger().WithFields(logrus.Fields{
"format": a.format,
}).Debug("Received binlog event: FormatDescription")
@@ -370,7 +370,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if err != nil {
return err
}
logger.WithFields(logrus.Fields{
ctx.GetLogger().WithFields(logrus.Fields{
"previousGtids": position.GTIDSet.String(),
}).Debug("Received binlog event: PreviousGTIDs")
@@ -382,9 +382,9 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
return err
}
if isBegin {
logger.Errorf("unsupported binlog protocol message: GTID event with 'isBegin' set to true")
ctx.GetLogger().Errorf("unsupported binlog protocol message: GTID event with 'isBegin' set to true")
}
logger.WithFields(logrus.Fields{
ctx.GetLogger().WithFields(logrus.Fields{
"gtid": gtid,
"isBegin": isBegin,
}).Debug("Received binlog event: GTID")
@@ -409,7 +409,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if err != nil {
return err
}
logger.WithFields(logrus.Fields{
ctx.GetLogger().WithFields(logrus.Fields{
"id": tableId,
"tableName": tableMap.Name,
"database": tableMap.Database,
@@ -420,7 +420,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if tableId == 0xFFFFFF {
// Table ID 0xFFFFFF is a special value that indicates table maps can be freed.
logger.Infof("binlog protocol message: table ID '0xFFFFFF'; clearing table maps")
ctx.GetLogger().Infof("binlog protocol message: table ID '0xFFFFFF'; clearing table maps")
a.tableMapsById = make(map[uint64]*mysql.TableMap)
} else {
flags := tableMap.Flags
@@ -433,7 +433,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: TableMap event with unsupported flags '%x'", flags)
logger.Errorf(msg)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
a.tableMapsById[tableId] = tableMap
@@ -442,14 +442,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
case event.IsDeleteRows():
// A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated.
// For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
logger.Debug("Received binlog event: DeleteRows")
ctx.GetLogger().Debug("Received binlog event: DeleteRows")
tableId := event.TableID(a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}
if a.filters.isTableFilteredOut(tableMap) {
if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
a.modifiedDatabases[tableMap.Database] = struct{}{}
@@ -469,7 +469,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: DeleteRows event with unsupported flags '%x'", flags)
logger.Errorf(msg)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
@@ -477,13 +477,13 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
return err
}
logger.Debugf(" - Deleted Rows (table: %s)", tableMap.Name)
ctx.GetLogger().Debugf(" - Deleted Rows (table: %s)", tableMap.Name)
for _, row := range rows.Rows {
deletedRow, err := parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify)
if err != nil {
return err
}
logger.Debugf(" - Identify: %v ", sql.FormatRow(deletedRow))
ctx.GetLogger().Debugf(" - Identify: %v ", sql.FormatRow(deletedRow))
foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0
writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled)
@@ -505,14 +505,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
case event.IsWriteRows():
// A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated.
// For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
logger.Debug("Received binlog event: WriteRows")
ctx.GetLogger().Debug("Received binlog event: WriteRows")
tableId := event.TableID(a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}
if a.filters.isTableFilteredOut(tableMap) {
if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
a.modifiedDatabases[tableMap.Database] = struct{}{}
@@ -533,7 +533,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: WriteRows event with unsupported flags '%x'", flags)
logger.Errorf(msg)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
@@ -541,14 +541,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
return err
}
logger.Debugf(" - New Rows (table: %s)", tableMap.Name)
ctx.GetLogger().Debugf(" - New Rows (table: %s)", tableMap.Name)
// TODO: batch writes to the same table
for _, row := range rows.Rows {
newRow, err := parseRow(ctx, tableMap, schema, rows.DataColumns, row.NullColumns, row.Data)
if err != nil {
return err
}
logger.Debugf(" - Data: %v ", sql.FormatRow(newRow))
ctx.GetLogger().Debugf(" - Data: %v ", sql.FormatRow(newRow))
// TODO: Make this retry logic generic for insert/update/delete ?
// TODO: LOTS of duplication here! This would be super helpful to clean up
@@ -568,7 +568,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
err = closeWriteSession(ctx, engine, tableMap.Database, writeSession)
if err != nil {
if errors.Is(datas.ErrOptimisticLockFailed, err) && retryCount < 3 {
logger.Errorf("Retrying after error writing table updates: %s", err)
ctx.GetLogger().Errorf("Retrying after error writing table updates: %s", err)
retryCount++
continue
} else {
@@ -583,14 +583,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
case event.IsUpdateRows():
// A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated.
// For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
logger.Debug("Received binlog event: UpdateRows")
ctx.GetLogger().Debug("Received binlog event: UpdateRows")
tableId := event.TableID(a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}
if a.filters.isTableFilteredOut(tableMap) {
if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
a.modifiedDatabases[tableMap.Database] = struct{}{}
@@ -610,7 +610,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: UpdateRows event with unsupported flags '%x'", flags)
logger.Errorf(msg)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
@@ -618,7 +618,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
return err
}
logger.Debugf(" - Updated Rows (table: %s)", tableMap.Name)
ctx.GetLogger().Debugf(" - Updated Rows (table: %s)", tableMap.Name)
for _, row := range rows.Rows {
identifyRow, err := parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify)
if err != nil {
@@ -628,7 +628,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
if err != nil {
return err
}
logger.Debugf(" - Identify: %v Data: %v ", sql.FormatRow(identifyRow), sql.FormatRow(updatedRow))
ctx.GetLogger().Debugf(" - Identify: %v Data: %v ", sql.FormatRow(identifyRow), sql.FormatRow(updatedRow))
foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0
writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled)
@@ -656,7 +656,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
// network by a primary to a replica to let it know that the primary is still alive, and is only sent
// when the primary has no binlog events to send to replica servers.
// For more details, see: https://mariadb.com/kb/en/heartbeat_log_event/
logger.Debug("Received binlog event: Heartbeat")
ctx.GetLogger().Debug("Received binlog event: Heartbeat")
} else {
return fmt.Errorf("received unknown event: %v", event)
}
@@ -676,7 +676,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
a.currentPosition.GTIDSet = a.currentPosition.GTIDSet.AddGTID(a.currentGtid)
err := sql.SystemVariables.AssignValues(map[string]interface{}{"gtid_executed": a.currentPosition.GTIDSet.String()})
if err != nil {
logger.Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error())
ctx.GetLogger().Errorf("unable to set @@GLOBAL.gtid_executed: %s", err.Error())
}
err = positionStore.Save(ctx, a.currentPosition)
if err != nil {
@@ -684,7 +684,7 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
}
// For now, create a Dolt commit from every data update. Eventually, we'll want to make this configurable.
logger.Trace("Creating Dolt commit(s)")
ctx.GetLogger().Trace("Creating Dolt commit(s)")
for _, database := range databasesToCommit {
executeQueryWithEngine(ctx, engine, "use `"+database+"`;")
executeQueryWithEngine(ctx, engine,
@@ -950,7 +950,7 @@ func loadReplicaServerId() (uint32, error) {
func executeQueryWithEngine(ctx *sql.Context, engine *gms.Engine, query string) {
if ctx.GetCurrentDatabase() == "" {
logger.Warn("No current database selected")
ctx.GetLogger().Warn("No current database selected")
}
_, iter, err := engine.Query(ctx, query)
@@ -958,7 +958,7 @@ func executeQueryWithEngine(ctx *sql.Context, engine *gms.Engine, query string)
// Log any errors, except for commits with "nothing to commit"
if err.Error() != "nothing to commit" {
msg := fmt.Sprintf("ERROR executing query: %v ", err.Error())
logger.Errorf(msg)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
return
@@ -967,7 +967,7 @@ func executeQueryWithEngine(ctx *sql.Context, engine *gms.Engine, query string)
_, err := iter.Next(ctx)
if err != nil {
if err != io.EOF {
logger.Errorf("ERROR reading query results: %v ", err.Error())
ctx.GetLogger().Errorf("ERROR reading query results: %v ", err.Error())
}
return
}
@@ -20,8 +20,6 @@ import (
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
@@ -29,8 +27,6 @@ import (
var DoltBinlogReplicaController = newDoltBinlogReplicaController()
var logger = logrus.StandardLogger()
// ErrServerNotConfiguredAsReplica is returned when replication is started without enough configuration provided.
var ErrServerNotConfiguredAsReplica = fmt.Errorf(
"server is not configured as a replica; fix with CHANGE REPLICATION SOURCE TO")
@@ -92,7 +88,7 @@ func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
return fmt.Errorf("no execution context set for the replica controller")
}
logger.Info("starting binlog replication...")
ctx.GetLogger().Info("starting binlog replication...")
d.applier.Go(d.ctx)
return nil
}
@@ -98,7 +98,7 @@ func (fc *filterConfiguration) setIgnoreTables(urts []sql.UnresolvedTable) error
// isTableFilteredOut returns true if the table identified by |tableMap| has been filtered out on this replica and
// should not have any updates applied from binlog messages.
func (fc *filterConfiguration) isTableFilteredOut(tableMap *mysql.TableMap) bool {
func (fc *filterConfiguration) isTableFilteredOut(ctx *sql.Context, tableMap *mysql.TableMap) bool {
if fc == nil {
return false
}
@@ -116,7 +116,7 @@ func (fc *filterConfiguration) isTableFilteredOut(tableMap *mysql.TableMap) bool
if len(fc.doTables) > 0 {
if doTables, ok := fc.doTables[db]; ok {
if _, ok := doTables[table]; !ok {
logger.Tracef("skipping table %s.%s (not in doTables) ", tableMap.Database, tableMap.Name)
ctx.GetLogger().Tracef("skipping table %s.%s (not in doTables) ", tableMap.Database, tableMap.Name)
return true
}
}
@@ -126,7 +126,7 @@ func (fc *filterConfiguration) isTableFilteredOut(tableMap *mysql.TableMap) bool
if ignoredTables, ok := fc.ignoreTables[db]; ok {
if _, ok := ignoredTables[table]; ok {
// If this table is being ignored, don't process any further
logger.Tracef("skipping table %s.%s (in ignoreTables)", tableMap.Database, tableMap.Name)
ctx.GetLogger().Tracef("skipping table %s.%s (in ignoreTables)", tableMap.Database, tableMap.Name)
return true
}
}