mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-03 10:08:59 -06:00
go: sqle: cluster: Dolt databases on a standby are read only. They accept writes when the server transitions to primary.
This commit is contained in:
@@ -114,6 +114,7 @@ func NewSqlEngine(
|
||||
|
||||
config.ClusterController.RegisterStoredProcedures(pro)
|
||||
pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook)
|
||||
config.ClusterController.ManageDatabaseProvider(pro)
|
||||
|
||||
// Load in privileges from file, if it exists
|
||||
persister := mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath)
|
||||
|
||||
@@ -270,7 +270,7 @@ func Serve(
|
||||
clusterController.ManageQueryConnections(
|
||||
mySQLServer.SessionManager().Iter,
|
||||
sqlEngine.GetUnderlyingEngine().ProcessList.Kill,
|
||||
func(uint32) {}, // TODO: mySQLServer.SessionManager().KillConnection,
|
||||
func(uint32) {}, // TODO: mySQLServer.SessionManager().KillConnection,
|
||||
)
|
||||
} else {
|
||||
lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err)
|
||||
|
||||
@@ -17,6 +17,7 @@ package remotesrv
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
@@ -38,6 +39,8 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
var ErrUnimplemented = errors.New("unimplemented")
|
||||
|
||||
type RemoteChunkStore struct {
|
||||
HttpHost string
|
||||
csCache DBCache
|
||||
@@ -79,7 +82,10 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -133,7 +139,10 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -199,7 +208,10 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
|
||||
nextPath := getRepoPath(req)
|
||||
if nextPath != repoPath {
|
||||
repoPath = nextPath
|
||||
cs = rs.getStore(logger, repoPath)
|
||||
cs, err = rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cs == nil {
|
||||
return status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
@@ -292,7 +304,10 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -341,7 +356,10 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -349,7 +367,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
|
||||
|
||||
logger.Printf("found %s", repoPath)
|
||||
|
||||
err := cs.Rebase(ctx)
|
||||
err = cs.Rebase(ctx)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred during processing of Rebace rpc of %s details: %v", repoPath, err)
|
||||
@@ -364,7 +382,10 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -385,7 +406,10 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -399,7 +423,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
|
||||
updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount)
|
||||
}
|
||||
|
||||
err := cs.AddTableFilesToManifest(ctx, updates)
|
||||
err = cs.AddTableFilesToManifest(ctx, updates)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred updating the manifest: %s", err.Error())
|
||||
@@ -426,12 +450,15 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion)
|
||||
cs, err := rs.getOrCreateStore(logger, repoPath, req.ClientRepoFormat.NbfVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
}
|
||||
|
||||
err := cs.Rebase(ctx)
|
||||
err = cs.Rebase(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -453,7 +480,10 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -522,7 +552,10 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
defer func() { logger.Println("finished") }()
|
||||
|
||||
repoPath := getRepoPath(req)
|
||||
cs := rs.getStore(logger, repoPath)
|
||||
cs, err := rs.getStore(logger, repoPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cs == nil {
|
||||
return nil, status.Error(codes.Internal, "Could not get chunkstore")
|
||||
@@ -536,7 +569,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
updates[hash.New(cti.Hash).String()] = int(cti.ChunkCount)
|
||||
}
|
||||
|
||||
err := cs.AddTableFilesToManifest(ctx, updates)
|
||||
err = cs.AddTableFilesToManifest(ctx, updates)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("error occurred updating the manifest: %s", err.Error())
|
||||
@@ -546,18 +579,20 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
|
||||
return &remotesapi.AddTableFilesResponse{Success: true}, nil
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) RemoteSrvStore {
|
||||
func (rs *RemoteChunkStore) getStore(logger *logrus.Entry, repoPath string) (RemoteSrvStore, error) {
|
||||
return rs.getOrCreateStore(logger, repoPath, types.Format_Default.VersionString())
|
||||
}
|
||||
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) RemoteSrvStore {
|
||||
func (rs *RemoteChunkStore) getOrCreateStore(logger *logrus.Entry, repoPath, nbfVerStr string) (RemoteSrvStore, error) {
|
||||
cs, err := rs.csCache.Get(repoPath, nbfVerStr)
|
||||
|
||||
if err != nil {
|
||||
logger.Printf("Failed to retrieve chunkstore for %s\n", repoPath)
|
||||
if errors.Is(err, ErrUnimplemented) {
|
||||
return nil, status.Error(codes.Unimplemented, err.Error())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cs
|
||||
return cs, nil
|
||||
}
|
||||
|
||||
var requestId int32
|
||||
|
||||
@@ -53,6 +53,7 @@ type Controller struct {
|
||||
cinterceptor clientinterceptor
|
||||
lgr *logrus.Logger
|
||||
|
||||
provider dbProvider
|
||||
iterSessions IterSessions
|
||||
killQuery func(uint32)
|
||||
killConnection func(uint32)
|
||||
@@ -62,6 +63,12 @@ type sqlvars interface {
|
||||
AddSystemVariables(sysVars []sql.SystemVariable)
|
||||
}
|
||||
|
||||
// We can manage certain aspects of the exposed databases on the server through
|
||||
// this.
|
||||
type dbProvider interface {
|
||||
SetIsStandby(bool)
|
||||
}
|
||||
|
||||
type procedurestore interface {
|
||||
Register(sql.ExternalStoredProcedureDetails)
|
||||
}
|
||||
@@ -128,7 +135,20 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.
|
||||
|
||||
type IterSessions func(func(sql.Session) (bool, error)) error
|
||||
|
||||
func (c *Controller) ManageDatabaseProvider(p dbProvider) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.provider = p
|
||||
c.provider.SetIsStandby(c.role == RoleStandby)
|
||||
}
|
||||
|
||||
func (c *Controller) ManageQueryConnections(iterSessions IterSessions, killQuery, killConnection func(uint32)) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.iterSessions = iterSessions
|
||||
@@ -279,27 +299,25 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int, graceful bool) erro
|
||||
|
||||
changedrole := role != string(c.role)
|
||||
|
||||
c.role = Role(role)
|
||||
c.epoch = epoch
|
||||
|
||||
if changedrole {
|
||||
var err error
|
||||
if c.role == RoleStandby {
|
||||
if role == string(RoleStandby) {
|
||||
if graceful {
|
||||
err = c.gracefulTransitionToStandby()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err = c.immediateTransitionToStandby()
|
||||
// TODO: this should not fail; if it does, we still prevent transition for now.
|
||||
c.immediateTransitionToStandby()
|
||||
}
|
||||
} else {
|
||||
err = c.transitionToPrimary()
|
||||
// TODO: this should not fail; if it does, we still prevent transition for now.
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
c.transitionToPrimary()
|
||||
}
|
||||
}
|
||||
|
||||
c.role = Role(role)
|
||||
c.epoch = epoch
|
||||
|
||||
c.refreshSystemVars()
|
||||
c.cinterceptor.setRole(c.role, c.epoch)
|
||||
c.sinterceptor.setRole(c.role, c.epoch)
|
||||
@@ -374,7 +392,19 @@ func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.Server
|
||||
// * Replicate all databases to their standby remotes.
|
||||
// * If success, return success.
|
||||
// * If failure, set all databases in database_provider back to their original state. Return failure.
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) gracefulTransitionToStandby() error {
|
||||
c.setProviderIsStandby(true)
|
||||
c.killRunningQueries()
|
||||
// TODO: this can block with c.mu held, although we are not too
|
||||
// interested in the server proceeding gracefully while this is
|
||||
// happening.
|
||||
if err := c.waitForHooksToReplicate(); err != nil {
|
||||
c.setProviderIsStandby(false)
|
||||
c.killRunningQueries()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -382,7 +412,11 @@ func (c *Controller) gracefulTransitionToStandby() error {
|
||||
// * Set all databases in database_provider to read-only.
|
||||
// * Kill all running queries in GMS.
|
||||
// * Return success. NOTE: we do not attempt to replicate to the standby.
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) immediateTransitionToStandby() error {
|
||||
c.setProviderIsStandby(true)
|
||||
c.killRunningQueries()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -390,20 +424,40 @@ func (c *Controller) immediateTransitionToStandby() error {
|
||||
// * Set all databases in database_provider back to their original mode: read-write or read only.
|
||||
// * Kill all running queries in GMS.
|
||||
// * Return success.
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) transitionToPrimary() error {
|
||||
c.setProviderIsStandby(false)
|
||||
c.killRunningQueries()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Kills all running queries in the managed GMS engine.
|
||||
// called with c.mu held
|
||||
func (c *Controller) killRunningQueries() {
|
||||
c.mu.Lock()
|
||||
iterSessions, killQuery, killConnection := c.iterSessions, c.killQuery, c.killConnection
|
||||
c.mu.Unlock()
|
||||
if c.iterSessions != nil {
|
||||
iterSessions(func(session sql.Session) (stop bool, err error) {
|
||||
killQuery(session.ID())
|
||||
killConnection(session.ID())
|
||||
c.iterSessions(func(session sql.Session) (stop bool, err error) {
|
||||
c.killQuery(session.ID())
|
||||
c.killConnection(session.ID())
|
||||
return
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// called with c.mu held
|
||||
func (c *Controller) setProviderIsStandby(standby bool) {
|
||||
if c.provider != nil {
|
||||
c.provider.SetIsStandby(standby)
|
||||
}
|
||||
}
|
||||
|
||||
// Called during a graceful transition from primary to standby. Waits until all
|
||||
// commithooks report nextHead == lastPushedHead.
|
||||
//
|
||||
// TODO: Implement this.
|
||||
// TODO: Report errors from commit hooks or add a deadline here or something.
|
||||
//
|
||||
// called with c.mu held
|
||||
func (c *Controller) waitForHooksToReplicate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -55,6 +55,7 @@ type DoltDatabaseProvider struct {
|
||||
remoteDialer dbfactory.GRPCDialProvider // TODO: why isn't this a method defined on the remote object
|
||||
|
||||
dbFactoryUrl string
|
||||
isStandby *bool
|
||||
}
|
||||
|
||||
var _ sql.DatabaseProvider = (*DoltDatabaseProvider)(nil)
|
||||
@@ -116,6 +117,7 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files
|
||||
defaultBranch: defaultBranch,
|
||||
dbFactoryUrl: doltdb.LocalDirDoltDB,
|
||||
InitDatabaseHook: ConfigureReplicationDatabaseHook,
|
||||
isStandby: new(bool),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -148,6 +150,15 @@ func (p DoltDatabaseProvider) FileSystem() filesys.Filesys {
|
||||
return p.fs
|
||||
}
|
||||
|
||||
// If this DatabaseProvider is set to standby |true|, it returns every dolt
|
||||
// database as a read only database. Set back to |false| to get read-write
|
||||
// behavior from dolt databases again.
|
||||
func (p DoltDatabaseProvider) SetIsStandby(standby bool) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
*p.isStandby = standby
|
||||
}
|
||||
|
||||
// FileSystemForDatabase returns a filesystem, with the working directory set to the root directory
|
||||
// of the requested database. If the requested database isn't found, a database not found error
|
||||
// is returned.
|
||||
@@ -168,9 +179,10 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da
|
||||
var ok bool
|
||||
p.mu.RLock()
|
||||
db, ok = p.databases[formatDbMapKeyName(name)]
|
||||
standby := *p.isStandby
|
||||
p.mu.RUnlock()
|
||||
if ok {
|
||||
return db, nil
|
||||
return wrapForStandby(db, standby), nil
|
||||
}
|
||||
|
||||
db, _, ok, err = p.databaseForRevision(ctx, name)
|
||||
@@ -189,7 +201,21 @@ func (p DoltDatabaseProvider) Database(ctx *sql.Context, name string) (db sql.Da
|
||||
}
|
||||
|
||||
// Don't track revision databases, just instantiate them on demand
|
||||
return db, nil
|
||||
return wrapForStandby(db, standby), nil
|
||||
}
|
||||
|
||||
func wrapForStandby(db sql.Database, standby bool) sql.Database {
|
||||
if !standby {
|
||||
return db
|
||||
}
|
||||
if _, ok := db.(ReadOnlyDatabase); ok {
|
||||
return db
|
||||
}
|
||||
if db, ok := db.(Database); ok {
|
||||
// :-/. Hopefully it's not too sliced.
|
||||
return ReadOnlyDatabase{db}
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
// attemptCloneReplica attempts to clone a database from the configured replication remote URL template, returning an error
|
||||
|
||||
@@ -15,8 +15,6 @@
|
||||
package sqle
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/dolthub/go-mysql-server/sql"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
@@ -37,15 +35,15 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ddb, ok := db.(Database)
|
||||
sdb, ok := db.(SqlDatabase)
|
||||
if !ok {
|
||||
return nil, errors.New("unimplemented")
|
||||
return nil, remotesrv.ErrUnimplemented
|
||||
}
|
||||
datasdb := doltdb.HackDatasDatabaseFromDoltDB(ddb.DbData().Ddb)
|
||||
datasdb := doltdb.HackDatasDatabaseFromDoltDB(sdb.DbData().Ddb)
|
||||
cs := datas.ChunkStoreFromDatabase(datasdb)
|
||||
rss, ok := cs.(remotesrv.RemoteSrvStore)
|
||||
if !ok {
|
||||
return nil, errors.New("unimplemented")
|
||||
return nil, remotesrv.ErrUnimplemented
|
||||
}
|
||||
return rss, nil
|
||||
}
|
||||
|
||||
@@ -131,7 +131,9 @@ cluster:
|
||||
# same role, new epoch
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12"
|
||||
# new role, new epoch
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13');" "" 1
|
||||
# we assert on a new connection, since the server may have killed the old one on the transition.
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
|
||||
|
||||
# Server comes back up with latest assumed role.
|
||||
kill $SERVER_PID
|
||||
@@ -205,6 +207,7 @@ cluster:
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
@@ -236,6 +239,7 @@ cluster:
|
||||
cd ../servertwo
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
@@ -280,3 +284,215 @@ cluster:
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "| 5 " ]] || false
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: booted standby server is read only" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)'
|
||||
cd ..
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1
|
||||
[[ "$output" =~ "Database repo1 is read-only" ]] || false
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: booted primary server is read write" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)'
|
||||
cd ..
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" ""
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10"
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: standby transitioned to primary becomes writable" {
|
||||
cd serverone
|
||||
|
||||
cd repo1
|
||||
dolt sql -q 'create table vals (i int primary key)'
|
||||
dolt sql -q 'insert into vals values (1), (2), (3), (4), (5)'
|
||||
cd ..
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
SERVER_PID=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1
|
||||
[[ "$output" =~ "Database repo1 is read-only" ]] || false
|
||||
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', 11)" "" 1
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 0
|
||||
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n10"
|
||||
}
|
||||
|
||||
@test "sql-server-cluster: primary transitioned to standby becomes read only" {
|
||||
# In order to gracefully transition to standby, we will need the standby running.
|
||||
cd serverone
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERONE_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
|
||||
bootstrap_role: standby
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERONE_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
serverone_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
|
||||
|
||||
cd ../servertwo
|
||||
|
||||
echo "
|
||||
log_level: trace
|
||||
user:
|
||||
name: dolt
|
||||
listener:
|
||||
host: 0.0.0.0
|
||||
port: ${SERVERTWO_MYSQL_PORT}
|
||||
behavior:
|
||||
read_only: false
|
||||
autocommit: true
|
||||
cluster:
|
||||
standby_remotes:
|
||||
- name: standby
|
||||
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
|
||||
bootstrap_role: primary
|
||||
bootstrap_epoch: 10
|
||||
remotesapi:
|
||||
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
|
||||
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
|
||||
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
|
||||
|
||||
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
|
||||
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
|
||||
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
|
||||
servertwo_pid=$!
|
||||
|
||||
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
|
||||
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5)"
|
||||
run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', 11)" "" 1
|
||||
run server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "insert into vals values (6),(7),(8),(9),(10)" "" 1
|
||||
[[ "$output" =~ "Database repo1 is read-only" ]] || false
|
||||
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "select count(*) from vals" "count(*)\n5"
|
||||
|
||||
kill $servertwo_pid
|
||||
wait $servertwo_pid
|
||||
|
||||
kill $serverone_pid
|
||||
wait $serverone_pid
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user