From f2a43f1377daa3f4465d46c1cd449affa7fff116 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 28 Sep 2022 15:45:01 -0700 Subject: [PATCH] go: sqle: cluster: Add a dolt_cluster database, exposing a dolt_cluster_status table which queries replication status. --- go/cmd/dolt/commands/engine/sqlengine.go | 6 + .../doltcore/sqle/cluster/commithook.go | 52 +++++--- .../doltcore/sqle/cluster/controller.go | 33 +++++ .../doltcore/sqle/cluster/dialprovider.go | 12 ++ .../doltcore/sqle/cluster/initdbhook.go | 11 +- .../sqle/clusterdb/cluster_status_table.go | 122 ++++++++++++++++++ .../doltcore/sqle/clusterdb/database.go | 71 ++++++++++ .../bats/sql-server-cluster.bats | 26 ++-- 8 files changed, 299 insertions(+), 34 deletions(-) create mode 100644 go/libraries/doltcore/sqle/clusterdb/cluster_status_table.go create mode 100644 go/libraries/doltcore/sqle/clusterdb/database.go diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index 8de8a77380..9984d8a8d8 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -99,6 +99,12 @@ func NewSqlEngine( all := append(dsqleDBsAsSqlDBs(dbs), infoDB) locations = append(locations, nil) + clusterDB := config.ClusterController.ClusterDatabase() + if clusterDB != nil { + all = append(all, clusterDB) + locations = append(locations, nil) + } + b := env.GetDefaultInitBranch(mrEnv.Config()) pro, err := dsqle.NewDoltDatabaseProviderWithDatabases(b, mrEnv.FileSystem(), all, locations) if err != nil { diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index c10f9616c5..42c345eb82 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -17,6 +17,7 @@ package cluster import ( "context" "errors" + "fmt" "io" "sync" "time" @@ -45,6 +46,7 @@ type commithook struct { lastPushedSuccess time.Time nextPushAttempt time.Time nextHeadIncomingTime time.Time + currentError *string role Role @@ -149,7 +151,9 @@ func (h *commithook) attemptReplicate(ctx context.Context) { var err error destDB, err = h.destDBF(ctx) if err != nil { - h.lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v.", err) + h.currentError = new(string) + *h.currentError = fmt.Sprintf("could not replicate to standby: error fetching destDB: %v", err) + h.lgr.Warnf("cluster/commithook: could not replicate to standby: error fetching destDB: %v", err) h.mu.Lock() // TODO: We could add some backoff here. if toPush == h.nextHead { @@ -181,11 +185,14 @@ func (h *commithook) attemptReplicate(ctx context.Context) { h.mu.Lock() if err == nil { + h.currentError = nil h.lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB") h.lastPushedHead = toPush h.lastPushedSuccess = incomingTime h.nextPushAttempt = time.Time{} } else { + h.currentError = new(string) + *h.currentError = fmt.Sprintf("failed to commit chunks on destDB: %v", err) h.lgr.Warnf("cluster/commithook: failed to commit chunks on destDB: %v", err) // add some delay if a new head didn't come in while we were pushing. if toPush == h.nextHead { @@ -195,24 +202,36 @@ func (h *commithook) attemptReplicate(ctx context.Context) { } } -func (h *commithook) replicationLag() time.Duration { +func (h *commithook) status() (replicationLag *time.Duration, lastUpdate *time.Time, currentErr *string) { h.mu.Lock() defer h.mu.Unlock() - if h.role == RoleStandby { - return time.Duration(0) + if h.role == RolePrimary { + if h.lastPushedHead != (hash.Hash{}) { + replicationLag = new(time.Duration) + if h.nextHead != h.lastPushedHead { + // We return the wallclock time between now and the last time we were + // successful. If h.nextHeadIncomingTime is significantly earlier than + // time.Now(), because the server has not received a write in a long + // time, then this metric may report a high number when the number of + // seconds of writes outstanding could actually be much smaller. + // Operationally, failure to replicate a write for a long time is a + // problem that merits investigation, regardless of how many pending + // writes are failing to replicate. + *replicationLag = time.Now().Sub(h.lastPushedSuccess) + } + } + + if h.lastPushedSuccess != (time.Time{}) { + lastUpdate := new(time.Time) + *lastUpdate = h.lastPushedSuccess + } } - if h.nextHead == h.lastPushedHead { - return time.Duration(0) - } - // We return the wallclock time between now and the last time we were - // successful. If h.nextHeadIncomingTime is significantly earlier than - // time.Now(), because the server has not received a write in a long - // time, then this metric may report a high number when the number of - // seconds of writes outstanding could actually be much smaller. - // Operationally, failure to replicate a write for a long time is a - // problem that merits investigation, regardless of how many pending - // writes are failing to replicate. - return time.Now().Sub(h.lastPushedSuccess) + + currentErr = h.currentError + + // TODO: lastUpdate in Standby role. + + return } // TODO: Would be more efficient to only tick when we have outstanding work... @@ -250,6 +269,7 @@ func (h *commithook) setRole(role Role) { defer h.mu.Unlock() // Reset head-to-push and timers here. When we transition into Primary, // the replicate() loop will take these from the current chunk store. + h.currentError = nil h.nextHead = hash.Hash{} h.lastPushedHead = hash.Hash{} h.lastPushedSuccess = time.Time{} diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 744a854f22..81fbe2c845 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -28,6 +28,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/clusterdb" "github.com/dolthub/dolt/go/libraries/utils/config" "github.com/dolthub/dolt/go/store/types" ) @@ -157,6 +158,13 @@ func (c *Controller) RegisterStoredProcedures(store procedurestore) { store.Register(newAssumeRoleProcedure(c)) } +func (c *Controller) ClusterDatabase() sql.Database { + if c == nil { + return nil + } + return clusterdb.NewClusterDatabase(c) +} + func (c *Controller) RemoteSrvPort() int { if c == nil { return -1 @@ -281,3 +289,28 @@ func (c *Controller) registerCommitHook(hook *commithook) { defer c.mu.Unlock() c.commithooks = append(c.commithooks, hook) } + +func (c *Controller) GetClusterStatus() []clusterdb.ReplicaStatus { + if c == nil { + return []clusterdb.ReplicaStatus{} + } + c.mu.Lock() + epoch, role := c.epoch, c.role + commithooks := make([]*commithook, len(c.commithooks)) + copy(commithooks, c.commithooks) + c.mu.Unlock() + ret := make([]clusterdb.ReplicaStatus, len(commithooks)) + for i, c := range commithooks { + lag, lastUpdate, currentErrorStr := c.status() + ret[i] = clusterdb.ReplicaStatus{ + Database: c.dbname, + Remote: c.remotename, + Role: string(role), + Epoch: epoch, + ReplicationLag: lag, + LastUpdate: lastUpdate, + CurrentError: currentErrorStr, + } + } + return ret +} diff --git a/go/libraries/doltcore/sqle/cluster/dialprovider.go b/go/libraries/doltcore/sqle/cluster/dialprovider.go index d2a61a406d..9562db17fa 100644 --- a/go/libraries/doltcore/sqle/cluster/dialprovider.go +++ b/go/libraries/doltcore/sqle/cluster/dialprovider.go @@ -15,7 +15,10 @@ package cluster import ( + "time" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" "github.com/dolthub/dolt/go/libraries/doltcore/grpcendpoint" @@ -38,5 +41,14 @@ func (p grpcDialProvider) GetGRPCDialParams(config grpcendpoint.Config) (string, return "", nil, err } opts = append(opts, p.ci.Options()...) + opts = append(opts, grpc.WithConnectParams(grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 250 * time.Millisecond, + Multiplier: 1.6, + Jitter: 0.6, + MaxDelay: 10 * time.Second, + }, + MinConnectTimeout: 250 * time.Millisecond, + })) return endpoint, opts, nil } diff --git a/go/libraries/doltcore/sqle/cluster/initdbhook.go b/go/libraries/doltcore/sqle/cluster/initdbhook.go index b9f23c1c6f..7713c91540 100644 --- a/go/libraries/doltcore/sqle/cluster/initdbhook.go +++ b/go/libraries/doltcore/sqle/cluster/initdbhook.go @@ -32,6 +32,12 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig return orig } return func(ctx *sql.Context, pro sqle.DoltDatabaseProvider, name string, denv *env.DoltEnv) error { + var err error + err = orig(ctx, pro, name, denv) + if err != nil { + return err + } + dialprovider := controller.gRPCDialProvider(denv) var remoteDBs []func(context.Context) (*doltdb.DoltDB, error) for _, r := range controller.cfg.StandbyRemotes() { @@ -51,11 +57,6 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads, orig }) } - err := orig(ctx, pro, name, denv) - if err != nil { - return err - } - role, _ := controller.roleAndEpoch() for i, r := range controller.cfg.StandbyRemotes() { ttfdir, err := denv.TempTableFilesDir() diff --git a/go/libraries/doltcore/sqle/clusterdb/cluster_status_table.go b/go/libraries/doltcore/sqle/clusterdb/cluster_status_table.go new file mode 100644 index 0000000000..cca51c6163 --- /dev/null +++ b/go/libraries/doltcore/sqle/clusterdb/cluster_status_table.go @@ -0,0 +1,122 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clusterdb + +import ( + "time" + + "github.com/dolthub/go-mysql-server/sql" +) + +type ReplicaStatus struct { + // The name of the database this replica status represents. + Database string + // The role this server is currently running as. "primary" or "standby". + Role string + // The epoch of this server's current role. + Epoch int + // The standby remote that this replica status represents. + Remote string + // The current replication lag. NULL when we are a standby. + ReplicationLag *time.Duration + // As a standby, the last time we received a root update. + // As a primary, the last time we pushed a root update to the standby. + LastUpdate *time.Time + // A string describing the last encountered error. NULL when we are a + // standby. NULL when our last replication attempt succeeded. + CurrentError *string +} + +type ClusterStatusProvider interface { + GetClusterStatus() []ReplicaStatus +} + +var _ sql.Table = ClusterStatusTable{} + +type partition struct { +} + +func (p *partition) Key() []byte { + return []byte("FULL") +} + +func NewClusterStatusTable(provider ClusterStatusProvider) sql.Table { + return ClusterStatusTable{provider} +} + +type ClusterStatusTable struct { + provider ClusterStatusProvider +} + +func (t ClusterStatusTable) Name() string { + return StatusTableName +} + +func (t ClusterStatusTable) String() string { + return StatusTableName +} + +func (t ClusterStatusTable) Collation() sql.CollationID { + return sql.Collation_Default +} + +func (t ClusterStatusTable) Partitions(*sql.Context) (sql.PartitionIter, error) { + return sql.PartitionsToPartitionIter((*partition)(nil)), nil +} + +func (t ClusterStatusTable) PartitionRows(*sql.Context, sql.Partition) (sql.RowIter, error) { + if t.provider == nil { + return sql.RowsToRowIter(), nil + } + return sql.RowsToRowIter(replicaStatusesToRows(t.provider.GetClusterStatus())...), nil +} + +func replicaStatusesToRows(rss []ReplicaStatus) []sql.Row { + ret := make([]sql.Row, len(rss)) + for i, rs := range rss { + ret[i] = replicaStatusToRow(rs) + } + return ret +} + +func replicaStatusToRow(rs ReplicaStatus) sql.Row { + ret := make(sql.Row, 7) + ret[0] = rs.Database + ret[1] = rs.Remote + ret[2] = rs.Role + ret[3] = int64(rs.Epoch) + if rs.ReplicationLag != nil { + ret[4] = rs.ReplicationLag.Milliseconds() + } + if rs.LastUpdate != nil { + ret[5] = *rs.LastUpdate + } + if rs.CurrentError != nil { + ret[6] = *rs.CurrentError + } + return ret +} + +func (t ClusterStatusTable) Schema() sql.Schema { + return sql.Schema{ + {Name: "database", Type: sql.Text, Source: StatusTableName, PrimaryKey: true, Nullable: false}, + {Name: "standby_remote", Type: sql.Text, Source: StatusTableName, PrimaryKey: true, Nullable: false}, + {Name: "role", Type: sql.Text, Source: StatusTableName, PrimaryKey: false, Nullable: false}, + {Name: "epoch", Type: sql.Int64, Source: StatusTableName, PrimaryKey: false, Nullable: false}, + {Name: "replication_lag_millis", Type: sql.Int64, Source: StatusTableName, PrimaryKey: false, Nullable: true}, + {Name: "last_update", Type: sql.Datetime, Source: StatusTableName, PrimaryKey: false, Nullable: true}, + {Name: "current_error", Type: sql.Text, Source: StatusTableName, PrimaryKey: false, Nullable: true}, + } +} diff --git a/go/libraries/doltcore/sqle/clusterdb/database.go b/go/libraries/doltcore/sqle/clusterdb/database.go new file mode 100644 index 0000000000..8880ed9085 --- /dev/null +++ b/go/libraries/doltcore/sqle/clusterdb/database.go @@ -0,0 +1,71 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clusterdb + +import ( + "errors" + "strings" + + "github.com/dolthub/go-mysql-server/sql" +) + +type database struct { + statusProvider ClusterStatusProvider +} + +var _ sql.Database = database{} + +const StatusTableName = "dolt_cluster_status" + +func (database) Name() string { + return "dolt_cluster" +} + +func (db database) GetTableInsensitive(ctx *sql.Context, tblName string) (sql.Table, bool, error) { + tblName = strings.ToLower(tblName) + if tblName == StatusTableName { + return NewClusterStatusTable(db.statusProvider), true, nil + } + return nil, false, nil +} + +func (database) GetTableNames(ctx *sql.Context) ([]string, error) { + return []string{StatusTableName}, nil +} + +func NewClusterDatabase(p ClusterStatusProvider) sql.Database { + return database{p} +} + +// Implement StoredProcedureDatabase so that external stored procedures are available. +var _ sql.StoredProcedureDatabase = database{} + +func (database) GetStoredProcedures(ctx *sql.Context) ([]sql.StoredProcedureDetails, error) { + return nil, nil +} + +func (database) SaveStoredProcedure(ctx *sql.Context, spd sql.StoredProcedureDetails) error { + return errors.New("unimplemented") +} + +func (database) DropStoredProcedure(ctx *sql.Context, name string) error { + return errors.New("unimplemented") +} + +var _ sql.ReadOnlyDatabase = database{} + +func (database) IsReadOnly() bool { + return true +} diff --git a/integration-tests/bats/sql-server-cluster.bats b/integration-tests/bats/sql-server-cluster.bats index 62d5f04cc3..083295b65c 100644 --- a/integration-tests/bats/sql-server-cluster.bats +++ b/integration-tests/bats/sql-server-cluster.bats @@ -56,14 +56,13 @@ cluster: SERVER_PID=$! wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 - server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10" + server_query_with_port "${SERVERONE_MYSQL_PORT}" dolt_cluster 1 dolt "" "select @@GLOBAL.dolt_cluster_role, @@GLOBAL.dolt_cluster_role_epoch;select "'`database`'", standby_remote, role, epoch from dolt_cluster_status order by "'`database`'" asc" "@@GLOBAL.dolt_cluster_role,@@GLOBAL.dolt_cluster_role_epoch\nstandby,10;database,standby_remote,role,epoch\nrepo1,standby,standby,10\nrepo2,standby,standby,10" kill $SERVER_PID wait $SERVER_PID SERVER_PID= echo " -log_level: trace user: name: dolt listener: @@ -92,7 +91,6 @@ cluster: cd serverone echo " -log_level: trace user: name: dolt listener: @@ -150,7 +148,6 @@ cluster: cd serverone echo " -log_level: trace user: name: dolt listener: @@ -181,7 +178,6 @@ cluster: cd serverone echo " -log_level: trace user: name: dolt listener: @@ -209,7 +205,6 @@ cluster: cd serverone echo " -log_level: trace user: name: dolt listener: @@ -222,7 +217,7 @@ cluster: standby_remotes: - name: standby remote_url_template: http://localhost:${SERVERTWO_GRPC_PORT}/{database} - bootstrap_role: primary + bootstrap_role: standby bootstrap_epoch: 10 remotesapi: port: ${SERVERONE_GRPC_PORT}" > server.yaml @@ -236,10 +231,11 @@ cluster: DOLT_ROOT_PATH=`pwd` dolt sql-server --config server.yaml & serverone_pid=$! + wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 + cd ../servertwo echo " -log_level: trace user: name: dolt listener: @@ -252,7 +248,7 @@ cluster: standby_remotes: - name: standby remote_url_template: http://localhost:${SERVERONE_GRPC_PORT}/{database} - bootstrap_role: standby + bootstrap_role: primary bootstrap_epoch: 10 remotesapi: port: ${SERVERTWO_GRPC_PORT}" > server.yaml @@ -267,14 +263,18 @@ cluster: servertwo_pid=$! wait_for_connection "${SERVERTWO_MYSQL_PORT}" 5000 - wait_for_connection "${SERVERONE_MYSQL_PORT}" 5000 - server_query_with_port "${SERVERONE_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5);" + + server_query_with_port "${SERVERTWO_MYSQL_PORT}" repo1 1 dolt "" "create table vals (i int primary key);insert into vals values (1),(2),(3),(4),(5)" + + server_query_with_port "${SERVERTWO_MYSQL_PORT}" dolt_cluster 1 dolt "" "select "'`database`'", standby_remote, role, epoch, replication_lag_millis, current_error from dolt_cluster_status order by "'`database`'" asc" "database,standby_remote,role,epoch,replication_lag_millis,current_error\nrepo1,standby,primary,10,0,None\nrepo2,standby,primary,10,0,None" + + kill $servertwo_pid + wait $servertwo_pid kill $serverone_pid wait $serverone_pid - kill $servertwo_pid - wait $servertwo_pid + cd ../serverone run env DOLT_ROOT_PATH=`pwd` dolt sql -q 'select count(*) from vals' [ "$status" -eq 0 ]