Merge pull request #4422 from dolthub/aaron/cluster-replication-hook

go/libraries/doltcore/sqle: Implement standby replication hook.
This commit is contained in:
Aaron Son
2022-09-29 10:08:54 -07:00
committed by GitHub
23 changed files with 1274 additions and 122 deletions

2
go/Godeps/LICENSES generated
View File

@@ -1806,7 +1806,7 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
================================================================================
================================================================================
= github.com/cenkalti/backoff licensed under: =
= github.com/cenkalti/backoff/v4 licensed under: =
The MIT License (MIT)

View File

@@ -88,6 +88,13 @@ func NewSqlEngine(
return nil, err
}
config.ClusterController.ManageSystemVariables(sql.SystemVariables)
err = config.ClusterController.ApplyStandbyReplicationConfig(ctx, bThreads, mrEnv, dbs...)
if err != nil {
return nil, err
}
infoDB := information_schema.NewInformationSchemaDatabase()
all := append(dsqleDBsAsSqlDBs(dbs), infoDB)
locations = append(locations, nil)
@@ -99,10 +106,8 @@ func NewSqlEngine(
}
pro = pro.WithRemoteDialer(mrEnv.RemoteDialProvider())
if config.ClusterController != nil {
config.ClusterController.ManageSystemVariables(sql.SystemVariables)
config.ClusterController.RegisterStoredProcedures(pro)
}
config.ClusterController.RegisterStoredProcedures(pro)
pro.InitDatabaseHook = cluster.NewInitDatabaseHook(config.ClusterController, bThreads, pro.InitDatabaseHook)
// Load in privileges from file, if it exists
persister := mysql_file_handler.NewPersister(config.PrivFilePath, config.DoltCfgDirPath)

View File

@@ -124,7 +124,7 @@ func Serve(
}
}
clusterController, err := cluster.NewController(serverConfig.ClusterConfig(), mrEnv.Config())
clusterController, err := cluster.NewController(lgr, serverConfig.ClusterConfig(), mrEnv.Config())
if err != nil {
return err, nil
}
@@ -225,7 +225,12 @@ func Serve(
var remoteSrv *remotesrv.Server
if serverConfig.RemotesapiPort() != nil {
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
remoteSrv = sqle.NewRemoteSrvServer(logrus.NewEntry(lgr), remoteSrvSqlCtx, *serverConfig.RemotesapiPort())
remoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
ReadOnly: true,
HttpPort: *serverConfig.RemotesapiPort(),
GrpcPort: *serverConfig.RemotesapiPort(),
})
listeners, err := remoteSrv.Listeners()
if err != nil {
lgr.Errorf("error starting remotesapi server listeners on port %d: %v", *serverConfig.RemotesapiPort(), err)
@@ -243,6 +248,32 @@ func Serve(
}
}
var clusterRemoteSrv *remotesrv.Server
if clusterController != nil {
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
clusterRemoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
HttpPort: clusterController.RemoteSrvPort(),
GrpcPort: clusterController.RemoteSrvPort(),
Options: clusterController.ServerOptions(),
})
listeners, err := clusterRemoteSrv.Listeners()
if err != nil {
lgr.Errorf("error starting remotesapi server listeners for cluster config on port %d: %v", clusterController.RemoteSrvPort(), err)
startError = err
return
} else {
go func() {
clusterRemoteSrv.Serve(listeners)
}()
}
} else {
lgr.Errorf("error creating SQL engine context for remotesapi server: %v", err)
startError = err
return
}
}
if ok, f := mrEnv.IsLocked(); ok {
startError = env.ErrActiveServerLock.New(f)
return
@@ -259,6 +290,9 @@ func Serve(
if remoteSrv != nil {
remoteSrv.GracefulStop()
}
if clusterRemoteSrv != nil {
clusterRemoteSrv.GracefulStop()
}
return mySQLServer.Close()
})

View File

