diff --git a/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java b/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java index a77fc4a55..f7664e006 100644 --- a/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java +++ b/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java @@ -41,4 +41,13 @@ public final class GeneralProvider { public static void showTableByNodeUnitHandlerFinished() { } + + public static void beforeSlowLogClose() { + } + + public static void afterSlowLogClose() { + } + + public static void runFlushLogTask() { + } } diff --git a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java index 524096e94..b05ed7440 100644 --- a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java @@ -175,7 +175,7 @@ public final class SystemConfig { private int flushSlowLogPeriod = 1; //second private int flushSlowLogSize = 1000; //row private int sqlSlowTime = 100; //ms - private int slowQueueOverflowPolicy = 2; + private int slowQueueOverflowPolicy = 1; //general log private int enableGeneralLog = 0; diff --git a/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java b/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java index ad9eae156..fd472633c 100644 --- a/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java +++ b/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java @@ -5,6 +5,8 @@ package com.actiontech.dble.log; +import com.actiontech.dble.btrace.provider.GeneralProvider; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -137,6 +139,7 @@ public class DailyRotateLogStore { force(false); close(); + GeneralProvider.afterSlowLogClose(); File file; file = new File(fileName); file.renameTo(target); diff --git a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java index d0f13c88a..28ceb066c 100644 --- a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java +++ b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java @@ -8,6 +8,7 @@ package com.actiontech.dble.log.slow; import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; +import com.actiontech.dble.btrace.provider.GeneralProvider; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.DailyRotateLogStore; import com.actiontech.dble.server.status.SlowQueryLog; @@ -26,6 +27,7 @@ public class SlowQueryLogProcessor extends Thread { private BlockingQueue queue; private DailyRotateLogStore store; private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; private long logSize = 0; private long lastLogSize = 0; private static final String FILE_HEADER = "/FAKE_PATH/mysqld, Version: FAKE_VERSION. started with:\n" + @@ -41,37 +43,44 @@ public class SlowQueryLogProcessor extends Thread { @Override public void run() { SlowQueryLogEntry log; - scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS); + initFlushLogTask(); try { store.open(); while (SlowQueryLog.getInstance().isEnableSlowLog()) { try { log = queue.poll(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - continue; - } - if (log == null) { - continue; - } - - if (writeLog(log)) { - logSize++; - } - - synchronized (this) { - if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) { - flushLog(); + if (log == null) { + continue; } + + if (writeLog(log)) { + logSize++; + } + + synchronized (this) { + if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) { + flushLog(); + } + } + } catch (Throwable e) { + LOGGER.warn("slow log error:", e); } } - // disable slow_query_log, end task - while ((log = queue.poll()) != null) { - if (writeLog(log)) { - logSize++; + // disable slow_query_log, need to place all the remaining elements in the queue + while (true) { + try { + if ((log = queue.poll()) == null) { + break; + } + if (writeLog(log)) { + logSize++; + } + } catch (Throwable e) { + LOGGER.warn("slow log error:", e); } } } catch (IOException e) { - LOGGER.info("transaction log error:", e); + LOGGER.warn("transaction log error:", e); store.close(); } finally { scheduler.shutdown(); @@ -80,6 +89,15 @@ public class SlowQueryLogProcessor extends Thread { } } + public void initFlushLogTask() { + scheduledFuture = scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS); + } + + public void cancelFlushLogTask() { + scheduledFuture.cancel(false); + } + + private synchronized boolean writeLog(SlowQueryLogEntry log) throws IOException { if (log == null) { return false; @@ -104,8 +122,8 @@ public class SlowQueryLogProcessor extends Thread { try { store.force(false); } catch (IOException e) { - LOGGER.info("transaction log error:", e); - store.close(); + LOGGER.warn("flush slow log error:", e); + GeneralProvider.beforeSlowLogClose(); } } @@ -114,6 +132,7 @@ public class SlowQueryLogProcessor extends Thread { @Override public void run() { synchronized (this) { + GeneralProvider.runFlushLogTask(); flushLog(); } } diff --git a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java index 9d50859cf..c3089b091 100644 --- a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java @@ -28,6 +28,7 @@ import com.actiontech.dble.services.rwsplit.RWSplitMultiHandler; import com.actiontech.dble.services.rwsplit.RWSplitService; import com.actiontech.dble.services.rwsplit.handle.PSHandler; import com.actiontech.dble.services.rwsplit.handle.PreparedStatementHolder; +import com.actiontech.dble.statistic.sql.StatisticListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -199,6 +200,7 @@ public class RWSplitNonBlockingSession extends Session { if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) { long statementId = ByteUtil.readUB4(originPacket, 5); PreparedStatementHolder holder = rwSplitService.getPrepareStatement(statementId); + StatisticListener.getInstance().record(rwSplitService, r -> r.onFrontendSetSql(getService().getSchema(), holder.getPrepareSql())); if (holder.isMustMaster() && conn.getInstance().isReadInstance()) { holder.setExecuteOrigin(originPacket); PSHandler psHandler = new PSHandler(rwSplitService, holder); diff --git a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java index 3093a052c..51796b177 100644 --- a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java +++ b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java @@ -7,7 +7,6 @@ package com.actiontech.dble.server.status; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.slow.SlowQueryLogProcessor; - import com.actiontech.dble.server.trace.TraceResult; import com.actiontech.dble.services.mysqlsharding.ShardingService; import org.slf4j.Logger; @@ -81,6 +80,10 @@ public final class SlowQueryLog { public void setFlushPeriod(int flushPeriod) { this.flushPeriod = flushPeriod; + if (this.enableSlowLog && processor != null) { + processor.cancelFlushLogTask(); + processor.initFlushLogTask(); + } } public int getFlushSize() { diff --git a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java index 290474e72..1b5482e94 100644 --- a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java +++ b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java @@ -111,7 +111,7 @@ public class TraceResult implements Cloneable { public void setResponseTime(final ShardingService shardingService, boolean isSuccess) { if (this.requestEnd == null) { this.requestEnd = TraceRecord.currenTime(); - if (this.isCompletedV1() && + if (SlowQueryLog.getInstance().isEnableSlowLog() && this.isCompletedV1() && isSuccess && getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) { SlowQueryLog.getInstance().putSlowQueryLog(shardingService, this.clone()); } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java index 5597e50dd..46ec4390a 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java @@ -6,10 +6,10 @@ package com.actiontech.dble.services.manager.response; import com.actiontech.dble.config.ErrorCode; -import com.actiontech.dble.services.manager.ManagerService; -import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.server.status.SlowQueryLog; +import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +22,7 @@ public final class ReloadSlowQueryFlushPeriod { private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueryFlushPeriod.class); public static void execute(ManagerService service, int time) { - if (time < 0) { + if (time <= 0) { service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "the commend is not correct"); return; } diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java index 28e44be97..c5508b04b 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitHandler.java @@ -224,7 +224,7 @@ public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler, loadDataClean(); initDbClean(); rwSplitService.getSession2().unbind(); - reason = "Connection {dbInstance[" + rwSplitService.getConnection().getHost() + ":" + rwSplitService.getConnection().getPort() + "],DbGroup[" + + reason = "Connection {dbInstance[" + service.getConnection().getHost() + ":" + service.getConnection().getPort() + "],DbGroup[" + rwSplitService.getUserConfig().getDbGroup() + "],threadID[" + ((MySQLResponseService) service).getConnection().getThreadId() + "]} was closed ,reason is [" + reason + "]"; writeErrorMsg(rwSplitService.nextPacketId(), reason); diff --git a/src/main/java/com/actiontech/dble/singleton/SystemParams.java b/src/main/java/com/actiontech/dble/singleton/SystemParams.java index d5d652ea0..f4b8de131 100644 --- a/src/main/java/com/actiontech/dble/singleton/SystemParams.java +++ b/src/main/java/com/actiontech/dble/singleton/SystemParams.java @@ -192,7 +192,7 @@ public final class SystemParams { params.add(new ParamInfo("sqlSlowTime", SlowQueryLog.getInstance().getSlowTime() + "ms", "The threshold of Slow Query, the default is 100ms")); params.add(new ParamInfo("flushSlowLogPeriod", SlowQueryLog.getInstance().getFlushPeriod() + "s", "The period for flushing log to disk, the default is 1 second")); params.add(new ParamInfo("flushSlowLogSize", SlowQueryLog.getInstance().getFlushSize() + "", "The max size for flushing log to disk, the default is 1000")); - params.add(new ParamInfo("slowQueueOverflowPolicy", SlowQueryLog.getInstance().getQueueOverflowPolicy() + "", "Slow log queue overflow policy, the default is 2")); + params.add(new ParamInfo("slowQueueOverflowPolicy", SlowQueryLog.getInstance().getQueueOverflowPolicy() + "", "Slow log queue overflow policy, the default is 1")); params.add(new ParamInfo("enableAlert", AlertUtil.isEnable() ? "1" : "0", "Enable or disable alert")); params.add(new ParamInfo("capClientFoundRows", CapClientFoundRows.getInstance().isEnableCapClientFoundRows() + "", "Whether to turn on EOF_Packet to return found rows, the default value is false")); params.add(new ParamInfo("maxRowSizeToFile", LoadDataBatch.getInstance().getSize() + "", "The maximum row size,if over this value,row data will be saved to file when load data.The default value is 100000")); diff --git a/src/main/resources/bootstrap_template.cnf b/src/main/resources/bootstrap_template.cnf index 5c08e5f09..78dbf6a69 100644 --- a/src/main/resources/bootstrap_template.cnf +++ b/src/main/resources/bootstrap_template.cnf @@ -173,7 +173,7 @@ # the threshold for judging if the query is slow , unit is millisecond -DsqlSlowTime=100 # slow log queue overflow policy --DslowQueueOverflowPolicy=2 +-DslowQueueOverflowPolicy=1 # used for load data,maxCharsPerColumn means max chars length for per column when load data -DmaxCharsPerColumn=65535