diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 44a475607e..0a74b55284 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -48,6 +48,7 @@ type commithook struct { nextHeadIncomingTime time.Time lastSuccess time.Time currentError *string + cancelReplicate func() role Role @@ -168,6 +169,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) { toPush := h.nextHead incomingTime := h.nextHeadIncomingTime destDB := h.destDB + ctx, h.cancelReplicate = context.WithCancel(ctx) + defer func() { + if h.cancelReplicate != nil { + h.cancelReplicate() + } + h.cancelReplicate = nil + }() h.mu.Unlock() if destDB == nil { @@ -183,6 +191,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) { if toPush == h.nextHead { h.nextPushAttempt = time.Now().Add(1 * time.Second) } + h.cancelReplicate = nil return } lgr.Tracef("cluster/commithook: fetched destDB") @@ -208,20 +217,22 @@ func (h *commithook) attemptReplicate(ctx context.Context) { } h.mu.Lock() - if err == nil { - h.currentError = nil - lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB") - h.lastPushedHead = toPush - h.lastSuccess = incomingTime - h.nextPushAttempt = time.Time{} - } else { - h.currentError = new(string) - *h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err) - lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err) - // add some delay if a new head didn't come in while we were pushing. - if toPush == h.nextHead { - // TODO: We could add some backoff here. - h.nextPushAttempt = time.Now().Add(1 * time.Second) + if h.role == RolePrimary { + if err == nil { + h.currentError = nil + lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB") + h.lastPushedHead = toPush + h.lastSuccess = incomingTime + h.nextPushAttempt = time.Time{} + } else { + h.currentError = new(string) + *h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err) + lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err) + // add some delay if a new head didn't come in while we were pushing. + if toPush == h.nextHead { + // TODO: We could add some backoff here. + h.nextPushAttempt = time.Now().Add(1 * time.Second) + } } } } @@ -298,6 +309,10 @@ func (h *commithook) setRole(role Role) { h.nextPushAttempt = time.Time{} h.role = role h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role))) + if h.cancelReplicate != nil { + h.cancelReplicate() + h.cancelReplicate = nil + } h.cond.Signal() } diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index db1bc3b0a9..91c755ac58 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -97,8 +97,10 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) } ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{}) ret.sinterceptor.setRole(role, epoch) + ret.sinterceptor.roleSetter = ret.setRoleAndEpoch ret.cinterceptor.lgr = lgr.WithFields(logrus.Fields{}) ret.cinterceptor.setRole(role, epoch) + ret.cinterceptor.roleSetter = ret.setRoleAndEpoch return ret, nil } @@ -142,7 +144,7 @@ func (c *Controller) ManageDatabaseProvider(p dbProvider) { c.mu.Lock() defer c.mu.Unlock() c.provider = p - c.provider.SetIsStandby(c.role == RoleStandby) + c.setProviderIsStandby(c.role == RoleStandby) } func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) { @@ -286,17 +288,23 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro if epoch == c.epoch && role == string(c.role) { return nil } - if epoch == c.epoch { - return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role) - } - if epoch < c.epoch { - return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch) - } if role != "primary" && role != "standby" { return fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role) } + if epoch < c.epoch { + return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch) + } + if epoch == c.epoch { + // This is allowed for non-graceful transitions to 'standby', which only occur from interceptors and + // other signals that the cluster is misconfigured. + isallowed := !graceful && role == "standby" + if !isallowed { + return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role) + } + } + changedrole := role != string(c.role) if changedrole { @@ -321,8 +329,10 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro c.refreshSystemVars() c.cinterceptor.setRole(c.role, c.epoch) c.sinterceptor.setRole(c.role, c.epoch) - for _, h := range c.commithooks { - h.setRole(c.role) + if changedrole { + for _, h := range c.commithooks { + h.setRole(c.role) + } } return c.persistVariables() } diff --git a/go/libraries/doltcore/sqle/cluster/interceptors.go b/go/libraries/doltcore/sqle/cluster/interceptors.go index a0b40f0093..ff37aea5e3 100644 --- a/go/libraries/doltcore/sqle/cluster/interceptors.go +++ b/go/libraries/doltcore/sqle/cluster/interceptors.go @@ -36,19 +36,20 @@ const clusterRoleEpochHeader = "x-dolt-cluster-role-epoch" // interceptor anytime it changes. In turn, this interceptor: // * adds the server's current role and epoch to the request headers for every // outbound request. -// * fails all outgoing requests immediately with codes.Unavailable if the role -// == RoleStandby, since this server should not be replicating when it believes -// it is a standby. +// * fails all outgoing requests immediately with codes.FailedPrecondition if +// the role == RoleStandby, since this server should not be replicating when it +// believes it is a standby. // * watches returned response headers for a situation which causes this server // to force downgrade from primary to standby. In particular, when a returned // response header asserts that the standby replica is a primary at a higher // epoch than this server, this incterceptor coordinates with the Controller to // immediately transition to standby and to stop replicating to the standby. type clientinterceptor struct { - lgr *logrus.Entry - role Role - epoch int - mu sync.Mutex + lgr *logrus.Entry + role Role + epoch int + mu sync.Mutex + roleSetter func(role string, epoch int, graceful bool) error } func (ci *clientinterceptor) setRole(role Role, epoch int) { @@ -67,8 +68,9 @@ func (ci *clientinterceptor) getRole() (Role, int) { func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor { return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { role, epoch := ci.getRole() + ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role)) if role == RoleStandby { - return nil, status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby") + return nil, status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby") } ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch)) var header metadata.MD @@ -81,8 +83,9 @@ func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor { func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { role, epoch := ci.getRole() + ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role)) if role == RoleStandby { - return status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby") + return status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby") } ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch)) var header metadata.MD @@ -95,18 +98,21 @@ func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor { func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role, epoch int) { epochs := header.Get(clusterRoleEpochHeader) roles := header.Get(clusterRoleHeader) - if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) { + if len(epochs) > 0 && len(roles) > 0 { if respepoch, err := strconv.Atoi(epochs[0]); err == nil { - if respepoch == epoch { - ci.lgr.Errorf("cluster: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.") - // TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|... - } else if respepoch > epoch { - // The server we replicate to thinks it is the primary at a higher epoch than us... - ci.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch) - // TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|... + if roles[0] == string(RolePrimary) { + if respepoch == epoch { + ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.") + ci.roleSetter(string(RoleStandby), respepoch, false) + } else if respepoch > epoch { + // The server we replicate to thinks it is the primary at a higher epoch than us... + ci.lgr.Warnf("cluster: clientinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch) + ci.roleSetter(string(RoleStandby), respepoch, false) + } } } } + ci.lgr.Warnf("cluster: clientinterceptor: response was missing role and epoch metadata") } func (ci *clientinterceptor) Options() []grpc.DialOption { @@ -122,7 +128,7 @@ func (ci *clientinterceptor) Options() []grpc.DialOption { // interceptor anytime it changes. In turn, this interceptor: // * adds the server's current role and epoch to the response headers for every // request. -// * fails all incoming requests immediately with codes.Unavailable if the +// * fails all incoming requests immediately with codes.FailedPrecondition if the // current role == RolePrimary, since nothing should be replicating to us in // that state. // * watches incoming request headers for a situation which causes this server @@ -131,10 +137,11 @@ func (ci *clientinterceptor) Options() []grpc.DialOption { // than our current epoch, this interceptor coordinates with the Controller to // immediately transition to standby and allow replication requests through. type serverinterceptor struct { - lgr *logrus.Entry - role Role - epoch int - mu sync.Mutex + lgr *logrus.Entry + role Role + epoch int + mu sync.Mutex + roleSetter func(role string, epoch int, graceful bool) error } func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor { @@ -150,7 +157,7 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor { } if role == RolePrimary { // As a primary, we do not accept replication requests. - return status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication") + return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication") } return handler(srv, ss) } @@ -169,7 +176,7 @@ func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor { } if role == RolePrimary { // As a primary, we do not accept replication requests. - return nil, status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication") + return nil, status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication") } return handler(ctx, req) } @@ -186,12 +193,12 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role, // at the same epoch. We will become standby // and our peer will become standby. An // operator will need to get involved. - si.lgr.Errorf("cluster: this server and its standby replica are both primary at the same epoch. force transitioning to standby.") - // TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch| + si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to standby.") + si.roleSetter(string(RoleStandby), reqepoch, false) } else if reqepoch > epoch { // The client replicating to us thinks it is the primary at a higher epoch than us. - si.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch) - // TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch| + si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch) + si.roleSetter(string(RoleStandby), reqepoch, false) } } } diff --git a/go/libraries/doltcore/sqle/cluster/interceptors_test.go b/go/libraries/doltcore/sqle/cluster/interceptors_test.go index 132b3bab8e..0c4f83f303 100644 --- a/go/libraries/doltcore/sqle/cluster/interceptors_test.go +++ b/go/libraries/doltcore/sqle/cluster/interceptors_test.go @@ -20,6 +20,8 @@ import ( "sync" "testing" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -43,6 +45,12 @@ func (s *server) Watch(req *grpc_health_v1.HealthCheckRequest, ss grpc_health_v1 return status.Errorf(codes.Unimplemented, "method Watch not implemented") } +func noopSetRole(string, int, bool) error { + return nil +} + +var lgr = logrus.StandardLogger().WithFields(logrus.Fields{}) + func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient), serveropts []grpc.ServerOption, dialopts []grpc.DialOption) *server { addr, err := net.ResolveUnixAddr("unix", "test_grpc.socket") require.NoError(t, err) @@ -85,6 +93,8 @@ func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient), func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) { var si serverinterceptor si.setRole(RoleStandby, 10) + si.roleSetter = noopSetRole + si.lgr = lgr withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { var md metadata.MD _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md)) @@ -101,6 +111,8 @@ func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) { func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) { var si serverinterceptor si.setRole(RoleStandby, 10) + si.roleSetter = noopSetRole + si.lgr = lgr withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { var md metadata.MD srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md)) @@ -119,15 +131,17 @@ func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) { func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) { var si serverinterceptor si.setRole(RolePrimary, 10) + si.roleSetter = noopSetRole + si.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { ctx := metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value") _, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) ctx = metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value") ss, err := client.Watch(ctx, &grpc_health_v1.HealthCheckRequest{}) assert.NoError(t, err) _, err = ss.Recv() - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) }, si.Options(), nil) assert.Nil(t, srv.md) } @@ -135,6 +149,8 @@ func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) { func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) { var ci clientinterceptor ci.setRole(RolePrimary, 10) + ci.roleSetter = noopSetRole + ci.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) assert.Equal(t, codes.Unimplemented, status.Code(err)) @@ -150,6 +166,8 @@ func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) { func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) { var ci clientinterceptor ci.setRole(RolePrimary, 10) + ci.roleSetter = noopSetRole + ci.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}) require.NoError(t, err) @@ -167,14 +185,16 @@ func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) { func TestClientInterceptorAsStandbyDoesNotSendRequest(t *testing.T) { var ci clientinterceptor ci.setRole(RolePrimary, 10) + ci.roleSetter = noopSetRole + ci.lgr = lgr srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) { _, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) assert.Equal(t, codes.Unimplemented, status.Code(err)) ci.setRole(RoleStandby, 11) _, err = client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}) - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) _, err = client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}) - assert.Equal(t, codes.Unavailable, status.Code(err)) + assert.Equal(t, codes.FailedPrecondition, status.Code(err)) }, nil, ci.Options()) if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) { assert.Equal(t, "primary", srv.md.Get(clusterRoleHeader)[0]) diff --git a/integration-tests/bats/sql-server-cluster.bats b/integration-tests/bats/sql-server-cluster.bats index 9b9d3a264c..b938245705 100644 --- a/integration-tests/bats/sql-server-cluster.bats +++ b/integration-tests/bats/sql-server-cluster.bats @@ -501,3 +501,252 @@ cluster: kill $serverone_pid wait $serverone_pid } + +# @test "sql-server-cluster: misconfigured cluster with primaries at same epoch, both transition to standby" { +# cd serverone +# +# echo " +# log_level: trace +# user: +# name: dolt +# listener: +# host: 0.0.0.0 +# port: ${SERVERONE_MYSQL_PORT} +# behavior: +# read_only: false +# autocommit: true +# cluster: +# standby_remotes: +# - name: standby +# remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} +# bootstrap_role: primary +# bootstrap_epoch: 10 +# remotesapi: +# port: ${SERVERONE_GRPC_PORT}" > server.yaml +# +# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake +# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" +# DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true +# +# (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) +# (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) +# DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & +# serverone_pid=$! +# +# wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 +# +# cd ../servertwo +# +# echo " +# log_level: trace +# user: +# name: dolt +# listener: +# host: 0.0.0.0 +# port: ${SERVERTWO_MYSQL_PORT} +# behavior: +# read_only: false +# autocommit: true +# cluster: +# standby_remotes: +# - name: standby +# remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} +# bootstrap_role: primary +# bootstrap_epoch: 10 +# remotesapi: +# port: ${SERVERTWO_GRPC_PORT}" > server.yaml +# +# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake +# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" +# DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true +# +# (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) +# (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) +# DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & +# servertwo_pid=$! +# +# wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 +# +# # Run a query to make sure everyone sees everyone... +# run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "CREATE TABLE vals (i int primary key)" "" 1 +# +# server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10" +# server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10" +# +# kill $servertwo_pid +# wait $servertwo_pid +# +# kill $serverone_pid +# wait $serverone_pid +# } + +@test "sql-server-cluster: an older primary comes up, becomes a standby and does not overwrite newer primary" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 15 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10" + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +} + +@test "sql-server-cluster: a new primary comes up, old primary becomes a standby and has its state overwritten" { + cd serverone + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERONE_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 10 + remotesapi: + port: ${SERVERONE_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + serverone_pid=$! + + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + + cd ../servertwo + + cd repo1 + dolt sql -q 'create table vals (i int primary key)' + dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)' + cd ../ + + echo " +log_level: trace +user: + name: dolt +listener: + host: 0.0.0.0 + port: ${SERVERTWO_MYSQL_PORT} +behavior: + read_only: false + autocommit: true +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} + bootstrap_role: primary + bootstrap_epoch: 15 + remotesapi: + port: ${SERVERTWO_GRPC_PORT}" > server.yaml + + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake + DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests" + DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true + + (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1) + (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2) + DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & + servertwo_pid=$! + + wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 + + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15" + server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10" + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,15" + + kill $servertwo_pid + wait $servertwo_pid + + kill $serverone_pid + wait $serverone_pid +}