diff --git a/go/libraries/doltcore/sqle/cluster/commithook.go b/go/libraries/doltcore/sqle/cluster/commithook.go index 1b540f028d..37ae58896e 100644 --- a/go/libraries/doltcore/sqle/cluster/commithook.go +++ b/go/libraries/doltcore/sqle/cluster/commithook.go @@ -108,6 +108,7 @@ func (h *commithook) replicate(ctx context.Context) { defer h.logger().Tracef("cluster/commithook: background thread: replicate: shutdown.") h.mu.Lock() defer h.mu.Unlock() + shouldHeartbeat := false for { lgr := h.logger() // Shutdown for context canceled. @@ -138,11 +139,17 @@ func (h *commithook) replicate(ctx context.Context) { h.nextHeadIncomingTime = time.Now() } else if h.shouldReplicate() { h.attemptReplicate(ctx) + shouldHeartbeat = false } else { lgr.Tracef("cluster/commithook: background thread: waiting for signal.") if h.waitNotify != nil { h.waitNotify() } + if shouldHeartbeat { + h.attemptHeartbeat(ctx) + } else { + shouldHeartbeat = true + } h.cond.Wait() lgr.Tracef("cluster/commithook: background thread: woken up.") } @@ -175,6 +182,37 @@ func (h *commithook) primaryNeedsInit() bool { return h.role == RolePrimary && h.nextHead == (hash.Hash{}) } +// Called by the replicate thread to periodically heartbeat liveness to a +// standby if we are a primary. These heartbeats are best effort and currently +// do not affect the data plane much. +// +// preconditions: h.mu is locked and shouldReplicate() returned false. +func (h *commithook) attemptHeartbeat(ctx context.Context) { + if h.role != RolePrimary { + return + } + head := h.lastPushedHead + if head.IsEmpty() { + return + } + destDB := h.destDB + if destDB == nil { + return + } + ctx, h.cancelReplicate = context.WithTimeout(ctx, 5*time.Second) + defer func() { + if h.cancelReplicate != nil { + h.cancelReplicate() + } + h.cancelReplicate = nil + }() + h.mu.Unlock() + datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB) + cs := datas.ChunkStoreFromDatabase(datasDB) + cs.Commit(ctx, head, head) + h.mu.Lock() +} + // Called by the replicate thread to push the nextHead to the destDB and set // its root to the new value. // diff --git a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml index 1bd5b6ec00..04c1959a4d 100644 --- a/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml +++ b/integration-tests/go-sql-server-driver/tests/sql-server-cluster.yaml @@ -925,6 +925,74 @@ tests: result: columns: ["count(*)"] rows: [["15"]] +- name: last_updated heartbeats + multi_repos: + - name: server1 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3309 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 + server: + args: ["--config", "server.yaml"] + port: 3309 + - name: server2 + with_files: + - name: server.yaml + contents: | + log_level: trace + listener: + host: 0.0.0.0 + port: 3310 + cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 + server: + args: ["--config", "server.yaml"] + port: 3310 + connections: + - on: server1 + queries: + - exec: 'create database repo1' + - query: "call dolt_assume_cluster_role('standby', 2)" + result: + columns: ["status"] + rows: [["0"]] + - on: server1 + queries: + - query: "call dolt_assume_cluster_role('primary', 3)" + result: + columns: ["status"] + rows: [["0"]] + - on: server2 + queries: + - query: "SELECT TIMESTAMPDIFF(SECOND, CONVERT_TZ(last_update, 'GMT', @@GLOBAL.time_zone), NOW()) < 5 AS within_threshold FROM dolt_cluster.dolt_cluster_status;" + result: + columns: ["within_threshold"] + rows: [["1"]] + - query: "SELECT SLEEP(5)" + result: + columns: ["SLEEP(5)"] + rows: [["0"]] + - query: "SELECT TIMESTAMPDIFF(SECOND, CONVERT_TZ(last_update, 'GMT', @@GLOBAL.time_zone), NOW()) < 5 AS within_threshold FROM dolt_cluster.dolt_cluster_status;" + result: + columns: ["within_threshold"] + rows: [["1"]] - name: create new database, clone a database, primary replicates to standby, standby has both databases multi_repos: - name: server1