diff --git a/go/libraries/doltcore/sqle/cluster/interceptors.go b/go/libraries/doltcore/sqle/cluster/interceptors.go index 049be9e3fd..c078aa4bcc 100644 --- a/go/libraries/doltcore/sqle/cluster/interceptors.go +++ b/go/libraries/doltcore/sqle/cluster/interceptors.go @@ -204,11 +204,11 @@ type serverinterceptor struct { func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor { return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - fromStandby := false + fromClusterMember := false if md, ok := metadata.FromIncomingContext(ss.Context()); ok { - fromStandby = si.handleRequestHeaders(md) + fromClusterMember = si.handleRequestHeaders(md) } - if fromStandby { + if fromClusterMember { if err := si.authenticate(ss.Context()); err != nil { return err } @@ -236,11 +236,11 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor { func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - fromStandby := false + fromClusterMember := false if md, ok := metadata.FromIncomingContext(ctx); ok { - fromStandby = si.handleRequestHeaders(md) + fromClusterMember = si.handleRequestHeaders(md) } - if fromStandby { + if fromClusterMember { if err := si.authenticate(ctx); err != nil { return nil, err } @@ -271,9 +271,9 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD) bool { epochs := header.Get(clusterRoleEpochHeader) roles := header.Get(clusterRoleHeader) if len(epochs) > 0 && len(roles) > 0 { - if roles[0] == string(RolePrimary) && role == RolePrimary { + if roles[0] == string(RolePrimary) { if reqepoch, err := strconv.Atoi(epochs[0]); err == nil { - if reqepoch == epoch { + if reqepoch == epoch && role == RolePrimary { // Misconfiguration in the cluster means this // server and its standby are marked as Primary // at the same epoch. We will become standby @@ -282,13 +282,17 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD) bool { si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to detected_broken_config.") si.roleSetter(string(RoleDetectedBrokenConfig), reqepoch) } else if reqepoch > epoch { - // The client replicating to us thinks it is the primary at a higher epoch than us. - si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch) + if role == RolePrimary { + // The client replicating to us thinks it is the primary at a higher epoch than us. + si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch) + } else if role == RoleDetectedBrokenConfig { + si.lgr.Warnf("cluster: serverinterceptor: this server is detected_broken_config at epoch %d. the server replicating to it is primary at epoch %d. transitioning to standby.", epoch, reqepoch) + } si.roleSetter(string(RoleStandby), reqepoch) } } } - // returns true if the request was from a standby replica, false otherwise + // returns true if the request was from a cluster replica, false otherwise return true } return false diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index 0d62bd39bf..739b6d4046 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -337,6 +337,117 @@ tests: queries: - exec: "use repo1" - exec: "create table vals (i int primary key)" +- name: standby server takes primary epoch + multi_repos: + - name: server1 + repos: + - name: repo1 + with_remotes: + - name: standby + url: http://localhost:3852/repo1 + - name: repo2 + with_remotes: + - name: standby + url: http://localhost:3852/repo2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: 3851 + server: + args: ["--port", "3309"] + port: 3309 + - name: server2 + repos: + - name: repo1 + with_remotes: + - name: standby + url: http://localhost:3851/repo1 + - name: repo2 + with_remotes: + - name: standby + url: http://localhost:3851/repo2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: "use repo1" + - exec: "create table vals (i int primary key)" + - exec: "insert into vals values (1),(2),(3),(4),(5)" + restart_server: + args: ["--config", "server.yaml"] + - on: server1 + queries: + - exec: "use dolt_cluster" + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis, current_error from dolt_cluster_status order by `database` asc" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: + - ["repo1","standby","primary","10","0","NULL"] + - ["repo2","standby","primary","10","0","NULL"] + retry_attempts: 100 + - on: server2 + queries: + - exec: "use repo1" + - query: "select count(*) from vals" + result: + columns: ["count(*)"] + rows: [["5"]] + - query: "select @@global.dolt_cluster_role_epoch" + result: + columns: ["@@global.dolt_cluster_role_epoch"] + rows: [["10"]] + - on: server1 + queries: + - exec: "use repo1" + - exec: "call dolt_assume_cluster_role('primary', 11)" + - exec: "insert into vals values (6),(7),(8),(9),(10)" + - exec: "use dolt_cluster" + - query: "select `database`, standby_remote, role, epoch, replication_lag_millis, current_error from dolt_cluster_status order by `database` asc" + result: + columns: ["database","standby_remote","role","epoch","replication_lag_millis","current_error"] + rows: + - ["repo1","standby","primary","11","0","NULL"] + - ["repo2","standby","primary","11","0","NULL"] + retry_attempts: 100 + - on: server2 + queries: + - exec: "use repo1" + - query: "select count(*) from vals" + result: + columns: ["count(*)"] + rows: [["10"]] + - query: "select @@global.dolt_cluster_role_epoch" + result: + columns: ["@@global.dolt_cluster_role_epoch"] + rows: [["11"]] - name: standby transitioned to primary becomes read write multi_repos: - name: server1