Merge pull request #5949 from dolthub/aaron/cluster-replication-heartbeats

go: sqle: cluster: commithook: Periodically heartbeat to a standby when we are primary. This allows replication_lag on the standby to more accurately reflect the possible drift locally.
This commit is contained in:
Aaron Son
2023-05-16 16:51:08 -07:00
committed by GitHub
2 changed files with 106 additions and 0 deletions

View File

@@ -108,6 +108,7 @@ func (h *commithook) replicate(ctx context.Context) {
defer h.logger().Tracef("cluster/commithook: background thread: replicate: shutdown.")
h.mu.Lock()
defer h.mu.Unlock()
shouldHeartbeat := false
for {
lgr := h.logger()
// Shutdown for context canceled.
@@ -138,11 +139,17 @@ func (h *commithook) replicate(ctx context.Context) {
h.nextHeadIncomingTime = time.Now()
} else if h.shouldReplicate() {
h.attemptReplicate(ctx)
shouldHeartbeat = false
} else {
lgr.Tracef("cluster/commithook: background thread: waiting for signal.")
if h.waitNotify != nil {
h.waitNotify()
}
if shouldHeartbeat {
h.attemptHeartbeat(ctx)
} else {
shouldHeartbeat = true
}
h.cond.Wait()
lgr.Tracef("cluster/commithook: background thread: woken up.")
}
@@ -175,6 +182,37 @@ func (h *commithook) primaryNeedsInit() bool {
return h.role == RolePrimary && h.nextHead == (hash.Hash{})
}
// Called by the replicate thread to periodically heartbeat liveness to a
// standby if we are a primary. These heartbeats are best effort and currently
// do not affect the data plane much.
//
// preconditions: h.mu is locked and shouldReplicate() returned false.
func (h *commithook) attemptHeartbeat(ctx context.Context) {
if h.role != RolePrimary {
return
}
head := h.lastPushedHead
if head.IsEmpty() {
return
}
destDB := h.destDB
if destDB == nil {
return
}
ctx, h.cancelReplicate = context.WithTimeout(ctx, 5*time.Second)
defer func() {
if h.cancelReplicate != nil {
h.cancelReplicate()
}
h.cancelReplicate = nil
}()
h.mu.Unlock()
datasDB := doltdb.HackDatasDatabaseFromDoltDB(destDB)
cs := datas.ChunkStoreFromDatabase(datasDB)
cs.Commit(ctx, head, head)
h.mu.Lock()
}
// Called by the replicate thread to push the nextHead to the destDB and set
// its root to the new value.
//

View File

@@ -925,6 +925,74 @@ tests:
result:
columns: ["count(*)"]
rows: [["15"]]
- name: last_updated heartbeats
multi_repos:
- name: server1
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3309
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3852/{database}
bootstrap_role: primary
bootstrap_epoch: 1
remotesapi:
port: 3851
server:
args: ["--config", "server.yaml"]
port: 3309
- name: server2
with_files:
- name: server.yaml
contents: |
log_level: trace
listener:
host: 0.0.0.0
port: 3310
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3851/{database}
bootstrap_role: standby
bootstrap_epoch: 1
remotesapi:
port: 3852
server:
args: ["--config", "server.yaml"]
port: 3310
connections:
- on: server1
queries:
- exec: 'create database repo1'
- query: "call dolt_assume_cluster_role('standby', 2)"
result:
columns: ["status"]
rows: [["0"]]
- on: server1
queries:
- query: "call dolt_assume_cluster_role('primary', 3)"
result:
columns: ["status"]
rows: [["0"]]
- on: server2
queries:
- query: "SELECT TIMESTAMPDIFF(SECOND, CONVERT_TZ(last_update, 'GMT', @@GLOBAL.time_zone), NOW()) < 5 AS within_threshold FROM dolt_cluster.dolt_cluster_status;"
result:
columns: ["within_threshold"]
rows: [["1"]]
- query: "SELECT SLEEP(5)"
result:
columns: ["SLEEP(5)"]
rows: [["0"]]
- query: "SELECT TIMESTAMPDIFF(SECOND, CONVERT_TZ(last_update, 'GMT', @@GLOBAL.time_zone), NOW()) < 5 AS within_threshold FROM dolt_cluster.dolt_cluster_status;"
result:
columns: ["within_threshold"]
rows: [["1"]]
- name: create new database, clone a database, primary replicates to standby, standby has both databases
multi_repos:
- name: server1