@@ -10,7 +10,6 @@ require (
github.com/aws/aws-sdk-go v1.32.6
github.com/bcicen/jstream v1.0.0
github.com/boltdb/bolt v1.3.1
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/denisbrodbeck/machineid v1.0.1
github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078
github.com/dolthub/fslock v0.0.3
@@ -48,7 +47,7 @@ require (
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20220829200755-d48e67d00261
google.golang.org/api v0.32.0
google.golang.org/grpc v1.37.0
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.27.1
gopkg.in/square/go-jose.v2 v2.5.1
gopkg.in/src-d/go-errors.v1 v1.0.0
@@ -57,6 +56,7 @@ require (
require (
github.com/aliyun/aliyun-oss-go-sdk v2.2.5+incompatible
github.com/cenkalti/backoff/v4 v4.1.3
github.com/dolthub/go-mysql-server v0.12.1-0.20220929062247-323a847921de
github.com/google/flatbuffers v2.0.6+incompatible
github.com/kch42/buzhash v0.0.0-20160816060738-9bdec3dec7c6

View File

@@ -130,8 +130,9 @@ github.com/bombsimon/wsl/v3 v3.1.0/go.mod h1:st10JtZYLE4D5sC7b8xV4zTKZwAQjCH/Hy2
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
@@ -201,7 +202,6 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
@@ -1171,8 +1171,8 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM
google.golang.org/grpc v1.32.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.37.0 h1:uSZWeQJX5j11bIQ4AJoj+McDBo29cY1MCoC1wO3ts+c=
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -90,6 +90,10 @@ func (ph *PushOnWriteHook) HandleError(ctx context.Context, err error) error {
return nil
}
func (*PushOnWriteHook) ExecuteForWorkingSets() bool {
return false
}
// SetLogger implements CommitHook
func (ph *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
ph.out = wr
@@ -126,6 +130,10 @@ func NewAsyncPushOnWriteHook(bThreads *sql.BackgroundThreads, destDB *DoltDB, tm
return &AsyncPushOnWriteHook{ch: ch}, nil
}
func (*AsyncPushOnWriteHook) ExecuteForWorkingSets() bool {
return false
}
// Execute implements CommitHook, replicates head updates to the destDb field
func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
addr, _ := ds.MaybeHeadAddr()
@@ -188,6 +196,10 @@ func (lh *LogHook) SetLogger(ctx context.Context, wr io.Writer) error {
return nil
}
func (*LogHook) ExecuteForWorkingSets() bool {
return false
}
func RunAsyncReplicationThreads(bThreads *sql.BackgroundThreads, ch chan PushArg, destDB *DoltDB, tmpDir string, logger io.Writer) error {
mu := &sync.Mutex{}
var newHeads = make(map[string]PushArg, asyncPushBufferSize)

View File

@@ -1269,6 +1269,11 @@ func (ddb *DoltDB) SetCommitHooks(ctx context.Context, postHooks []CommitHook) *
return ddb
}
func (ddb *DoltDB) PrependCommitHook(ctx context.Context, hook CommitHook) *DoltDB {
ddb.db = ddb.db.SetCommitHooks(ctx, append([]CommitHook{hook}, ddb.db.PostCommitHooks()...))
return ddb
}
func (ddb *DoltDB) SetCommitHookLogger(ctx context.Context, wr io.Writer) *DoltDB {
if ddb.db.Database != nil {
ddb.db = ddb.db.SetCommitHookLogger(ctx, wr)
@@ -1281,7 +1286,7 @@ func (ddb *DoltDB) ExecuteCommitHooks(ctx context.Context, datasetId string) err
if err != nil {
return err
}
ddb.db.ExecuteCommitHooks(ctx, ds)
ddb.db.ExecuteCommitHooks(ctx, ds, false)
return nil
}

View File

@@ -36,6 +36,8 @@ type CommitHook interface {
HandleError(ctx context.Context, err error) error
// SetLogger lets clients specify an output stream for HandleError
SetLogger(ctx context.Context, wr io.Writer) error
ExecuteForWorkingSets() bool
}
func (db hooksDatabase) SetCommitHooks(ctx context.Context, postHooks []CommitHook) hooksDatabase {
@@ -54,12 +56,14 @@ func (db hooksDatabase) PostCommitHooks() []CommitHook {
return db.postCommitHooks
}
func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset) {
func (db hooksDatabase) ExecuteCommitHooks(ctx context.Context, ds datas.Dataset, onlyWS bool) {
var err error
for _, hook := range db.postCommitHooks {
err = hook.Execute(ctx, ds, db)
if err != nil {
hook.HandleError(ctx, err)
if !onlyWS || hook.ExecuteForWorkingSets() {
err = hook.Execute(ctx, ds, db)
if err != nil {
hook.HandleError(ctx, err)
}
}
}
}
@@ -79,7 +83,7 @@ func (db hooksDatabase) CommitWithWorkingSet(
prevWsHash,
opts)
if err == nil {
db.ExecuteCommitHooks(ctx, commitDS)
db.ExecuteCommitHooks(ctx, commitDS, false)
}
return commitDS, workingSetDS, err
}
@@ -87,7 +91,7 @@ func (db hooksDatabase) CommitWithWorkingSet(
func (db hooksDatabase) Commit(ctx context.Context, ds datas.Dataset, v types.Value, opts datas.CommitOptions) (datas.Dataset, error) {
ds, err := db.Database.Commit(ctx, ds, v, opts)
if err == nil {
db.ExecuteCommitHooks(ctx, ds)
db.ExecuteCommitHooks(ctx, ds, false)
}
return ds, err
}
@@ -95,7 +99,7 @@ func (db hooksDatabase) Commit(ctx context.Context, ds datas.Dataset, v types.Va
func (db hooksDatabase) SetHead(ctx context.Context, ds datas.Dataset, newHeadAddr hash.Hash) (datas.Dataset, error) {
ds, err := db.Database.SetHead(ctx, ds, newHeadAddr)
if err == nil {
db.ExecuteCommitHooks(ctx, ds)
db.ExecuteCommitHooks(ctx, ds, false)
}
return ds, err
}
@@ -103,7 +107,7 @@ func (db hooksDatabase) SetHead(ctx context.Context, ds datas.Dataset, newHeadAd
func (db hooksDatabase) FastForward(ctx context.Context, ds datas.Dataset, newHeadAddr hash.Hash) (datas.Dataset, error) {
ds, err := db.Database.FastForward(ctx, ds, newHeadAddr)
if err == nil {
db.ExecuteCommitHooks(ctx, ds)
db.ExecuteCommitHooks(ctx, ds, false)
}
return ds, err
}
@@ -111,7 +115,15 @@ func (db hooksDatabase) FastForward(ctx context.Context, ds datas.Dataset, newHe
func (db hooksDatabase) Delete(ctx context.Context, ds datas.Dataset) (datas.Dataset, error) {
ds, err := db.Database.Delete(ctx, ds)
if err == nil {
db.ExecuteCommitHooks(ctx, datas.NewHeadlessDataset(ds.Database(), ds.ID()))
db.ExecuteCommitHooks(ctx, datas.NewHeadlessDataset(ds.Database(), ds.ID()), false)
}
return ds, err
}
func (db hooksDatabase) UpdateWorkingSet(ctx context.Context, ds datas.Dataset, workingSet datas.WorkingSetSpec, prevHash hash.Hash) (datas.Dataset, error) {
ds, err := db.Database.UpdateWorkingSet(ctx, ds, workingSet, prevHash)
if err == nil {
db.ExecuteCommitHooks(ctx, ds, true)
}
return ds, err
}

View File

@@ -54,6 +54,7 @@ type ServerArgs struct {
FS filesys.Filesys
DBCache DBCache
ReadOnly bool
Options []grpc.ServerOption
}
func NewServer(args ServerArgs) *Server {
@@ -66,7 +67,7 @@ func NewServer(args ServerArgs) *Server {
s.wg.Add(2)
s.grpcPort = args.GrpcPort
s.grpcSrv = grpc.NewServer(grpc.MaxRecvMsgSize(128 * 1024 * 1024))
s.grpcSrv = grpc.NewServer(append([]grpc.ServerOption{grpc.MaxRecvMsgSize(128 * 1024 * 1024)}, args.Options...)...)
var chnkSt remotesapi.ChunkStoreServiceServer = NewHttpFSBackedChunkStore(args.Logger, args.HttpHost, args.DBCache, args.FS)
if args.ReadOnly {
chnkSt = ReadOnlyChunkStore{chnkSt}

View File

@@ -30,7 +30,7 @@ import (
"sync/atomic"
"time"
"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
@@ -80,13 +80,16 @@ const (
var tracer = otel.Tracer("github.com/dolthub/dolt/go/libraries/doltcore/remotestorage")
var uploadRetryParams = backoff.NewExponentialBackOff()
var downRetryParams = backoff.NewExponentialBackOff()
func downloadBackOff(ctx context.Context) backoff.BackOff {
ret := backoff.NewExponentialBackOff()
ret.MaxInterval = 5 * time.Second
return backoff.WithContext(backoff.WithMaxRetries(ret, downRetryCount), ctx)
}
func init() {
uploadRetryParams.MaxInterval = 5 * time.Second
downRetryParams.MaxInterval = 5 * time.Second
func uploadBackOff(ctx context.Context) backoff.BackOff {
ret := backoff.NewExponentialBackOff()
ret.MaxInterval = 5 * time.Second
return backoff.WithContext(backoff.WithMaxRetries(ret, uploadRetryCount), ctx)
}
// Only hedge downloads of ranges < 4MB in length for now.
@@ -617,7 +620,7 @@ func (dcs *DoltChunkStore) getDLLocs(ctx context.Context, hashes []hash.Hash) (d
}
return processGrpcErr(err)
}
return backoff.Retry(op, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return backoff.Retry(op, grpcBackOff(ctx))
})
if err := eg.Wait(); err != nil {
@@ -1009,7 +1012,7 @@ func (dcs *DoltChunkStore) uploadTableFileWithRetries(ctx context.Context, table
return nil
}
return backoff.Retry(op, backoff.WithMaxRetries(uploadRetryParams, uploadRetryCount))
return backoff.Retry(op, uploadBackOff(ctx))
}
type Sizer interface {
@@ -1213,7 +1216,7 @@ func rangeDownloadWithRetries(ctx context.Context, stats StatsRecorder, fetcher
}
dstart := time.Now()
err := backoff.Retry(op, backoff.WithMaxRetries(downRetryParams, downRetryCount))
err := backoff.Retry(op, downloadBackOff(ctx))
if err != nil {
return nil, err
}

View File

@@ -20,7 +20,7 @@ import (
"fmt"
"net/http"
"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -46,30 +46,12 @@ func processHttpResp(resp *http.Response, err error) error {
// ProcessGrpcErr converts an error from a Grpc call into a RetriableCallState
func processGrpcErr(err error) error {
if err == nil {
return nil
}
st, ok := status.FromError(err)
if !ok {
return err
}
switch st.Code() {
case codes.OK:
return nil
case codes.Canceled,
codes.Unknown,
codes.DeadlineExceeded,
codes.Aborted,
codes.Internal,
codes.DataLoss,
codes.ResourceExhausted,
codes.Unavailable:
return err
case codes.InvalidArgument,
codes.NotFound,
codes.AlreadyExists,

View File

@@ -17,20 +17,22 @@ package remotestorage
import (
"context"
"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"google.golang.org/grpc"
)
const (
csClientRetries = 5
grpcRetries = 5
)
var csRetryParams = backoff.NewExponentialBackOff()
func grpcBackOff(ctx context.Context) backoff.BackOff {
ret := backoff.NewExponentialBackOff()
return backoff.WithContext(backoff.WithMaxRetries(ret, grpcRetries), ctx)
}
func RetryingUnaryClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
doit := func() error {
err := invoker(ctx, method, req, reply, cc, opts...)
return processGrpcErr(err)
return processGrpcErr(invoker(ctx, method, req, reply, cc, opts...))
}
return backoff.Retry(doit, backoff.WithMaxRetries(csRetryParams, csClientRetries))
return backoff.Retry(doit, grpcBackOff(ctx))
}

View File

@@ -0,0 +1,309 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import (
"context"
"errors"
"io"
"sync"
"sync/atomic"
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
)
var _ doltdb.CommitHook = (*commithook)(nil)
type commithook struct {
rootLgr *logrus.Entry
lgr atomic.Value // *logrus.Entry
remotename string
dbname string
lout io.Writer
mu sync.Mutex
wg sync.WaitGroup
cond *sync.Cond
nextHead hash.Hash
lastPushedHead hash.Hash
lastPushedSuccess time.Time
nextPushAttempt time.Time
nextHeadIncomingTime time.Time
role Role
// The standby replica to which the new root gets replicated.
destDB *doltdb.DoltDB
// When we first start replicating to the destination, we lazily
// instantiate the remote and we do not treat failures as terminal.
destDBF func(context.Context) (*doltdb.DoltDB, error)
// This database, which we are replicating from. In our current
// configuration, it is local to this server process.
srcDB *doltdb.DoltDB
tempDir string
}
var errDestDBRootHashMoved error = errors.New("cluster/commithook: standby replication: destination database root hash moved during our write, while it is assumed we are the only writer.")
const logFieldThread = "thread"
const logFieldRole = "role"
func newCommitHook(lgr *logrus.Logger, remotename, dbname string, role Role, destDBF func(context.Context) (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook {
var ret commithook
ret.rootLgr = lgr.WithField(logFieldThread, "Standby Replication - "+dbname+" to "+remotename)
ret.lgr.Store(ret.rootLgr.WithField(logFieldRole, string(role)))
ret.remotename = remotename
ret.dbname = dbname
ret.role = role
ret.destDBF = destDBF
ret.srcDB = srcDB
ret.tempDir = tempDir
ret.cond = sync.NewCond(&ret.mu)
return &ret
}
func (h *commithook) Run(bt *sql.BackgroundThreads) error {
return bt.Add("Standby Replication - "+h.dbname+" to "+h.remotename, h.run)
}
func (h *commithook) run(ctx context.Context) {
// The hook comes up attempting to replicate the current head.
h.logger().Tracef("cluster/commithook: background thread: running.")
h.wg.Add(2)
go h.replicate(ctx)
go h.tick(ctx)
<-ctx.Done()
h.logger().Tracef("cluster/commithook: background thread: requested shutdown, signaling replication thread.")
h.cond.Signal()
h.wg.Wait()
h.logger().Tracef("cluster/commithook: background thread: completed.")
}
func (h *commithook) replicate(ctx context.Context) {
defer h.wg.Done()
defer h.logger().Tracef("cluster/commithook: background thread: replicate: shutdown.")
h.mu.Lock()
defer h.mu.Unlock()
for {
lgr := h.logger()
// Shutdown for context canceled.
if ctx.Err() != nil {
lgr.Tracef("cluster/commithook replicate thread exiting; saw ctx.Err(): %v", ctx.Err())
if h.shouldReplicate() {
// attempt a last true-up of our standby as we shutdown
// TODO: context.WithDeadline based on config / convention?
h.attemptReplicate(context.Background())
}
return
}
if h.primaryNeedsInit() {
lgr.Tracef("cluster/commithook: fetching current head.")
// When the replicate thread comes up, it attempts to replicate the current head.
datasDB := doltdb.HackDatasDatabaseFromDoltDB(h.srcDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var err error
h.nextHead, err = cs.Root(ctx)
if err != nil {
// TODO: if err != nil, something is really wrong; should shutdown or backoff.
lgr.Warningf("standby replication thread failed to load database root: %v", err)
h.nextHead = hash.Hash{}
}
// We do not know when this head was written, but we
// are starting to try to replicate it now.
h.nextHeadIncomingTime = time.Now()
} else if h.shouldReplicate() {
h.attemptReplicate(ctx)
} else {
lgr.Tracef("cluster/commithook: background thread: waiting for signal.")
h.cond.Wait()
lgr.Tracef("cluster/commithook: background thread: woken up.")
}
}
}
// called with h.mu locked.
func (h *commithook) shouldReplicate() bool {
if h.role != RolePrimary {
return false
}
if h.nextHead == h.lastPushedHead {
return false
}
return (h.nextPushAttempt == (time.Time{}) || time.Now().After(h.nextPushAttempt))
}
// called with h.mu locked.
func (h *commithook) primaryNeedsInit() bool {
return h.role == RolePrimary && h.nextHead == (hash.Hash{})
}
// Called by the replicate thread to push the nextHead to the destDB and set
// its root to the new value.
//
// preconditions: h.mu is locked and shouldReplicate() returned true.
// when this function returns, h.mu is locked.
func (h *commithook) attemptReplicate(ctx context.Context) {
lgr := h.logger()
toPush := h.nextHead
incomingTime := h.nextHeadIncomingTime
destDB := h.destDB
h.mu.Unlock()
if destDB == nil {
lgr.Tracef("cluster/commithook: attempting to fetch destDB.")
var err error
destDB, err = h.destDBF(ctx)
if err != nil {
lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v.", err)
h.mu.Lock()
// TODO: We could add some backoff here.
if toPush == h.nextHead {
h.nextPushAttempt = time.Now().Add(1 * time.Second)
}
return
}
lgr.Tracef("cluster/commithook: fetched destDB")
h.mu.Lock()
h.destDB = destDB
h.mu.Unlock()
}
lgr.Tracef("cluster/commithook: pushing chunks for root hash %v to destDB", toPush.String())
err := destDB.PullChunks(ctx, h.tempDir, h.srcDB, toPush, nil, nil)
if err == nil {
lgr.Tracef("cluster/commithook: successfully pushed chunks, setting root")
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var curRootHash hash.Hash
if curRootHash, err = cs.Root(ctx); err == nil {
var ok bool
ok, err = cs.Commit(ctx, toPush, curRootHash)
if err == nil && !ok {
err = errDestDBRootHashMoved
}
}
}
h.mu.Lock()
if err == nil {
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
h.lastPushedHead = toPush
h.lastPushedSuccess = incomingTime
h.nextPushAttempt = time.Time{}
} else {
lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
// add some delay if a new head didn't come in while we were pushing.
if toPush == h.nextHead {
// TODO: We could add some backoff here.
h.nextPushAttempt = time.Now().Add(1 * time.Second)
}
}
}
func (h *commithook) replicationLag() time.Duration {
h.mu.Lock()
defer h.mu.Unlock()
if h.role == RoleStandby {
return time.Duration(0)
}
if h.nextHead == h.lastPushedHead {
return time.Duration(0)
}
// We return the wallclock time between now and the last time we were
// successful. If h.nextHeadIncomingTime is significantly earlier than
// time.Now(), because the server has not received a write in a long
// time, then this metric may report a high number when the number of
// seconds of writes outstanding could actually be much smaller.
// Operationally, failure to replicate a write for a long time is a
// problem that merits investigation, regardless of how many pending
// writes are failing to replicate.
return time.Now().Sub(h.lastPushedSuccess)
}
func (h *commithook) logger() *logrus.Entry {
return h.lgr.Load().(*logrus.Entry)
}
// TODO: Would be more efficient to only tick when we have outstanding work...
func (h *commithook) tick(ctx context.Context) {
defer h.wg.Done()
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
h.cond.Signal()
}
}
}
func (h *commithook) setRole(role Role) {
h.mu.Lock()
defer h.mu.Unlock()
// Reset head-to-push and timers here. When we transition into Primary,
// the replicate() loop will take these from the current chunk store.
h.nextHead = hash.Hash{}
h.lastPushedHead = hash.Hash{}
h.lastPushedSuccess = time.Time{}
h.nextPushAttempt = time.Time{}
h.role = role
h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role)))
h.cond.Signal()
}
// Execute on this commithook updates the target root hash we're attempting to
// replicate and wakes the replication thread.
func (h *commithook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
lgr := h.logger()
lgr.Warnf("cluster/commithook: Execute called post commit")
cs := datas.ChunkStoreFromDatabase(db)
root, err := cs.Root(ctx)
if err != nil {
lgr.Warnf("cluster/commithook: Execute: error retrieving local database root: %v", err)
return err
}
h.mu.Lock()
defer h.mu.Unlock()
if root != h.nextHead {
lgr.Tracef("signaling replication thread to push new head: %v", root.String())
h.nextHeadIncomingTime = time.Now()
h.nextHead = root
h.nextPushAttempt = time.Time{}
h.cond.Signal()
}
return nil
}
func (h *commithook) HandleError(ctx context.Context, err error) error {
return nil
}
func (h *commithook) SetLogger(ctx context.Context, wr io.Writer) error {
h.lout = wr
return nil
}
func (h *commithook) ExecuteForWorkingSets() bool {
return true
}

View File

@@ -15,13 +15,21 @@
package cluster
import (
"context"
"fmt"
"strconv"
"sync"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/store/types"
)
type Role string
@@ -38,6 +46,10 @@ type Controller struct {
epoch int
systemVars sqlvars
mu sync.Mutex
commithooks []*commithook
sinterceptor serverinterceptor
cinterceptor clientinterceptor
lgr *logrus.Logger
}
type sqlvars interface {
@@ -53,34 +65,111 @@ const (
DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch"
)
func NewController(cfg Config, pCfg config.ReadWriteConfig) (*Controller, error) {
func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) (*Controller, error) {
if cfg == nil {
return nil, nil
}
pCfg = config.NewPrefixConfig(pCfg, PersistentConfigPrefix)
role, epoch, err := applyBootstrapClusterConfig(cfg, pCfg)
role, epoch, err := applyBootstrapClusterConfig(lgr, cfg, pCfg)
if err != nil {
return nil, err
}
return &Controller{
ret := &Controller{
cfg: cfg,
persistentCfg: pCfg,
role: role,
epoch: epoch,
}, nil
commithooks: make([]*commithook, 0),
lgr: lgr,
}
ret.sinterceptor.lgr = lgr.WithFields(logrus.Fields{})
ret.sinterceptor.setRole(role, epoch)
ret.cinterceptor.lgr = lgr.WithFields(logrus.Fields{})
ret.cinterceptor.setRole(role, epoch)
return ret, nil
}
func (c *Controller) ManageSystemVariables(variables sqlvars) {
if c == nil {
return
}
c.mu.Lock()
defer c.mu.Unlock()
c.systemVars = variables
c.refreshSystemVars()
}
func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.BackgroundThreads, mrEnv *env.MultiRepoEnv, dbs ...sqle.SqlDatabase) error {
if c == nil {
return nil
}
c.mu.Lock()
defer c.mu.Unlock()
for _, db := range dbs {
denv := mrEnv.GetEnv(db.Name())
if denv == nil {
continue
}
c.lgr.Tracef("cluster/controller: applying commit hooks for %s with role %s", db.Name(), string(c.role))
hooks, err := c.applyCommitHooks(ctx, db.Name(), bt, denv)
if err != nil {
return err
}
c.commithooks = append(c.commithooks, hooks...)
}
return nil
}
func (c *Controller) applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv) ([]*commithook, error) {
ttfdir, err := denv.TempTableFilesDir()
if err != nil {
return nil, err
}
remotes, err := denv.GetRemotes()
if err != nil {
return nil, err
}
dialprovider := c.gRPCDialProvider(denv)
var hooks []*commithook
for _, r := range c.cfg.StandbyRemotes() {
remote, ok := remotes[r.Name()]
if !ok {
return nil, fmt.Errorf("sqle: cluster: standby replication: destination remote %s does not exist on database %s", r.Name(), name)
}
commitHook := newCommitHook(c.lgr, r.Name(), name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) {
return remote.GetRemoteDB(ctx, types.Format_Default, dialprovider)
}, denv.DoltDB, ttfdir)
denv.DoltDB.PrependCommitHook(ctx, commitHook)
if err := commitHook.Run(bt); err != nil {
return nil, err
}
hooks = append(hooks, commitHook)
}
return hooks, nil
}
func (c *Controller) gRPCDialProvider(denv *env.DoltEnv) dbfactory.GRPCDialProvider {
return grpcDialProvider{env.NewGRPCDialProviderFromDoltEnv(denv), &c.cinterceptor}
}
func (c *Controller) RegisterStoredProcedures(store procedurestore) {
if c == nil {
return
}
store.Register(newAssumeRoleProcedure(c))
}
func (c *Controller) RemoteSrvPort() int {
if c == nil {
return -1
}
return c.cfg.RemotesAPIConfig().Port()
}
func (c *Controller) ServerOptions() []grpc.ServerOption {
return c.sinterceptor.Options()
}
func (c *Controller) refreshSystemVars() {
role, epoch := string(c.role), c.epoch
vars := []sql.SystemVariable{
@@ -109,21 +198,28 @@ func (c *Controller) persistVariables() error {
return c.persistentCfg.SetStrings(toset)
}
func applyBootstrapClusterConfig(cfg Config, pCfg config.ReadWriteConfig) (Role, int, error) {
func applyBootstrapClusterConfig(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) (Role, int, error) {
toset := make(map[string]string)
persistentRole := pCfg.GetStringOrDefault(DoltClusterRoleVariable, "")
persistentEpoch := pCfg.GetStringOrDefault(DoltClusterRoleEpochVariable, "")
if persistentRole == "" {
if cfg.BootstrapRole() != "" {
lgr.Tracef("cluster/controller: persisted cluster role was empty, apply bootstrap_role %s", cfg.BootstrapRole())
persistentRole = cfg.BootstrapRole()
} else {
lgr.Trace("cluster/controller: persisted cluster role was empty, bootstrap_role was empty: defaulted to primary")
persistentRole = "primary"
}
toset[DoltClusterRoleVariable] = persistentRole
} else {
lgr.Tracef("cluster/controller: persisted cluster role is %s", persistentRole)
}
if persistentEpoch == "" {
persistentEpoch = strconv.Itoa(cfg.BootstrapEpoch())
lgr.Tracef("cluster/controller: persisted cluster role epoch is empty, took boostrap_epoch: %s", persistentEpoch)
toset[DoltClusterRoleEpochVariable] = persistentEpoch
} else {
lgr.Tracef("cluster/controller: persisted cluster role epoch is %s", persistentEpoch)
}
if persistentRole != string(RolePrimary) && persistentRole != string(RoleStandby) {
return "", 0, fmt.Errorf("persisted role %s.%s = %s must be \"primary\" or \"secondary\"", PersistentConfigPrefix, DoltClusterRoleVariable, persistentRole)
@@ -153,18 +249,37 @@ func (c *Controller) setRoleAndEpoch(role string, epoch int) error {
if epoch < c.epoch {
return fmt.Errorf("error assuming role '%s' at epoch %d; already at epoch %d", role, epoch, c.epoch)
}
if role == string(c.role) {
c.epoch = epoch
c.refreshSystemVars()
return c.persistVariables()
}
if role != "primary" && role != "standby" {
return fmt.Errorf("error assuming role '%s'; valid roles are 'primary' and 'standby'", role)
}
// TODO: Role is transitioning...lots of stuff to do.
changedrole := role != string(c.role)
c.role = Role(role)
c.epoch = epoch
if changedrole {
// TODO: Role is transitioning...lots of stuff to do.
}
c.refreshSystemVars()
c.cinterceptor.setRole(c.role, c.epoch)
c.sinterceptor.setRole(c.role, c.epoch)
for _, h := range c.commithooks {
h.setRole(c.role)
}
return c.persistVariables()
}
func (c *Controller) roleAndEpoch() (Role, int) {
c.mu.Lock()
defer c.mu.Unlock()
return c.role, c.epoch
}
func (c *Controller) registerCommitHook(hook *commithook) {
c.mu.Lock()
defer c.mu.Unlock()
c.commithooks = append(c.commithooks, hook)
}

View File

@@ -0,0 +1,42 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import (
"google.golang.org/grpc"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/grpcendpoint"
)
// We wrap the default environment dial provider. In the standby replication
// case, we want the following differences:
//
// - client interceptors for transmitting our replication role.
// - do not use environment credentials. (for now).
type grpcDialProvider struct {
orig dbfactory.GRPCDialProvider
ci *clientinterceptor
}
func (p grpcDialProvider) GetGRPCDialParams(config grpcendpoint.Config) (string, []grpc.DialOption, error) {
config.WithEnvCreds = false
endpoint, opts, err := p.orig.GetGRPCDialParams(config)
if err != nil {
return "", nil, err
}
opts = append(opts, p.ci.Options()...)
return endpoint, opts, nil
}

View File

@@ -0,0 +1,75 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import (
"context"
"strings"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/types"
)
func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig sqle.InitDatabaseHook) sqle.InitDatabaseHook {
if controller == nil {
return orig
}
return func(ctx *sql.Context, pro sqle.DoltDatabaseProvider, name string, denv *env.DoltEnv) error {
dialprovider := controller.gRPCDialProvider(denv)
var remoteDBs []func(context.Context) (*doltdb.DoltDB, error)
for _, r := range controller.cfg.StandbyRemotes() {
// TODO: url sanitize name
remoteUrl := strings.Replace(r.RemoteURLTemplate(), dsess.URLTemplateDatabasePlaceholder, name, -1)
// TODO: Assert remotesapi URL.
r := env.NewRemote(r.Name(), remoteUrl, nil)
err := denv.AddRemote(r)
if err != nil {
return err
}
remoteDBs = append(remoteDBs, func(ctx context.Context) (*doltdb.DoltDB, error) {
return r.GetRemoteDB(ctx, types.Format_Default, dialprovider)
})
}
err := orig(ctx, pro, name, denv)
if err != nil {
return err
}
role, _ := controller.roleAndEpoch()
for i, r := range controller.cfg.StandbyRemotes() {
ttfdir, err := denv.TempTableFilesDir()
if err != nil {
return err
}
commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir)
denv.DoltDB.PrependCommitHook(ctx, commitHook)
controller.registerCommitHook(commitHook)
if err := commitHook.Run(bt); err != nil {
return err
}
}
return nil
}
}

View File

@@ -0,0 +1,218 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import (
"context"
"strconv"
"sync"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
const clusterRoleHeader = "x-dolt-cluster-role"
const clusterRoleEpochHeader = "x-dolt-cluster-role-epoch"
// clientinterceptor is installed as a Unary and Stream client interceptor on
// the client conns that are used to communicate with standby remotes. The
// cluster.Controller sets this server's current Role and role epoch on the
// interceptor anytime it changes. In turn, this interceptor:
// * adds the server's current role and epoch to the request headers for every
// outbound request.
// * fails all outgoing requests immediately with codes.Unavailable if the role
// == RoleStandby, since this server should not be replicating when it believes
// it is a standby.
// * watches returned response headers for a situation which causes this server
// to force downgrade from primary to standby. In particular, when a returned
// response header asserts that the standby replica is a primary at a higher
// epoch than this server, this incterceptor coordinates with the Controller to
// immediately transition to standby and to stop replicating to the standby.
type clientinterceptor struct {
lgr *logrus.Entry
role Role
epoch int
mu sync.Mutex
}
func (ci *clientinterceptor) setRole(role Role, epoch int) {
ci.mu.Lock()
defer ci.mu.Unlock()
ci.role = role
ci.epoch = epoch
}
func (ci *clientinterceptor) getRole() (Role, int) {
ci.mu.Lock()
defer ci.mu.Unlock()
return ci.role, ci.epoch
}
func (ci *clientinterceptor) Stream() grpc.StreamClientInterceptor {
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
role, epoch := ci.getRole()
if role == RoleStandby {
return nil, status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby")
}
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
var header metadata.MD
stream, err := streamer(ctx, desc, cc, method, append(opts, grpc.Header(&header))...)
ci.handleResponseHeaders(header, role, epoch)
return stream, err
}
}
func (ci *clientinterceptor) Unary() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
role, epoch := ci.getRole()
if role == RoleStandby {
return status.Error(codes.Unavailable, "this server is a standby and is not currently replicating to its standby")
}
ctx = metadata.AppendToOutgoingContext(ctx, clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))
var header metadata.MD
err := invoker(ctx, method, req, reply, cc, append(opts, grpc.Header(&header))...)
ci.handleResponseHeaders(header, role, epoch)
return err
}
}
func (ci *clientinterceptor) handleResponseHeaders(header metadata.MD, role Role, epoch int) {
epochs := header.Get(clusterRoleEpochHeader)
roles := header.Get(clusterRoleHeader)
if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) {
if respepoch, err := strconv.Atoi(epochs[0]); err == nil {
if respepoch == epoch {
ci.lgr.Errorf("cluster: this server and the server replicating to it are both primary at the same epoch. force transitioning to standby.")
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
} else if respepoch > epoch {
// The server we replicate to thinks it is the primary at a higher epoch than us...
ci.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, respepoch)
// TODO: Signal to controller that we are forced to become a standby at epoch |respepoch|...
}
}
}
}
func (ci *clientinterceptor) Options() []grpc.DialOption {
return []grpc.DialOption{
grpc.WithChainUnaryInterceptor(ci.Unary()),
grpc.WithChainStreamInterceptor(ci.Stream()),
}
}
// serverinterceptor is installed as a Unary and Stream interceptor on a
// ChunkStoreServer which is serving a SQL database as a standby remote. The
// cluster.Controller sets this server's current Role and role epoch on the
// interceptor anytime it changes. In turn, this interceptor:
// * adds the server's current role and epoch to the response headers for every
// request.
// * fails all incoming requests immediately with codes.Unavailable if the
// current role == RolePrimary, since nothing should be replicating to us in
// that state.
// * watches incoming request headers for a situation which causes this server
// to force downgrade from primary to standby. In particular, when an incoming
// request asserts that the client is the current primary at an epoch higher
// than our current epoch, this interceptor coordinates with the Controller to
// immediately transition to standby and allow replication requests through.
type serverinterceptor struct {
lgr *logrus.Entry
role Role
epoch int
mu sync.Mutex
}
func (si *serverinterceptor) Stream() grpc.StreamServerInterceptor {
return func(srv interface{}, ss grpc.ServerStream, into *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if md, ok := metadata.FromIncomingContext(ss.Context()); ok {
role, epoch := si.getRole()
si.handleRequestHeaders(md, role, epoch)
}
// After handleRequestHeaders, our role may have changed, so we fetch it again here.
role, epoch := si.getRole()
if err := grpc.SetHeader(ss.Context(), metadata.Pairs(clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))); err != nil {
return err
}
if role == RolePrimary {
// As a primary, we do not accept replication requests.
return status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication")
}
return handler(srv, ss)
}
}
func (si *serverinterceptor) Unary() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
role, epoch := si.getRole()
si.handleRequestHeaders(md, role, epoch)
}
// After handleRequestHeaders, our role may have changed, so we fetch it again here.
role, epoch := si.getRole()
if err := grpc.SetHeader(ctx, metadata.Pairs(clusterRoleHeader, string(role), clusterRoleEpochHeader, strconv.Itoa(epoch))); err != nil {
return nil, err
}
if role == RolePrimary {
// As a primary, we do not accept replication requests.
return nil, status.Error(codes.Unavailable, "this server is a primary and is not currently accepting replication")
}
return handler(ctx, req)
}
}
func (si *serverinterceptor) handleRequestHeaders(header metadata.MD, role Role, epoch int) {
epochs := header.Get(clusterRoleEpochHeader)
roles := header.Get(clusterRoleHeader)
if len(epochs) > 0 && len(roles) > 0 && roles[0] == string(RolePrimary) && role == RolePrimary {
if reqepoch, err := strconv.Atoi(epochs[0]); err == nil {
if reqepoch == epoch {
// Misconfiguration in the cluster means this
// server and its standby are marked as Primary
// at the same epoch. We will become standby
// and our peer will become standby. An
// operator will need to get involved.
si.lgr.Errorf("cluster: this server and its standby replica are both primary at the same epoch. force transitioning to standby.")
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
} else if reqepoch > epoch {
// The client replicating to us thinks it is the primary at a higher epoch than us.
si.lgr.Warnf("cluster: this server is primary at epoch %d. the server replicating to it is primary at epoch %d. force transitioning to standby.", epoch, reqepoch)
// TODO: Signal to controller that we are forced to become a standby at epoch |reqepoch|
}
}
}
}
func (si *serverinterceptor) Options() []grpc.ServerOption {
return []grpc.ServerOption{
grpc.ChainUnaryInterceptor(si.Unary()),
grpc.ChainStreamInterceptor(si.Stream()),
}
}
func (si *serverinterceptor) setRole(role Role, epoch int) {
si.mu.Lock()
defer si.mu.Unlock()
si.role = role
si.epoch = epoch
}
func (si *serverinterceptor) getRole() (Role, int) {
si.mu.Lock()
defer si.mu.Unlock()
return si.role, si.epoch
}

View File

@@ -0,0 +1,185 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import (
"context"
"net"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
type server struct {
md metadata.MD
}
func (s *server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
s.md, _ = metadata.FromIncomingContext(ctx)
return nil, status.Errorf(codes.Unimplemented, "method Check not implemented")
}
func (s *server) Watch(req *grpc_health_v1.HealthCheckRequest, ss grpc_health_v1.Health_WatchServer) error {
s.md, _ = metadata.FromIncomingContext(ss.Context())
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
}
func withClient(t *testing.T, cb func(*testing.T, grpc_health_v1.HealthClient), serveropts []grpc.ServerOption, dialopts []grpc.DialOption) *server {
addr, err := net.ResolveUnixAddr("unix", "test_grpc.socket")
require.NoError(t, err)
lis, err := net.ListenUnix("unix", addr)
require.NoError(t, err)
var wg sync.WaitGroup
var srvErr error
wg.Add(1)
srv := grpc.NewServer(serveropts...)
hs := new(server)
grpc_health_v1.RegisterHealthServer(srv, hs)
defer func() {
if srv != nil {
srv.GracefulStop()
wg.Wait()
}
}()
go func() {
defer wg.Done()
srvErr = srv.Serve(lis)
}()
cc, err := grpc.Dial("unix:test_grpc.socket", append([]grpc.DialOption{grpc.WithInsecure()}, dialopts...)...)
require.NoError(t, err)
client := grpc_health_v1.NewHealthClient(cc)
cb(t, client)
srv.GracefulStop()
wg.Wait()
srv = nil
require.NoError(t, srvErr)
return hs
}
func TestServerInterceptorAddsUnaryResponseHeaders(t *testing.T) {
var si serverinterceptor
si.setRole(RoleStandby, 10)
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
var md metadata.MD
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
assert.Equal(t, codes.Unimplemented, status.Code(err))
if assert.Len(t, md.Get(clusterRoleHeader), 1) {
assert.Equal(t, "standby", md.Get(clusterRoleHeader)[0])
}
if assert.Len(t, md.Get(clusterRoleEpochHeader), 1) {
assert.Equal(t, "10", md.Get(clusterRoleEpochHeader)[0])
}
}, si.Options(), nil)
}
func TestServerInterceptorAddsStreamResponseHeaders(t *testing.T) {
var si serverinterceptor
si.setRole(RoleStandby, 10)
withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
var md metadata.MD
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{}, grpc.Header(&md))
require.NoError(t, err)
_, err = srv.Recv()
assert.Equal(t, codes.Unimplemented, status.Code(err))
if assert.Len(t, md.Get(clusterRoleHeader), 1) {
assert.Equal(t, "standby", md.Get(clusterRoleHeader)[0])
}
if assert.Len(t, md.Get(clusterRoleEpochHeader), 1) {
assert.Equal(t, "10", md.Get(clusterRoleEpochHeader)[0])
}
}, si.Options(), nil)
}
func TestServerInterceptorAsPrimaryDoesNotSendRequest(t *testing.T) {
var si serverinterceptor
si.setRole(RolePrimary, 10)
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
ctx := metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
_, err := client.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unavailable, status.Code(err))
ctx = metadata.AppendToOutgoingContext(context.Background(), "test-header", "test-header-value")
ss, err := client.Watch(ctx, &grpc_health_v1.HealthCheckRequest{})
assert.NoError(t, err)
_, err = ss.Recv()
assert.Equal(t, codes.Unavailable, status.Code(err))
}, si.Options(), nil)
assert.Nil(t, srv.md)
}
func TestClientInterceptorAddsUnaryRequestHeaders(t *testing.T) {
var ci clientinterceptor
ci.setRole(RolePrimary, 10)
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unimplemented, status.Code(err))
}, nil, ci.Options())
if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) {
assert.Equal(t, "primary", srv.md.Get(clusterRoleHeader)[0])
}
if assert.Len(t, srv.md.Get(clusterRoleEpochHeader), 1) {
assert.Equal(t, "10", srv.md.Get(clusterRoleEpochHeader)[0])
}
}
func TestClientInterceptorAddsStreamRequestHeaders(t *testing.T) {
var ci clientinterceptor
ci.setRole(RolePrimary, 10)
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
srv, err := client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
require.NoError(t, err)
_, err = srv.Recv()
assert.Equal(t, codes.Unimplemented, status.Code(err))
}, nil, ci.Options())
if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) {
assert.Equal(t, "primary", srv.md.Get(clusterRoleHeader)[0])
}
if assert.Len(t, srv.md.Get(clusterRoleEpochHeader), 1) {
assert.Equal(t, "10", srv.md.Get(clusterRoleEpochHeader)[0])
}
}
func TestClientInterceptorAsStandbyDoesNotSendRequest(t *testing.T) {
var ci clientinterceptor
ci.setRole(RolePrimary, 10)
srv := withClient(t, func(t *testing.T, client grpc_health_v1.HealthClient) {
_, err := client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unimplemented, status.Code(err))
ci.setRole(RoleStandby, 11)
_, err = client.Check(context.Background(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unavailable, status.Code(err))
_, err = client.Watch(context.Background(), &grpc_health_v1.HealthCheckRequest{})
assert.Equal(t, codes.Unavailable, status.Code(err))
}, nil, ci.Options())
if assert.Len(t, srv.md.Get(clusterRoleHeader), 1) {
assert.Equal(t, "primary", srv.md.Get(clusterRoleHeader)[0])
}
if assert.Len(t, srv.md.Get(clusterRoleEpochHeader), 1) {
assert.Equal(t, "10", srv.md.Get(clusterRoleEpochHeader)[0])
}
}

