go: sqle: cluster: Add logging to our replication commit hook. Make the commit hook attempt to true-up standby on shutdown.

This commit is contained in:
Aaron Son
2022-09-27 14:46:38 -07:00
parent e648d9bceb
commit 98f4321ec8
5 changed files with 111 additions and 55 deletions

View File

@@ -124,7 +124,7 @@ func Serve(
}
}
clusterController, err := cluster.NewController(serverConfig.ClusterConfig(), mrEnv.Config())
clusterController, err := cluster.NewController(lgr, serverConfig.ClusterConfig(), mrEnv.Config())
if err != nil {
return err, nil
}

View File

@@ -22,6 +22,7 @@ import (
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/store/datas"
@@ -31,6 +32,7 @@ import (
var _ doltdb.CommitHook = (*commithook)(nil)
type commithook struct {
lgr *logrus.Entry
remotename string
dbname string
lout io.Writer
@@ -45,10 +47,10 @@ type commithook struct {
role Role
// The standby replica to which the new root gets replicated.
destDB *doltdb.DoltDB
destDB *doltdb.DoltDB
// When we first start replicating to the destination, we lazily
// instantiate the remote and we do not treat failures as terminal.
destDBF func() (*doltdb.DoltDB, error)
destDBF func(context.Context) (*doltdb.DoltDB, error)
// This database, which we are replicating from. In our current
// configuration, it is local to this server process.
srcDB *doltdb.DoltDB
@@ -58,8 +60,9 @@ type commithook struct {
var errDestDBRootHashMoved error = errors.New("sqle: cluster: standby replication: destination database root hash moved during our write, while it is assumed we are the only writer.")
func newCommitHook(remotename, dbname string, role Role, destDBF func() (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook {
func newCommitHook(lgr *logrus.Logger, remotename, dbname string, role Role, destDBF func(context.Context) (*doltdb.DoltDB, error), srcDB *doltdb.DoltDB, tempDir string) *commithook {
var ret commithook
ret.lgr = lgr.WithField("thread", "Standby Replication - "+dbname+" to "+remotename)
ret.remotename = remotename
ret.dbname = dbname
ret.role = role
@@ -76,67 +79,108 @@ func (h *commithook) Run(bt *sql.BackgroundThreads) error {
func (h *commithook) replicate(ctx context.Context) {
defer h.wg.Done()
defer h.lgr.Tracef("cluster/commithook: background thread: replicate: shutdown.")
h.mu.Lock()
defer h.mu.Unlock()
for {
// Shutdown for context canceled.
if ctx.Err() != nil {
h.lgr.Tracef("commithook replicate thread exiting; saw ctx.Err(): %v", ctx.Err())
if h.shouldReplicate() {
// attempt a last true-up of our standby as we shutdown
// TODO: context.WithDeadline based on config / convention?
h.attemptReplicate(context.Background())
}
return
}
if h.role == RolePrimary && h.nextHead == (hash.Hash{}) {
h.lgr.Tracef("cluster/commithook: fetching current head.")
// When the replicate thread comes up, it attempts to replicate the current head.
datasDB := doltdb.HackDatasDatabaseFromDoltDB(h.srcDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var err error
h.nextHead, err = cs.Root(ctx)
if err != nil {
// TODO: if err != nil, something is really wrong; should shutdown or backoff.
h.lgr.Warning("standby replication thread failed to load database root: %v", err)
h.nextHead = hash.Hash{}
}
} else if h.role == RolePrimary && h.nextHead != h.lastPushedHead && (h.nextPushAttempt == (time.Time{}) || time.Now().After(h.nextPushAttempt)) {
toPush := h.nextHead
destDB := h.destDB
h.mu.Unlock()
if destDB == nil {
var err error
destDB, err = h.destDBF()
if err != nil {
h.mu.Lock()
// TODO: Log this.
continue
}
h.mu.Lock()
h.destDB = destDB
h.mu.Unlock()
}
err := destDB.PullChunks(ctx, h.tempDir, h.srcDB, toPush, nil, nil)
if err == nil {
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var curRootHash hash.Hash
if curRootHash, err = cs.Root(ctx); err == nil {
var ok bool
ok, err = cs.Commit(ctx, toPush, curRootHash)
if err == nil && !ok {
err = errDestDBRootHashMoved
}
}
}
h.mu.Lock()
if err == nil {
h.lastPushedHead = toPush
h.lastPushedSuccess = time.Now()
h.nextPushAttempt = time.Time{}
} else {
if toPush == h.nextHead {
// TODO: We could add some backoff here.
h.nextPushAttempt = time.Now().Add(1 * time.Second)
}
}
} else if h.shouldReplicate() {
h.attemptReplicate(ctx)
} else {
h.lgr.Tracef("cluster/commithook: background thread: waiting for signal.")
h.cond.Wait()
h.lgr.Tracef("cluster/commithook: background thread: woken up.")
}
}
}
// called with h.mu locked.
func (h *commithook) shouldReplicate() bool {
if h.role != RolePrimary {
return false
}
if h.nextHead == h.lastPushedHead {
return false
}
return (h.nextPushAttempt == (time.Time{}) || time.Now().After(h.nextPushAttempt))
}
// Called by the replicate thread to push the nextHead to the destDB and set
// its root to the new value.
//
// preconditions: h.mu is locked and shouldReplicate() returned true.
// when this function returns, h.mu is locked.
func (h *commithook) attemptReplicate(ctx context.Context) {
toPush := h.nextHead
destDB := h.destDB
h.mu.Unlock()
if destDB == nil {
h.lgr.Tracef("cluster/commithook: attempting to fetch destDB.")
var err error
destDB, err = h.destDBF(ctx)
if err != nil {
h.lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v.", err)
h.mu.Lock()
// TODO: We could add some backoff here.
h.nextPushAttempt = time.Now().Add(1 * time.Second)
return
}
h.lgr.Tracef("cluster/commithook: fetched destDB")
h.mu.Lock()
h.destDB = destDB
h.mu.Unlock()
}
h.lgr.Tracef("cluster/commithook: pushing chunks for root hash %v to destDB", toPush.String())
err := destDB.PullChunks(ctx, h.tempDir, h.srcDB, toPush, nil, nil)
if err == nil {
h.lgr.Tracef("cluster/commithook: successfully pushed chunks, setting root")
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
var curRootHash hash.Hash
if curRootHash, err = cs.Root(ctx); err == nil {
var ok bool
ok, err = cs.Commit(ctx, toPush, curRootHash)
if err == nil && !ok {
err = errDestDBRootHashMoved
}
}
}
h.mu.Lock()
if err == nil {
h.lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
h.lastPushedHead = toPush
h.lastPushedSuccess = time.Now()
h.nextPushAttempt = time.Time{}
} else {
h.lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
// add some delay if a new head didn't come in while we were pushing.
if toPush == h.nextHead {
// TODO: We could add some backoff here.
h.nextPushAttempt = time.Now().Add(1 * time.Second)
}
}
}
@@ -144,11 +188,13 @@ func (h *commithook) replicate(ctx context.Context) {
// TODO: Would be more efficient to only tick when we have outstanding work...
func (h *commithook) tick(ctx context.Context) {
defer h.wg.Done()
defer h.lgr.Trace("tick thread returning")
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
time.Sleep(1 * time.Second)
return
case <-ticker.C:
h.cond.Signal()
@@ -158,12 +204,15 @@ func (h *commithook) tick(ctx context.Context) {
func (h *commithook) run(ctx context.Context) {
// The hook comes up attempting to replicate the current head.
h.lgr.Tracef("cluster/commithook: background thread: running.")
h.wg.Add(2)
go h.replicate(ctx)
go h.tick(ctx)
<-ctx.Done()
h.lgr.Tracef("cluster/commithook: background thread: requested shutdown, signaling replication thread.")
h.cond.Signal()
h.wg.Wait()
h.lgr.Tracef("cluster/commithook: background thread: completed.")
}
func (h *commithook) setRole(role Role) {

View File

@@ -21,6 +21,7 @@ import (
"sync"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
@@ -47,6 +48,7 @@ type Controller struct {
commithooks []*commithook
sinterceptor serverinterceptor
cinterceptor clientinterceptor
lgr *logrus.Logger
}
type sqlvars interface {
@@ -62,7 +64,7 @@ const (
DoltClusterRoleEpochVariable = "dolt_cluster_role_epoch"
)
func NewController(cfg Config, pCfg config.ReadWriteConfig) (*Controller, error) {
func NewController(lgr *logrus.Logger, cfg Config, pCfg config.ReadWriteConfig) (*Controller, error) {
if cfg == nil {
return nil, nil
}
@@ -77,6 +79,7 @@ func NewController(cfg Config, pCfg config.ReadWriteConfig) (*Controller, error)
role: role,
epoch: epoch,
commithooks: make([]*commithook, 0),
lgr: lgr,
}, nil
}
@@ -101,7 +104,7 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.
if denv == nil {
continue
}
hooks, err := applyCommitHooks(ctx, db.Name(), bt, denv, c.cfg, c.role)
hooks, err := applyCommitHooks(ctx, c.lgr, db.Name(), bt, denv, c.cfg, c.role)
if err != nil {
return err
}
@@ -110,7 +113,7 @@ func (c *Controller) ApplyStandbyReplicationConfig(ctx context.Context, bt *sql.
return nil
}
func applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv, cfg Config, role Role) ([]*commithook, error) {
func applyCommitHooks(ctx context.Context, lgr *logrus.Logger, name string, bt *sql.BackgroundThreads, denv *env.DoltEnv, cfg Config, role Role) ([]*commithook, error) {
ttfdir, err := denv.TempTableFilesDir()
if err != nil {
return nil, err
@@ -125,7 +128,7 @@ func applyCommitHooks(ctx context.Context, name string, bt *sql.BackgroundThread
if !ok {
return nil, fmt.Errorf("sqle: cluster: standby replication: destination remote %s does not exist on database %s", r.Name(), name)
}
commitHook := newCommitHook(r.Name(), name, role, func() (*doltdb.DoltDB, error) {
commitHook := newCommitHook(lgr, r.Name(), name, role, func(ctx context.Context) (*doltdb.DoltDB, error) {
return remote.GetRemoteDB(ctx, types.Format_Default, denv)
}, denv.DoltDB, ttfdir)
denv.DoltDB.PrependCommitHook(ctx, commitHook)

View File

@@ -15,6 +15,7 @@
package cluster
import (
"context"
"strings"
"github.com/dolthub/go-mysql-server/sql"
@@ -31,7 +32,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
return orig
}
return func(ctx *sql.Context, pro sqle.DoltDatabaseProvider, name string, denv *env.DoltEnv) error {
var remoteDBs []func() (*doltdb.DoltDB, error)
var remoteDBs []func(context.Context) (*doltdb.DoltDB, error)
for _, r := range controller.cfg.StandbyRemotes() {
// TODO: url sanitize name
remoteUrl := strings.Replace(r.RemoteURLTemplate(), dsess.URLTemplateDatabasePlaceholder, name, -1)
@@ -44,7 +45,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
return err
}
remoteDBs = append(remoteDBs, func() (*doltdb.DoltDB, error) {
remoteDBs = append(remoteDBs, func(ctx context.Context) (*doltdb.DoltDB, error) {
return r.GetRemoteDB(ctx, types.Format_Default, denv)
})
}
@@ -60,7 +61,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
if err != nil {
return err
}
commitHook := newCommitHook(r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir)
commitHook := newCommitHook(controller.lgr, r.Name(), name, role, remoteDBs[i], denv.DoltDB, ttfdir)
denv.DoltDB.PrependCommitHook(ctx, commitHook)
controller.registerCommitHook(commitHook)
if err := commitHook.Run(bt); err != nil {

View File

@@ -17,7 +17,7 @@ setup() {
}
teardown() {
stop_sql_server
stop_sql_server "1"
teardown_common
}
@@ -53,6 +53,7 @@ cluster:
SERVER_PID=
echo "
log_level: trace
user:
name: dolt
listener:
@@ -79,6 +80,7 @@ cluster:
@test "sql-server-cluster: dolt_assume_cluster_role" {
echo "
log_level: trace
user:
name: dolt
listener:
@@ -134,6 +136,7 @@ cluster:
@test "sql-server-cluster: create database makes a new remote" {
echo "
log_level: trace
user:
name: dolt
listener: