Cleaning up duplication in processing row events

This commit is contained in:
Jason Fulghum
2023-02-14 09:41:25 -08:00
parent 38f00ad568
commit df1ae28ed2
@@ -15,7 +15,6 @@
package binlogreplication
import (
"errors"
"fmt"
"io"
"strconv"
@@ -39,7 +38,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer"
"github.com/dolthub/dolt/go/libraries/doltcore/sqlserver"
"github.com/dolthub/dolt/go/store/datas"
)
// positionStore is a singleton instance for loading/saving binlog position state to disk for durable storage.
@@ -439,214 +437,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
a.tableMapsById[tableId] = tableMap
}
case event.IsDeleteRows():
case event.IsDeleteRows(), event.IsWriteRows(), event.IsUpdateRows():
// A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated.
// For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
ctx.GetLogger().Debug("Received binlog event: DeleteRows")
tableId := event.TableID(a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}
if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
a.modifiedDatabases[tableMap.Database] = struct{}{}
rows, err := event.Rows(a.format, tableMap)
err = a.processRowEvent(ctx, event, engine)
if err != nil {
return err
}
flags := rows.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags &^ rowFlag_endOfStatement
}
if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks {
flags = flags &^ rowFlag_noForeignKeyChecks
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: DeleteRows event with unsupported flags '%x'", flags)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - Deleted Rows (table: %s)", tableMap.Name)
for _, row := range rows.Rows {
deletedRow, err := parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - Identify: %v ", sql.FormatRow(deletedRow))
foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0
writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled)
if err != nil {
return err
}
err = tableWriter.Delete(ctx, deletedRow)
if err != nil {
return err
}
err = closeWriteSession(ctx, engine, tableMap.Database, writeSession)
if err != nil {
return err
}
}
case event.IsWriteRows():
// A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated.
// For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
ctx.GetLogger().Debug("Received binlog event: WriteRows")
tableId := event.TableID(a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}
if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
a.modifiedDatabases[tableMap.Database] = struct{}{}
rows, err := event.Rows(a.format, tableMap)
if err != nil {
return err
}
flags := rows.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags &^ rowFlag_endOfStatement
}
if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags &^ rowFlag_noForeignKeyChecks
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: WriteRows event with unsupported flags '%x'", flags)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - New Rows (table: %s)", tableMap.Name)
// TODO: batch writes to the same table
for _, row := range rows.Rows {
newRow, err := parseRow(ctx, tableMap, schema, rows.DataColumns, row.NullColumns, row.Data)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - Data: %v ", sql.FormatRow(newRow))
// TODO: Make this retry logic generic for insert/update/delete ?
// TODO: LOTS of duplication here! This would be super helpful to clean up
retryCount := 0
for {
foreignKeyChecksDisabled := rows.Flags&rowFlag_noForeignKeyChecks > 0
writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled)
if err != nil {
return err
}
err = tableWriter.Insert(ctx, newRow)
if err != nil {
return err
}
err = closeWriteSession(ctx, engine, tableMap.Database, writeSession)
if err != nil {
if errors.Is(datas.ErrOptimisticLockFailed, err) && retryCount < 3 {
ctx.GetLogger().Errorf("Retrying after error writing table updates: %s", err)
retryCount++
continue
} else {
return err
}
} else {
break
}
}
}
case event.IsUpdateRows():
// A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated.
// For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/
ctx.GetLogger().Debug("Received binlog event: UpdateRows")
tableId := event.TableID(a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}
if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
a.modifiedDatabases[tableMap.Database] = struct{}{}
rows, err := event.Rows(a.format, tableMap)
if err != nil {
return err
}
flags := rows.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags &^ rowFlag_endOfStatement
}
if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks {
flags = flags &^ rowFlag_noForeignKeyChecks
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: UpdateRows event with unsupported flags '%x'", flags)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - Updated Rows (table: %s)", tableMap.Name)
for _, row := range rows.Rows {
identifyRow, err := parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify)
if err != nil {
return err
}
updatedRow, err := parseRow(ctx, tableMap, schema, rows.DataColumns, row.NullColumns, row.Data)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - Identify: %v Data: %v ", sql.FormatRow(identifyRow), sql.FormatRow(updatedRow))
foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0
writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled)
if err != nil {
return err
}
err = tableWriter.Update(ctx, identifyRow, updatedRow)
if err != nil {
return err
}
err = closeWriteSession(ctx, engine, tableMap.Database, writeSession)
if err != nil {
return err
}
}
default:
// We can't access the bytes directly because these non-interface types in Vitess are not exposed.
// Having a Bytes() or Type() method on the Vitess interface would let us clean this up.
@@ -699,6 +497,108 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms.
return nil
}
// processRowEvent processes a WriteRows, DeleteRows, or UpdateRows binlog event and returns an error if any problems
// were encountered.
func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.BinlogEvent, engine *gms.Engine) error {
switch {
case event.IsDeleteRows():
ctx.GetLogger().Debug("Received binlog event: DeleteRows")
case event.IsWriteRows():
ctx.GetLogger().Debug("Received binlog event: WriteRows")
case event.IsUpdateRows():
ctx.GetLogger().Debug("Received binlog event: UpdateRows")
default:
return fmt.Errorf("unsupported event type: %v", event)
}
tableId := event.TableID(a.format)
tableMap, ok := a.tableMapsById[tableId]
if !ok {
return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId)
}
if a.filters.isTableFilteredOut(ctx, tableMap) {
return nil
}
a.modifiedDatabases[tableMap.Database] = struct{}{}
rows, err := event.Rows(a.format, tableMap)
if err != nil {
return err
}
flags := rows.Flags
if flags&rowFlag_endOfStatement == rowFlag_endOfStatement {
// nothing to be done for end of statement; just clear the flag and move on
flags = flags &^ rowFlag_endOfStatement
}
if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks {
flags = flags &^ rowFlag_noForeignKeyChecks
}
if flags != 0 {
msg := fmt.Sprintf("unsupported binlog protocol message: DeleteRows event with unsupported flags '%x'", flags)
ctx.GetLogger().Errorf(msg)
DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg)
}
schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database)
if err != nil {
return err
}
switch {
case event.IsDeleteRows():
ctx.GetLogger().Debugf(" - Deleted Rows (table: %s)", tableMap.Name)
case event.IsUpdateRows():
ctx.GetLogger().Debugf(" - Updated Rows (table: %s)", tableMap.Name)
case event.IsWriteRows():
ctx.GetLogger().Debugf(" - New Rows (table: %s)", tableMap.Name)
}
for _, row := range rows.Rows {
var identityRow, dataRow sql.Row
if len(row.Identify) > 0 {
identityRow, err = parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - Identity: %v ", sql.FormatRow(identityRow))
}
if len(row.Data) > 0 {
dataRow, err = parseRow(ctx, tableMap, schema, rows.DataColumns, row.NullColumns, row.Data)
if err != nil {
return err
}
ctx.GetLogger().Debugf(" - Data: %v ", sql.FormatRow(dataRow))
}
foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0
writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled)
if err != nil {
return err
}
switch {
case event.IsDeleteRows():
err = tableWriter.Delete(ctx, identityRow)
case event.IsWriteRows():
err = tableWriter.Insert(ctx, dataRow)
case event.IsUpdateRows():
err = tableWriter.Update(ctx, identityRow, dataRow)
}
if err != nil {
return err
}
err = closeWriteSession(ctx, engine, tableMap.Database, writeSession)
if err != nil {
return err
}
}
return nil
}
//
// Helper functions
//