From 37abe22af8a38d80919ad50ed5054b0540aa1045 Mon Sep 17 00:00:00 2001 From: guoaomen Date: Tue, 23 May 2023 14:15:09 +0800 Subject: [PATCH] fix: after the slow log queue is full, discard the subsequent records that need to be added after abnormal disk flush, fileChannel will not close --- .../dble/btrace/provider/GeneralProvider.java | 9 +++ .../dble/config/model/SystemConfig.java | 2 +- .../dble/log/DailyRotateLogStore.java | 3 + .../dble/log/slow/SlowQueryLogProcessor.java | 63 ++++++++++++------- .../dble/server/status/SlowQueryLog.java | 5 +- .../dble/server/trace/TraceResult.java | 2 +- .../response/ReloadSlowQueryFlushPeriod.java | 6 +- 7 files changed, 62 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java b/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java index bc8b5d45e..5469cf196 100644 --- a/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java +++ b/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java @@ -41,4 +41,13 @@ public final class GeneralProvider { public static void showTableByNodeUnitHandlerFinished() { } + + public static void beforeSlowLogClose() { + } + + public static void afterSlowLogClose() { + } + + public static void runFlushLogTask() { + } } diff --git a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java index 4e60ca4f0..a41380c6c 100644 --- a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java @@ -173,7 +173,7 @@ public final class SystemConfig { private int flushSlowLogPeriod = 1; //second private int flushSlowLogSize = 1000; //row private int sqlSlowTime = 100; //ms - private int slowQueueOverflowPolicy = 2; + private int slowQueueOverflowPolicy = 1; //general log private int enableGeneralLog = 0; diff --git a/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java b/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java index 4cd2de769..b50fd5ca8 100644 --- a/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java +++ b/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java @@ -5,6 +5,8 @@ package com.actiontech.dble.log; +import com.actiontech.dble.btrace.provider.GeneralProvider; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -137,6 +139,7 @@ public class DailyRotateLogStore { force(false); close(); + GeneralProvider.afterSlowLogClose(); File file; file = new File(fileName); file.renameTo(target); diff --git a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java index 2ee2b3993..8923260b5 100644 --- a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java +++ b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java @@ -8,6 +8,7 @@ package com.actiontech.dble.log.slow; import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; +import com.actiontech.dble.btrace.provider.GeneralProvider; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.DailyRotateLogStore; import com.actiontech.dble.server.status.SlowQueryLog; @@ -26,6 +27,7 @@ public class SlowQueryLogProcessor extends Thread { private BlockingQueue queue; private DailyRotateLogStore store; private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; private long logSize = 0; private long lastLogSize = 0; private static final String FILE_HEADER = "/FAKE_PATH/mysqld, Version: FAKE_VERSION. started with:\n" + @@ -41,37 +43,44 @@ public class SlowQueryLogProcessor extends Thread { @Override public void run() { SlowQueryLogEntry log; - scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS); + initFlushLogTask(); try { store.open(); while (SlowQueryLog.getInstance().isEnableSlowLog()) { try { log = queue.poll(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - continue; - } - if (log == null) { - continue; - } - - if (writeLog(log)) { - logSize++; - } - - synchronized (this) { - if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) { - flushLog(); + if (log == null) { + continue; } + + if (writeLog(log)) { + logSize++; + } + + synchronized (this) { + if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) { + flushLog(); + } + } + } catch (Throwable e) { + LOGGER.warn("slow log error:", e); } } - // disable slow_query_log, end task - while ((log = queue.poll()) != null) { - if (writeLog(log)) { - logSize++; + // disable slow_query_log, need to place all the remaining elements in the queue + while (true) { + try { + if ((log = queue.poll()) == null) { + break; + } + if (writeLog(log)) { + logSize++; + } + } catch (Throwable e) { + LOGGER.warn("slow log error:", e); } } } catch (IOException e) { - LOGGER.info("transaction log error:", e); + LOGGER.warn("transaction log error:", e); store.close(); } finally { scheduler.shutdown(); @@ -80,6 +89,15 @@ public class SlowQueryLogProcessor extends Thread { } } + public void initFlushLogTask() { + scheduledFuture = scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS); + } + + public void cancelFlushLogTask() { + scheduledFuture.cancel(false); + } + + private synchronized boolean writeLog(SlowQueryLogEntry log) throws IOException { if (log == null) { return false; @@ -104,8 +122,8 @@ public class SlowQueryLogProcessor extends Thread { try { store.force(false); } catch (IOException e) { - LOGGER.info("transaction log error:", e); - store.close(); + LOGGER.warn("flush slow log error:", e); + GeneralProvider.beforeSlowLogClose(); } } @@ -114,6 +132,7 @@ public class SlowQueryLogProcessor extends Thread { @Override public void run() { synchronized (this) { + GeneralProvider.runFlushLogTask(); flushLog(); } } diff --git a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java index 3b7540c27..c3e38370e 100644 --- a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java +++ b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java @@ -7,7 +7,6 @@ package com.actiontech.dble.server.status; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.slow.SlowQueryLogProcessor; - import com.actiontech.dble.server.trace.TraceResult; import com.actiontech.dble.services.mysqlsharding.ShardingService; import org.slf4j.Logger; @@ -81,6 +80,10 @@ public final class SlowQueryLog { public void setFlushPeriod(int flushPeriod) { this.flushPeriod = flushPeriod; + if (this.enableSlowLog && processor != null) { + processor.cancelFlushLogTask(); + processor.initFlushLogTask(); + } } public int getFlushSize() { diff --git a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java index ad42f8542..b88e7f492 100644 --- a/src/main/java/com/actiontech/dble/server/trace/TraceResult.java +++ b/src/main/java/com/actiontech/dble/server/trace/TraceResult.java @@ -111,7 +111,7 @@ public class TraceResult implements Cloneable { public void setResponseTime(final ShardingService shardingService, boolean isSuccess) { if (this.requestEnd == null) { this.requestEnd = TraceRecord.currenTime(); - if (this.isCompletedV1() && + if (SlowQueryLog.getInstance().isEnableSlowLog() && this.isCompletedV1() && isSuccess && getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) { SlowQueryLog.getInstance().putSlowQueryLog(shardingService, this.clone()); } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java index c98a09005..3eb031e26 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java @@ -6,10 +6,10 @@ package com.actiontech.dble.services.manager.response; import com.actiontech.dble.config.ErrorCode; -import com.actiontech.dble.services.manager.ManagerService; -import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.server.status.SlowQueryLog; +import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +22,7 @@ public final class ReloadSlowQueryFlushPeriod { private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueryFlushPeriod.class); public static void execute(ManagerService service, int time) { - if (time < 0) { + if (time <= 0) { service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "the commend is not correct"); return; }