mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-05 16:15:41 -06:00
Merge pull request #7482 from dolthub/aaron/cluster-standby-takes-epoch-from-primary
go: sqle/cluster: When in standby mode, take the epoch of the primary.
This commit is contained in:
@@ -204,11 +204,11 @@ type serverinterceptor struct {
|
||||
|
||||
func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
|
||||
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
|
||||
fromStandby := false
|
||||
fromClusterMember := false
|
||||
if md, ok := metadata.FromIncomingContext(ss.Context()); ok {
|
||||
fromStandby = si.handleRequestHeaders(md)
|
||||
fromClusterMember = si.handleRequestHeaders(md)
|
||||
}
|
||||
if fromStandby {
|
||||
if fromClusterMember {
|
||||
if err := si.authenticate(ss.Context()); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -236,11 +236,11 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
|
||||
|
||||
func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
|
||||
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
||||
fromStandby := false
|
||||
fromClusterMember := false
|
||||
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
||||
fromStandby = si.handleRequestHeaders(md)
|
||||
fromClusterMember = si.handleRequestHeaders(md)
|
||||
}
|
||||
if fromStandby {
|
||||
if fromClusterMember {
|
||||
if err := si.authenticate(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -271,9 +271,9 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD) bool {
|
||||
epochs := header.Get(clusterRoleEpochHeader)
|
||||
roles := header.Get(clusterRoleHeader)
|
||||
if len(epochs) > 0 && len(roles) > 0 {
|
||||
if roles[0] == string(RolePrimary) && role == RolePrimary {
|
||||
if roles[0] == string(RolePrimary) {
|
||||
if reqepoch, err := strconv.Atoi(epochs[0]); err == nil {
|
||||
if reqepoch == epoch {
|
||||
if reqepoch == epoch && role == RolePrimary {
|
||||
// Misconfiguration in the cluster means this
|
||||
// server and its standby are marked as Primary
|
||||
// at the same epoch. We will become standby
|
||||
@@ -282,13 +282,17 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD) bool {
|
||||
si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to detected_broken_config.")
|
||||
si.roleSetter(string(RoleDetectedBrokenConfig), reqepoch)
|
||||
} else if reqepoch > epoch {
|
||||
// The client replicating to us thinks it is the primary at a higher epoch than us.
|
||||
si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
|
||||
if role == RolePrimary {
|
||||
// The client replicating to us thinks it is the primary at a higher epoch than us.
|
||||
si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
|
||||
} else if role == RoleDetectedBrokenConfig {
|
||||
si.lgr.Warnf("cluster: serverinterceptor: this server is detected_broken_config at epoch %d. the server replicating to it is primary at epoch %d. transitioning to standby.", epoch, reqepoch)
|
||||
}
|
||||
si.roleSetter(string(RoleStandby), reqepoch)
|
||||
}
|
||||
}
|
||||
}
|
||||
// returns true if the request was from a standby replica, false otherwise
|
||||
// returns true if the request was from a cluster replica, false otherwise
|
||||
return true
|
||||
}
|
||||
return false
|
||||
|
||||
@@ -337,6 +337,117 @@ tests:
|
||||
queries:
|
||||
- exec: "use repo1"
|
||||
- exec: "create table vals (i int primary key)"
|
||||
- name: standby server takes primary epoch
|
||||
multi_repos:
|
||||
- name: server1
|
||||
repos:
|
||||
- name: repo1
|
||||
with_remotes:
|
||||
- name: standby
|
||||
url: http://localhost:3852/repo1
|
||||
- name: repo2
|
||||
with_remotes:
|
||||
- name: standby
|
||||
url: http://localhost:3852/repo2
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3309
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3852/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: 3851
|
||||
server:
|
||||
args: ["--port", "3309"]
|
||||
port: 3309
|
||||
- name: server2
|
||||
repos:
|
||||
- name: repo1
|
||||
with_remotes:
|
||||
- name: standby
|
||||
url: http://localhost:3851/repo1
|
||||
- name: repo2
|
||||
with_remotes:
|
||||
- name: standby
|
||||
url: http://localhost:3851/repo2
|
||||
with_files:
|
||||
- name: server.yaml
|
||||
contents: |
|
||||
log_level: trace
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: 3310
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:3851/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 1
|
||||
remotesapi:
|
||||
port: 3852
|
||||
server:
|
||||
args: ["--config", "server.yaml"]
|
||||
port: 3310
|
||||
connections:
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: "use repo1"
|
||||
- exec: "create table vals (i int primary key)"
|
||||
- exec: "insert into vals values (1),(2),(3),(4),(5)"
|
||||
restart_server:
|
||||
args: ["--config", "server.yaml"]
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: "use dolt_cluster"
|
||||
- query: "select `database`, standby_remote, role, epoch, replication_lag_millis, current_error from dolt_cluster_status order by `database` asc"
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"]
|
||||
rows:
|
||||
- ["repo1","standby","primary","10","0","NULL"]
|
||||
- ["repo2","standby","primary","10","0","NULL"]
|
||||
retry_attempts: 100
|
||||
- on: server2
|
||||
queries:
|
||||
- exec: "use repo1"
|
||||
- query: "select count(*) from vals"
|
||||
result:
|
||||
columns: ["count(*)"]
|
||||
rows: [["5"]]
|
||||
- query: "select @@global.dolt_cluster_role_epoch"
|
||||
result:
|
||||
columns: ["@@global.dolt_cluster_role_epoch"]
|
||||
rows: [["10"]]
|
||||
- on: server1
|
||||
queries:
|
||||
- exec: "use repo1"
|
||||
- exec: "call dolt_assume_cluster_role('primary', 11)"
|
||||
- exec: "insert into vals values (6),(7),(8),(9),(10)"
|
||||
- exec: "use dolt_cluster"
|
||||
- query: "select `database`, standby_remote, role, epoch, replication_lag_millis, current_error from dolt_cluster_status order by `database` asc"
|
||||
result:
|
||||
columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"]
|
||||
rows:
|
||||
- ["repo1","standby","primary","11","0","NULL"]
|
||||
- ["repo2","standby","primary","11","0","NULL"]
|
||||
retry_attempts: 100
|
||||
- on: server2
|
||||
queries:
|
||||
- exec: "use repo1"
|
||||
- query: "select count(*) from vals"
|
||||
result:
|
||||
columns: ["count(*)"]
|
||||
rows: [["10"]]
|
||||
- query: "select @@global.dolt_cluster_role_epoch"
|
||||
result:
|
||||
columns: ["@@global.dolt_cluster_role_epoch"]
|
||||
rows: [["11"]]
|
||||
- name: standby transitioned to primary becomes read write
|
||||
multi_repos:
|
||||
- name: server1
|
||||
|
||||
Reference in New Issue
Block a user