fix: after the slow log queue is full, discard the subsequent records that need to be added

after abnormal disk flush, fileChannel will not close
This commit is contained in:
guoaomen
2023-06-02 13:31:58 +08:00
parent 511c0292ad
commit f65edaa52b
6 changed files with 61 additions and 20 deletions

View File

@@ -0,0 +1,14 @@
package com.actiontech.dble.btrace.provider;
public final class GeneralProvider {
private GeneralProvider() {
}
public static void beforeSlowLogClose() {
}
public static void afterSlowLogClose() {
}
public static void runFlushLogTask() {
}
}

View File

@@ -5,6 +5,8 @@
package com.actiontech.dble.log;
import com.actiontech.dble.btrace.provider.GeneralProvider;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@@ -148,6 +150,7 @@ public class DailyRotateLogStore {
force(false);
close();
GeneralProvider.afterSlowLogClose();
File file;
file = new File(fileName);
file.renameTo(target);

View File

@@ -5,6 +5,7 @@
package com.actiontech.dble.log.slow;
import com.actiontech.dble.btrace.provider.GeneralProvider;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.DailyRotateLogStore;
import com.actiontech.dble.server.ServerConnection;
@@ -24,6 +25,7 @@ public class SlowQueryLogProcessor extends Thread {
private BlockingQueue<SlowQueryLogEntry> queue;
private DailyRotateLogStore store;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private long logSize = 0;
private long lastLogSize = 0;
private static final String FILE_HEADER = "/FAKE_PATH/mysqld, Version: FAKE_VERSION. started with:\n" +
@@ -39,30 +41,39 @@ public class SlowQueryLogProcessor extends Thread {
@Override
public void run() {
SlowQueryLogEntry log;
scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS);
initFlushLogTask();
try {
store.open();
while (SlowQueryLog.getInstance().isEnableSlowLog()) {
try {
log = queue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
continue;
}
if (log == null) {
continue;
}
writeLog(log);
logSize++;
synchronized (this) {
if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) {
flushLog();
if (log == null) {
continue;
}
writeLog(log);
logSize++;
synchronized (this) {
if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) {
flushLog();
}
}
} catch (Throwable e) {
LOGGER.warn("slow log error:", e);
}
}
// disable slow_query_log, end task
while ((log = queue.poll()) != null) {
writeLog(log);
logSize++;
// disable slow_query_log, need to place all the remaining elements in the queue
while (true) {
try {
if ((log = queue.poll()) == null) {
break;
}
writeLog(log);
logSize++;
} catch (Throwable e) {
LOGGER.warn("slow log error:", e);
}
}
scheduler.shutdown();
flushLog();
@@ -73,6 +84,14 @@ public class SlowQueryLogProcessor extends Thread {
}
}
public void initFlushLogTask() {
scheduledFuture = scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS);
}
public void cancelFlushLogTask() {
scheduledFuture.cancel(false);
}
private synchronized void writeLog(SlowQueryLogEntry log) throws IOException {
if (log == null)
return;
@@ -94,8 +113,8 @@ public class SlowQueryLogProcessor extends Thread {
try {
store.force(false);
} catch (IOException e) {
LOGGER.info("transaction log error:", e);
store.close();
LOGGER.warn("flush slow log error:", e);
GeneralProvider.beforeSlowLogClose();
}
}
@@ -104,6 +123,7 @@ public class SlowQueryLogProcessor extends Thread {
@Override
public void run() {
synchronized (this) {
GeneralProvider.runFlushLogTask();
flushLog();
}
}

View File

@@ -22,7 +22,7 @@ public final class ReloadSlowQueryFlushPeriod {
private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueryFlushPeriod.class);
public static void execute(ManagerConnection c, int time) {
if (time < 0) {
if (time <= 0) {
c.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "the commend is not correct");
return;
}

View File

@@ -328,7 +328,7 @@ public class NonBlockingSession implements Session {
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
responseTime = System.nanoTime();
traceResult.setVeryEnd(responseTime);
if (isSuccess) {
if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) {
SlowQueryLog.getInstance().putSlowQueryLog(this.source, (TraceResult) traceResult.clone());
}
}

View File

@@ -70,6 +70,10 @@ public final class SlowQueryLog {
public void setFlushPeriod(int flushPeriod) {
this.flushPeriod = flushPeriod;
if (this.enableSlowLog && processor != null) {
processor.cancelFlushLogTask();
processor.initFlushLogTask();
}
}
public int getFlushSize() {