Merge pull request #4667 from dolthub/aaron/sql-cluster-server-interceptor-prepare-allow-non-cluster-traffic

go/libraries/doltcore/sqle/cluster: interceptors.go: Add some initial support for server interceptor allowing non-cluster clients to access the remotesapi.
This commit is contained in:
Aaron Son
2022-10-31 13:54:39 -07:00
committed by GitHub
2 changed files with 133 additions and 59 deletions
@@ -30,6 +30,15 @@ import (
const clusterRoleHeader = "x-dolt-cluster-role"
const clusterRoleEpochHeader = "x-dolt-cluster-role-epoch"
var writeEndpoints map[string]bool
func init() {
writeEndpoints = make(map[string]bool)
writeEndpoints["/dolt.services.remotesapi.v1alpha1.ChunkStoreService/Commit"] = true
writeEndpoints["/dolt.services.remotesapi.v1alpha1.ChunkStoreService/AddTableFiles"] = true
writeEndpoints["/dolt.services.remotesapi.v1alpha1.ChunkStoreService/GetUploadLocations"] = true
}
// clientinterceptor is installed as a Unary and Stream client interceptor on
// the client conns that are used to communicate with standby remotes. The
// cluster.Controller sets this server's current Role and role epoch on the
@@ -134,17 +143,21 @@ func (ci *clientinterceptor) Options() []grpc.DialOption {
// serverinterceptor is installed as a Unary and Stream interceptor on a
// ChunkStoreServer which is serving a SQL database as a standby remote. The
// cluster.Controller sets this server's current Role and role epoch on the
// interceptor anytime it changes. In turn, this interceptor:
// * adds the server's current role and epoch to the response headers for every
// request.
// * fails all incoming requests immediately with codes.FailedPrecondition if the
// current role != RoleStandby, since nothing should be replicating to us in
// that state.
// interceptor anytime it changes. In turn, this interceptor has the following
// behavior:
// * for any incoming standby traffic, it will add the server's current role
// and epoch to the response headers for every request.
// * for any incoming standby traffic, it will fail incoming requests
// immediately with codes.FailedPrecondition if the current role !=
// RoleStandby, since nothing should be replicating to us in that state.
// * watches incoming request headers for a situation which causes this server
// to force downgrade from primary to standby. In particular, when an incoming
// request asserts that the client is the current primary at an epoch higher
// than our current epoch, this interceptor coordinates with the Controller to
// immediately transition to standby and allow replication requests through.
// * for incoming requests which are not standby, it will currently fail the
// requests with codes.Unauthenticated. Eventually, it will allow read-only
// traffic through which is authenticated and authorized.
type serverinterceptor struct {
lgr *logrus.Entry
role Role
@@ -154,71 +167,90 @@ type serverinterceptor struct {
}
func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, into *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
fromStandby := false
if md, ok := metadata.FromIncomingContext(ss.Context()); ok {
role, epoch := si.getRole()
si.handleRequestHeaders(md, role, epoch)
fromStandby = si.handleRequestHeaders(md, role, epoch)
}
// After handleRequestHeaders, our role may have changed, so we fetch it again here.
role, epoch := si.getRole()
if err := grpc.SetHeader(ss.Context(), metadata.Pairs(clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))); err != nil {
return err
if fromStandby {
// After handleRequestHeaders, our role may have changed, so we fetch it again here.
role, epoch := si.getRole()
if err := grpc.SetHeader(ss.Context(), metadata.Pairs(clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))); err != nil {
return err
}
if role == RolePrimary {
// As a primary, we do not accept replication requests.
return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
}
if role == RoleDetectedBrokenConfig {
// As a primary, we do not accept replication requests.
return status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
}
return handler(srv, ss)
} else if isWrite := writeEndpoints[info.FullMethod]; isWrite {
return status.Error(codes.Unimplemented, "unimplemented")
} else {
return status.Error(codes.Unauthenticated, "unauthenticated")
}
if role == RolePrimary {
// As a primary, we do not accept replication requests.
return status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
}
if role == RoleDetectedBrokenConfig {
// As a primary, we do not accept replication requests.
return status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
}
return handler(srv, ss)
}
}
func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
fromStandby := false
if md, ok := metadata.FromIncomingContext(ctx); ok {
role, epoch := si.getRole()
si.handleRequestHeaders(md, role, epoch)
fromStandby = si.handleRequestHeaders(md, role, epoch)
}
// After handleRequestHeaders, our role may have changed, so we fetch it again here.
role, epoch := si.getRole()
if err := grpc.SetHeader(ctx, metadata.Pairs(clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))); err != nil {
return nil, err
if fromStandby {
// After handleRequestHeaders, our role may have changed, so we fetch it again here.
role, epoch := si.getRole()
if err := grpc.SetHeader(ctx, metadata.Pairs(clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))); err != nil {
return nil, err
}
if role == RolePrimary {
// As a primary, we do not accept replication requests.
return nil, status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
}
if role == RoleDetectedBrokenConfig {
// As a primary, we do not accept replication requests.
return nil, status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
}
return handler(ctx, req)
} else if isWrite := writeEndpoints[info.FullMethod]; isWrite {
return nil, status.Error(codes.Unimplemented, "unimplemented")
} else {
return nil, status.Error(codes.Unauthenticated, "unauthenticated")
}
if role == RolePrimary {
// As a primary, we do not accept replication requests.
return nil, status.Error(codes.FailedPrecondition, "this server is a primary and is not currently accepting replication")
}
if role == RoleDetectedBrokenConfig {
// As a primary, we do not accept replication requests.
return nil, status.Error(codes.FailedPrecondition, "this server is currently in detected_broken_config and is not currently accepting replication")
}
return handler(ctx, req)
}
}
func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role, epoch int) {
func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role, epoch int) bool {
epochs := header.Get(clusterRoleEpochHeader)
roles := header.Get(clusterRoleHeader)
if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) && role == RolePrimary {
if reqepoch, err := strconv.Atoi(epochs[0]); err == nil {
if reqepoch == epoch {
// Misconfiguration in the cluster means this
// server and its standby are marked as Primary
// at the same epoch. We will become standby
// and our peer will become standby. An
// operator will need to get involved.
si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to detected_broken_config.")
si.roleSetter(string(RoleDetectedBrokenConfig), reqepoch)
} else if reqepoch > epoch {
// The client replicating to us thinks it is the primary at a higher epoch than us.
si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
si.roleSetter(string(RoleStandby), reqepoch)
if len(epochs) > 0 && len(roles) > 0 {
if roles[0] == string(RolePrimary) && role == RolePrimary {
if reqepoch, err := strconv.Atoi(epochs[0]); err == nil {
if reqepoch == epoch {
// Misconfiguration in the cluster means this
// server and its standby are marked as Primary
// at the same epoch. We will become standby
// and our peer will become standby. An
// operator will need to get involved.
si.lgr.Errorf("cluster: serverinterceptor: this server and its standby replica are both primary at the same epoch. force transitioning to detected_broken_config.")
si.roleSetter(string(RoleDetectedBrokenConfig), reqepoch)
} else if reqepoch > epoch {
// The client replicating to us thinks it is the primary at a higher epoch than us.
si.lgr.Warnf("cluster: serverinterceptor: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
si.roleSetter(string(RoleStandby), reqepoch)
}
}
}
// returns true if the request was from a standby replica, false otherwise
return true
}
return false
}
func (si *serverinterceptor) Options() []grpc.ServerOption {
@@ -17,6 +17,7 @@ package cluster
import (
"context"
"net"
"strconv"
"sync"
"testing"
@@ -89,6 +90,47 @@ func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient),
return hs
}
func outboundCtx(vals ...interface{}) context.Context {
ctx := context.Background()
if len(vals) == 0 {
return ctx
}
if len(vals) == 2 {
return metadata.AppendToOutgoingContext(ctx,
clusterRoleHeader, string(vals[0].(Role)),
clusterRoleEpochHeader, strconv.Itoa(vals[1].(int)))
}
panic("bad test --- outboundCtx must take 0 or 2 values")
}
func TestServerInterceptorUnauthenticatedWithoutClientHeaders(t *testing.T) {
var si serverinterceptor
si.roleSetter = noopSetRole
si.lgr = lgr
si.setRole(RoleStandby, 10)
t.Run("Standby", func(t *testing.T) {
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
_, err := client.Check(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unauthenticated, status.Code(err))
srv, err := client.Watch(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
_, err = srv.Recv()
assert.Equal(t, codes.Unauthenticated, status.Code(err))
}, si.Options(), nil)
})
si.setRole(RolePrimary, 10)
t.Run("Primary", func(t *testing.T) {
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
_, err := client.Check(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unauthenticated, status.Code(err))
srv, err := client.Watch(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
_, err = srv.Recv()
assert.Equal(t, codes.Unauthenticated, status.Code(err))
}, si.Options(), nil)
})
}
func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) {
var si serverinterceptor
si.setRole(RoleStandby, 10)
@@ -96,7 +138,7 @@ func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) {
si.lgr = lgr
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
var md metadata.MD
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
_, err := client.Check(outboundCtx(RolePrimary, 10), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
assert.Equal(t, codes.Unimplemented, status.Code(err))
if assert.Len(t, md.Get(clusterRoleHeader), 1) {
assert.Equal(t, "standby", md.Get(clusterRoleHeader)[0])
@@ -114,7 +156,7 @@ func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) {
si.lgr = lgr
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
var md metadata.MD
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
srv, err := client.Watch(outboundCtx(RolePrimary, 10), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
require.NoError(t, err)
_, err = srv.Recv()
assert.Equal(t, codes.Unimplemented, status.Code(err))
@@ -133,10 +175,10 @@ func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) {
si.roleSetter = noopSetRole
si.lgr = lgr
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
ctx := metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
ctx := metadata.AppendToOutgoingContext(outboundCtx(RoleStandby, 10), "test-header", "test-header-value")
_, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
ctx = metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
ctx = metadata.AppendToOutgoingContext(outboundCtx(RoleStandby, 10), "test-header", "test-header-value")
ss, err := client.Watch(ctx, &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
_, err = ss.Recv()
@@ -151,7 +193,7 @@ func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) {
ci.roleSetter = noopSetRole
ci.lgr = lgr
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
_, err := client.Check(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unimplemented, status.Code(err))
}, nil, ci.Options())
if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) {
@@ -168,7 +210,7 @@ func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) {
ci.roleSetter = noopSetRole
ci.lgr = lgr
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
srv, err := client.Watch(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
_, err = srv.Recv()
assert.Equal(t, codes.Unimplemented, status.Code(err))
@@ -187,12 +229,12 @@ func TestClientInterceptorAsStandbyDoesNotSendRequest(t *testing.T) {
ci.roleSetter = noopSetRole
ci.lgr = lgr
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
_, err := client.Check(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unimplemented, status.Code(err))
ci.setRole(RoleStandby, 11)
_, err = client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
_, err = client.Check(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
_, err = client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
_, err = client.Watch(outboundCtx(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.FailedPrecondition, status.Code(err))
}, nil, ci.Options())
if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) {