mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-13 19:29:58 -05:00
Merge pull request #4433 from dolthub/aaron/sql-cluster-standby-updated-stats
go: sqle: cluster: Add a ChunkStore wrapper for use in our remotesrvServer to record successful commits on the standby.
This commit is contained in:
@@ -225,12 +225,13 @@ func Serve(
|
||||
var remoteSrv *remotesrv.Server
|
||||
if serverConfig.RemotesapiPort() != nil {
|
||||
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
|
||||
remoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
|
||||
args := sqle.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{
|
||||
Logger: logrus.NewEntry(lgr),
|
||||
ReadOnly: true,
|
||||
HttpPort: *serverConfig.RemotesapiPort(),
|
||||
GrpcPort: *serverConfig.RemotesapiPort(),
|
||||
})
|
||||
remoteSrv = remotesrv.NewServer(args)
|
||||
listeners, err := remoteSrv.Listeners()
|
||||
if err != nil {
|
||||
lgr.Errorf("error starting remotesapi server listeners on port %d: %v", *serverConfig.RemotesapiPort(), err)
|
||||
@@ -251,12 +252,10 @@ func Serve(
|
||||
var clusterRemoteSrv *remotesrv.Server
|
||||
if clusterController != nil {
|
||||
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
|
||||
clusterRemoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
|
||||
Logger: logrus.NewEntry(lgr),
|
||||
HttpPort: clusterController.RemoteSrvPort(),
|
||||
GrpcPort: clusterController.RemoteSrvPort(),
|
||||
Options: clusterController.ServerOptions(),
|
||||
args := clusterController.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{
|
||||
Logger: logrus.NewEntry(lgr),
|
||||
})
|
||||
clusterRemoteSrv = remotesrv.NewServer(args)
|
||||
listeners, err := clusterRemoteSrv.Listeners()
|
||||
if err != nil {
|
||||
lgr.Errorf("error starting remotesapi server listeners for cluster config on port %d: %v", clusterController.RemoteSrvPort(), err)
|
||||
|
||||
@@ -44,9 +44,9 @@ type commithook struct {
|
||||
cond *sync.Cond
|
||||
nextHead hash.Hash
|
||||
lastPushedHead hash.Hash
|
||||
lastPushedSuccess time.Time
|
||||
nextPushAttempt time.Time
|
||||
nextHeadIncomingTime time.Time
|
||||
lastSuccess time.Time
|
||||
currentError *string
|
||||
|
||||
role Role
|
||||
@@ -212,7 +212,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
|
||||
h.currentError = nil
|
||||
lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
|
||||
h.lastPushedHead = toPush
|
||||
h.lastPushedSuccess = incomingTime
|
||||
h.lastSuccess = incomingTime
|
||||
h.nextPushAttempt = time.Time{}
|
||||
} else {
|
||||
h.currentError = new(string)
|
||||
@@ -241,20 +241,19 @@ func (h *commithook) status() (replicationLag *time.Duration, lastUpdate *time.T
|
||||
// Operationally, failure to replicate a write for a long time is a
|
||||
// problem that merits investigation, regardless of how many pending
|
||||
// writes are failing to replicate.
|
||||
*replicationLag = time.Now().Sub(h.lastPushedSuccess)
|
||||
*replicationLag = time.Now().Sub(h.lastSuccess)
|
||||
}
|
||||
}
|
||||
|
||||
if h.lastPushedSuccess != (time.Time{}) {
|
||||
lastUpdate := new(time.Time)
|
||||
*lastUpdate = h.lastPushedSuccess
|
||||
}
|
||||
}
|
||||
|
||||
if h.lastSuccess != (time.Time{}) {
|
||||
lastUpdate = new(time.Time)
|
||||
*lastUpdate = h.lastSuccess
|
||||
}
|
||||
|
||||
currentErr = h.currentError
|
||||
|
||||
// TODO: lastUpdate in Standby role.
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -277,6 +276,16 @@ func (h *commithook) tick(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *commithook) recordSuccessfulRemoteSrvCommit() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
if h.role == RolePrimary {
|
||||
return
|
||||
}
|
||||
h.lastSuccess = time.Now()
|
||||
h.currentError = nil
|
||||
}
|
||||
|
||||
func (h *commithook) setRole(role Role) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
@@ -285,7 +294,7 @@ func (h *commithook) setRole(role Role) {
|
||||
h.currentError = nil
|
||||
h.nextHead = hash.Hash{}
|
||||
h.lastPushedHead = hash.Hash{}
|
||||
h.lastPushedSuccess = time.Time{}
|
||||
h.lastSuccess = time.Time{}
|
||||
h.nextPushAttempt = time.Time{}
|
||||
h.role = role
|
||||
h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role)))
|
||||
|
||||
@@ -27,6 +27,7 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/env"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/clusterdb"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/config"
|
||||
@@ -316,3 +317,25 @@ func (c *Controller) GetClusterStatus() []clusterdb.ReplicaStatus {
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (c *Controller) recordSuccessfulRemoteSrvCommit(name string) {
|
||||
c.lgr.Tracef("standby replica received push and updated database %s", name)
|
||||
c.mu.Lock()
|
||||
commithooks := make([]*commithook, len(c.commithooks))
|
||||
copy(commithooks, c.commithooks)
|
||||
c.mu.Unlock()
|
||||
for _, c := range commithooks {
|
||||
if c.dbname == name {
|
||||
c.recordSuccessfulRemoteSrvCommit()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.ServerArgs) remotesrv.ServerArgs {
|
||||
args.HttpPort = c.RemoteSrvPort()
|
||||
args.GrpcPort = c.RemoteSrvPort()
|
||||
args.Options = c.ServerOptions()
|
||||
args = sqle.RemoteSrvServerArgs(ctx, args)
|
||||
args.DBCache = remotesrvStoreCache{args.DBCache, c}
|
||||
return args
|
||||
}
|
||||
|
||||
49
go/libraries/doltcore/sqle/cluster/remotesrv.go
Normal file
49
go/libraries/doltcore/sqle/cluster/remotesrv.go
Normal file
@@ -0,0 +1,49 @@
|
||||
// Copyright 2022 Dolthub, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package cluster
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
|
||||
type remotesrvStoreCache struct {
|
||||
remotesrv.DBCache
|
||||
controller *Controller
|
||||
}
|
||||
|
||||
func (s remotesrvStoreCache) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
|
||||
rss, err := s.DBCache.Get(path, nbfVerStr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return remotesrvStore{rss, path, s.controller}, nil
|
||||
}
|
||||
|
||||
type remotesrvStore struct {
|
||||
remotesrv.RemoteSrvStore
|
||||
path string
|
||||
controller *Controller
|
||||
}
|
||||
|
||||
func (rss remotesrvStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
|
||||
res, err := rss.RemoteSrvStore.Commit(ctx, current, last)
|
||||
if err == nil && res {
|
||||
rss.controller.recordSuccessfulRemoteSrvCommit(rss.path)
|
||||
}
|
||||
return res, err
|
||||
}
|
||||
@@ -50,9 +50,9 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e
|
||||
return rss, nil
|
||||
}
|
||||
|
||||
func NewRemoteSrvServer(ctx *sql.Context, args remotesrv.ServerArgs) *remotesrv.Server {
|
||||
func RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.ServerArgs) remotesrv.ServerArgs {
|
||||
sess := dsess.DSessFromSess(ctx.Session)
|
||||
args.FS = sess.Provider().FileSystem()
|
||||
args.DBCache = remotesrvStore{ctx}
|
||||
return remotesrv.NewServer(args)
|
||||
return args
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user