go: sqle: cluster: Add a ChunkStore wrapper for use in our remotesrv Server to record successful commits on the standby.

This commit is contained in:
Aaron Son
2022-09-28 17:03:56 -07:00
parent f2a43f1377
commit 8f6f40937e
5 changed files with 98 additions and 18 deletions

View File

@@ -225,12 +225,13 @@ func Serve(
var remoteSrv *remotesrv.Server
if serverConfig.RemotesapiPort() != nil {
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
remoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
args := sqle.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
ReadOnly: true,
HttpPort: *serverConfig.RemotesapiPort(),
GrpcPort: *serverConfig.RemotesapiPort(),
})
remoteSrv = remotesrv.NewServer(args)
listeners, err := remoteSrv.Listeners()
if err != nil {
lgr.Errorf("error starting remotesapi server listeners on port %d: %v", *serverConfig.RemotesapiPort(), err)
@@ -251,12 +252,10 @@ func Serve(
var clusterRemoteSrv *remotesrv.Server
if clusterController != nil {
if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil {
clusterRemoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
HttpPort: clusterController.RemoteSrvPort(),
GrpcPort: clusterController.RemoteSrvPort(),
Options: clusterController.ServerOptions(),
args := clusterController.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{
Logger: logrus.NewEntry(lgr),
})
clusterRemoteSrv = remotesrv.NewServer(args)
listeners, err := clusterRemoteSrv.Listeners()
if err != nil {
lgr.Errorf("error starting remotesapi server listeners for cluster config on port %d: %v", clusterController.RemoteSrvPort(), err)

View File

@@ -43,9 +43,9 @@ type commithook struct {
cond *sync.Cond
nextHead hash.Hash
lastPushedHead hash.Hash
lastPushedSuccess time.Time
nextPushAttempt time.Time
nextHeadIncomingTime time.Time
lastSuccess time.Time
currentError *string
role Role
@@ -188,7 +188,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) {
h.currentError = nil
h.lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB")
h.lastPushedHead = toPush
h.lastPushedSuccess = incomingTime
h.lastSuccess = incomingTime
h.nextPushAttempt = time.Time{}
} else {
h.currentError = new(string)
@@ -217,20 +217,19 @@ func (h *commithook) status() (replicationLag *time.Duration, lastUpdate *time.T
// Operationally, failure to replicate a write for a long time is a
// problem that merits investigation, regardless of how many pending
// writes are failing to replicate.
*replicationLag = time.Now().Sub(h.lastPushedSuccess)
*replicationLag = time.Now().Sub(h.lastSuccess)
}
}
if h.lastPushedSuccess != (time.Time{}) {
lastUpdate := new(time.Time)
*lastUpdate = h.lastPushedSuccess
}
}
if h.lastSuccess != (time.Time{}) {
lastUpdate = new(time.Time)
*lastUpdate = h.lastSuccess
}
currentErr = h.currentError
// TODO: lastUpdate in Standby role.
return
}
@@ -264,6 +263,16 @@ func (h *commithook) run(ctx context.Context) {
h.lgr.Tracef("cluster/commithook: background thread: completed.")
}
func (h *commithook) recordSuccessfulRemoteSrvCommit() {
h.mu.Lock()
defer h.mu.Unlock()
if h.role == RolePrimary {
return
}
h.lastSuccess = time.Now()
h.currentError = nil
}
func (h *commithook) setRole(role Role) {
h.mu.Lock()
defer h.mu.Unlock()
@@ -272,7 +281,7 @@ func (h *commithook) setRole(role Role) {
h.currentError = nil
h.nextHead = hash.Hash{}
h.lastPushedHead = hash.Hash{}
h.lastPushedSuccess = time.Time{}
h.lastSuccess = time.Time{}
h.nextPushAttempt = time.Time{}
h.role = role
h.lgr = h.rootLgr.WithField("role", string(role))

View File

@@ -27,6 +27,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/dbfactory"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/clusterdb"
"github.com/dolthub/dolt/go/libraries/utils/config"
@@ -314,3 +315,25 @@ func (c *Controller) GetClusterStatus() []clusterdb.ReplicaStatus {
}
return ret
}
func (c *Controller) recordSuccessfulRemoteSrvCommit(name string) {
c.lgr.Tracef("standby replica received push and updated database %s", name)
c.mu.Lock()
commithooks := make([]*commithook, len(c.commithooks))
copy(commithooks, c.commithooks)
c.mu.Unlock()
for _, c := range commithooks {
if c.dbname == name {
c.recordSuccessfulRemoteSrvCommit()
}
}
}
func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.ServerArgs) remotesrv.ServerArgs {
args.HttpPort = c.RemoteSrvPort()
args.GrpcPort = c.RemoteSrvPort()
args.Options = c.ServerOptions()
args = sqle.RemoteSrvServerArgs(ctx, args)
args.DBCache = remotesrvStoreCache{args.DBCache, c}
return args
}

View File

@@ -0,0 +1,49 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cluster
import (
"context"
"github.com/dolthub/dolt/go/libraries/doltcore/remotesrv"
"github.com/dolthub/dolt/go/store/hash"
)
type remotesrvStoreCache struct {
remotesrv.DBCache
controller *Controller
}
func (s remotesrvStoreCache) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, error) {
rss, err := s.DBCache.Get(path, nbfVerStr)
if err != nil {
return nil, err
}
return remotesrvStore{rss, path, s.controller}, nil
}
type remotesrvStore struct {
remotesrv.RemoteSrvStore
path string
controller *Controller
}
func (rss remotesrvStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) {
res, err := rss.RemoteSrvStore.Commit(ctx, current, last)
if err == nil && res {
rss.controller.recordSuccessfulRemoteSrvCommit(rss.path)
}
return res, err
}

View File

@@ -50,9 +50,9 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e
return rss, nil
}
func NewRemoteSrvServer(ctx *sql.Context, args remotesrv.ServerArgs) *remotesrv.Server {
func RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.ServerArgs) remotesrv.ServerArgs {
sess := dsess.DSessFromSess(ctx.Session)
args.FS = sess.Provider().FileSystem()
args.DBCache = remotesrvStore{ctx}
return remotesrv.NewServer(args)
return args
}