go/libraries/doltcore/sqle/cluster: Improve the robustness of graceful transitions to standby role.

Some subtle races could cause graceful transitions to fail when they could have
succeeded or could cause replication to block for a long time while the attempt
to transition was ongoing.
This commit is contained in:
Aaron Son
2023-03-22 14:43:18 -07:00
parent e0001293b1
commit 396a60f7ed
3 changed files with 108 additions and 71 deletions
@@ -167,12 +167,6 @@ func (h *commithook) isCaughtUp() bool {
return h.nextHead == h.lastPushedHead
}
func (h *commithook) isCaughtUpLocking() bool {
h.mu.Lock()
defer h.mu.Unlock()
return h.isCaughtUp()
}
// called with h.mu locked.
func (h *commithook) primaryNeedsInit() bool {
return h.role == RolePrimary && h.nextHead == (hash.Hash{})
@@ -340,10 +334,17 @@ func (h *commithook) setRole(role Role) {
h.cond.Signal()
}
func (h *commithook) setWaitNotify(f func()) {
func (h *commithook) setWaitNotify(f func()) bool {
h.mu.Lock()
defer h.mu.Unlock()
if f != nil {
if h.waitNotify != nil {
return false
}
f()
}
h.waitNotify = f
return true
}
var errDetectedBrokenConfigStr = "error: more than one server was configured as primary in the same epoch. this server has stopped accepting writes. choose a primary in the cluster and call dolt_assume_cluster_role() on servers in the cluster to start replication at a higher epoch"
@@ -352,11 +353,11 @@ var errDetectedBrokenConfigStr = "error: more than one server was configured as
// replicate and wakes the replication thread.
func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
lgr := h.logger()
lgr.Warnf("cluster/commithook: Execute called post commit")
lgr.Tracef("cluster/commithook: Execute called post commit")
cs := datas.ChunkStoreFromDatabase(db)
root, err := cs.Root(ctx)
if err != nil {
lgr.Warnf("cluster/commithook: Execute: error retrieving local database root: %v", err)
lgr.Errorf("cluster/commithook: Execute: error retrieving local database root: %v", err)
return err
}
h.mu.Lock()
@@ -382,8 +382,15 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool, save
var err error
if role == string(RoleStandby) {
if graceful {
beforeRole, beforeEpoch := c.role, c.epoch
err = c.gracefulTransitionToStandby(saveConnID)
if err == nil && (beforeRole != c.role || beforeEpoch != c.epoch) {
// The role or epoch moved out from under us while we were unlocked and transitioning to standby.
err = fmt.Errorf("error assuming role '%s' at epoch %d: the role configuration changed while we were replicating to our standbys. Please try again", role, epoch)
}
if err != nil {
c.setProviderIsStandby(c.role != RolePrimary)
c.killRunningQueries(saveConnID)
return false, err
}
} else {
@@ -491,12 +498,9 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server
func (c *Controller) gracefulTransitionToStandby(saveConnID int) error {
c.setProviderIsStandby(true)
c.killRunningQueries(saveConnID)
// TODO: this can block with c.mu held, although we are not too
// interested in the server proceeding gracefully while this is
// happening.
// waitForHooksToReplicate will release the lock while it
// blocks, but will return with the lock held.
if err := c.waitForHooksToReplicate(); err != nil {
c.setProviderIsStandby(false)
c.killRunningQueries(saveConnID)
return err
}
return nil
@@ -559,25 +563,30 @@ const waitForHooksToReplicateTimeout = 10 * time.Second
//
// called with c.mu held
func (c *Controller) waitForHooksToReplicate() error {
caughtup := make([]bool, len(c.commithooks))
commithooks := make([]*commithook, len(c.commithooks))
copy(commithooks, c.commithooks)
caughtup := make([]bool, len(commithooks))
var wg sync.WaitGroup
wg.Add(len(c.commithooks))
for li, lch := range c.commithooks {
wg.Add(len(commithooks))
for li, lch := range commithooks {
i := li
ch := lch
if ch.isCaughtUpLocking() {
caughtup[i] = true
wg.Done()
} else {
ch.setWaitNotify(func() {
// called with ch.mu locked.
if !caughtup[i] && ch.isCaughtUp() {
caughtup[i] = true
wg.Done()
}
})
ok := ch.setWaitNotify(func() {
// called with ch.mu locked.
if !caughtup[i] && ch.isCaughtUp() {
caughtup[i] = true
wg.Done()
}
})
if !ok {
for j := li - 1; j >= 0; j-- {
commithooks[j].setWaitNotify(nil)
}
c.lgr.Warnf("cluster/controller: failed to wait for graceful transition to standby; there were concurrent attempts to transition..")
return errors.New("cluster/controller: failed to transition from primary to standby gracefully; did not gain exclusive access to commithooks.")
}
}
c.mu.Unlock()
done := make(chan struct{})
go func() {
wg.Wait()
@@ -590,21 +599,25 @@ func (c *Controller) waitForHooksToReplicate() error {
case <-time.After(waitForHooksToReplicateTimeout):
success = false
}
for _, ch := range c.commithooks {
c.mu.Lock()
for _, ch := range commithooks {
ch.setWaitNotify(nil)
}
// make certain we don't leak the wg.Wait goroutine in the failure case.
for _, b := range caughtup {
if !b {
wg.Done()
if !success {
// make certain we don't leak the wg.Wait goroutine in the failure case.
// at this point, none of the callbacks will ever be called again and
// ch.setWaitNotify grabs a lock and so establishes the happens before.
for _, b := range caughtup {
if !b {
wg.Done()
}
}
}
if success {
c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.")
return nil
} else {
<-done
c.lgr.Warnf("cluster/controller: failed to replicate all databases to all standbys; not transitioning to standby.")
return errors.New("cluster/controller: failed to transition from primary to standby gracefully; could not replicate databases to standby in a timely manner.")
} else {
c.lgr.Tracef("cluster/controller: successfully replicated all databases to all standbys; transitioning to standby.")
return nil
}
}
@@ -43,6 +43,20 @@ func init() {
writeEndpoints["/dolt.services.remotesapi.v1alpha1.ChunkStoreService/GetUploadLocations"] = true
}
func isLikelyServerResponse(err error) bool {
code := status.Code(err)
switch code {
case codes.Unavailable:
fallthrough
case codes.DeadlineExceeded:
fallthrough
case codes.Canceled:
return false
default:
return true
}
}
// clientinterceptor is installed as a Unary and Stream client interceptor on
// the client conns that are used to communicate with standby remotes. The
// cluster.Controller sets this server's current Role and role epoch on the
@@ -83,15 +97,15 @@ func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor {
role, epoch := ci.getRole()
ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role))
if role == RoleStandby {
return nil, status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
return nil, status.Error(codes.FailedPrecondition, "cluster: clientinterceptor: this server is a standby and is not currently replicating to its standby")
}
if role == RoleDetectedBrokenConfig {
return nil, status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby")
return nil, status.Error(codes.FailedPrecondition, "cluster: clientinterceptor: this server is in detected_broken_config and is not currently replicating to its standby")
}
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
var header metadata.MD
stream, err := streamer(ctx, desc, cc, method, append(opts, grpc.Header(&header))...)
ci.handleResponseHeaders(header, role, epoch)
ci.handleResponseHeaders(header, err)
return stream, err
}
}
@@ -101,40 +115,50 @@ func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor {
role, epoch := ci.getRole()
ci.lgr.Tracef("cluster: clientinterceptor: processing request to %s, role %s", method, string(role))
if role == RoleStandby {
return status.Error(codes.FailedPrecondition, "this server is a standby and is not currently replicating to its standby")
return status.Error(codes.FailedPrecondition, "cluster: clientinterceptor: this server is a standby and is not currently replicating to its standby")
}
if role == RoleDetectedBrokenConfig {
return status.Error(codes.FailedPrecondition, "this server is in detected_broken_config and is not currently replicating to its standby")
return status.Error(codes.FailedPrecondition, "cluster: clientinterceptor: this server is in detected_broken_config and is not currently replicating to its standby")
}
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
var header metadata.MD
err := invoker(ctx, method, req, reply, cc, append(opts, grpc.Header(&header))...)
ci.handleResponseHeaders(header, role, epoch)
ci.handleResponseHeaders(header, err)
return err
}
}
func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role, epoch int) {
epochs := header.Get(clusterRoleEpochHeader)
roles := header.Get(clusterRoleHeader)
if len(epochs) > 0 && len(roles) > 0 {
if respepoch, err := strconv.Atoi(epochs[0]); err == nil {
if roles[0] == string(RolePrimary) {
if respepoch == epoch {
ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to detected_broken_config.")
ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch)
} else if respepoch > epoch {
// The server we replicate to thinks it is the primary at a higher epoch than us...
ci.lgr.Warnf("cluster: clientinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
ci.roleSetter(string(RoleStandby), respepoch)
}
} else if roles[0] == string(RoleDetectedBrokenConfig) && respepoch >= epoch {
ci.lgr.Errorf("cluster: clientinterceptor: this server learned from its standby that the standby is in detected_broken_config. force transitioning to detected_broken_config.")
ci.roleSetter(string(RoleDetectedBrokenConfig), respepoch)
}
}
func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, err error) {
role, epoch := ci.getRole()
if role != RolePrimary {
// By the time we process this response, we were no longer a primary.
return
}
respEpochs := header.Get(clusterRoleEpochHeader)
respRoles := header.Get(clusterRoleHeader)
if len(respEpochs) > 0 && len(respRoles) > 0 {
respRole := respRoles[0]
respEpoch, err := strconv.Atoi(respEpochs[0])
if err == nil {
if respRole == string(RolePrimary) {
if respEpoch == epoch {
ci.lgr.Errorf("cluster: clientinterceptor: this server and the server replicating to it are both primary at the same epoch. force transitioning to detected_broken_config.")
ci.roleSetter(string(RoleDetectedBrokenConfig), respEpoch)
} else if respEpoch > epoch {
// The server we replicate to thinks it is the primary at a higher epoch than us...
ci.lgr.Warnf("cluster: clientinterceptor: this server is primary at epoch %d. a server it attempted to replicate to is primary at epoch %d. force transitioning to standby.", epoch, respEpoch)
ci.roleSetter(string(RoleStandby), respEpoch)
}
} else if respRole == string(RoleDetectedBrokenConfig) && respEpoch >= epoch {
ci.lgr.Errorf("cluster: clientinterceptor: this server learned from its standby that the standby is in detected_broken_config at the same or higher epoch. force transitioning to detected_broken_config.")
ci.roleSetter(string(RoleDetectedBrokenConfig), respEpoch)
}
} else {
ci.lgr.Errorf("cluster: clientinterceptor: failed to parse epoch in response header; something is wrong: %v", err)
}
} else if isLikelyServerResponse(err) {
ci.lgr.Warnf("cluster: clientinterceptor: response was missing role and epoch metadata")
}
ci.lgr.Warnf("cluster: clientinterceptor: response was missing role and epoch metadata")
}
func (ci *clientinterceptor) Options() []grpc.DialOption {
@@ -182,8 +206,7 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fromStandby := false
if md, ok := metadata.FromIncomingContext(ss.Context()); ok {
role, epoch := si.getRole()
fromStandby = si.handleRequestHeaders(md, role, epoch)
fromStandby = si.handleRequestHeaders(md)
}
if fromStandby {
if err := si.authenticate(ss.Context()); err != nil {
@@ -199,7 +222,7 @@ func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
}
if role == RoleDetectedBrokenConfig {
// As a primary, we do not accept replication requests.
// In detected_brokne_config we do not accept replication requests.
return status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
}
return handler(srv, ss)
@@ -215,8 +238,7 @@ func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
fromStandby := false
if md, ok := metadata.FromIncomingContext(ctx); ok {
role, epoch := si.getRole()
fromStandby = si.handleRequestHeaders(md, role, epoch)
fromStandby = si.handleRequestHeaders(md)
}
if fromStandby {
if err := si.authenticate(ctx); err != nil {
@@ -232,7 +254,7 @@ func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
return nil, status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
}
if role == RoleDetectedBrokenConfig {
// As a primary, we do not accept replication requests.
// In detected_broken_config we do not accept replication requests.
return nil, status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
}
return handler(ctx, req)
@@ -244,7 +266,8 @@ func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
}
}
func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role, epoch int) bool {
func (si *serverinterceptor) handleRequestHeaders(header metadata.MD) bool {
role, epoch := si.getRole()
epochs := header.Get(clusterRoleEpochHeader)
roles := header.Get(clusterRoleHeader)
if len(epochs) > 0 && len(roles) > 0 {