Breaking out a replica applier type from the replica controller type

This commit is contained in:
Jason Fulghum
2023-01-25 11:40:04 -08:00
parent f55f8714d2
commit 2ef65c0fa4
5 changed files with 681 additions and 589 deletions

View File

@@ -142,7 +142,8 @@ func NewSqlEngine(
"authentication_dolt_jwt": NewAuthenticateDoltJWTPlugin(config.JwksConfig),
})
engine.BinlogReplicaController = config.BinlogReplicaController
// TODO: This could be a cleaner interface
engine.Analyzer.BinlogReplicaController = config.BinlogReplicaController
// Load MySQL Db information
if err = engine.Analyzer.Catalog.MySQLDb.LoadData(sql.NewEmptyContext(), data); err != nil {

View File

@@ -80,21 +80,13 @@ func deleteReplicationConfiguration(ctx *sql.Context) error {
return engine.Analyzer.Catalog.MySQLDb.Persist(ctx)
}
// persistSourceUuid saves the specified |sourceUuid| to a persistent storage location. If the source UUID has already
// been persisted, then no action is taken.
func persistSourceUuid(ctx *sql.Context, sourceUuid interface{}) error {
// If the source UUID is already set, then there's no need to persist it again, since it can't change
if replicationSourceUuid != "" {
return nil
}
// persistSourceUuid saves the specified |sourceUuid| to a persistent storage location.
func persistSourceUuid(ctx *sql.Context, sourceUuid string) error {
replicaSourceInfo, err := loadReplicationConfiguration(ctx)
if err != nil {
return err
}
replicaSourceInfo.Uuid = fmt.Sprintf("%v", sourceUuid)
replicationSourceUuid = replicaSourceInfo.Uuid
replicaSourceInfo.Uuid = sourceUuid
return persistReplicationConfiguration(ctx, replicaSourceInfo)
}

View File

@@ -0,0 +1,338 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package binlogreplication
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/binlogreplication"
"github.com/dolthub/go-mysql-server/sql/mysql_db"
)
var DoltBinlogReplicaController = newDoltBinlogReplicaController()
var logger = logrus.StandardLogger()
// ErrServerNotConfiguredAsReplica is returned when replication is started without enough configuration provided.
var ErrServerNotConfiguredAsReplica = fmt.Errorf(
"server is not configured as a replica; fix with CHANGE REPLICATION SOURCE TO")
// doltBinlogReplicaController implements the BinlogReplicaController interface for a Dolt database in order to
// provide support for a Dolt server to be a replica of a MySQL primary.
//
// This type is used concurrently multiple sessions on the DB can call this interface concurrently,
// so all state that the controller tracks MUST be protected with a mutex.
type doltBinlogReplicaController struct {
status binlogreplication.ReplicaStatus
filters *filterConfiguration
applier *binlogReplicaApplier
mu *sync.Mutex
}
var _ binlogreplication.BinlogReplicaController = (*doltBinlogReplicaController)(nil)
// newDoltBinlogReplicaController creates a new doltBinlogReplicaController instance.
func newDoltBinlogReplicaController() *doltBinlogReplicaController {
controller := doltBinlogReplicaController{
mu: &sync.Mutex{},
filters: newFilterConfiguration(),
}
controller.status.AutoPosition = true
controller.status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning
controller.status.ReplicaSqlRunning = binlogreplication.ReplicaSqlNotRunning
controller.applier = newBinlogReplicaApplier(controller.filters)
// TODO: Set the log level in the tests; don't just hardcode it here
logger.SetLevel(logrus.TraceLevel)
return &controller
}
// StartReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StartReplica(ctx *sql.Context) error {
if false {
// TODO: If the database is already configured for Dolt replication/clustering, then error out
// Add a (BATS?) test to cover this case
return fmt.Errorf("dolt replication already enabled; unable to use binlog replication with other replication modes. " +
"Disable Dolt replication first before starting binlog replication")
}
// If we aren't running in a sql-server context, it would be nice to return a helpful, Dolt-specific
// error message. Currently, this case would trigger an error from the GMS layer, so we can't give
// a specific error message about needing to run Dolt in sql-server mode yet.
_, err := loadReplicaServerId()
if err != nil {
return fmt.Errorf("unable to start replication: %s", err.Error())
}
configuration, err := loadReplicationConfiguration(ctx)
if err != nil {
return err
} else if configuration == nil {
return ErrServerNotConfiguredAsReplica
}
logger.Info("starting binlog replication...")
// Create a new context to use, because otherwise the engine will cancel the original
// context after the 'start replica' statement has finished executing.
ctx = ctx.WithContext(context.Background()).WithQuery("")
d.applier.Go(ctx)
return nil
}
// StopReplica implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) StopReplica(_ *sql.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
d.applier.stopReplicationChan <- struct{}{}
d.status.ReplicaIoRunning = binlogreplication.ReplicaIoNotRunning
d.status.ReplicaSqlRunning = binlogreplication.ReplicaSqlNotRunning
return nil
}
// SetReplicationSourceOptions implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) SetReplicationSourceOptions(ctx *sql.Context, options []binlogreplication.ReplicationOption) error {
replicaSourceInfo, err := loadReplicationConfiguration(ctx)
if err != nil {
return err
}
if replicaSourceInfo == nil {
replicaSourceInfo = mysql_db.NewReplicaSourceInfo()
}
for _, option := range options {
switch strings.ToUpper(option.Name) {
case "SOURCE_HOST":
value, err := getOptionValueAsString(option)
if err != nil {
return err
}
replicaSourceInfo.Host = value
case "SOURCE_USER":
value, err := getOptionValueAsString(option)
if err != nil {
return err
}
replicaSourceInfo.User = value
case "SOURCE_PASSWORD":
value, err := getOptionValueAsString(option)
if err != nil {
return err
}
replicaSourceInfo.Password = value
case "SOURCE_PORT":
intValue, err := getOptionValueAsInt(option)
if err != nil {
return err
}
replicaSourceInfo.Port = uint16(intValue)
case "SOURCE_CONNECT_RETRY":
intValue, err := getOptionValueAsInt(option)
if err != nil {
return err
}
replicaSourceInfo.ConnectRetryInterval = uint32(intValue)
case "SOURCE_RETRY_COUNT":
intValue, err := getOptionValueAsInt(option)
if err != nil {
return err
}
replicaSourceInfo.ConnectRetryCount = uint64(intValue)
default:
return fmt.Errorf("unknown replication source option: %s", option.Name)
}
}
// Persist the updated replica source configuration to disk
return persistReplicationConfiguration(ctx, replicaSourceInfo)
}
// SetReplicationFilterOptions implements the BinlogReplicaController interface.
func (d *doltBinlogReplicaController) SetReplicationFilterOptions(_ *sql.Context, options []binlogreplication.ReplicationOption) error {
for _, option := range options {
switch strings.ToUpper(option.Name) {
case "REPLICATE_DO_TABLE":
value, err := getOptionValueAsTableNames(option)
if err != nil {
return err
}
err = d.filters.setDoTables(value)
if err != nil {
return err
}
case "REPLICATE_IGNORE_TABLE":
value, err := getOptionValueAsTableNames(option)
if err != nil {
return err
}
err = d.filters.setIgnoreTables(value)
if err != nil {
return err
}
default:
return fmt.Errorf("unsupported replication filter option: %s", option.Name)
}
}
// TODO: Consider persisting filter settings. MySQL doesn't actually do this... unlike CHANGE REPLICATION SOURCE,
// CHANGE REPLICATION FILTER requires users to re-apply the filter options every time a server is restarted,
// or to pass them to mysqld on the command line or in configuration. Since we don't want to force users
// to specify these on the command line, we should consider diverging from MySQL behavior here slightly and
// persisting the filter configuration options if customers want this.
return nil
}
// GetReplicaStatus implements the BinlogReplicaController interface
func (d *doltBinlogReplicaController) GetReplicaStatus(ctx *sql.Context) (*binlogreplication.ReplicaStatus, error) {
replicaSourceInfo, err := loadReplicationConfiguration(ctx)
if err != nil {
return nil, err
}
if replicaSourceInfo == nil {
return nil, nil
}
d.mu.Lock()
defer d.mu.Unlock()
var copy = d.status
copy.SourceUser = replicaSourceInfo.User
copy.SourceHost = replicaSourceInfo.Host
copy.SourcePort = uint(replicaSourceInfo.Port)
copy.SourceServerUuid = replicaSourceInfo.Uuid
copy.ConnectRetry = replicaSourceInfo.ConnectRetryInterval
copy.SourceRetryCount = replicaSourceInfo.ConnectRetryCount
copy.ReplicateDoTables = d.filters.getDoTables()
copy.ReplicateIgnoreTables = d.filters.getIgnoreTables()
return &copy, nil
}
// ResetReplica implements the BinlogReplicaController interface
func (d *doltBinlogReplicaController) ResetReplica(ctx *sql.Context, resetAll bool) error {
d.mu.Lock()
defer d.mu.Unlock()
if d.status.ReplicaIoRunning != binlogreplication.ReplicaIoNotRunning ||
d.status.ReplicaSqlRunning != binlogreplication.ReplicaSqlNotRunning {
return fmt.Errorf("unable to reset replica while replication is running; stop replication and try again")
}
// Reset error status
d.status.LastIoErrNumber = 0
d.status.LastSqlErrNumber = 0
d.status.LastIoErrorTimestamp = nil
d.status.LastSqlErrorTimestamp = nil
d.status.LastSqlError = ""
d.status.LastIoError = ""
if resetAll {
err := deleteReplicationConfiguration(ctx)
if err != nil {
return err
}
d.filters = nil
}
return nil
}
// updateStatus allows the caller to safely update the replica controller's status. The controller locks it's mutex
// before the specified function |f| is called, and unlocks it after |f| is finished running. The current status is
// passed into the callback function |f| and the caller can safely update or copy any fields they need.
func (d *doltBinlogReplicaController) updateStatus(f func(status *binlogreplication.ReplicaStatus)) {
d.mu.Lock()
defer d.mu.Unlock()
f(&d.status)
}
// setIoError updates the current replication status with the specific |errno| and |message| to describe an IO error.
func (d *doltBinlogReplicaController) setIoError(errno uint, message string) {
d.mu.Lock()
defer d.mu.Unlock()
currentTime := time.Now()
d.status.LastIoErrorTimestamp = &currentTime
d.status.LastIoErrNumber = errno
d.status.LastIoError = message
}
// setSqlError updates the current replication status with the specific |errno| and |message| to describe an SQL error.
func (d *doltBinlogReplicaController) setSqlError(errno uint, message string) {
d.mu.Lock()
defer d.mu.Unlock()
currentTime := time.Now()
d.status.LastSqlErrorTimestamp = &currentTime
d.status.LastSqlErrNumber = errno
d.status.LastSqlError = message
}
// ------------------------------------------------
func getOptionValueAsString(option binlogreplication.ReplicationOption) (string, error) {
stringOptionValue, ok := option.Value.(binlogreplication.StringReplicationOptionValue)
if ok {
return stringOptionValue.GetValueAsString(), nil
}
return "", fmt.Errorf("unsupported value type for option %q; found %T, "+
"but expected a string", option.Name, option.Value.GetValue())
}
func getOptionValueAsInt(option binlogreplication.ReplicationOption) (int, error) {
integerOptionValue, ok := option.Value.(binlogreplication.IntegerReplicationOptionValue)
if ok {
return integerOptionValue.GetValueAsInt(), nil
}
return 0, fmt.Errorf("unsupported value type for option %q; found %T, "+
"but expected an integer", option.Name, option.Value.GetValue())
}
func getOptionValueAsTableNames(option binlogreplication.ReplicationOption) ([]sql.UnresolvedTable, error) {
tableNamesOptionValue, ok := option.Value.(binlogreplication.TableNamesReplicationOptionValue)
if ok {
return tableNamesOptionValue.GetValueAsTableList(), nil
}
return nil, fmt.Errorf("unsupported value type for option %q; found %T, "+
"but expected a list of tables", option.Name, option.Value.GetValue())
}
func verifyAllTablesAreQualified(urts []sql.UnresolvedTable) error {
for _, urt := range urts {
if urt.Database() == "" {
return fmt.Errorf("no database specified for table '%s'; "+
"all filter table names must be qualified with a database name", urt.Name())
}
}
return nil
}

