diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 7fe5c731b0..0f2eabc84f 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -225,12 +225,13 @@ func Serve( var remoteSrv *remotesrv.Server if serverConfig.RemotesapiPort() != nil { if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil { - remoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{ + args := sqle.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{ Logger: logrus.NewEntry(lgr), ReadOnly: true, HttpPort: *serverConfig.RemotesapiPort(), GrpcPort: *serverConfig.RemotesapiPort(), }) + remoteSrv = remotesrv.NewServer(args) listeners, err := remoteSrv.Listeners() if err != nil { lgr.Errorf("error starting remotesapi server listeners on port %d: %v", *serverConfig.RemotesapiPort(), err) @@ -251,12 +252,10 @@ func Serve( var clusterRemoteSrv *remotesrv.Server if clusterController != nil { if remoteSrvSqlCtx, err := sqlEngine.NewContext(context.Background()); err == nil { - clusterRemoteSrv = sqle.NewRemoteSrvServer(remoteSrvSqlCtx, remotesrv.ServerArgs{ - Logger: logrus.NewEntry(lgr), - HttpPort: clusterController.RemoteSrvPort(), - GrpcPort: clusterController.RemoteSrvPort(), - Options: clusterController.ServerOptions(), + args := clusterController.RemoteSrvServerArgs(remoteSrvSqlCtx, remotesrv.ServerArgs{ + Logger: logrus.NewEntry(lgr), }) + clusterRemoteSrv = remotesrv.NewServer(args) listeners, err := clusterRemoteSrv.Listeners() if err != nil { lgr.Errorf("error starting remotesapi server listeners for cluster config on port %d: %v", clusterController.RemoteSrvPort(), err) diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index cdcb653582..44a475607e 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -44,9 +44,9 @@ type commithook struct { cond *sync.Cond nextHead hash.Hash lastPushedHead hash.Hash - lastPushedSuccess time.Time nextPushAttempt time.Time nextHeadIncomingTime time.Time + lastSuccess time.Time currentError *string role Role @@ -212,7 +212,7 @@ func (h *commithook) attemptReplicate(ctx context.Context) { h.currentError = nil lgr.Tracef("cluster/commithook: successfully Commited chunks on destDB") h.lastPushedHead = toPush - h.lastPushedSuccess = incomingTime + h.lastSuccess = incomingTime h.nextPushAttempt = time.Time{} } else { h.currentError = new(string) @@ -241,20 +241,19 @@ func (h *commithook) status() (replicationLag *time.Duration, lastUpdate *time.T // Operationally, failure to replicate a write for a long time is a // problem that merits investigation, regardless of how many pending // writes are failing to replicate. - *replicationLag = time.Now().Sub(h.lastPushedSuccess) + *replicationLag = time.Now().Sub(h.lastSuccess) } } - if h.lastPushedSuccess != (time.Time{}) { - lastUpdate := new(time.Time) - *lastUpdate = h.lastPushedSuccess - } + } + + if h.lastSuccess != (time.Time{}) { + lastUpdate = new(time.Time) + *lastUpdate = h.lastSuccess } currentErr = h.currentError - // TODO: lastUpdate in Standby role. - return } @@ -277,6 +276,16 @@ func (h *commithook) tick(ctx context.Context) { } } +func (h *commithook) recordSuccessfulRemoteSrvCommit() { + h.mu.Lock() + defer h.mu.Unlock() + if h.role == RolePrimary { + return + } + h.lastSuccess = time.Now() + h.currentError = nil +} + func (h *commithook) setRole(role Role) { h.mu.Lock() defer h.mu.Unlock() @@ -285,7 +294,7 @@ func (h *commithook) setRole(role Role) { h.currentError = nil h.nextHead = hash.Hash{} h.lastPushedHead = hash.Hash{} - h.lastPushedSuccess = time.Time{} + h.lastSuccess = time.Time{} h.nextPushAttempt = time.Time{} h.role = role h.lgr.Store(h.rootLgr.WithField(logFieldRole, string(role))) diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index dc9f0d5db4..9014fb56c7 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -27,6 +27,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/dbfactory" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/remotesrv" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/clusterdb" "github.com/dolthub/dolt/go/libraries/utils/config" @@ -316,3 +317,25 @@ func (c *Controller) GetClusterStatus() []clusterdb.ReplicaStatus { } return ret } + +func (c *Controller) recordSuccessfulRemoteSrvCommit(name string) { + c.lgr.Tracef("standby replica received push and updated database %s", name) + c.mu.Lock() + commithooks := make([]*commithook, len(c.commithooks)) + copy(commithooks, c.commithooks) + c.mu.Unlock() + for _, c := range commithooks { + if c.dbname == name { + c.recordSuccessfulRemoteSrvCommit() + } + } +} + +func (c *Controller) RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.ServerArgs) remotesrv.ServerArgs { + args.HttpPort = c.RemoteSrvPort() + args.GrpcPort = c.RemoteSrvPort() + args.Options = c.ServerOptions() + args = sqle.RemoteSrvServerArgs(ctx, args) + args.DBCache = remotesrvStoreCache{args.DBCache, c} + return args +} diff --git a/go/libraries/doltcore/sqle/cluster/remotesrv.go b/go/libraries/doltcore/sqle/cluster/remotesrv.go new file mode 100644 index 0000000000..6bff32a7e0 --- /dev/null +++ b/go/libraries/doltcore/sqle/cluster/remotesrv.go @@ -0,0 +1,49 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cluster + +import ( + "context" + + "github.com/dolthub/dolt/go/libraries/doltcore/remotesrv" + "github.com/dolthub/dolt/go/store/hash" +) + +type remotesrvStoreCache struct { + remotesrv.DBCache + controller *Controller +} + +func (s remotesrvStoreCache) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, error) { + rss, err := s.DBCache.Get(path, nbfVerStr) + if err != nil { + return nil, err + } + return remotesrvStore{rss, path, s.controller}, nil +} + +type remotesrvStore struct { + remotesrv.RemoteSrvStore + path string + controller *Controller +} + +func (rss remotesrvStore) Commit(ctx context.Context, current, last hash.Hash) (bool, error) { + res, err := rss.RemoteSrvStore.Commit(ctx, current, last) + if err == nil && res { + rss.controller.recordSuccessfulRemoteSrvCommit(rss.path) + } + return res, err +} diff --git a/go/libraries/doltcore/sqle/remotesrv.go b/go/libraries/doltcore/sqle/remotesrv.go index 8de3cdfee5..8932cd040d 100644 --- a/go/libraries/doltcore/sqle/remotesrv.go +++ b/go/libraries/doltcore/sqle/remotesrv.go @@ -50,9 +50,9 @@ func (s remotesrvStore) Get(path, nbfVerStr string) (remotesrv.RemoteSrvStore, e return rss, nil } -func NewRemoteSrvServer(ctx *sql.Context, args remotesrv.ServerArgs) *remotesrv.Server { +func RemoteSrvServerArgs(ctx *sql.Context, args remotesrv.ServerArgs) remotesrv.ServerArgs { sess := dsess.DSessFromSess(ctx.Session) args.FS = sess.Provider().FileSystem() args.DBCache = remotesrvStore{ctx} - return remotesrv.NewServer(args) + return args }