mirror of
https://github.com/actiontech/dble.git
synced 2026-01-08 05:40:15 -06:00
Merge branch 'master' into 1860
This commit is contained in:
@@ -42,4 +42,6 @@ public final class AlarmCode {
|
||||
public static final String DB_INSTANCE_LOWER_CASE_ERROR = "DBLE_DB_INSTANCE_LOWER_CASE_ERROR"; //Resolve by trigger
|
||||
public static final String DB_SLAVE_INSTANCE_DELAY = "DBLE_DB_SLAVE_INSTANCE_DELAY"; //Resolve by trigger
|
||||
public static final String DB_MASTER_INSTANCE_DELAY_FAIL = "DB_MASTER_INSTANCE_DELAY_FAIL";
|
||||
public static final String SLOW_QUERY_QUEUE_POLICY_ABORT = "SLOW_QUERY_QUEUE_POLICY_ABORT";
|
||||
public static final String SLOW_QUERY_QUEUE_POLICY_WAIT = "SLOW_QUERY_QUEUE_POLICY_WAIT";
|
||||
}
|
||||
|
||||
@@ -173,6 +173,7 @@ public final class SystemConfig {
|
||||
private int flushSlowLogPeriod = 1; //second
|
||||
private int flushSlowLogSize = 1000; //row
|
||||
private int sqlSlowTime = 100; //ms
|
||||
private int slowQueueOverflowPolicy = 2;
|
||||
|
||||
//general log
|
||||
private int enableGeneralLog = 0;
|
||||
@@ -1867,6 +1868,19 @@ public final class SystemConfig {
|
||||
}
|
||||
|
||||
|
||||
public int getSlowQueueOverflowPolicy() {
|
||||
return slowQueueOverflowPolicy;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public void setSlowQueueOverflowPolicy(int slowQueueOverflowPolicy) {
|
||||
if (slowQueueOverflowPolicy == 1 || slowQueueOverflowPolicy == 2) {
|
||||
this.slowQueueOverflowPolicy = slowQueueOverflowPolicy;
|
||||
} else {
|
||||
problemReporter.warn(String.format(WARNING_FORMAT, "slowQueueOverflowPolicy", slowQueueOverflowPolicy, this.slowQueueOverflowPolicy));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SystemConfig [" +
|
||||
@@ -1940,6 +1954,7 @@ public final class SystemConfig {
|
||||
", flushSlowLogPeriod=" + flushSlowLogPeriod +
|
||||
", flushSlowLogSize=" + flushSlowLogSize +
|
||||
", sqlSlowTime=" + sqlSlowTime +
|
||||
", slowQueueOverflowPolicy=" + slowQueueOverflowPolicy +
|
||||
", enableAlert=" + enableAlert +
|
||||
", maxCharsPerColumn=" + maxCharsPerColumn +
|
||||
", maxRowSizeToFile=" + maxRowSizeToFile +
|
||||
|
||||
@@ -5,6 +5,9 @@
|
||||
|
||||
package com.actiontech.dble.log.slow;
|
||||
|
||||
import com.actiontech.dble.alarm.AlarmCode;
|
||||
import com.actiontech.dble.alarm.Alert;
|
||||
import com.actiontech.dble.alarm.AlertUtil;
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.log.DailyRotateLogStore;
|
||||
import com.actiontech.dble.server.status.SlowQueryLog;
|
||||
@@ -119,10 +122,28 @@ public class SlowQueryLogProcessor extends Thread {
|
||||
|
||||
public void putSlowQueryLog(ShardingService service, TraceResult log) {
|
||||
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
|
||||
|
||||
try {
|
||||
final boolean enQueue = queue.offer(logEntry, 3, TimeUnit.SECONDS);
|
||||
if (!enQueue) {
|
||||
LOGGER.warn("slow log queue has so many item. Discard log entry: {} ", logEntry.toString());
|
||||
boolean enQueue = queue.offer(logEntry);
|
||||
if (!enQueue && SlowQueryLog.getInstance().getQueueOverflowPolicy() == 1) {
|
||||
//abort
|
||||
String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}";
|
||||
LOGGER.warn(errorMsg);
|
||||
// alert
|
||||
AlertUtil.alertSelf(AlarmCode.SLOW_QUERY_QUEUE_POLICY_ABORT, Alert.AlertLevel.WARN, errorMsg, null);
|
||||
} else if (!enQueue && SlowQueryLog.getInstance().getQueueOverflowPolicy() == 2) {
|
||||
//wait 3s
|
||||
long start = System.nanoTime();
|
||||
boolean offerFlag = queue.offer(logEntry, 3, TimeUnit.SECONDS);
|
||||
String costTime = String.valueOf((System.nanoTime() - start) / 1000000);
|
||||
if (offerFlag) {
|
||||
String errorMsg = "since there are too many slow query logs to be written, the write channel is blocked, and the returned slow SQL execution time will include the write channel blocking time:" + costTime + "(ms).";
|
||||
LOGGER.warn(errorMsg);
|
||||
// alert
|
||||
AlertUtil.alertSelf(AlarmCode.SLOW_QUERY_QUEUE_POLICY_WAIT, Alert.AlertLevel.WARN, errorMsg, null);
|
||||
} else {
|
||||
LOGGER.warn("slow log queue has so many item and waiting time:3s exceeded. Discard log entry: {} ", logEntry.toString());
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.info(" ", e);
|
||||
|
||||
@@ -29,6 +29,7 @@ public final class ManagerParseReload {
|
||||
public static final int LOAD_DATA_NUM = 14;
|
||||
public static final int SAMPLING_RATE = 15;
|
||||
public static final int XAID_CHECK_PERIOD = 16;
|
||||
public static final int SLOW_QUERY_QUEUE_POLICY = 17;
|
||||
|
||||
public static int parse(String stmt, int offset) {
|
||||
int i = offset;
|
||||
@@ -320,6 +321,9 @@ public final class ManagerParseReload {
|
||||
case 'F':
|
||||
case 'f':
|
||||
return slowQueryFlushCheck(stmt, offset);
|
||||
case 'Q':
|
||||
case 'q':
|
||||
return slowQueryQueuePolicyCheck(stmt, offset);
|
||||
default:
|
||||
return OTHER;
|
||||
}
|
||||
@@ -425,6 +429,32 @@ public final class ManagerParseReload {
|
||||
return OTHER;
|
||||
}
|
||||
|
||||
private static int slowQueryQueuePolicyCheck(String stmt, int offset) {
|
||||
if (stmt.length() > offset + 4) {
|
||||
char c1 = stmt.charAt(++offset);
|
||||
char c2 = stmt.charAt(++offset);
|
||||
char c3 = stmt.charAt(++offset);
|
||||
char c4 = stmt.charAt(++offset);
|
||||
char c5 = stmt.charAt(++offset);
|
||||
char c6 = stmt.charAt(++offset);
|
||||
char c7 = stmt.charAt(++offset);
|
||||
char c8 = stmt.charAt(++offset);
|
||||
char c9 = stmt.charAt(++offset);
|
||||
char c10 = stmt.charAt(++offset);
|
||||
char c11 = stmt.charAt(++offset);
|
||||
|
||||
// reload @@slow_query.queue_policy
|
||||
if ((c1 == 'U' || c1 == 'u') && (c2 == 'E' || c2 == 'e') && (c3 == 'U' || c3 == 'u') &&
|
||||
(c4 == 'E' || c4 == 'e') && (c5 == '_') && (c6 == 'P' || c6 == 'p') && (c7 == 'O' || c7 == 'o') &&
|
||||
(c8 == 'L' || c8 == 'l') && (c9 == 'I' || c9 == 'i') && (c10 == 'C' || c10 == 'c') &&
|
||||
(c11 == 'Y' || c11 == 'y') && (stmt.length() > ++offset)) {
|
||||
return (offset << 8) | SLOW_QUERY_QUEUE_POLICY;
|
||||
}
|
||||
return OTHER;
|
||||
}
|
||||
return OTHER;
|
||||
}
|
||||
|
||||
private static int slowQueryFlushSizeCheck(String stmt, int offset) {
|
||||
if (stmt.length() > offset + 3) {
|
||||
char c1 = stmt.charAt(++offset);
|
||||
|
||||
@@ -22,6 +22,7 @@ public final class SlowQueryLog {
|
||||
private volatile int slowTime; //ms
|
||||
private volatile int flushPeriod;
|
||||
private volatile int flushSize;
|
||||
private volatile int queueOverflowPolicy;
|
||||
private static final SlowQueryLog INSTANCE = new SlowQueryLog();
|
||||
private volatile SlowQueryLogProcessor processor = new SlowQueryLogProcessor();
|
||||
|
||||
@@ -29,6 +30,7 @@ public final class SlowQueryLog {
|
||||
this.slowTime = SystemConfig.getInstance().getSqlSlowTime();
|
||||
this.flushPeriod = SystemConfig.getInstance().getFlushSlowLogPeriod();
|
||||
this.flushSize = SystemConfig.getInstance().getFlushSlowLogSize();
|
||||
this.queueOverflowPolicy = SystemConfig.getInstance().getSlowQueueOverflowPolicy();
|
||||
}
|
||||
|
||||
public static SlowQueryLog getInstance() {
|
||||
@@ -57,6 +59,14 @@ public final class SlowQueryLog {
|
||||
}
|
||||
}
|
||||
|
||||
public int getQueueOverflowPolicy() {
|
||||
return queueOverflowPolicy;
|
||||
}
|
||||
|
||||
public void setQueueOverflowPolicy(int queueOverflowPolicy) {
|
||||
this.queueOverflowPolicy = queueOverflowPolicy;
|
||||
}
|
||||
|
||||
public int getSlowTime() {
|
||||
return slowTime;
|
||||
}
|
||||
|
||||
@@ -44,6 +44,9 @@ public final class ReloadHandler {
|
||||
case ManagerParseReload.SLOW_QUERY_FLUSH_SIZE:
|
||||
ReloadSlowQueryFlushSize.execute(service, ParseUtil.getSQLId(stmt, rs >>> SHIFT));
|
||||
break;
|
||||
case ManagerParseReload.SLOW_QUERY_QUEUE_POLICY:
|
||||
ReloadSlowQueuePolicy.execute(service, ParseUtil.getSQLId(stmt, rs >>> SHIFT));
|
||||
break;
|
||||
case ManagerParseReload.QUERY_CF:
|
||||
String filter = ParseUtil.parseString(stmt);
|
||||
ReloadQueryCf.execute(service, filter);
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.services.manager.response;
|
||||
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.net.mysql.OkPacket;
|
||||
import com.actiontech.dble.server.status.SlowQueryLog;
|
||||
import com.actiontech.dble.services.manager.ManagerService;
|
||||
import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public final class ReloadSlowQueuePolicy {
|
||||
private ReloadSlowQueuePolicy() {
|
||||
}
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueuePolicy.class);
|
||||
|
||||
public static void execute(ManagerService service, int policy) {
|
||||
if (policy != 1 && policy != 2) {
|
||||
service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "slow queue overflow policy is only supported as 1 or 2");
|
||||
return;
|
||||
}
|
||||
try {
|
||||
WriteDynamicBootstrap.getInstance().changeValue("slowQueueOverflowPolicy", String.valueOf(policy));
|
||||
} catch (IOException e) {
|
||||
String msg = " reload @@slow_query.queue_policy failed";
|
||||
LOGGER.warn(service + " " + msg, e);
|
||||
service.writeErrMessage(ErrorCode.ER_YES, msg);
|
||||
return;
|
||||
}
|
||||
SlowQueryLog.getInstance().setQueueOverflowPolicy(policy);
|
||||
LOGGER.info(service + " reload @@slow_query.queue_policy=" + policy + " success by manager");
|
||||
|
||||
OkPacket ok = new OkPacket();
|
||||
ok.setPacketId(1);
|
||||
ok.setAffectedRows(1);
|
||||
ok.setServerStatus(2);
|
||||
ok.setMessage("reload @@slow_query.queue_policy success".getBytes());
|
||||
ok.write(service.getConnection());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -191,6 +191,7 @@ public final class ShowHelp {
|
||||
HELPS.put("reload @@slow_query.flushperiod", "Reset the flush period");
|
||||
HELPS.put("show @@slow_query.flushsize", "Show the min flush size for writing to disk");
|
||||
HELPS.put("reload @@slow_query.flushsize", "Reset the flush size");
|
||||
HELPS.put("reload @@slow_query.queue_policy", "Reset the queue policy");
|
||||
|
||||
//create database
|
||||
HELPS.put("create database @@shardingNode ='dn......'", "create database for shardingNode in config");
|
||||
|
||||
@@ -186,6 +186,7 @@ public final class SystemParams {
|
||||
params.add(new ParamInfo("sqlSlowTime", SlowQueryLog.getInstance().getSlowTime() + "ms", "The threshold of Slow Query, the default is 100ms"));
|
||||
params.add(new ParamInfo("flushSlowLogPeriod", SlowQueryLog.getInstance().getFlushPeriod() + "s", "The period for flushing log to disk, the default is 1 second"));
|
||||
params.add(new ParamInfo("flushSlowLogSize", SlowQueryLog.getInstance().getFlushSize() + "", "The max size for flushing log to disk, the default is 1000"));
|
||||
params.add(new ParamInfo("slowQueueOverflowPolicy", SlowQueryLog.getInstance().getQueueOverflowPolicy() + "", "Slow log queue overflow policy, the default is 2"));
|
||||
params.add(new ParamInfo("enableAlert", AlertUtil.isEnable() + "", "Enable or disable alert"));
|
||||
params.add(new ParamInfo("capClientFoundRows", CapClientFoundRows.getInstance().isEnableCapClientFoundRows() + "", "Whether to turn on EOF_Packet to return found rows, the default value is false"));
|
||||
params.add(new ParamInfo("maxRowSizeToFile", LoadDataBatch.getInstance().getSize() + "", "The maximum row size,if over this value,row data will be saved to file when load data.The default value is 100000"));
|
||||
|
||||
@@ -172,6 +172,8 @@
|
||||
-DflushSlowLogSize=1000
|
||||
# the threshold for judging if the query is slow , unit is millisecond
|
||||
-DsqlSlowTime=100
|
||||
# slow log queue overflow policy
|
||||
-DslowQueueOverflowPolicy=2
|
||||
|
||||
# used for load data,maxCharsPerColumn means max chars length for per column when load data
|
||||
-DmaxCharsPerColumn=65535
|
||||
|
||||
Reference in New Issue
Block a user