go: sqle: cluster: Add a role, detected_broken_config, to use when we see two primaries at the same epoch.

This commit is contained in:
Aaron Son
2022-09-29 17:14:31 -07:00
parent e477a0a3be
commit 0b719f0b24
5 changed files with 118 additions and 87 deletions
@@ -15,6 +15,8 @@
package cluster
import (
"errors"
"github.com/dolthub/go-mysql-server/sql"
)
@@ -29,6 +31,9 @@ func newAssumeRoleProcedure(controller *Controller) sql.ExternalStoredProcedureD
},
},
Function: func(ctx *sql.Context, role string, epoch int) (sql.RowIter, error) {
if role == string(RoleDetectedBrokenConfig) {
return nil, errors.New("cannot set role to detected_broken_config; valid values are 'primary' and 'standby'")
}
err := controller.setRoleAndEpoch(role, epoch, true /* graceful */)
if err != nil {
return nil, err
@@ -48,7 +48,7 @@ type commithook struct {
nextHeadIncomingTime time.Time
lastSuccess time.Time
currentError *string
cancelReplicate func()
cancelReplicate func()
role Role
@@ -290,7 +290,7 @@ func (h *commithook) tick(ctx context.Context) {
func (h *commithook) recordSuccessfulRemoteSrvCommit() {
h.mu.Lock()
defer h.mu.Unlock()
if h.role == RolePrimary {
if h.role != RoleStandby {
return
}
h.lastSuccess = time.Now()
@@ -313,9 +313,14 @@ func (h *commithook) setRole(role Role) {
h.cancelReplicate()
h.cancelReplicate = nil
}
if role == RoleDetectedBrokenConfig {
h.currentError = &errDetectedBrokenConfigStr
}
h.cond.Signal()
}
var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch"
// Execute on this commithook updates the target root hash we're attempting to
// replicate and wakes the replication thread.
func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
@@ -38,6 +38,7 @@ type Role string
const RolePrimary Role = "primary"
const RoleStandby Role = "standby"
const RoleDetectedBrokenConfig Role = "detected_broken_config"
const PersistentConfigPrefix = "sqlserver.cluster"
@@ -144,7 +145,7 @@ func (c *Controller) ManageDatabaseProvider(p dbProvider) {
c.mu.Lock()
defer c.mu.Unlock()
c.provider = p
c.setProviderIsStandby(c.role == RoleStandby)
c.setProviderIsStandby(c.role != RolePrimary)
}
func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) {
@@ -246,6 +247,7 @@ func (c *Controller) persistVariables() error {
func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) (Role, int, error) {
toset := make(map[string]string)
persistentRole := pCfg.GetStringOrDefault(DoltClusterRoleVariable, "")
var roleFromPersistentConfig bool
persistentEpoch := pCfg.GetStringOrDefault(DoltClusterRoleEpochVariable, "")
if persistentRole == "" {
if cfg.BootstrapRole() != "" {
@@ -257,6 +259,7 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea
}
toset[DoltClusterRoleVariable] = persistentRole
} else {
roleFromPersistentConfig = true
lgr.Tracef("cluster/controller: persisted cluster role is %s", persistentRole)
}
if persistentEpoch == "" {
@@ -267,7 +270,10 @@ func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.Rea
lgr.Tracef("cluster/controller: persisted cluster role epoch is %s", persistentEpoch)
}
if persistentRole != string(RolePrimary) && persistentRole != string(RoleStandby) {
return "", 0, fmt.Errorf("persisted role %s.%s = %s must be \"primary\" or \"secondary\"", PersistentConfigPrefix, DoltClusterRoleVariable, persistentRole)
isallowed := persistentRole == string(RoleDetectedBrokenConfig) && roleFromPersistentConfig
if !isallowed {
return "", 0, fmt.Errorf("persisted role %s.%s = %s must be \"primary\" or \"secondary\"", PersistentConfigPrefix, DoltClusterRoleVariable, persistentRole)
}
}
epochi, err := strconv.Atoi(persistentEpoch)
if err != nil {
@@ -289,7 +295,7 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro
return nil
}
if role != "primary" && role != "standby" {
if role != string(RolePrimary) && role != string(RoleStandby) && role != string(RoleDetectedBrokenConfig) {
return fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role)
}
@@ -299,7 +305,7 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro
if epoch == c.epoch {
// This is allowed for non-graceful transitions to 'standby', which only occur from interceptors and
// other signals that the cluster is misconfigured.
isallowed := !graceful && role == "standby"
isallowed := !graceful && (role == string(RoleStandby) || role == string(RoleDetectedBrokenConfig))
if !isallowed {
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role)
}
@@ -318,6 +324,8 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro
} else {
c.immediateTransitionToStandby()
}
} else if role == string(RoleDetectedBrokenConfig) {
c.immediateTransitionToStandby()
} else {
c.transitionToPrimary()
}
@@ -72,6 +72,9 @@ func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor {
if role == RoleStandby {
return nil, status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
}
if role == RoleDetectedBrokenConfig {
return nil, status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby")
}
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
var header metadata.MD
stream, err := streamer(ctx, desc, cc, method, append(opts, grpc.Header(&header))...)
@@ -87,6 +90,9 @@ func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor {
if role == RoleStandby {
return status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
}
if role == RoleDetectedBrokenConfig {
return status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby")
}
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
var header metadata.MD
err := invoker(ctx, method, req, reply, cc, append(opts, grpc.Header(&header))...)
@@ -102,13 +108,16 @@ func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role
if respepoch, err := strconv.Atoi(epochs[0]); err == nil {
if roles[0] == string(RolePrimary) {
if respepoch == epoch {
ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.")
ci.roleSetter(string(RoleStandby), respepoch, false)
ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to detected_broken_config.")
ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch, false)
} else if respepoch > epoch {
// The server we replicate to thinks it is the primary at a higher epoch than us...
ci.lgr.Warnf("cluster: clientinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
ci.roleSetter(string(RoleStandby), respepoch, false)
}
} else if roles[0] == string(RoleDetectedBrokenConfig) && respepoch >= epoch {
ci.lgr.Errorf("cluster: clientinterceptor: this server learned from its standby that the standby is in detected_broken_config. force transitioning to detected_broken_config.")
ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch, false)
}
}
}
@@ -159,6 +168,10 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
// As a primary, we do not accept replication requests.
return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
}
if role == RoleDetectedBrokenConfig {
// As a primary, we do not accept replication requests.
return status.Error(codes.FailedPrecondition, "this server is current in detected_broken_config and is not currently accepting replication")
}
return handler(srv, ss)
}
}
@@ -193,8 +206,8 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role,
// at the same epoch. We will become standby
// and our peer will become standby. An
// operator will need to get involved.
si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to standby.")
si.roleSetter(string(RoleStandby), reqepoch, false)
si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to detected_broken_config.")
si.roleSetter(string(RoleDetectedBrokenConfig), reqepoch, false)
} else if reqepoch > epoch {
// The client replicating to us thinks it is the primary at a higher epoch than us.
si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
+77 -77
View File
@@ -502,83 +502,83 @@ cluster:
wait $serverone_pid
}
# @test "sql-server-cluster: misconfigured cluster with primaries at same epoch, both transition to standby" {
# cd serverone
#
# echo "
# log_level: trace
# user:
# name: dolt
# listener:
# host: 0.0.0.0
# port: ${SERVERONE_MYSQL_PORT}
# behavior:
# read_only: false
# autocommit: true
# cluster:
# standby_remotes:
# - name: standby
# remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
# bootstrap_role: primary
# bootstrap_epoch: 10
# remotesapi:
# port: ${SERVERONE_GRPC_PORT}" > server.yaml
#
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
# DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
#
# (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
# (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
# DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
# serverone_pid=$!
#
# wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
#
# cd ../servertwo
#
# echo "
# log_level: trace
# user:
# name: dolt
# listener:
# host: 0.0.0.0
# port: ${SERVERTWO_MYSQL_PORT}
# behavior:
# read_only: false
# autocommit: true
# cluster:
# standby_remotes:
# - name: standby
# remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
# bootstrap_role: primary
# bootstrap_epoch: 10
# remotesapi:
# port: ${SERVERTWO_GRPC_PORT}" > server.yaml
#
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
# DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
#
# (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
# (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
# DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
# servertwo_pid=$!
#
# wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
#
# # Run a query to make sure everyone sees everyone...
# run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "CREATE TABLE vals (i int primary key)" "" 1
#
# server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
# server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
#
# kill $servertwo_pid
# wait $servertwo_pid
#
# kill $serverone_pid
# wait $serverone_pid
# }
@test "sql-server-cluster: misconfigured cluster with primaries at same epoch, both transition to detected_broken_config" {
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: ${SERVERONE_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: primary
bootstrap_epoch: 10
remotesapi:
port: ${SERVERONE_GRPC_PORT}" > server.yaml
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
serverone_pid=$!
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
cd ../servertwo
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: ${SERVERTWO_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
bootstrap_role: primary
bootstrap_epoch: 10
remotesapi:
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
servertwo_pid=$!
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
# Run a query to make sure everyone sees everyone...
run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "CREATE TABLE vals (i int primary key)" "" 1
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\ndetected_broken_config,10"
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\ndetected_broken_config,10"
kill $servertwo_pid
wait $servertwo_pid
kill $serverone_pid
wait $serverone_pid
}
@test "sql-server-cluster: an older primary comes up, becomes a standby and does not overwrite newer primary" {
cd serverone