mirror of
https://github.com/actiontech/dble.git
synced 2026-05-06 06:20:42 -05:00
fix[ATK-4637]: add Jackson dependencies and enhance MySQL delay detection logic
This commit is contained in:
@@ -26,6 +26,7 @@
|
||||
<grpc.version>1.75.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
|
||||
<netty.version>4.1.124.Final</netty.version>
|
||||
<log4j2.version>2.18.0</log4j2.version>
|
||||
<jackson.version>2.15.0</jackson.version>
|
||||
</properties>
|
||||
<repositories>
|
||||
<repository>
|
||||
@@ -130,6 +131,27 @@
|
||||
<groupId>de.ruedigermoeller</groupId>
|
||||
<artifactId>fst</artifactId>
|
||||
<version>2.57</version>
|
||||
<exclusions>
|
||||
<exclusion>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-annotations</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-core</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
|
||||
@@ -56,9 +56,16 @@ public class MySQLDelayDetector extends MySQLDetector {
|
||||
heartbeat.setSlaveBehindMaster((int) delayVal);
|
||||
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
|
||||
} else {
|
||||
// master and slave maybe switch
|
||||
heartbeat.setSlaveBehindMaster(null);
|
||||
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
|
||||
if (heartbeat.getStatus() != MySQLHeartbeat.OK_STATUS) {
|
||||
long updatedLogic = dbGroup.getLogicTimestamp().updateAndGet(current -> Math.max(current, delay));
|
||||
LOGGER.warn("delay detection rebased logic_timestamp to {} for dbGroup {}", updatedLogic, dbGroup.getGroupName());
|
||||
heartbeat.setSlaveBehindMaster(0);
|
||||
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_NORMAL);
|
||||
} else {
|
||||
// master and slave maybe switch
|
||||
heartbeat.setSlaveBehindMaster(null);
|
||||
heartbeat.setDbSynStatus(MySQLHeartbeat.DB_SYN_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -84,7 +84,8 @@ public class MySQLHeartbeat {
|
||||
this.heartbeatTimeout = dbInstance.getDbGroupConfig().getHeartbeatTimeout();
|
||||
this.isDelayDetection = dbInstance.getDbGroupConfig().isDelayDetection();
|
||||
if (isDelayDetection) {
|
||||
this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(), dbInstance.getDbGroupConfig().getDelayDatabase());
|
||||
this.heartbeatSQL = getDetectorSql(dbInstance.getDbGroupConfig().getName(),
|
||||
dbInstance.getDbGroupConfig().getDelayDatabase(), dbInstance.isReadInstance());
|
||||
} else {
|
||||
this.heartbeatSQL = source.getDbGroupConfig().getHeartbeatSQL();
|
||||
}
|
||||
@@ -181,12 +182,12 @@ public class MySQLHeartbeat {
|
||||
}
|
||||
}
|
||||
|
||||
private String getDetectorSql(String dbGroupName, String delayDatabase) {
|
||||
private String getDetectorSql(String dbGroupName, String delayDatabase, boolean readInstance) {
|
||||
String[] str = {"dble", dbGroupName, SystemConfig.getInstance().getInstanceName()};
|
||||
String sourceName = Joiner.on("_").join(str);
|
||||
String sqlTableName = delayDatabase + ".u_delay ";
|
||||
String detectorSql;
|
||||
if (!source.isReadInstance()) {
|
||||
if (!readInstance) {
|
||||
String update = "replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)";
|
||||
detectorSql = convert(update, Lists.newArrayList(sqlTableName, sourceName));
|
||||
} else {
|
||||
@@ -199,9 +200,14 @@ public class MySQLHeartbeat {
|
||||
private String convert(String template, List<String> list) {
|
||||
StringBuilder sb = new StringBuilder(template);
|
||||
String replace = "?";
|
||||
int fromIndex = 0;
|
||||
for (String str : list) {
|
||||
int index = sb.indexOf(replace);
|
||||
sb.replace(index, index + 1, str);
|
||||
int index = sb.indexOf(replace, fromIndex);
|
||||
if (index < 0) {
|
||||
throw new IllegalArgumentException("heartbeat sql template placeholder '?' not enough, template=" + template + ", values=" + list);
|
||||
}
|
||||
sb.replace(index, index + replace.length(), str);
|
||||
fromIndex = index + str.length();
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
@@ -387,11 +393,17 @@ public class MySQLHeartbeat {
|
||||
}
|
||||
|
||||
String getHeartbeatSQL() {
|
||||
if (isDelayDetection && !source.isReadInstance()) {
|
||||
return convert(heartbeatSQL, Lists.newArrayList(String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
|
||||
} else {
|
||||
return heartbeatSQL;
|
||||
if (isDelayDetection) {
|
||||
boolean readInstance = source.isReadInstance();
|
||||
String detectorSql = getDetectorSql(source.getDbGroupConfig().getName(),
|
||||
source.getDbGroupConfig().getDelayDatabase(), readInstance);
|
||||
if (!readInstance) {
|
||||
return convert(detectorSql, Lists.newArrayList(String.valueOf(LocalDateTime.now()),
|
||||
String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())));
|
||||
}
|
||||
return detectorSql;
|
||||
}
|
||||
return heartbeatSQL;
|
||||
}
|
||||
|
||||
public DbInstanceSyncRecorder getAsyncRecorder() {
|
||||
|
||||
Reference in New Issue
Block a user