View File

@@ -0,0 +1,159 @@
// Copyright 2023 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package binlogreplication
import (
"fmt"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/mysql"
"sync"
)
// filterConfiguration defines the binlog filtering rules applied on the replica.
type filterConfiguration struct {
// doTables holds a map of database name to map of table names, indicating tables that SHOULD be replicated.
doTables map[string]map[string]struct{}
// ignoreTables holds a map of database name to map of table names, indicating tables that should NOT be replicated.
ignoreTables map[string]map[string]struct{}
// mu guards against concurrent access to the filter configuration data.
mu *sync.Mutex
}
// newFilterConfiguration creates a new filterConfiguration instance and initializes members.
func newFilterConfiguration() *filterConfiguration {
return &filterConfiguration{
doTables: make(map[string]map[string]struct{}),
ignoreTables: make(map[string]map[string]struct{}),
mu: &sync.Mutex{},
}
}
// setDoTables sets the tables that are allowed to replicate and returns an error if any problems were
// encountered, such as unqualified tables being specified in |urts|. If any DoTables were previously configured,
// they are cleared out before the new tables are set as the value of DoTables.
func (fc *filterConfiguration) setDoTables(urts []sql.UnresolvedTable) error {
err := verifyAllTablesAreQualified(urts)
if err != nil {
return err
}
fc.mu.Lock()
defer fc.mu.Unlock()
// Setting new replication filters clears out any existing filters
fc.doTables = make(map[string]map[string]struct{})
for _, urt := range urts {
if fc.doTables[urt.Database()] == nil {
fc.doTables[urt.Database()] = make(map[string]struct{})
}
tableMap := fc.doTables[urt.Database()]
tableMap[urt.Name()] = struct{}{}
}
return nil
}
// setIgnoreTables sets the tables that are NOT allowed to replicate and returns an error if any problems were
// encountered, such as unqualified tables being specified in |urts|. If any IgnoreTables were previously configured,
// they are cleared out before the new tables are set as the value of IgnoreTables.
func (fc *filterConfiguration) setIgnoreTables(urts []sql.UnresolvedTable) error {
err := verifyAllTablesAreQualified(urts)
if err != nil {
return err
}
fc.mu.Lock()
defer fc.mu.Unlock()
// Setting new replication filters clears out any existing filters
fc.ignoreTables = make(map[string]map[string]struct{})
for _, urt := range urts {
if fc.ignoreTables[urt.Database()] == nil {
fc.ignoreTables[urt.Database()] = make(map[string]struct{})
}
tableMap := fc.ignoreTables[urt.Database()]
tableMap[urt.Name()] = struct{}{}
}
return nil
}
// isTableFilteredOut returns true if the table identified by |tableMap| has been filtered out on this replica and
// should not have any updates applied from binlog messages.
func (fc *filterConfiguration) isTableFilteredOut(tableMap *mysql.TableMap) bool {
if fc == nil {
return false
}
fc.mu.Lock()
defer fc.mu.Unlock()
// If any filter doTable options are specified, then a table MUST be listed in the set
// for it to be replicated. doTables options are processed BEFORE ignoreTables options.
// If a table appears in both doTable and ignoreTables, it is ignored.
// https://dev.mysql.com/doc/refman/8.0/en/replication-rules-table-options.html
if len(fc.doTables) > 0 {
if doTables, ok := fc.doTables[tableMap.Database]; ok {
if _, ok := doTables[tableMap.Name]; !ok {
logger.Tracef("skipping table %s.%s (not in doTables) ", tableMap.Database, tableMap.Name)
return true
}
}
}
if len(fc.ignoreTables) > 0 {
if ignoredTables, ok := fc.ignoreTables[tableMap.Database]; ok {
if _, ok := ignoredTables[tableMap.Name]; ok {
// If this table is being ignored, don't process any further
logger.Tracef("skipping table %s.%s (in ignoreTables)", tableMap.Database, tableMap.Name)
return true
}
}
}
return false
}
// getDoTables returns a slice of qualified table names that are configured to be replicated.
func (fc *filterConfiguration) getDoTables() []string {
fc.mu.Lock()
defer fc.mu.Unlock()
return convertFilterMapToStringSlice(fc.doTables)
}
// getIgnoreTables returns a slice of qualified table names that are configured to be filtered out of replication.
func (fc *filterConfiguration) getIgnoreTables() []string {
fc.mu.Lock()
defer fc.mu.Unlock()
return convertFilterMapToStringSlice(fc.ignoreTables)
}
// convertFilterMapToStringSlice converts the specified |filterMap| into a string slice, by iterating over every
// key in the top level map, which stores a database name, and for each of those keys, iterating over every key
// in the inner map, which stores a table name. Each table name is qualified with the matching database name and the
// results are returned as a slice of qualified table names.
func convertFilterMapToStringSlice(filterMap map[string]map[string]struct{}) []string {
if filterMap == nil {
return nil
}
tableNames := make([]string, 0, len(filterMap))
for dbName, tableMap := range filterMap {
for tableName, _ := range tableMap {
tableNames = append(tableNames, fmt.Sprintf("%s.%s", dbName, tableName))
}
}
return tableNames
}