mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-07 00:39:44 -06:00
go: sqle: cluster: client and server interceptors are able to transition server cluster role when they see a newer primary.
This commit is contained in:
@@ -48,6 +48,7 @@ type commithook struct {
|
||||
nextHeadIncomingTime time.Time
|
||||
lastSuccess time.Time
|
||||
currentError *string
|
||||
cancelReplicate func()
|
||||
|
||||
role Role
|
||||
|
||||
@@ -168,6 +169,13 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
toPush := h.nextHead
|
||||
incomingTime := h.nextHeadIncomingTime
|
||||
destDB := h.destDB
|
||||
ctx, h.cancelReplicate = context.WithCancel(ctx)
|
||||
defer func() {
|
||||
if h.cancelReplicate != nil {
|
||||
h.cancelReplicate()
|
||||
}
|
||||
h.cancelReplicate = nil
|
||||
}()
|
||||
h.mu.Unlock()
|
||||
|
||||
if destDB == nil {
|
||||
@@ -183,6 +191,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
if toPush == h.nextHead {
|
||||
h.nextPushAttempt = time.Now().Add(1 * time.Second)
|
||||
}
|
||||
h.cancelReplicate = nil
|
||||
return
|
||||
}
|
||||
lgr.Tracef("cluster/commithook: fetched destDB")
|
||||
@@ -208,20 +217,22 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
if err == nil {
|
||||
h.currentError = nil
|
||||
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
|
||||
h.lastPushedHead = toPush
|
||||
h.lastSuccess = incomingTime
|
||||
h.nextPushAttempt = time.Time{}
|
||||
} else {
|
||||
h.currentError = new(string)
|
||||
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
|
||||
lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
|
||||
// add some delay if a new head didn't come in while we were pushing.
|
||||
if toPush == h.nextHead {
|
||||
// TODO: We could add some backoff here.
|
||||
h.nextPushAttempt = time.Now().Add(1 * time.Second)
|
||||
if h.role == RolePrimary {
|
||||
if err == nil {
|
||||
h.currentError = nil
|
||||
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
|
||||
h.lastPushedHead = toPush
|
||||
h.lastSuccess = incomingTime
|
||||
h.nextPushAttempt = time.Time{}
|
||||
} else {
|
||||
h.currentError = new(string)
|
||||
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
|
||||
lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
|
||||
// add some delay if a new head didn't come in while we were pushing.
|
||||
if toPush == h.nextHead {
|
||||
// TODO: We could add some backoff here.
|
||||
h.nextPushAttempt = time.Now().Add(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -298,6 +309,10 @@ func (h *commithook) setRole(role Role) {
|
||||
h.nextPushAttempt = time.Time{}
|
||||
h.role = role
|
||||
h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role)))
|
||||
if h.cancelReplicate != nil {
|
||||
h.cancelReplicate()
|
||||
h.cancelReplicate = nil
|
||||
}
|
||||
h.cond.Signal()
|
||||
}
|
||||
|
||||
|
||||
@@ -97,8 +97,10 @@ func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig)
|
||||
}
|
||||
ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{})
|
||||
ret.sinterceptor.setRole(role, epoch)
|
||||
ret.sinterceptor.roleSetter = ret.setRoleAndEpoch
|
||||
ret.cinterceptor.lgr = lgr.WithFields(logrus.Fields{})
|
||||
ret.cinterceptor.setRole(role, epoch)
|
||||
ret.cinterceptor.roleSetter = ret.setRoleAndEpoch
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@@ -142,7 +144,7 @@ func (c *Controller) ManageDatabaseProvider(p dbProvider) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.provider = p
|
||||
c.provider.SetIsStandby(c.role == RoleStandby)
|
||||
c.setProviderIsStandby(c.role == RoleStandby)
|
||||
}
|
||||
|
||||
func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery func(uint32), killConnection func(uint32) error) {
|
||||
@@ -286,17 +288,23 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro
|
||||
if epoch == c.epoch && role == string(c.role) {
|
||||
return nil
|
||||
}
|
||||
if epoch == c.epoch {
|
||||
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role)
|
||||
}
|
||||
if epoch < c.epoch {
|
||||
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch)
|
||||
}
|
||||
|
||||
if role != "primary" && role != "standby" {
|
||||
return fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role)
|
||||
}
|
||||
|
||||
if epoch < c.epoch {
|
||||
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch)
|
||||
}
|
||||
if epoch == c.epoch {
|
||||
// This is allowed for non-graceful transitions to 'standby', which only occur from interceptors and
|
||||
// other signals that the cluster is misconfigured.
|
||||
isallowed := !graceful && role == "standby"
|
||||
if !isallowed {
|
||||
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d with different role, '%s'", role, epoch, c.epoch, c.role)
|
||||
}
|
||||
}
|
||||
|
||||
changedrole := role != string(c.role)
|
||||
|
||||
if changedrole {
|
||||
@@ -321,8 +329,10 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro
|
||||
c.refreshSystemVars()
|
||||
c.cinterceptor.setRole(c.role, c.epoch)
|
||||
c.sinterceptor.setRole(c.role, c.epoch)
|
||||
for _, h := range c.commithooks {
|
||||
h.setRole(c.role)
|
||||
if changedrole {
|
||||
for _, h := range c.commithooks {
|
||||
h.setRole(c.role)
|
||||
}
|
||||
}
|
||||
return c.persistVariables()
|
||||
}
|
||||
|
||||
@@ -36,19 +36,20 @@ const clusterRoleEpochHeader = "x-dolt-cluster-role-epoch"
|
||||
// interceptor anytime it changes. In turn, this interceptor:
|
||||
// * adds the server's current role and epoch to the request headers for every
|
||||
// outbound request.
|
||||
// * fails all outgoing requests immediately with codes.Unavailable if the role
|
||||
// == RoleStandby, since this server should not be replicating when it believes
|
||||
// it is a standby.
|
||||
// * fails all outgoing requests immediately with codes.FailedPrecondition if
|
||||
// the role == RoleStandby, since this server should not be replicating when it
|
||||
// believes it is a standby.
|
||||
// * watches returned response headers for a situation which causes this server
|
||||
// to force downgrade from primary to standby. In particular, when a returned
|
||||
// response header asserts that the standby replica is a primary at a higher
|
||||
// epoch than this server, this incterceptor coordinates with the Controller to
|
||||
// immediately transition to standby and to stop replicating to the standby.
|
||||
type clientinterceptor struct {
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
roleSetter func(role string, epoch int, graceful bool) error
|
||||
}
|
||||
|
||||
func (ci *clientinterceptor) setRole(role Role, epoch int) {
|
||||
@@ -67,8 +68,9 @@ func (ci *clientinterceptor) getRole() (Role, int) {
|
||||
func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor {
|
||||
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
|
||||
role, epoch := ci.getRole()
|
||||
ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role))
|
||||
if role == RoleStandby {
|
||||
return nil, status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby")
|
||||
return nil, status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
|
||||
}
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
|
||||
var header metadata.MD
|
||||
@@ -81,8 +83,9 @@ func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor {
|
||||
func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor {
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
role, epoch := ci.getRole()
|
||||
ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role))
|
||||
if role == RoleStandby {
|
||||
return status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby")
|
||||
return status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
|
||||
}
|
||||
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
|
||||
var header metadata.MD
|
||||
@@ -95,18 +98,21 @@ func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor {
|
||||
func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role, epoch int) {
|
||||
epochs := header.Get(clusterRoleEpochHeader)
|
||||
roles := header.Get(clusterRoleHeader)
|
||||
if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) {
|
||||
if len(epochs) > 0 && len(roles) > 0 {
|
||||
if respepoch, err := strconv.Atoi(epochs[0]); err == nil {
|
||||
if respepoch == epoch {
|
||||
ci.lgr.Errorf("cluster: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.")
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
|
||||
} else if respepoch > epoch {
|
||||
// The server we replicate to thinks it is the primary at a higher epoch than us...
|
||||
ci.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
|
||||
if roles[0] == string(RolePrimary) {
|
||||
if respepoch == epoch {
|
||||
ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.")
|
||||
ci.roleSetter(string(RoleStandby), respepoch, false)
|
||||
} else if respepoch > epoch {
|
||||
// The server we replicate to thinks it is the primary at a higher epoch than us...
|
||||
ci.lgr.Warnf("cluster: clientinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
|
||||
ci.roleSetter(string(RoleStandby), respepoch, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ci.lgr.Warnf("cluster: clientinterceptor: response was missing role and epoch metadata")
|
||||
}
|
||||
|
||||
func (ci *clientinterceptor) Options() []grpc.DialOption {
|
||||
@@ -122,7 +128,7 @@ func (ci *clientinterceptor) Options() []grpc.DialOption {
|
||||
// interceptor anytime it changes. In turn, this interceptor:
|
||||
// * adds the server's current role and epoch to the response headers for every
|
||||
// request.
|
||||
// * fails all incoming requests immediately with codes.Unavailable if the
|
||||
// * fails all incoming requests immediately with codes.FailedPrecondition if the
|
||||
// current role == RolePrimary, since nothing should be replicating to us in
|
||||
// that state.
|
||||
// * watches incoming request headers for a situation which causes this server
|
||||
@@ -131,10 +137,11 @@ func (ci *clientinterceptor) Options() []grpc.DialOption {
|
||||
// than our current epoch, this interceptor coordinates with the Controller to
|
||||
// immediately transition to standby and allow replication requests through.
|
||||
type serverinterceptor struct {
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
lgr *logrus.Entry
|
||||
role Role
|
||||
epoch int
|
||||
mu sync.Mutex
|
||||
roleSetter func(role string, epoch int, graceful bool) error
|
||||
}
|
||||
|
||||
func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
|
||||
@@ -150,7 +157,7 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
|
||||
}
|
||||
if role == RolePrimary {
|
||||
// As a primary, we do not accept replication requests.
|
||||
return status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication")
|
||||
return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
|
||||
}
|
||||
return handler(srv, ss)
|
||||
}
|
||||
@@ -169,7 +176,7 @@ func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
|
||||
}
|
||||
if role == RolePrimary {
|
||||
// As a primary, we do not accept replication requests.
|
||||
return nil, status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication")
|
||||
return nil, status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
|
||||
}
|
||||
return handler(ctx, req)
|
||||
}
|
||||
@@ -186,12 +193,12 @@ func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role,
|
||||
// at the same epoch. We will become standby
|
||||
// and our peer will become standby. An
|
||||
// operator will need to get involved.
|
||||
si.lgr.Errorf("cluster: this server and its standby replica are both primary at the same epoch. force transitioning to standby.")
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
|
||||
si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to standby.")
|
||||
si.roleSetter(string(RoleStandby), reqepoch, false)
|
||||
} else if reqepoch > epoch {
|
||||
// The client replicating to us thinks it is the primary at a higher epoch than us.
|
||||
si.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
|
||||
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
|
||||
si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
|
||||
si.roleSetter(string(RoleStandby), reqepoch, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
@@ -43,6 +45,12 @@ func (s *server) Watch(req *grpc_health_v1.HealthCheckRequest, ss grpc_health_v1
|
||||
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
|
||||
}
|
||||
|
||||
func noopSetRole(string, int, bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
var lgr = logrus.StandardLogger().WithFields(logrus.Fields{})
|
||||
|
||||
func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient), serveropts []grpc.ServerOption, dialopts []grpc.DialOption) *server {
|
||||
addr, err := net.ResolveUnixAddr("unix", "test_grpc.socket")
|
||||
require.NoError(t, err)
|
||||
@@ -85,6 +93,8 @@ func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient),
|
||||
func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) {
|
||||
var si serverinterceptor
|
||||
si.setRole(RoleStandby, 10)
|
||||
si.roleSetter = noopSetRole
|
||||
si.lgr = lgr
|
||||
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
var md metadata.MD
|
||||
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
|
||||
@@ -101,6 +111,8 @@ func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) {
|
||||
func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) {
|
||||
var si serverinterceptor
|
||||
si.setRole(RoleStandby, 10)
|
||||
si.roleSetter = noopSetRole
|
||||
si.lgr = lgr
|
||||
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
var md metadata.MD
|
||||
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
|
||||
@@ -119,15 +131,17 @@ func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) {
|
||||
func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) {
|
||||
var si serverinterceptor
|
||||
si.setRole(RolePrimary, 10)
|
||||
si.roleSetter = noopSetRole
|
||||
si.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
ctx := metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
|
||||
_, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
ctx = metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
|
||||
ss, err := client.Watch(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.NoError(t, err)
|
||||
_, err = ss.Recv()
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
}, si.Options(), nil)
|
||||
assert.Nil(t, srv.md)
|
||||
}
|
||||
@@ -135,6 +149,8 @@ func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) {
|
||||
func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) {
|
||||
var ci clientinterceptor
|
||||
ci.setRole(RolePrimary, 10)
|
||||
ci.roleSetter = noopSetRole
|
||||
ci.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unimplemented, status.Code(err))
|
||||
@@ -150,6 +166,8 @@ func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) {
|
||||
func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) {
|
||||
var ci clientinterceptor
|
||||
ci.setRole(RolePrimary, 10)
|
||||
ci.roleSetter = noopSetRole
|
||||
ci.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
require.NoError(t, err)
|
||||
@@ -167,14 +185,16 @@ func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) {
|
||||
func TestClientInterceptorAsStandbyDoesNotSendRequest(t *testing.T) {
|
||||
var ci clientinterceptor
|
||||
ci.setRole(RolePrimary, 10)
|
||||
ci.roleSetter = noopSetRole
|
||||
ci.lgr = lgr
|
||||
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
|
||||
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unimplemented, status.Code(err))
|
||||
ci.setRole(RoleStandby, 11)
|
||||
_, err = client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
_, err = client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
|
||||
assert.Equal(t, codes.Unavailable, status.Code(err))
|
||||
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
|
||||
}, nil, ci.Options())
|
||||
if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) {
|
||||
assert.Equal(t, "primary", srv.md.Get(clusterRoleHeader)[0])
|
||||
|
||||
@@ -501,3 +501,252 @@ cluster:
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
# @test "sql-server-cluster: misconfigured cluster with primaries at same epoch, both transition to standby" {
|
||||
# cd serverone
|
||||
#
|
||||
# echo "
|
||||
# log_level: trace
|
||||
# user:
|
||||
# name: dolt
|
||||
# listener:
|
||||
# host: 0.0.0.0
|
||||
# port: ${SERVERONE_MYSQL_PORT}
|
||||
# behavior:
|
||||
# read_only: false
|
||||
# autocommit: true
|
||||
# cluster:
|
||||
# standby_remotes:
|
||||
# - name: standby
|
||||
# remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
# bootstrap_role: primary
|
||||
# bootstrap_epoch: 10
|
||||
# remotesapi:
|
||||
# port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
#
|
||||
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
# DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
#
|
||||
# (cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
# (cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
# DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
# serverone_pid=$!
|
||||
#
|
||||
# wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
#
|
||||
# cd ../servertwo
|
||||
#
|
||||
# echo "
|
||||
# log_level: trace
|
||||
# user:
|
||||
# name: dolt
|
||||
# listener:
|
||||
# host: 0.0.0.0
|
||||
# port: ${SERVERTWO_MYSQL_PORT}
|
||||
# behavior:
|
||||
# read_only: false
|
||||
# autocommit: true
|
||||
# cluster:
|
||||
# standby_remotes:
|
||||
# - name: standby
|
||||
# remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
# bootstrap_role: primary
|
||||
# bootstrap_epoch: 10
|
||||
# remotesapi:
|
||||
# port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
#
|
||||
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
# DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
# DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
#
|
||||
# (cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
# (cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
# DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
# servertwo_pid=$!
|
||||
#
|
||||
# wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
#
|
||||
# # Run a query to make sure everyone sees everyone...
|
||||
# run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "CREATE TABLE vals (i int primary key)" "" 1
|
||||
#
|
||||
# server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
|
||||
# server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
|
||||
#
|
||||
# kill $servertwo_pid
|
||||
# wait $servertwo_pid
|
||||
#
|
||||
# kill $serverone_pid
|
||||
# wait $serverone_pid
|
||||
# }
|
||||
|
||||
@test "sql-server-cluster: an older primary comes up, becomes a standby and does not overwrite newer primary" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 15
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10"
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: a new primary comes up, old primary becomes a standby and has its state overwritten" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10)'
|
||||
cd ../
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 15
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,15"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "SELECT count(*) FROM vals" "count(*)\n10"
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "SELECT @@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,15"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user