mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-09 11:19:01 -05:00
go: sqle: cluster: commithook: Periodically heartbeat to a standby when we are primary. This allows replication_lag on the standby to more accurately reflect the possible drift locally.
This commit is contained in:
@@ -108,6 +108,7 @@ func (h *commithook) replicate(ctx context.Context) {
|
||||
defer h.logger().Tracef("cluster/commithook: background thread: replicate: shutdown.")
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
shouldHeartbeat := false
|
||||
for {
|
||||
lgr := h.logger()
|
||||
// Shutdown for context canceled.
|
||||
@@ -138,11 +139,17 @@ func (h *commithook) replicate(ctx context.Context) {
|
||||
h.nextHeadIncomingTime = time.Now()
|
||||
} else if h.shouldReplicate() {
|
||||
h.attemptReplicate(ctx)
|
||||
shouldHeartbeat = false
|
||||
} else {
|
||||
lgr.Tracef("cluster/commithook: background thread: waiting for signal.")
|
||||
if h.waitNotify != nil {
|
||||
h.waitNotify()
|
||||
}
|
||||
if shouldHeartbeat {
|
||||
h.attemptHeartbeat(ctx)
|
||||
} else {
|
||||
shouldHeartbeat = true
|
||||
}
|
||||
h.cond.Wait()
|
||||
lgr.Tracef("cluster/commithook: background thread: woken up.")
|
||||
}
|
||||
@@ -175,6 +182,37 @@ func (h *commithook) primaryNeedsInit() bool {
|
||||
return h.role == RolePrimary && h.nextHead == (hash.Hash{})
|
||||
}
|
||||
|
||||
// Called by the replicate thread to periodically heartbeat liveness to a
|
||||
// standby if we are a primary. These heartbeats are best effort and currently
|
||||
// do not affect the data plane much.
|
||||
//
|
||||
// preconditions: h.mu is locked and shouldReplicate() returned false.
|
||||
func (h *commithook) attemptHeartbeat(ctx context.Context) {
|
||||
if h.role != RolePrimary {
|
||||
return
|
||||
}
|
||||
head := h.lastPushedHead
|
||||
if head.IsEmpty() {
|
||||
return
|
||||
}
|
||||
destDB := h.destDB
|
||||
if destDB == nil {
|
||||
return
|
||||
}
|
||||
ctx, h.cancelReplicate = context.WithTimeout(ctx, 5 * time.Second)
|
||||
defer func() {
|
||||
if h.cancelReplicate != nil {
|
||||
h.cancelReplicate()
|
||||
}
|
||||
h.cancelReplicate = nil
|
||||
}()
|
||||
h.mu.Unlock()
|
||||
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
|
||||
cs := datas.ChunkStoreFromDatabase(datasDB)
|
||||
cs.Commit(ctx, head, head)
|
||||
h.mu.Lock()
|
||||
}
|
||||
|
||||
// Called by the replicate thread to push the nextHead to the destDB and set
|
||||
// its root to the new value.
|
||||
//
|
||||
|
||||
Reference in New Issue
Block a user