diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go index 6fafcbeafd..c55b9d1f2b 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_replica_applier.go @@ -15,7 +15,6 @@ package binlogreplication import ( - "errors" "fmt" "io" "strconv" @@ -39,7 +38,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/sqle/globalstate" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/writer" "github.com/dolthub/dolt/go/libraries/doltcore/sqlserver" - "github.com/dolthub/dolt/go/store/datas" ) // positionStore is a singleton instance for loading/saving binlog position state to disk for durable storage. @@ -439,214 +437,14 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms. a.tableMapsById[tableId] = tableMap } - case event.IsDeleteRows(): + case event.IsDeleteRows(), event.IsWriteRows(), event.IsUpdateRows(): // A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated. // For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/ - ctx.GetLogger().Debug("Received binlog event: DeleteRows") - tableId := event.TableID(a.format) - tableMap, ok := a.tableMapsById[tableId] - if !ok { - return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId) - } - - if a.filters.isTableFilteredOut(ctx, tableMap) { - return nil - } - a.modifiedDatabases[tableMap.Database] = struct{}{} - - rows, err := event.Rows(a.format, tableMap) + err = a.processRowEvent(ctx, event, engine) if err != nil { return err } - flags := rows.Flags - if flags&rowFlag_endOfStatement == rowFlag_endOfStatement { - // nothing to be done for end of statement; just clear the flag and move on - flags = flags &^ rowFlag_endOfStatement - } - if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks { - flags = flags &^ rowFlag_noForeignKeyChecks - } - if flags != 0 { - msg := fmt.Sprintf("unsupported binlog protocol message: DeleteRows event with unsupported flags '%x'", flags) - ctx.GetLogger().Errorf(msg) - DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg) - } - schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) - if err != nil { - return err - } - - ctx.GetLogger().Debugf(" - Deleted Rows (table: %s)", tableMap.Name) - for _, row := range rows.Rows { - deletedRow, err := parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify) - if err != nil { - return err - } - ctx.GetLogger().Debugf(" - Identify: %v ", sql.FormatRow(deletedRow)) - - foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0 - writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled) - if err != nil { - return err - } - - err = tableWriter.Delete(ctx, deletedRow) - if err != nil { - return err - } - - err = closeWriteSession(ctx, engine, tableMap.Database, writeSession) - if err != nil { - return err - } - } - - case event.IsWriteRows(): - // A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated. - // For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/ - ctx.GetLogger().Debug("Received binlog event: WriteRows") - tableId := event.TableID(a.format) - tableMap, ok := a.tableMapsById[tableId] - if !ok { - return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId) - } - - if a.filters.isTableFilteredOut(ctx, tableMap) { - return nil - } - a.modifiedDatabases[tableMap.Database] = struct{}{} - - rows, err := event.Rows(a.format, tableMap) - if err != nil { - return err - } - - flags := rows.Flags - if flags&rowFlag_endOfStatement == rowFlag_endOfStatement { - // nothing to be done for end of statement; just clear the flag and move on - flags = flags &^ rowFlag_endOfStatement - } - if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks { - // nothing to be done for end of statement; just clear the flag and move on - flags = flags &^ rowFlag_noForeignKeyChecks - } - if flags != 0 { - msg := fmt.Sprintf("unsupported binlog protocol message: WriteRows event with unsupported flags '%x'", flags) - ctx.GetLogger().Errorf(msg) - DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg) - } - schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) - if err != nil { - return err - } - - ctx.GetLogger().Debugf(" - New Rows (table: %s)", tableMap.Name) - // TODO: batch writes to the same table - for _, row := range rows.Rows { - newRow, err := parseRow(ctx, tableMap, schema, rows.DataColumns, row.NullColumns, row.Data) - if err != nil { - return err - } - ctx.GetLogger().Debugf(" - Data: %v ", sql.FormatRow(newRow)) - - // TODO: Make this retry logic generic for insert/update/delete ? - // TODO: LOTS of duplication here! This would be super helpful to clean up - retryCount := 0 - for { - foreignKeyChecksDisabled := rows.Flags&rowFlag_noForeignKeyChecks > 0 - writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled) - if err != nil { - return err - } - - err = tableWriter.Insert(ctx, newRow) - if err != nil { - return err - } - - err = closeWriteSession(ctx, engine, tableMap.Database, writeSession) - if err != nil { - if errors.Is(datas.ErrOptimisticLockFailed, err) && retryCount < 3 { - ctx.GetLogger().Errorf("Retrying after error writing table updates: %s", err) - retryCount++ - continue - } else { - return err - } - } else { - break - } - } - } - - case event.IsUpdateRows(): - // A ROWS_EVENT is written for row based replication if data is inserted, deleted or updated. - // For more details, see: https://mariadb.com/kb/en/rows_event_v1v2-rows_compressed_event_v1/ - ctx.GetLogger().Debug("Received binlog event: UpdateRows") - tableId := event.TableID(a.format) - tableMap, ok := a.tableMapsById[tableId] - if !ok { - return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId) - } - - if a.filters.isTableFilteredOut(ctx, tableMap) { - return nil - } - a.modifiedDatabases[tableMap.Database] = struct{}{} - - rows, err := event.Rows(a.format, tableMap) - if err != nil { - return err - } - - flags := rows.Flags - if flags&rowFlag_endOfStatement == rowFlag_endOfStatement { - // nothing to be done for end of statement; just clear the flag and move on - flags = flags &^ rowFlag_endOfStatement - } - if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks { - flags = flags &^ rowFlag_noForeignKeyChecks - } - if flags != 0 { - msg := fmt.Sprintf("unsupported binlog protocol message: UpdateRows event with unsupported flags '%x'", flags) - ctx.GetLogger().Errorf(msg) - DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg) - } - schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) - if err != nil { - return err - } - - ctx.GetLogger().Debugf(" - Updated Rows (table: %s)", tableMap.Name) - for _, row := range rows.Rows { - identifyRow, err := parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify) - if err != nil { - return err - } - updatedRow, err := parseRow(ctx, tableMap, schema, rows.DataColumns, row.NullColumns, row.Data) - if err != nil { - return err - } - ctx.GetLogger().Debugf(" - Identify: %v Data: %v ", sql.FormatRow(identifyRow), sql.FormatRow(updatedRow)) - - foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0 - writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled) - if err != nil { - return err - } - - err = tableWriter.Update(ctx, identifyRow, updatedRow) - if err != nil { - return err - } - - err = closeWriteSession(ctx, engine, tableMap.Database, writeSession) - if err != nil { - return err - } - } - default: // We can't access the bytes directly because these non-interface types in Vitess are not exposed. // Having a Bytes() or Type() method on the Vitess interface would let us clean this up. @@ -699,6 +497,108 @@ func (a *binlogReplicaApplier) processBinlogEvent(ctx *sql.Context, engine *gms. return nil } +// processRowEvent processes a WriteRows, DeleteRows, or UpdateRows binlog event and returns an error if any problems +// were encountered. +func (a *binlogReplicaApplier) processRowEvent(ctx *sql.Context, event mysql.BinlogEvent, engine *gms.Engine) error { + switch { + case event.IsDeleteRows(): + ctx.GetLogger().Debug("Received binlog event: DeleteRows") + case event.IsWriteRows(): + ctx.GetLogger().Debug("Received binlog event: WriteRows") + case event.IsUpdateRows(): + ctx.GetLogger().Debug("Received binlog event: UpdateRows") + default: + return fmt.Errorf("unsupported event type: %v", event) + } + + tableId := event.TableID(a.format) + tableMap, ok := a.tableMapsById[tableId] + if !ok { + return fmt.Errorf("unable to find replication metadata for table ID: %d", tableId) + } + + if a.filters.isTableFilteredOut(ctx, tableMap) { + return nil + } + a.modifiedDatabases[tableMap.Database] = struct{}{} + + rows, err := event.Rows(a.format, tableMap) + if err != nil { + return err + } + + flags := rows.Flags + if flags&rowFlag_endOfStatement == rowFlag_endOfStatement { + // nothing to be done for end of statement; just clear the flag and move on + flags = flags &^ rowFlag_endOfStatement + } + if flags&rowFlag_noForeignKeyChecks == rowFlag_noForeignKeyChecks { + flags = flags &^ rowFlag_noForeignKeyChecks + } + if flags != 0 { + msg := fmt.Sprintf("unsupported binlog protocol message: DeleteRows event with unsupported flags '%x'", flags) + ctx.GetLogger().Errorf(msg) + DoltBinlogReplicaController.setSqlError(mysql.ERUnknownError, msg) + } + schema, err := getTableSchema(ctx, engine, tableMap.Name, tableMap.Database) + if err != nil { + return err + } + + switch { + case event.IsDeleteRows(): + ctx.GetLogger().Debugf(" - Deleted Rows (table: %s)", tableMap.Name) + case event.IsUpdateRows(): + ctx.GetLogger().Debugf(" - Updated Rows (table: %s)", tableMap.Name) + case event.IsWriteRows(): + ctx.GetLogger().Debugf(" - New Rows (table: %s)", tableMap.Name) + } + + for _, row := range rows.Rows { + var identityRow, dataRow sql.Row + if len(row.Identify) > 0 { + identityRow, err = parseRow(ctx, tableMap, schema, rows.IdentifyColumns, row.NullIdentifyColumns, row.Identify) + if err != nil { + return err + } + ctx.GetLogger().Debugf(" - Identity: %v ", sql.FormatRow(identityRow)) + } + + if len(row.Data) > 0 { + dataRow, err = parseRow(ctx, tableMap, schema, rows.DataColumns, row.NullColumns, row.Data) + if err != nil { + return err + } + ctx.GetLogger().Debugf(" - Data: %v ", sql.FormatRow(dataRow)) + } + + foreignKeyChecksDisabled := tableMap.Flags&rowFlag_noForeignKeyChecks > 0 + writeSession, tableWriter, err := getTableWriter(ctx, engine, tableMap.Name, tableMap.Database, foreignKeyChecksDisabled) + if err != nil { + return err + } + + switch { + case event.IsDeleteRows(): + err = tableWriter.Delete(ctx, identityRow) + case event.IsWriteRows(): + err = tableWriter.Insert(ctx, dataRow) + case event.IsUpdateRows(): + err = tableWriter.Update(ctx, identityRow, dataRow) + } + if err != nil { + return err + } + + err = closeWriteSession(ctx, engine, tableMap.Database, writeSession) + if err != nil { + return err + } + } + + return nil +} + // // Helper functions //