diff --git a/src/main/java/com/actiontech/dble/alarm/AlarmCode.java b/src/main/java/com/actiontech/dble/alarm/AlarmCode.java index 6db95b5f2..1e9548dca 100644 --- a/src/main/java/com/actiontech/dble/alarm/AlarmCode.java +++ b/src/main/java/com/actiontech/dble/alarm/AlarmCode.java @@ -42,4 +42,6 @@ public final class AlarmCode { public static final String DB_INSTANCE_LOWER_CASE_ERROR = "DBLE_DB_INSTANCE_LOWER_CASE_ERROR"; //Resolve by trigger public static final String DB_SLAVE_INSTANCE_DELAY = "DBLE_DB_SLAVE_INSTANCE_DELAY"; //Resolve by trigger public static final String DB_MASTER_INSTANCE_DELAY_FAIL = "DB_MASTER_INSTANCE_DELAY_FAIL"; + public static final String SLOW_QUERY_QUEUE_POLICY_ABORT = "SLOW_QUERY_QUEUE_POLICY_ABORT"; + public static final String SLOW_QUERY_QUEUE_POLICY_WAIT = "SLOW_QUERY_QUEUE_POLICY_WAIT"; } diff --git a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java index 23df5895f..511550a59 100644 --- a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java @@ -173,6 +173,7 @@ public final class SystemConfig { private int flushSlowLogPeriod = 1; //second private int flushSlowLogSize = 1000; //row private int sqlSlowTime = 100; //ms + private int slowQueueOverflowPolicy = 2; //general log private int enableGeneralLog = 0; @@ -1867,6 +1868,19 @@ public final class SystemConfig { } + public int getSlowQueueOverflowPolicy() { + return slowQueueOverflowPolicy; + } + + @SuppressWarnings("unused") + public void setSlowQueueOverflowPolicy(int slowQueueOverflowPolicy) { + if (slowQueueOverflowPolicy == 1 || slowQueueOverflowPolicy == 2) { + this.slowQueueOverflowPolicy = slowQueueOverflowPolicy; + } else { + problemReporter.warn(String.format(WARNING_FORMAT, "slowQueueOverflowPolicy", slowQueueOverflowPolicy, this.slowQueueOverflowPolicy)); + } + } + @Override public String toString() { return "SystemConfig [" + @@ -1940,6 +1954,7 @@ public final class SystemConfig { ", flushSlowLogPeriod=" + flushSlowLogPeriod + ", flushSlowLogSize=" + flushSlowLogSize + ", sqlSlowTime=" + sqlSlowTime + + ", slowQueueOverflowPolicy=" + slowQueueOverflowPolicy + ", enableAlert=" + enableAlert + ", maxCharsPerColumn=" + maxCharsPerColumn + ", maxRowSizeToFile=" + maxRowSizeToFile + diff --git a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java index c4fef42bd..2ee2b3993 100644 --- a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java +++ b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java @@ -5,6 +5,9 @@ package com.actiontech.dble.log.slow; +import com.actiontech.dble.alarm.AlarmCode; +import com.actiontech.dble.alarm.Alert; +import com.actiontech.dble.alarm.AlertUtil; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.DailyRotateLogStore; import com.actiontech.dble.server.status.SlowQueryLog; @@ -119,10 +122,28 @@ public class SlowQueryLogProcessor extends Thread { public void putSlowQueryLog(ShardingService service, TraceResult log) { SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId()); + try { - final boolean enQueue = queue.offer(logEntry, 3, TimeUnit.SECONDS); - if (!enQueue) { - LOGGER.warn("slow log queue has so many item. Discard log entry: {} ", logEntry.toString()); + boolean enQueue = queue.offer(logEntry); + if (!enQueue && SlowQueryLog.getInstance().getQueueOverflowPolicy() == 1) { + //abort + String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}"; + LOGGER.warn(errorMsg); + // alert + AlertUtil.alertSelf(AlarmCode.SLOW_QUERY_QUEUE_POLICY_ABORT, Alert.AlertLevel.WARN, errorMsg, null); + } else if (!enQueue && SlowQueryLog.getInstance().getQueueOverflowPolicy() == 2) { + //wait 3s + long start = System.nanoTime(); + boolean offerFlag = queue.offer(logEntry, 3, TimeUnit.SECONDS); + String costTime = String.valueOf((System.nanoTime() - start) / 1000000); + if (offerFlag) { + String errorMsg = "since there are too many slow query logs to be written, the write channel is blocked, and the returned slow SQL execution time will include the write channel blocking time:" + costTime + "(ms)."; + LOGGER.warn(errorMsg); + // alert + AlertUtil.alertSelf(AlarmCode.SLOW_QUERY_QUEUE_POLICY_WAIT, Alert.AlertLevel.WARN, errorMsg, null); + } else { + LOGGER.warn("slow log queue has so many item and waiting time:3s exceeded. Discard log entry: {} ", logEntry.toString()); + } } } catch (InterruptedException e) { LOGGER.info(" ", e); diff --git a/src/main/java/com/actiontech/dble/route/parser/ManagerParseReload.java b/src/main/java/com/actiontech/dble/route/parser/ManagerParseReload.java index 3064496a8..5b9d57b71 100644 --- a/src/main/java/com/actiontech/dble/route/parser/ManagerParseReload.java +++ b/src/main/java/com/actiontech/dble/route/parser/ManagerParseReload.java @@ -29,6 +29,7 @@ public final class ManagerParseReload { public static final int LOAD_DATA_NUM = 14; public static final int SAMPLING_RATE = 15; public static final int XAID_CHECK_PERIOD = 16; + public static final int SLOW_QUERY_QUEUE_POLICY = 17; public static int parse(String stmt, int offset) { int i = offset; @@ -320,6 +321,9 @@ public final class ManagerParseReload { case 'F': case 'f': return slowQueryFlushCheck(stmt, offset); + case 'Q': + case 'q': + return slowQueryQueuePolicyCheck(stmt, offset); default: return OTHER; } @@ -425,6 +429,32 @@ public final class ManagerParseReload { return OTHER; } + private static int slowQueryQueuePolicyCheck(String stmt, int offset) { + if (stmt.length() > offset + 4) { + char c1 = stmt.charAt(++offset); + char c2 = stmt.charAt(++offset); + char c3 = stmt.charAt(++offset); + char c4 = stmt.charAt(++offset); + char c5 = stmt.charAt(++offset); + char c6 = stmt.charAt(++offset); + char c7 = stmt.charAt(++offset); + char c8 = stmt.charAt(++offset); + char c9 = stmt.charAt(++offset); + char c10 = stmt.charAt(++offset); + char c11 = stmt.charAt(++offset); + + // reload @@slow_query.queue_policy + if ((c1 == 'U' || c1 == 'u') && (c2 == 'E' || c2 == 'e') && (c3 == 'U' || c3 == 'u') && + (c4 == 'E' || c4 == 'e') && (c5 == '_') && (c6 == 'P' || c6 == 'p') && (c7 == 'O' || c7 == 'o') && + (c8 == 'L' || c8 == 'l') && (c9 == 'I' || c9 == 'i') && (c10 == 'C' || c10 == 'c') && + (c11 == 'Y' || c11 == 'y') && (stmt.length() > ++offset)) { + return (offset << 8) | SLOW_QUERY_QUEUE_POLICY; + } + return OTHER; + } + return OTHER; + } + private static int slowQueryFlushSizeCheck(String stmt, int offset) { if (stmt.length() > offset + 3) { char c1 = stmt.charAt(++offset); diff --git a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java index 84a654ac4..3b7540c27 100644 --- a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java +++ b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java @@ -22,6 +22,7 @@ public final class SlowQueryLog { private volatile int slowTime; //ms private volatile int flushPeriod; private volatile int flushSize; + private volatile int queueOverflowPolicy; private static final SlowQueryLog INSTANCE = new SlowQueryLog(); private volatile SlowQueryLogProcessor processor = new SlowQueryLogProcessor(); @@ -29,6 +30,7 @@ public final class SlowQueryLog { this.slowTime = SystemConfig.getInstance().getSqlSlowTime(); this.flushPeriod = SystemConfig.getInstance().getFlushSlowLogPeriod(); this.flushSize = SystemConfig.getInstance().getFlushSlowLogSize(); + this.queueOverflowPolicy = SystemConfig.getInstance().getSlowQueueOverflowPolicy(); } public static SlowQueryLog getInstance() { @@ -57,6 +59,14 @@ public final class SlowQueryLog { } } + public int getQueueOverflowPolicy() { + return queueOverflowPolicy; + } + + public void setQueueOverflowPolicy(int queueOverflowPolicy) { + this.queueOverflowPolicy = queueOverflowPolicy; + } + public int getSlowTime() { return slowTime; } diff --git a/src/main/java/com/actiontech/dble/services/manager/handler/ReloadHandler.java b/src/main/java/com/actiontech/dble/services/manager/handler/ReloadHandler.java index 85cdf2915..dd39d8771 100644 --- a/src/main/java/com/actiontech/dble/services/manager/handler/ReloadHandler.java +++ b/src/main/java/com/actiontech/dble/services/manager/handler/ReloadHandler.java @@ -44,6 +44,9 @@ public final class ReloadHandler { case ManagerParseReload.SLOW_QUERY_FLUSH_SIZE: ReloadSlowQueryFlushSize.execute(service, ParseUtil.getSQLId(stmt, rs >>> SHIFT)); break; + case ManagerParseReload.SLOW_QUERY_QUEUE_POLICY: + ReloadSlowQueuePolicy.execute(service, ParseUtil.getSQLId(stmt, rs >>> SHIFT)); + break; case ManagerParseReload.QUERY_CF: String filter = ParseUtil.parseString(stmt); ReloadQueryCf.execute(service, filter); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueuePolicy.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueuePolicy.java new file mode 100644 index 000000000..1743f0fc4 --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueuePolicy.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.services.manager.response; + +import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.net.mysql.OkPacket; +import com.actiontech.dble.server.status.SlowQueryLog; +import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public final class ReloadSlowQueuePolicy { + private ReloadSlowQueuePolicy() { + } + + private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueuePolicy.class); + + public static void execute(ManagerService service, int policy) { + if (policy != 1 && policy != 2) { + service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "slow queue overflow policy is only supported as 1 or 2"); + return; + } + try { + WriteDynamicBootstrap.getInstance().changeValue("slowQueueOverflowPolicy", String.valueOf(policy)); + } catch (IOException e) { + String msg = " reload @@slow_query.queue_policy failed"; + LOGGER.warn(service + " " + msg, e); + service.writeErrMessage(ErrorCode.ER_YES, msg); + return; + } + SlowQueryLog.getInstance().setQueueOverflowPolicy(policy); + LOGGER.info(service + " reload @@slow_query.queue_policy=" + policy + " success by manager"); + + OkPacket ok = new OkPacket(); + ok.setPacketId(1); + ok.setAffectedRows(1); + ok.setServerStatus(2); + ok.setMessage("reload @@slow_query.queue_policy success".getBytes()); + ok.write(service.getConnection()); + } + +} diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java index 3b57d6505..4e59c0f17 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowHelp.java @@ -191,6 +191,7 @@ public final class ShowHelp { HELPS.put("reload @@slow_query.flushperiod", "Reset the flush period"); HELPS.put("show @@slow_query.flushsize", "Show the min flush size for writing to disk"); HELPS.put("reload @@slow_query.flushsize", "Reset the flush size"); + HELPS.put("reload @@slow_query.queue_policy", "Reset the queue policy"); //create database HELPS.put("create database @@shardingNode ='dn......'", "create database for shardingNode in config"); diff --git a/src/main/java/com/actiontech/dble/singleton/SystemParams.java b/src/main/java/com/actiontech/dble/singleton/SystemParams.java index 51e0dcefe..dd4459b7e 100644 --- a/src/main/java/com/actiontech/dble/singleton/SystemParams.java +++ b/src/main/java/com/actiontech/dble/singleton/SystemParams.java @@ -186,6 +186,7 @@ public final class SystemParams { params.add(new ParamInfo("sqlSlowTime", SlowQueryLog.getInstance().getSlowTime() + "ms", "The threshold of Slow Query, the default is 100ms")); params.add(new ParamInfo("flushSlowLogPeriod", SlowQueryLog.getInstance().getFlushPeriod() + "s", "The period for flushing log to disk, the default is 1 second")); params.add(new ParamInfo("flushSlowLogSize", SlowQueryLog.getInstance().getFlushSize() + "", "The max size for flushing log to disk, the default is 1000")); + params.add(new ParamInfo("slowQueueOverflowPolicy", SlowQueryLog.getInstance().getQueueOverflowPolicy() + "", "Slow log queue overflow policy, the default is 2")); params.add(new ParamInfo("enableAlert", AlertUtil.isEnable() + "", "Enable or disable alert")); params.add(new ParamInfo("capClientFoundRows", CapClientFoundRows.getInstance().isEnableCapClientFoundRows() + "", "Whether to turn on EOF_Packet to return found rows, the default value is false")); params.add(new ParamInfo("maxRowSizeToFile", LoadDataBatch.getInstance().getSize() + "", "The maximum row size,if over this value,row data will be saved to file when load data.The default value is 100000")); diff --git a/src/main/resources/bootstrap_template.cnf b/src/main/resources/bootstrap_template.cnf index 14054f741..e75da29ab 100644 --- a/src/main/resources/bootstrap_template.cnf +++ b/src/main/resources/bootstrap_template.cnf @@ -172,6 +172,8 @@ -DflushSlowLogSize=1000 # the threshold for judging if the query is slow , unit is millisecond -DsqlSlowTime=100 +# slow log queue overflow policy +-DslowQueueOverflowPolicy=2 # used for load data,maxCharsPerColumn means max chars length for per column when load data -DmaxCharsPerColumn=65535