From 2acea6b909473925e64f978d784243aa78e511bf Mon Sep 17 00:00:00 2001 From: guoaomen Date: Tue, 23 May 2023 14:15:09 +0800 Subject: [PATCH] fix: after the slow log queue is full, discard the subsequent records that need to be added after abnormal disk flush, fileChannel will not close --- .../dble/btrace/provider/GeneralProvider.java | 14 ++++ .../dble/log/DailyRotateLogStore.java | 3 + .../dble/log/slow/SlowQueryLogProcessor.java | 75 ++++++++++++------- .../dble/server/NonBlockingSession.java | 2 +- .../dble/server/status/SlowQueryLog.java | 5 +- .../response/ReloadSlowQueryFlushPeriod.java | 6 +- 6 files changed, 71 insertions(+), 34 deletions(-) create mode 100644 src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java diff --git a/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java b/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java new file mode 100644 index 000000000..95efe3600 --- /dev/null +++ b/src/main/java/com/actiontech/dble/btrace/provider/GeneralProvider.java @@ -0,0 +1,14 @@ +package com.actiontech.dble.btrace.provider; + +public final class GeneralProvider { + private GeneralProvider() { + } + public static void beforeSlowLogClose() { + } + + public static void afterSlowLogClose() { + } + + public static void runFlushLogTask() { + } +} diff --git a/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java b/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java index 667bcb9b1..becfbe658 100644 --- a/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java +++ b/src/main/java/com/actiontech/dble/log/DailyRotateLogStore.java @@ -5,6 +5,8 @@ package com.actiontech.dble.log; +import com.actiontech.dble.btrace.provider.GeneralProvider; + import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; @@ -148,6 +150,7 @@ public class DailyRotateLogStore { force(false); close(); + GeneralProvider.afterSlowLogClose(); File file; file = new File(fileName); file.renameTo(target); diff --git a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java index 021243b34..b9a0daeb0 100644 --- a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java +++ b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java @@ -5,6 +5,7 @@ package com.actiontech.dble.log.slow; +import com.actiontech.dble.btrace.provider.GeneralProvider; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.DailyRotateLogStore; import com.actiontech.dble.server.status.SlowQueryLog; @@ -23,6 +24,7 @@ public class SlowQueryLogProcessor extends Thread { private BlockingQueue queue; private DailyRotateLogStore store; private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; private long logSize = 0; private long lastLogSize = 0; private static final String FILE_HEADER = "/FAKE_PATH/mysqld, Version: FAKE_VERSION. started with:\n" + @@ -38,37 +40,44 @@ public class SlowQueryLogProcessor extends Thread { @Override public void run() { SlowQueryLogEntry log; - scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS); + initFlushLogTask(); try { store.open(); while (SlowQueryLog.getInstance().isEnableSlowLog()) { try { log = queue.poll(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - continue; - } - if (log == null) { - continue; - } - - if (writeLog(log)) { - logSize++; - } - - synchronized (this) { - if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) { - flushLog(); + if (log == null) { + continue; } + + if (writeLog(log)) { + logSize++; + } + + synchronized (this) { + if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) { + flushLog(); + } + } + } catch (Throwable e) { + LOGGER.warn("slow log error:", e); } } - // disable slow_query_log, end task - while ((log = queue.poll()) != null) { - if (writeLog(log)) { - logSize++; + // disable slow_query_log, need to place all the remaining elements in the queue + while (true) { + try { + if ((log = queue.poll()) == null) { + break; + } + if (writeLog(log)) { + logSize++; + } + } catch (Throwable e) { + LOGGER.warn("slow log error:", e); } } } catch (IOException e) { - LOGGER.info("transaction log error:", e); + LOGGER.warn("transaction log error:", e); store.close(); } finally { scheduler.shutdown(); @@ -77,6 +86,15 @@ public class SlowQueryLogProcessor extends Thread { } } + public void initFlushLogTask() { + scheduledFuture = scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS); + } + + public void cancelFlushLogTask() { + scheduledFuture.cancel(false); + } + + private synchronized boolean writeLog(SlowQueryLogEntry log) throws IOException { if (log == null) { return false; @@ -101,8 +119,8 @@ public class SlowQueryLogProcessor extends Thread { try { store.force(false); } catch (IOException e) { - LOGGER.info("transaction log error:", e); - store.close(); + LOGGER.warn("flush slow log error:", e); + GeneralProvider.beforeSlowLogClose(); } } @@ -111,6 +129,7 @@ public class SlowQueryLogProcessor extends Thread { @Override public void run() { synchronized (this) { + GeneralProvider.runFlushLogTask(); flushLog(); } } @@ -120,13 +139,11 @@ public class SlowQueryLogProcessor extends Thread { public void putSlowQueryLog(ShardingService service, TraceResult log) { if (log.isCompleted() && log.getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) { SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId()); - try { - final boolean enQueue = queue.offer(logEntry, 3, TimeUnit.SECONDS); - if (!enQueue) { - LOGGER.warn("slow log queue has so many item. Discard log entry: {} ", logEntry.toString()); - } - } catch (InterruptedException e) { - LOGGER.info(" ", e); + final boolean enQueue = queue.offer(logEntry); + if (!enQueue) { + //abort + String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}"; + LOGGER.warn(errorMsg); } } } diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java index ef2b381b3..c2a09f96f 100644 --- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java @@ -318,7 +318,7 @@ public class NonBlockingSession extends Session { if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) { responseTime = System.nanoTime(); traceResult.setVeryEnd(responseTime); - if (isSuccess) { + if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) { SlowQueryLog.getInstance().putSlowQueryLog(this.shardingService, (TraceResult) traceResult.clone()); } } diff --git a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java index c3a56a190..750df92ed 100644 --- a/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java +++ b/src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java @@ -7,7 +7,6 @@ package com.actiontech.dble.server.status; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.slow.SlowQueryLogProcessor; - import com.actiontech.dble.server.trace.TraceResult; import com.actiontech.dble.services.mysqlsharding.ShardingService; import org.slf4j.Logger; @@ -71,6 +70,10 @@ public final class SlowQueryLog { public void setFlushPeriod(int flushPeriod) { this.flushPeriod = flushPeriod; + if (this.enableSlowLog && processor != null) { + processor.cancelFlushLogTask(); + processor.initFlushLogTask(); + } } public int getFlushSize() { diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java index 9bfd48b9e..ae4ebd1c9 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadSlowQueryFlushPeriod.java @@ -6,10 +6,10 @@ package com.actiontech.dble.services.manager.response; import com.actiontech.dble.config.ErrorCode; -import com.actiontech.dble.services.manager.ManagerService; -import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.server.status.SlowQueryLog; +import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,7 +22,7 @@ public final class ReloadSlowQueryFlushPeriod { private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueryFlushPeriod.class); public static void execute(ManagerService service, int time) { - if (time < 0) { + if (time <= 0) { service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "the commend is not correct"); return; }