Merge pull request #3719 from actiontech/fix/slowlog-2211

fix: after the slow log queue is full, discard the subsequent records that need to be added(cherry-pick)
This commit is contained in:
LUA
2023-06-02 10:23:07 +08:00
committed by guoaomen
9 changed files with 64 additions and 30 deletions

View File

@@ -41,4 +41,13 @@ public final class GeneralProvider {
public static void showTableByNodeUnitHandlerFinished() {
}
public static void beforeSlowLogClose() {
}
public static void afterSlowLogClose() {
}
public static void runFlushLogTask() {
}
}

View File

@@ -173,7 +173,7 @@ public final class SystemConfig {
private int flushSlowLogPeriod = 1; //second
private int flushSlowLogSize = 1000; //row
private int sqlSlowTime = 100; //ms
private int slowQueueOverflowPolicy = 2;
private int slowQueueOverflowPolicy = 1;
//general log
private int enableGeneralLog = 0;

View File

@@ -5,6 +5,8 @@
package com.actiontech.dble.log;
import com.actiontech.dble.btrace.provider.GeneralProvider;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -137,6 +139,7 @@ public class DailyRotateLogStore {
force(false);
close();
GeneralProvider.afterSlowLogClose();
File file;
file = new File(fileName);
file.renameTo(target);

View File

@@ -8,6 +8,7 @@ package com.actiontech.dble.log.slow;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.btrace.provider.GeneralProvider;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.DailyRotateLogStore;
import com.actiontech.dble.server.status.SlowQueryLog;
@@ -26,6 +27,7 @@ public class SlowQueryLogProcessor extends Thread {
private BlockingQueue<SlowQueryLogEntry> queue;
private DailyRotateLogStore store;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private long logSize = 0;
private long lastLogSize = 0;
private static final String FILE_HEADER = "/FAKE_PATH/mysqld, Version: FAKE_VERSION. started with:\n" +
@@ -41,37 +43,44 @@ public class SlowQueryLogProcessor extends Thread {
@Override
public void run() {
SlowQueryLogEntry log;
scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS);
initFlushLogTask();
try {
store.open();
while (SlowQueryLog.getInstance().isEnableSlowLog()) {
try {
log = queue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
continue;
}
if (log == null) {
continue;
}
if (writeLog(log)) {
logSize++;
}
synchronized (this) {
if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) {
flushLog();
if (log == null) {
continue;
}
if (writeLog(log)) {
logSize++;
}
synchronized (this) {
if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) {
flushLog();
}
}
} catch (Throwable e) {
LOGGER.warn("slow log error:", e);
}
}
// disable slow_query_log, end task
while ((log = queue.poll()) != null) {
if (writeLog(log)) {
logSize++;
// disable slow_query_log, need to place all the remaining elements in the queue
while (true) {
try {
if ((log = queue.poll()) == null) {
break;
}
if (writeLog(log)) {
logSize++;
}
} catch (Throwable e) {
LOGGER.warn("slow log error:", e);
}
}
} catch (IOException e) {
LOGGER.info("transaction log error:", e);
LOGGER.warn("transaction log error:", e);
store.close();
} finally {
scheduler.shutdown();
@@ -80,6 +89,15 @@ public class SlowQueryLogProcessor extends Thread {
}
}
public void initFlushLogTask() {
scheduledFuture = scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS);
}
public void cancelFlushLogTask() {
scheduledFuture.cancel(false);
}
private synchronized boolean writeLog(SlowQueryLogEntry log) throws IOException {
if (log == null) {
return false;
@@ -104,8 +122,8 @@ public class SlowQueryLogProcessor extends Thread {
try {
store.force(false);
} catch (IOException e) {
LOGGER.info("transaction log error:", e);
store.close();
LOGGER.warn("flush slow log error:", e);
GeneralProvider.beforeSlowLogClose();
}
}
@@ -114,6 +132,7 @@ public class SlowQueryLogProcessor extends Thread {
@Override
public void run() {
synchronized (this) {
GeneralProvider.runFlushLogTask();
flushLog();
}
}

View File

@@ -7,7 +7,6 @@ package com.actiontech.dble.server.status;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.slow.SlowQueryLogProcessor;
import com.actiontech.dble.server.trace.TraceResult;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import org.slf4j.Logger;
@@ -81,6 +80,10 @@ public final class SlowQueryLog {
public void setFlushPeriod(int flushPeriod) {
this.flushPeriod = flushPeriod;
if (this.enableSlowLog && processor != null) {
processor.cancelFlushLogTask();
processor.initFlushLogTask();
}
}
public int getFlushSize() {

View File

@@ -111,7 +111,7 @@ public class TraceResult implements Cloneable {
public void setResponseTime(final ShardingService shardingService, boolean isSuccess) {
if (this.requestEnd == null) {
this.requestEnd = TraceRecord.currenTime();
if (this.isCompletedV1() &&
if (SlowQueryLog.getInstance().isEnableSlowLog() && this.isCompletedV1() &&
isSuccess && getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) {
SlowQueryLog.getInstance().putSlowQueryLog(shardingService, this.clone());
}

View File

@@ -6,10 +6,10 @@
package com.actiontech.dble.services.manager.response;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,7 +22,7 @@ public final class ReloadSlowQueryFlushPeriod {
private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueryFlushPeriod.class);
public static void execute(ManagerService service, int time) {
if (time < 0) {
if (time <= 0) {
service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "the commend is not correct");
return;
}

View File

@@ -187,7 +187,7 @@ public final class SystemParams {
params.add(new ParamInfo("sqlSlowTime", SlowQueryLog.getInstance().getSlowTime() + "ms", "The threshold of Slow Query, the default is 100ms"));
params.add(new ParamInfo("flushSlowLogPeriod", SlowQueryLog.getInstance().getFlushPeriod() + "s", "The period for flushing log to disk, the default is 1 second"));
params.add(new ParamInfo("flushSlowLogSize", SlowQueryLog.getInstance().getFlushSize() + "", "The max size for flushing log to disk, the default is 1000"));
params.add(new ParamInfo("slowQueueOverflowPolicy", SlowQueryLog.getInstance().getQueueOverflowPolicy() + "", "Slow log queue overflow policy, the default is 2"));
params.add(new ParamInfo("slowQueueOverflowPolicy", SlowQueryLog.getInstance().getQueueOverflowPolicy() + "", "Slow log queue overflow policy, the default is 1"));
params.add(new ParamInfo("enableAlert", AlertUtil.isEnable() ? "1" : "0", "Enable or disable alert"));
params.add(new ParamInfo("capClientFoundRows", CapClientFoundRows.getInstance().isEnableCapClientFoundRows() + "", "Whether to turn on EOF_Packet to return found rows, the default value is false"));
params.add(new ParamInfo("maxRowSizeToFile", LoadDataBatch.getInstance().getSize() + "", "The maximum row size,if over this value,row data will be saved to file when load data.The default value is 100000"));

View File

@@ -173,7 +173,7 @@
# the threshold for judging if the query is slow , unit is millisecond
-DsqlSlowTime=100
# slow log queue overflow policy
-DslowQueueOverflowPolicy=2
-DslowQueueOverflowPolicy=1
# used for load data,maxCharsPerColumn means max chars length for per column when load data
-DmaxCharsPerColumn=65535