diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 1b540f028d..c29bc10ddd 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -108,6 +108,7 @@ func (h *commithook) replicate(ctx context.Context) { defer h.logger().Tracef("cluster/commithook: background thread: replicate: shutdown.") h.mu.Lock() defer h.mu.Unlock() + shouldHeartbeat := false for { lgr := h.logger() // Shutdown for context canceled. @@ -138,11 +139,17 @@ func (h *commithook) replicate(ctx context.Context) { h.nextHeadIncomingTime = time.Now() } else if h.shouldReplicate() { h.attemptReplicate(ctx) + shouldHeartbeat = false } else { lgr.Tracef("cluster/commithook: background thread: waiting for signal.") if h.waitNotify != nil { h.waitNotify() } + if shouldHeartbeat { + h.attemptHeartbeat(ctx) + } else { + shouldHeartbeat = true + } h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") } @@ -175,6 +182,37 @@ func (h *commithook) primaryNeedsInit() bool { return h.role == RolePrimary && h.nextHead == (hash.Hash{}) } +// Called by the replicate thread to periodically heartbeat liveness to a +// standby if we are a primary. These heartbeats are best effort and currently +// do not affect the data plane much. +// +// preconditions: h.mu is locked and shouldReplicate() returned false. +func (h *commithook) attemptHeartbeat(ctx context.Context) { + if h.role != RolePrimary { + return + } + head := h.lastPushedHead + if head.IsEmpty() { + return + } + destDB := h.destDB + if destDB == nil { + return + } + ctx, h.cancelReplicate = context.WithTimeout(ctx, 5 * time.Second) + defer func() { + if h.cancelReplicate != nil { + h.cancelReplicate() + } + h.cancelReplicate = nil + }() + h.mu.Unlock() + datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB) + cs := datas.ChunkStoreFromDatabase(datasDB) + cs.Commit(ctx, head, head) + h.mu.Lock() +} + // Called by the replicate thread to push the nextHead to the destDB and set // its root to the new value. //