Merge pull request #4911 from dolthub/aaron/dolt-standby-replication-remotesapi-listen-address

go/cmd/dolt/commands/sqlserver: Allow a server in a cluster to configure which address it listens on for replication traffic.
This commit is contained in:
Aaron Son
2022-12-02 13:44:05 -08:00
committed by GitHub
6 changed files with 44 additions and 34 deletions

View File

@@ -229,11 +229,12 @@ func Serve(
var remoteSrv *remotesrv.Server
if serverConfig.RemotesapiPort() != nil {
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
listenaddr := fmt.Sprintf(":%d", *serverConfig.RemotesapiPort())
args := sqle.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
ReadOnly: true,
HttpPort: *serverConfig.RemotesapiPort(),
GrpcPort: *serverConfig.RemotesapiPort(),
Logger: logrus.NewEntry(lgr),
ReadOnly: true,
HttpListenAddr: listenaddr,
GrpcListenAddr: listenaddr,
})
remoteSrv, err = remotesrv.NewServer(args)
if err != nil {
@@ -282,7 +283,7 @@ func Serve(
listeners, err := clusterRemoteSrv.Listeners()
if err != nil {
lgr.Errorf("error starting remotesapi server listeners for cluster config on port %d: %v", clusterController.RemoteSrvPort(), err)
lgr.Errorf("error starting remotesapi server listeners for cluster config on %s: %v", clusterController.RemoteSrvListenAddr(), err)
startError = err
return
}

View File

@@ -521,6 +521,7 @@ func (c *ClusterYAMLConfig) RemotesAPIConfig() cluster.RemotesAPIConfig {
}
type clusterRemotesAPIYAMLConfig struct {
Addr_ string `yaml:"address"`
Port_ int `yaml:"port"`
TLSKey_ string `yaml:"tls_key"`
TLSCert_ string `yaml:"tls_cert"`
@@ -529,6 +530,10 @@ type clusterRemotesAPIYAMLConfig struct {
DNSMatches []string `yaml:"server_name_dns"`
}
func (c clusterRemotesAPIYAMLConfig) Address() string {
return c.Addr_
}
func (c clusterRemotesAPIYAMLConfig) Port() int {
return c.Port_
}

View File

@@ -17,7 +17,6 @@ package remotesrv
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"strings"
@@ -36,10 +35,11 @@ type Server struct {
wg sync.WaitGroup
stopChan chan struct{}
grpcPort int
grpcSrv *grpc.Server
httpPort int
httpSrv http.Server
grpcListenAddr string
httpListenAddr string
grpcSrv *grpc.Server
httpSrv http.Server
tlsConfig *tls.Config
}
@@ -52,8 +52,10 @@ func (s *Server) GracefulStop() {
type ServerArgs struct {
Logger *logrus.Entry
HttpHost string
HttpPort int
GrpcPort int
HttpListenAddr string
GrpcListenAddr string
FS filesys.Filesys
DBCache DBCache
ReadOnly bool
@@ -87,7 +89,7 @@ func NewServer(args ServerArgs) (*Server, error) {
s.tlsConfig = args.TLSConfig
s.wg.Add(2)
s.grpcPort = args.GrpcPort
s.grpcListenAddr = args.GrpcListenAddr
s.grpcSrv = grpc.NewServer(append([]grpc.ServerOption{grpc.MaxRecvMsgSize(128 * 1024 * 1024)}, args.Options...)...)
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(args.Logger, args.HttpHost, args.DBCache, args.FS, scheme, sealer)
if args.ReadOnly {
@@ -99,15 +101,15 @@ func NewServer(args ServerArgs) (*Server, error) {
if args.HttpInterceptor != nil {
handler = args.HttpInterceptor(handler)
}
if args.HttpPort == args.GrpcPort {
if args.HttpListenAddr == args.GrpcListenAddr {
handler = grpcMultiplexHandler(s.grpcSrv, handler)
} else {
s.wg.Add(2)
}
s.httpPort = args.HttpPort
s.httpListenAddr = args.HttpListenAddr
s.httpSrv = http.Server{
Addr: fmt.Sprintf(":%d", args.HttpPort),
Addr: args.HttpListenAddr,
Handler: handler,
}
@@ -136,20 +138,20 @@ func (s *Server) Listeners() (Listeners, error) {
var grpcListener net.Listener
var err error
if s.tlsConfig != nil {
httpListener, err = tls.Listen("tcp", fmt.Sprintf(":%d", s.httpPort), s.tlsConfig)
httpListener, err = tls.Listen("tcp", s.httpListenAddr, s.tlsConfig)
} else {
httpListener, err = net.Listen("tcp", fmt.Sprintf(":%d", s.httpPort))
httpListener, err = net.Listen("tcp", s.httpListenAddr)
}
if err != nil {
return Listeners{}, err
}
if s.httpPort == s.grpcPort {
if s.httpListenAddr == s.grpcListenAddr {
return Listeners{http: httpListener}, nil
}
if s.tlsConfig != nil {
grpcListener, err = tls.Listen("tcp", fmt.Sprintf(":%d", s.grpcPort), s.tlsConfig)
grpcListener, err = tls.Listen("tcp", s.grpcListenAddr, s.tlsConfig)
} else {
grpcListener, err = net.Listen("tcp", fmt.Sprintf(":%d", s.grpcPort))
grpcListener, err = net.Listen("tcp", s.grpcListenAddr)
}
if err != nil {
httpListener.Close()
@@ -162,7 +164,7 @@ func (s *Server) Serve(listeners Listeners) {
if listeners.grpc != nil {
go func() {
defer s.wg.Done()
logrus.Println("Starting grpc server on port", s.grpcPort)
logrus.Println("Starting grpc server on", s.grpcListenAddr)
err := s.grpcSrv.Serve(listeners.grpc)
logrus.Println("grpc server exited. error:", err)
}()
@@ -175,7 +177,7 @@ func (s *Server) Serve(listeners Listeners) {
go func() {
defer s.wg.Done()
logrus.Println("Starting http server on port", s.httpPort)
logrus.Println("Starting http server on", s.httpListenAddr)
err := s.httpSrv.Serve(listeners.http)
logrus.Println("http server exited. exit error:", err)
}()

View File

@@ -22,6 +22,7 @@ type Config interface {
}
type RemotesAPIConfig interface {
Address() string
Port() int
TLSKey() string
TLSCert() string

View File

@@ -267,11 +267,11 @@ func (c *Controller) ClusterDatabase() sql.Database {
return clusterdb.NewClusterDatabase(c)
}
func (c *Controller) RemoteSrvPort() int {
func (c *Controller) RemoteSrvListenAddr() string {
if c == nil {
return -1
return ""
}
return c.cfg.RemotesAPIConfig().Port()
return fmt.Sprintf("%s:%d", c.cfg.RemotesAPIConfig().Address(), c.cfg.RemotesAPIConfig().Port())
}
func (c *Controller) ServerOptions() []grpc.ServerOption {
@@ -458,8 +458,9 @@ func (c *Controller) recordSuccessfulRemoteSrvCommit(name string) {
}
func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.ServerArgs) remotesrv.ServerArgs {
args.HttpPort = c.RemoteSrvPort()
args.GrpcPort = c.RemoteSrvPort()
listenaddr := c.RemoteSrvListenAddr()
args.HttpListenAddr = listenaddr
args.GrpcListenAddr = listenaddr
args.Options = c.ServerOptions()
args = sqle.RemoteSrvServerArgs(ctx, args)
args.DBCache = remotesrvStoreCache{args.DBCache, c}

View File

@@ -82,12 +82,12 @@ func main() {
}
server, err := remotesrv.NewServer(remotesrv.ServerArgs{
HttpHost: *httpHostParam,
HttpPort: *httpPortParam,
GrpcPort: *grpcPortParam,
FS: fs,
DBCache: dbCache,
ReadOnly: *readOnlyParam,
HttpHost: *httpHostParam,
HttpListenAddr: fmt.Sprintf(":%d", *httpPortParam),
GrpcListenAddr: fmt.Sprintf(":%d", *grpcPortParam),
FS: fs,
DBCache: dbCache,
ReadOnly: *readOnlyParam,
})
if err != nil {
log.Fatalf("error creating remotesrv Server: %v\n", err)