View File

@@ -47,6 +47,7 @@ type DoltDatabaseProvider struct {
databases map[string]sql.Database
functions map[string]sql.Function
externalProcedures sql.ExternalStoredProcedureRegistry
InitDatabaseHook InitDatabaseHook
mu *sync.RWMutex
defaultBranch string
@@ -114,6 +115,7 @@ func NewDoltDatabaseProviderWithDatabases(defaultBranch string, fs filesys.Files
fs: fs,
defaultBranch: defaultBranch,
dbFactoryUrl: doltdb.LocalDirDoltDB,
InitDatabaseHook: ConfigureReplicationDatabaseHook,
}, nil
}
@@ -347,8 +349,10 @@ func (p DoltDatabaseProvider) CreateDatabase(ctx *sql.Context, name string) erro
return err
}
// If replication is configured, set it up for the new database as well
err = p.configureReplication(ctx, name, newEnv)
// If we have an initialization hook, invoke it. By default, this will
// be ConfigureReplicationDatabaseHook, which will setup replication
// for the new database if a remote url template is set.
err = p.InitDatabaseHook(ctx, p, name, newEnv)
if err != nil {
return err
}
@@ -365,9 +369,11 @@ func (p DoltDatabaseProvider) CreateDatabase(ctx *sql.Context, name string) erro
return sess.AddDB(ctx, dbstate)
}
type InitDatabaseHook func(ctx *sql.Context, pro DoltDatabaseProvider, name string, env *env.DoltEnv) error
// configureReplication sets up replication for a newly created database as necessary
// TODO: consider the replication heads / all heads setting
func (p DoltDatabaseProvider) configureReplication(ctx *sql.Context, name string, newEnv *env.DoltEnv) error {
func ConfigureReplicationDatabaseHook(ctx *sql.Context, p DoltDatabaseProvider, name string, newEnv *env.DoltEnv) error {
_, replicationRemoteName, _ := sql.SystemVariables.GetGlobal(dsess.ReplicateToRemote)
if replicationRemoteName == "" {
return nil

View File

@@ -18,7 +18,6 @@ import (
"errors"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
@@ -51,14 +50,9 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e
return rss, nil
}
func NewRemoteSrvServer(lgr *logrus.Entry, ctx *sql.Context, port int) *remotesrv.Server {
func NewRemoteSrvServer(ctx *sql.Context, args remotesrv.ServerArgs) *remotesrv.Server {
sess := dsess.DSessFromSess(ctx.Session)
return remotesrv.NewServer(remotesrv.ServerArgs{
Logger: lgr,
HttpPort: port,
GrpcPort: port,
FS: sess.Provider().FileSystem(),
DBCache: remotesrvStore{ctx},
ReadOnly: true,
})
args.FS = sess.Provider().FileSystem()
args.DBCache = remotesrvStore{ctx}
return remotesrv.NewServer(args)
}

View File

@@ -19,7 +19,7 @@ import (
"errors"
"io"
"github.com/cenkalti/backoff"
"github.com/cenkalti/backoff/v4"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"

View File

@@ -190,13 +190,12 @@ stop_sql_server() {
wait=$1
if [ ! -z "$SERVER_PID" ]; then
kill $SERVER_PID
if [ $wait ]; then
while ps -p $SERVER_PID > /dev/null; do
sleep .1;
done
fi;
fi
if [ $wait ]; then
while ps -p $SERVER_PID > /dev/null; do
sleep .1;
done
fi;
SERVER_PID=
}

View File

@@ -3,119 +3,137 @@ load $BATS_TEST_DIRNAME/helper/common.bash
load $BATS_TEST_DIRNAME/helper/query-server-common.bash
make_repo() {
mkdir "$1"
cd "$1"
dolt init
cd ..
mkdir -p "$1"
(cd "$1"; dolt init)
}
SERVERONE_MYSQL_PORT=3309
SERVERONE_GRPC_PORT=50051
SERVERTWO_MYSQL_PORT=3310
SERVERTWO_GRPC_PORT=50052
setup() {
skiponwindows "tests are flaky on Windows"
setup_no_dolt_init
make_repo repo1
make_repo repo2
make_repo serverone/repo1
make_repo serverone/repo2
make_repo servertwo/repo1
make_repo servertwo/repo2
}
teardown() {
stop_sql_server
stop_sql_server "1"
teardown_common
}
@test "sql-server-cluster: persisted role and epoch take precedence over bootstrap values" {
cd serverone
echo "
user:
name: dolt
listener:
host: 0.0.0.0
port: 3309
port: ${SERVERONE_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: doltdb-1
remote_url_template: http://doltdb-1.doltdb:50051/{database}
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: standby
bootstrap_epoch: 10
remotesapi:
port: 50051" > server.yaml
port: ${SERVERONE_GRPC_PORT}" > server.yaml
(cd repo1 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo1)
(cd repo2 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo2)
dolt sql-server --config server.yaml &
SERVER_PID=$!
wait_for_connection 3309 5000
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
server_query_with_port 3309 repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
kill $SERVER_PID
wait $SERVER_PID
SERVER_PID=
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: 3309
port: ${SERVERONE_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: doltdb-1
remote_url_template: http://doltdb-1.doltdb:50051/{database}
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: primary
bootstrap_epoch: 0
remotesapi:
port: 50051" > server.yaml
port: ${SERVERONE_GRPC_PORT}" > server.yaml
dolt sql-server --config server.yaml &
SERVER_PID=$!
wait_for_connection 3309 5000
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
server_query_with_port 3309 repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
}
@test "sql-server-cluster: dolt_assume_cluster_role" {
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: 3309
port: ${SERVERONE_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: doltdb-1
remote_url_template: http://doltdb-1.doltdb:50051/{database}
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: standby
bootstrap_epoch: 10
remotesapi:
port: 50051" > server.yaml
port: ${SERVERONE_GRPC_PORT}" > server.yaml
(cd repo1 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo1)
(cd repo2 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo2)
dolt sql-server --config server.yaml &
SERVER_PID=$!
wait_for_connection 3309 5000
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
# stale epoch
run server_query_with_port 3309 repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '9');" "" 1
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '9');" "" 1
[[ "$output" =~ "error assuming role" ]] || false
# wrong role at current epoch
run server_query_with_port 3309 repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '10');" "" 1
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '10');" "" 1
[[ "$output" =~ "error assuming role" ]] || false
# wrong role name
run server_query_with_port 3309 repo1 1 dolt "" "call dolt_assume_cluster_role('backup', '11');" "" 1
run server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('backup', '11');" "" 1
[[ "$output" =~ "error assuming role" ]] || false
# successes
# same role, same epoch
server_query_with_port 3309 repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '10');" "status\n0"
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '10');" "status\n0"
# same role, new epoch
server_query_with_port 3309 repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12"
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('standby', '12'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,12"
# new role, new epoch
server_query_with_port 3309 repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "call dolt_assume_cluster_role('primary', '13'); select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "status\n0;@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
# Server comes back up with latest assumed role.
kill $SERVER_PID
@@ -123,7 +141,142 @@ cluster:
SERVER_PID=
dolt sql-server --config server.yaml &
SERVER_PID=$!
wait_for_connection 3309 5000
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
server_query_with_port 3309 repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nprimary,13"
}
@test "sql-server-cluster: create database makes a new remote" {
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: ${SERVERONE_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: primary
bootstrap_epoch: 10
remotesapi:
port: ${SERVERONE_GRPC_PORT}" > server.yaml
(cd repo1 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo1)
(cd repo2 && dolt remote add standby http://localhost:"${SERVERTWO_GRPC_PORT}"/repo2)
dolt sql-server --config server.yaml &
SERVER_PID=$!
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "create database a_new_database;use a_new_database;select name, url from dolt_remotes" ";;name,url\nstandby,http://localhost:50052/a_new_database"
}
@test "sql-server-cluster: sql-server fails to start if a configured remote is missing" {
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: ${SERVERONE_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: primary
bootstrap_epoch: 10
remotesapi:
port: ${SERVERONE_GRPC_PORT}" > server.yaml
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
run dolt sql-server --config server.yaml
[ "$status" -ne 0 ]
[[ "$output" =~ "destination remote standby does not exist" ]] || false
}
@test "sql-server-cluster: primary comes up and replicates to standby" {
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: ${SERVERONE_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: primary
bootstrap_epoch: 10
remotesapi:
port: ${SERVERONE_GRPC_PORT}" > server.yaml
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
(cd repo1 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo1)
(cd repo2 && dolt remote add standby http://localhost:${SERVERTWO_GRPC_PORT}/repo2)
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
serverone_pid=$!
cd ../servertwo
echo "
log_level: trace
user:
name: dolt
listener:
host: 0.0.0.0
port: ${SERVERTWO_MYSQL_PORT}
behavior:
read_only: false
autocommit: true
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
bootstrap_role: standby
bootstrap_epoch: 10
remotesapi:
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.email bats@email.fake
DOLT_ROOT_PATH=`pwd` dolt config --global --add user.name "Bats Tests"
DOLT_ROOT_PATH=`pwd` dolt config --global --add metrics.disabled true
(cd repo1 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo1)
(cd repo2 && dolt remote add standby http://localhost:${SERVERONE_GRPC_PORT}/repo2)
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
servertwo_pid=$!
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5);"
kill $serverone_pid
wait $serverone_pid
kill $servertwo_pid
wait $servertwo_pid
run env DOLT_ROOT_PATH=`pwd` dolt sql -q 'select count(*) from vals'
[ "$status" -eq 0 ]
[[ "$output" =~ "| 5 " ]] || false
}