mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-14 02:08:41 -05:00
sql-server: cluster: Run the cluster remotesapi server in read-write mode.
This commit is contained in:
@@ -99,10 +99,8 @@ func NewSqlEngine(
|
||||
}
|
||||
pro = pro.WithRemoteDialer(mrEnv.RemoteDialProvider())
|
||||
|
||||
if config.ClusterController != nil {
|
||||
config.ClusterController.ManageSystemVariables(sql.SystemVariables)
|
||||
config.ClusterController.RegisterStoredProcedures(pro)
|
||||
}
|
||||
config.ClusterController.ManageSystemVariables(sql.SystemVariables)
|
||||
config.ClusterController.RegisterStoredProcedures(pro)
|
||||
|
||||
// Load in privileges from file, if it exists
|
||||
persister := mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath)
|
||||
|
||||
@@ -214,7 +214,12 @@ func Serve(
|
||||
var remoteSrv *remotesrv.Server
|
||||
if serverConfig.RemotesapiPort() != nil {
|
||||
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
|
||||
remoteSrv = sqle.NewRemoteSrvServer(logrus.NewEntry(lgr), remoteSrvSqlCtx, *serverConfig.RemotesapiPort())
|
||||
remoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
|
||||
Logger: logrus.NewEntry(lgr),
|
||||
ReadOnly: true,
|
||||
HttpPort: *serverConfig.RemotesapiPort(),
|
||||
GrpcPort: *serverConfig.RemotesapiPort(),
|
||||
})
|
||||
listeners, err := remoteSrv.Listeners()
|
||||
if err != nil {
|
||||
lgr.Errorf("error starting remotesapi server listeners on port %d: %v", *serverConfig.RemotesapiPort(), err)
|
||||
@@ -232,6 +237,31 @@ func Serve(
|
||||
}
|
||||
}
|
||||
|
||||
var clusterRemoteSrv *remotesrv.Server
|
||||
if clusterController != nil {
|
||||
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
|
||||
clusterRemoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
|
||||
Logger: logrus.NewEntry(lgr),
|
||||
HttpPort: clusterController.RemoteSrvPort(),
|
||||
GrpcPort: clusterController.RemoteSrvPort(),
|
||||
})
|
||||
listeners, err := clusterRemoteSrv.Listeners()
|
||||
if err != nil {
|
||||
lgr.Errorf("error starting remotesapi server listeners for cluster config on port %d: %v", clusterController.RemoteSrvPort(), err)
|
||||
startError = err
|
||||
return
|
||||
} else {
|
||||
go func() {
|
||||
clusterRemoteSrv.Serve(listeners)
|
||||
}()
|
||||
}
|
||||
} else {
|
||||
lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err)
|
||||
startError = err
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if ok, f := mrEnv.IsLocked(); ok {
|
||||
startError = env.ErrActiveServerLock.New(f)
|
||||
return
|
||||
@@ -248,6 +278,9 @@ func Serve(
|
||||
if remoteSrv != nil {
|
||||
remoteSrv.GracefulStop()
|
||||
}
|
||||
if clusterRemoteSrv != nil {
|
||||
clusterRemoteSrv.GracefulStop()
|
||||
}
|
||||
|
||||
return mySQLServer.Close()
|
||||
})
|
||||
|
||||
@@ -70,6 +70,9 @@ func NewController(cfg Config, pCfg config.ReadWriteConfig) (*Controller, error)
|
||||
}
|
||||
|
||||
func (c *Controller) ManageSystemVariables(variables sqlvars) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.systemVars = variables
|
||||
@@ -77,9 +80,19 @@ func (c *Controller) ManageSystemVariables(variables sqlvars) {
|
||||
}
|
||||
|
||||
func (c *Controller) RegisterStoredProcedures(store procedurestore) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
store.Register(newAssumeRoleProcedure(c))
|
||||
}
|
||||
|
||||
func (c *Controller) RemoteSrvPort() int {
|
||||
if c == nil {
|
||||
return -1
|
||||
}
|
||||
return c.cfg.RemotesAPIConfig().Port()
|
||||
}
|
||||
|
||||
func (c *Controller) refreshSystemVars() {
|
||||
role, epoch := string(c.role), c.epoch
|
||||
vars := []sql.SystemVariable{
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
"errors"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
|
||||
@@ -51,14 +50,9 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e
|
||||
return rss, nil
|
||||
}
|
||||
|
||||
func NewRemoteSrvServer(lgr *logrus.Entry, ctx *sql.Context, port int) *remotesrv.Server {
|
||||
func NewRemoteSrvServer(ctx *sql.Context, args remotesrv.ServerArgs) *remotesrv.Server {
|
||||
sess := dsess.DSessFromSess(ctx.Session)
|
||||
return remotesrv.NewServer(remotesrv.ServerArgs{
|
||||
Logger: lgr,
|
||||
HttpPort: port,
|
||||
GrpcPort: port,
|
||||
FS: sess.Provider().FileSystem(),
|
||||
DBCache: remotesrvStore{ctx},
|
||||
ReadOnly: true,
|
||||
})
|
||||
args.FS = sess.Provider().FileSystem()
|
||||
args.DBCache = remotesrvStore{ctx}
|
||||
return remotesrv.NewServer(args)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user