Merge pull request #4429 from dolthub/aaron/sql-cluster-status-table

go: sqle: cluster: Add a dolt_cluster database, exposing a dolt_cluster_status table which queries replication status.
This commit is contained in:
Aaron Son
2022-09-29 15:24:25 -07:00
committed by GitHub
8 changed files with 298 additions and 33 deletions

View File

@@ -99,6 +99,12 @@ func NewSqlEngine(
all := append(dsqleDBsAsSqlDBs(dbs), infoDB)
locations = append(locations, nil)
clusterDB := config.ClusterController.ClusterDatabase()
if clusterDB != nil {
all = append(all, clusterDB)
locations = append(locations, nil)
}
b := env.GetDefaultInitBranch(mrEnv.Config())
pro, err := dsqle.NewDoltDatabaseProviderWithDatabases(b, mrEnv.FileSystem(), all, locations)
if err != nil {

View File

@@ -17,6 +17,7 @@ package cluster
import (
"context"
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
@@ -46,6 +47,7 @@ type commithook struct {
lastPushedSuccess time.Time
nextPushAttempt time.Time
nextHeadIncomingTime time.Time
currentError *string
role Role
@@ -173,6 +175,8 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
var err error
destDB, err = h.destDBF(ctx)
if err != nil {
h.currentError = new(string)
*h.currentError = fmt.Sprintf("could not replicate to standby: error fetching destDB: %v", err)
lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v.", err)
h.mu.Lock()
// TODO: We could add some backoff here.
@@ -205,11 +209,14 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
h.mu.Lock()
if err == nil {
h.currentError = nil
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
h.lastPushedHead = toPush
h.lastPushedSuccess = incomingTime
h.nextPushAttempt = time.Time{}
} else {
h.currentError = new(string)
*h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err)
lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err)
// add some delay if a new head didn't come in while we were pushing.
if toPush == h.nextHead {
@@ -219,24 +226,36 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
}
}
func (h *commithook) replicationLag() time.Duration {
func (h *commithook) status() (replicationLag *time.Duration, lastUpdate *time.Time, currentErr *string) {
h.mu.Lock()
defer h.mu.Unlock()
if h.role == RoleStandby {
return time.Duration(0)
if h.role == RolePrimary {
if h.lastPushedHead != (hash.Hash{}) {
replicationLag = new(time.Duration)
if h.nextHead != h.lastPushedHead {
// We return the wallclock time between now and the last time we were
// successful. If h.nextHeadIncomingTime is significantly earlier than
// time.Now(), because the server has not received a write in a long
// time, then this metric may report a high number when the number of
// seconds of writes outstanding could actually be much smaller.
// Operationally, failure to replicate a write for a long time is a
// problem that merits investigation, regardless of how many pending
// writes are failing to replicate.
*replicationLag = time.Now().Sub(h.lastPushedSuccess)
}
}
if h.lastPushedSuccess != (time.Time{}) {
lastUpdate := new(time.Time)
*lastUpdate = h.lastPushedSuccess
}
}
if h.nextHead == h.lastPushedHead {
return time.Duration(0)
}
// We return the wallclock time between now and the last time we were
// successful. If h.nextHeadIncomingTime is significantly earlier than
// time.Now(), because the server has not received a write in a long
// time, then this metric may report a high number when the number of
// seconds of writes outstanding could actually be much smaller.
// Operationally, failure to replicate a write for a long time is a
// problem that merits investigation, regardless of how many pending
// writes are failing to replicate.
return time.Now().Sub(h.lastPushedSuccess)
currentErr = h.currentError
// TODO: lastUpdate in Standby role.
return
}
func (h *commithook) logger() *logrus.Entry {
@@ -263,6 +282,7 @@ func (h *commithook) setRole(role Role) {
defer h.mu.Unlock()
// Reset head-to-push and timers here. When we transition into Primary,
// the replicate() loop will take these from the current chunk store.
h.currentError = nil
h.nextHead = hash.Hash{}
h.lastPushedHead = hash.Hash{}
h.lastPushedSuccess = time.Time{}

View File

@@ -28,6 +28,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/clusterdb"
"github.com/dolthub/dolt/go/libraries/utils/config"
"github.com/dolthub/dolt/go/store/types"
)
@@ -159,6 +160,13 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) {
store.Register(newAssumeRoleProcedure(c))
}
func (c *Controller) ClusterDatabase() sql.Database {
if c == nil {
return nil
}
return clusterdb.NewClusterDatabase(c)
}
func (c *Controller) RemoteSrvPort() int {
if c == nil {
return -1
@@ -283,3 +291,28 @@ func (c *Controller) registerCommitHook(hook *commithook) {
defer c.mu.Unlock()
c.commithooks = append(c.commithooks, hook)
}
func (c *Controller) GetClusterStatus() []clusterdb.ReplicaStatus {
if c == nil {
return []clusterdb.ReplicaStatus{}
}
c.mu.Lock()
epoch, role := c.epoch, c.role
commithooks := make([]*commithook, len(c.commithooks))
copy(commithooks, c.commithooks)
c.mu.Unlock()
ret := make([]clusterdb.ReplicaStatus, len(commithooks))
for i, c := range commithooks {
lag, lastUpdate, currentErrorStr := c.status()
ret[i] = clusterdb.ReplicaStatus{
Database: c.dbname,
Remote: c.remotename,
Role: string(role),
Epoch: epoch,
ReplicationLag: lag,
LastUpdate: lastUpdate,
CurrentError: currentErrorStr,
}
}
return ret
}

View File

@@ -15,7 +15,10 @@
package cluster
import (
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/grpcendpoint"
@@ -38,5 +41,14 @@ func (p grpcDialProvider) GetGRPCDialParams(config grpcendpoint.Config) (string,
return "", nil, err
}
opts = append(opts, p.ci.Options()...)
opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoff.Config{
BaseDelay: 250 * time.Millisecond,
Multiplier: 1.6,
Jitter: 0.6,
MaxDelay: 10 * time.Second,
},
MinConnectTimeout: 250 * time.Millisecond,
}))
return endpoint, opts, nil
}

View File

@@ -32,6 +32,12 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
return orig
}
return func(ctx *sql.Context, pro sqle.DoltDatabaseProvider, name string, denv *env.DoltEnv) error {
var err error
err = orig(ctx, pro, name, denv)
if err != nil {
return err
}
dialprovider := controller.gRPCDialProvider(denv)
var remoteDBs []func(context.Context) (*doltdb.DoltDB, error)
for _, r := range controller.cfg.StandbyRemotes() {
@@ -51,11 +57,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig
})
}
err := orig(ctx, pro, name, denv)
if err != nil {
return err
}
role, _ := controller.roleAndEpoch()
for i, r := range controller.cfg.StandbyRemotes() {
ttfdir, err := denv.TempTableFilesDir()

View File

@@ -0,0 +1,122 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clusterdb
import (
"time"
"github.com/dolthub/go-mysql-server/sql"
)
type ReplicaStatus struct {
// The name of the database this replica status represents.
Database string
// The role this server is currently running as. "primary" or "standby".
Role string
// The epoch of this server's current role.
Epoch int
// The standby remote that this replica status represents.
Remote string
// The current replication lag. NULL when we are a standby.
ReplicationLag *time.Duration
// As a standby, the last time we received a root update.
// As a primary, the last time we pushed a root update to the standby.
LastUpdate *time.Time
// A string describing the last encountered error. NULL when we are a
// standby. NULL when our last replication attempt succeeded.
CurrentError *string
}
type ClusterStatusProvider interface {
GetClusterStatus() []ReplicaStatus
}
var _ sql.Table = ClusterStatusTable{}
type partition struct {
}
func (p *partition) Key() []byte {
return []byte("FULL")
}
func NewClusterStatusTable(provider ClusterStatusProvider) sql.Table {
return ClusterStatusTable{provider}
}
type ClusterStatusTable struct {
provider ClusterStatusProvider
}
func (t ClusterStatusTable) Name() string {
return StatusTableName
}
func (t ClusterStatusTable) String() string {
return StatusTableName
}
func (t ClusterStatusTable) Collation() sql.CollationID {
return sql.Collation_Default
}
func (t ClusterStatusTable) Partitions(*sql.Context) (sql.PartitionIter, error) {
return sql.PartitionsToPartitionIter((*partition)(nil)), nil
}
func (t ClusterStatusTable) PartitionRows(*sql.Context, sql.Partition) (sql.RowIter, error) {
if t.provider == nil {
return sql.RowsToRowIter(), nil
}
return sql.RowsToRowIter(replicaStatusesToRows(t.provider.GetClusterStatus())...), nil
}
func replicaStatusesToRows(rss []ReplicaStatus) []sql.Row {
ret := make([]sql.Row, len(rss))
for i, rs := range rss {
ret[i] = replicaStatusToRow(rs)
}
return ret
}
func replicaStatusToRow(rs ReplicaStatus) sql.Row {
ret := make(sql.Row, 7)
ret[0] = rs.Database
ret[1] = rs.Remote
ret[2] = rs.Role
ret[3] = int64(rs.Epoch)
if rs.ReplicationLag != nil {
ret[4] = rs.ReplicationLag.Milliseconds()
}
if rs.LastUpdate != nil {
ret[5] = *rs.LastUpdate
}
if rs.CurrentError != nil {
ret[6] = *rs.CurrentError
}
return ret
}
func (t ClusterStatusTable) Schema() sql.Schema {
return sql.Schema{
{Name: "database", Type: sql.Text, Source: StatusTableName, PrimaryKey: true, Nullable: false},
{Name: "standby_remote", Type: sql.Text, Source: StatusTableName, PrimaryKey: true, Nullable: false},
{Name: "role", Type: sql.Text, Source: StatusTableName, PrimaryKey: false, Nullable: false},
{Name: "epoch", Type: sql.Int64, Source: StatusTableName, PrimaryKey: false, Nullable: false},
{Name: "replication_lag_millis", Type: sql.Int64, Source: StatusTableName, PrimaryKey: false, Nullable: true},
{Name: "last_update", Type: sql.Datetime, Source: StatusTableName, PrimaryKey: false, Nullable: true},
{Name: "current_error", Type: sql.Text, Source: StatusTableName, PrimaryKey: false, Nullable: true},
}
}

View File

@@ -0,0 +1,71 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clusterdb
import (
"errors"
"strings"
"github.com/dolthub/go-mysql-server/sql"
)
type database struct {
statusProvider ClusterStatusProvider
}
var _ sql.Database = database{}
const StatusTableName = "dolt_cluster_status"
func (database) Name() string {
return "dolt_cluster"
}
func (db database) GetTableInsensitive(ctx *sql.Context, tblName string) (sql.Table, bool, error) {
tblName = strings.ToLower(tblName)
if tblName == StatusTableName {
return NewClusterStatusTable(db.statusProvider), true, nil
}
return nil, false, nil
}
func (database) GetTableNames(ctx *sql.Context) ([]string, error) {
return []string{StatusTableName}, nil
}
func NewClusterDatabase(p ClusterStatusProvider) sql.Database {
return database{p}
}
// Implement StoredProcedureDatabase so that external stored procedures are available.
var _ sql.StoredProcedureDatabase = database{}
func (database) GetStoredProcedures(ctx *sql.Context) ([]sql.StoredProcedureDetails, error) {
return nil, nil
}
func (database) SaveStoredProcedure(ctx *sql.Context, spd sql.StoredProcedureDetails) error {
return errors.New("unimplemented")
}
func (database) DropStoredProcedure(ctx *sql.Context, name string) error {
return errors.New("unimplemented")
}
var _ sql.ReadOnlyDatabase = database{}
func (database) IsReadOnly() bool {
return true
}

View File

@@ -56,14 +56,13 @@ cluster:
SERVER_PID=$!
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10"
server_query_with_port "${SERVERONE_MYSQL_PORT}" dolt_cluster 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;select "'`database`'", standby_remote, role, epoch from dolt_cluster_status order by "'`database`'" asc" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10;database,standby_remote,role,epoch\nrepo1,standby,standby,10\nrepo2,standby,standby,10"
kill $SERVER_PID
wait $SERVER_PID
SERVER_PID=
echo "
log_level: trace
user:
name: dolt
listener:
@@ -92,7 +91,6 @@ cluster:
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
@@ -150,7 +148,6 @@ cluster:
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
@@ -181,7 +178,6 @@ cluster:
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
@@ -209,7 +205,6 @@ cluster:
cd serverone
echo "
log_level: trace
user:
name: dolt
listener:
@@ -222,7 +217,7 @@ cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database}
bootstrap_role: primary
bootstrap_role: standby
bootstrap_epoch: 10
remotesapi:
port: ${SERVERONE_GRPC_PORT}" > server.yaml
@@ -236,10 +231,11 @@ cluster:
DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml &
serverone_pid=$!
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
cd ../servertwo
echo "
log_level: trace
user:
name: dolt
listener:
@@ -252,7 +248,7 @@ cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database}
bootstrap_role: standby
bootstrap_role: primary
bootstrap_epoch: 10
remotesapi:
port: ${SERVERTWO_GRPC_PORT}" > server.yaml
@@ -267,14 +263,18 @@ cluster:
servertwo_pid=$!
wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000
wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000
server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5);"
server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5)"
server_query_with_port "${SERVERTWO_MYSQL_PORT}" dolt_cluster 1 dolt "" "select "'`database`'", standby_remote, role, epoch, replication_lag_millis, current_error from dolt_cluster_status order by "'`database`'" asc" "database,standby_remote,role,epoch,replication_lag_millis,current_error\nrepo1,standby,primary,10,0,None\nrepo2,standby,primary,10,0,None"
kill $servertwo_pid
wait $servertwo_pid
kill $serverone_pid
wait $serverone_pid
kill $servertwo_pid
wait $servertwo_pid
cd ../serverone
run env DOLT_ROOT_PATH=`pwd` dolt sql -q 'select count(*) from vals'
[ "$status" -eq 0 ]