Cleaning up flag handling for row events

This commit is contained in:
Jason Fulghum
2023-01-17 16:43:26 -08:00
parent acad86872c
commit 0a65502cd7

View File

@@ -351,11 +351,15 @@ func (d *doltBinlogReplicaController) setSqlError(errno uint, message string) {
}
// Row Flags https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
const endOfStatementRowFlag = 0x0001
const noForeignKeyChecksRowFlag = 0x0002
const noUniqueKeyChecksRowFlag = 0x0004
const rowsAreCompleteRowFlag = 0x0008
const noCheckConstraintsRowFlag = 0x0010
// rowFlag_endOfStatement indicates that a row event with this flag set is the last event in a statement.
const rowFlag_endOfStatement = 0x0001
const rowFlag_noForeignKeyChecks = 0x0002
const rowFlag_noUniqueKeyChecks = 0x0004
const rowFlag_noCheckConstraints = 0x0010
// rowFlag_rowsAreComplete indicates that rows in this event are complete, and contain values for all columns of the table.
const rowFlag_rowsAreComplete = 0x0008
// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
// and retrying if errors are encountered.
@@ -613,12 +617,17 @@ func (d *doltBinlogReplicaController) processBinlogEvent(ctx *sql.Context, engin
}).Debug("Received binlog event: TableMap")
if tableId == 0xFFFFFF {
// TODO: Handle special Table ID value 0xFFFFFF:
// TODO: Handle special Table ID value 0xFFFFFF. We have yet to see this specific message.
// Table id refers to a table defined by TABLE_MAP_EVENT. The special value 0xFFFFFF should have
// "end of statement flag" (0x0001) set and indicates that table maps can be freed.
logger.Errorf("unsupported binlog protocol message: TableMap event with table ID '0xFFFFFF'")
}
if tableMap.Flags != 0 {
flags := tableMap.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag
flags = flags ^ rowFlag_endOfStatement
}
if flags != 0 {
logger.Errorf("unsupported binlog protocol message: TableMap event with flags '%x'", tableMap.Flags)
}
tableMapsById[tableId] = tableMap
@@ -642,7 +651,13 @@ func (d *doltBinlogReplicaController) processBinlogEvent(ctx *sql.Context, engin
if err != nil {
return err
}
if rows.Flags != 0 {
flags := rows.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags ^ rowFlag_endOfStatement
}
if flags != 0 {
logger.Errorf("unsupported binlog protocol message: DeleteRows event with flags '%x'", tableMap.Flags)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
@@ -693,7 +708,13 @@ func (d *doltBinlogReplicaController) processBinlogEvent(ctx *sql.Context, engin
if err != nil {
return err
}
if rows.Flags != 0 {
flags := rows.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags ^ rowFlag_endOfStatement
}
if flags != 0 {
logger.Errorf("unsupported binlog protocol message: WriteRows event with flags '%x'", tableMap.Flags)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
@@ -755,7 +776,13 @@ func (d *doltBinlogReplicaController) processBinlogEvent(ctx *sql.Context, engin
if err != nil {
return err
}
if rows.Flags != 0 {
flags := rows.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags ^ rowFlag_endOfStatement
}
if flags != 0 {
logger.Errorf("unsupported binlog protocol message: UpdateRows event with flags '%x'", tableMap.Flags)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)