mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-27 00:51:39 -06:00
Add support for @@binlog_row_metadata = FULL
This commit is contained in:
@@ -348,12 +348,24 @@ func (b *binlogProducer) createTableMapEvents(ctx *sql.Context, databaseName str
|
||||
if tableName.Name == "" {
|
||||
tableName = tableDelta.FromName
|
||||
}
|
||||
tablesToId[tableName.Name] = tableId
|
||||
tableMap, err := createTableMapFromDoltTable(ctx, databaseName, tableName.Name, tableDelta.ToTable)
|
||||
|
||||
isFullRowMetadata, err := isFullRowMetadata(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
events = append(events, b.newTableMapEvent(tableId, tableMap))
|
||||
|
||||
tablesToId[tableName.Name] = tableId
|
||||
tableMap, err := createTableMapFromDoltTable(ctx, databaseName,
|
||||
tableName.Name, tableDelta.ToTable, isFullRowMetadata)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
tableMapEvent, err := b.newTableMapEvent(tableId, tableMap)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
events = append(events, tableMapEvent)
|
||||
}
|
||||
|
||||
return events, tablesToId, nil
|
||||
@@ -548,12 +560,8 @@ func (b *binlogProducer) newXIDEvent() mysql.BinlogEvent {
|
||||
|
||||
// newTableMapEvent returns a new TableMap BinlogEvent for the specified |tableId| and |tableMap|, and updates the
|
||||
// stream's log position.
|
||||
func (b *binlogProducer) newTableMapEvent(tableId uint64, tableMap *mysql.TableMap) mysql.BinlogEvent {
|
||||
// TODO: The new error introduced in this function signature is only used when optional table metadata
|
||||
// is specified (e.g. column names). Dolt doesn't support populating this yet, so there's no
|
||||
// need to look at the return error yet. That will be added in an upcoming PR.
|
||||
event, _ := mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap)
|
||||
return event
|
||||
func (b *binlogProducer) newTableMapEvent(tableId uint64, tableMap *mysql.TableMap) (mysql.BinlogEvent, error) {
|
||||
return mysql.NewTableMapEvent(*b.binlogFormat, b.binlogEventMeta, tableId, tableMap)
|
||||
}
|
||||
|
||||
// newWriteRowsEvent returns a new WriteRows BinlogEvent for the specified |tableId| and |rows|, and updates the
|
||||
@@ -594,6 +602,22 @@ func isDatabaseFilteredOut(ctx *sql.Context, dbName string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// isFullRowMetadata returns true if the system is configured to return full metadata as part of
|
||||
// binlog table descriptions. Full row metadata is configured by setting @@binlog_row_metadata
|
||||
// to FULL.
|
||||
func isFullRowMetadata(ctx *sql.Context) (bool, error) {
|
||||
val, err := ctx.GetSessionVariable(ctx, "binlog_row_metadata")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if stringVal, ok := val.(string); ok {
|
||||
return strings.EqualFold(stringVal, "FULL"), nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
// extractRowCountAndDiffType uses |sch| and |diff| to determine how many changed rows this
|
||||
// diff represents (returned as the |rowCount| param) and what type of change it represents
|
||||
// (returned as the |diffType| param). For tables with primary keys, this function will always
|
||||
@@ -659,19 +683,47 @@ func extractRowCountAndDiffType(sch schema.Schema, diff tree.Diff) (rowCount uin
|
||||
}
|
||||
}
|
||||
|
||||
// createTableMapFromDoltTable creates a binlog TableMap for the given Dolt table.
|
||||
func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName string, table *doltdb.Table) (*mysql.TableMap, error) {
|
||||
schema, err := table.GetSchema(ctx)
|
||||
// createTableMapFromDoltTable creates a binlog TableMap for the given Dolt table. If
|
||||
// |includeOptionalMetadata| is set to true, then additional, optional metadata such as
|
||||
// column names and column collations will also be included in the TableMap.
|
||||
func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName string, table *doltdb.Table, includeOptionalMetadata bool) (*mysql.TableMap, error) {
|
||||
sch, err := table.GetSchema(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
columns := schema.GetAllCols().GetColumns()
|
||||
columns := sch.GetAllCols().GetColumns()
|
||||
types := make([]byte, len(columns))
|
||||
metadata := make([]uint16, len(columns))
|
||||
canBeNullMap := mysql.NewServerBitmap(len(columns))
|
||||
|
||||
var columnNames []string
|
||||
var columnCollationIds []uint64
|
||||
var enumValues [][]string
|
||||
var setValues [][]string
|
||||
var enumAndSetCollationIds []uint64
|
||||
if includeOptionalMetadata {
|
||||
columnNames = make([]string, len(columns))
|
||||
columnCollationIds = make([]uint64, len(columns))
|
||||
}
|
||||
|
||||
for i, col := range columns {
|
||||
if includeOptionalMetadata {
|
||||
columnNames[i] = col.Name
|
||||
columnCollationIds[i] = uint64(schema.Collation_Unspecified)
|
||||
if stringType, ok := col.TypeInfo.ToSqlType().(sql.StringType); ok {
|
||||
columnCollationIds[i] = uint64(stringType.Collation())
|
||||
}
|
||||
|
||||
if enumType, ok := col.TypeInfo.ToSqlType().(sql.EnumType); ok {
|
||||
enumValues = append(enumValues, enumType.Values())
|
||||
enumAndSetCollationIds = append(enumAndSetCollationIds, uint64(enumType.Collation()))
|
||||
} else if setType, ok := col.TypeInfo.ToSqlType().(sql.SetType); ok {
|
||||
setValues = append(setValues, setType.Values())
|
||||
enumAndSetCollationIds = append(enumAndSetCollationIds, uint64(setType.Collation()))
|
||||
}
|
||||
}
|
||||
|
||||
metadata[i] = 0
|
||||
typ := col.TypeInfo.ToSqlType()
|
||||
|
||||
@@ -687,14 +739,24 @@ func createTableMapFromDoltTable(ctx *sql.Context, databaseName, tableName strin
|
||||
}
|
||||
}
|
||||
|
||||
return &mysql.TableMap{
|
||||
tableMap := &mysql.TableMap{
|
||||
Flags: 0x0000,
|
||||
Database: databaseName,
|
||||
Name: tableName,
|
||||
Types: types,
|
||||
CanBeNull: canBeNullMap,
|
||||
Metadata: metadata,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if includeOptionalMetadata {
|
||||
tableMap.OptionalColumnNames = columnNames
|
||||
tableMap.OptionalEnumValues = enumValues
|
||||
tableMap.OptionalSetValues = setValues
|
||||
tableMap.OptionalColumnCollations = columnCollationIds
|
||||
tableMap.OptionalEnumAndSetCollations = enumAndSetCollationIds
|
||||
}
|
||||
|
||||
return tableMap, nil
|
||||
}
|
||||
|
||||
// createBinlogFormat returns a new BinlogFormat that describes the format of this binlog stream, which will always
|
||||
|
||||
Reference in New Issue
Block a user