mirror of
https://github.com/actiontech/dble.git
synced 2026-01-04 11:50:15 -06:00
fix: after the slow log queue is full, discard the subsequent records that need to be added
after abnormal disk flush, fileChannel will not close
This commit is contained in:
@@ -0,0 +1,14 @@
|
||||
package com.actiontech.dble.btrace.provider;
|
||||
|
||||
public final class GeneralProvider {
|
||||
private GeneralProvider() {
|
||||
}
|
||||
public static void beforeSlowLogClose() {
|
||||
}
|
||||
|
||||
public static void afterSlowLogClose() {
|
||||
}
|
||||
|
||||
public static void runFlushLogTask() {
|
||||
}
|
||||
}
|
||||
@@ -5,6 +5,8 @@
|
||||
|
||||
package com.actiontech.dble.log;
|
||||
|
||||
import com.actiontech.dble.btrace.provider.GeneralProvider;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
@@ -148,6 +150,7 @@ public class DailyRotateLogStore {
|
||||
force(false);
|
||||
close();
|
||||
|
||||
GeneralProvider.afterSlowLogClose();
|
||||
File file;
|
||||
file = new File(fileName);
|
||||
file.renameTo(target);
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
package com.actiontech.dble.log.slow;
|
||||
|
||||
import com.actiontech.dble.btrace.provider.GeneralProvider;
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.log.DailyRotateLogStore;
|
||||
import com.actiontech.dble.server.status.SlowQueryLog;
|
||||
@@ -23,6 +24,7 @@ public class SlowQueryLogProcessor extends Thread {
|
||||
private BlockingQueue<SlowQueryLogEntry> queue;
|
||||
private DailyRotateLogStore store;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
private long logSize = 0;
|
||||
private long lastLogSize = 0;
|
||||
private static final String FILE_HEADER = "/FAKE_PATH/mysqld, Version: FAKE_VERSION. started with:\n" +
|
||||
@@ -38,37 +40,44 @@ public class SlowQueryLogProcessor extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
SlowQueryLogEntry log;
|
||||
scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS);
|
||||
initFlushLogTask();
|
||||
try {
|
||||
store.open();
|
||||
while (SlowQueryLog.getInstance().isEnableSlowLog()) {
|
||||
try {
|
||||
log = queue.poll(100, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
continue;
|
||||
}
|
||||
if (log == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (writeLog(log)) {
|
||||
logSize++;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) {
|
||||
flushLog();
|
||||
if (log == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (writeLog(log)) {
|
||||
logSize++;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if ((logSize - lastLogSize) % SlowQueryLog.getInstance().getFlushSize() == 0) {
|
||||
flushLog();
|
||||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOGGER.warn("slow log error:", e);
|
||||
}
|
||||
}
|
||||
// disable slow_query_log, end task
|
||||
while ((log = queue.poll()) != null) {
|
||||
if (writeLog(log)) {
|
||||
logSize++;
|
||||
// disable slow_query_log, need to place all the remaining elements in the queue
|
||||
while (true) {
|
||||
try {
|
||||
if ((log = queue.poll()) == null) {
|
||||
break;
|
||||
}
|
||||
if (writeLog(log)) {
|
||||
logSize++;
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOGGER.warn("slow log error:", e);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOGGER.info("transaction log error:", e);
|
||||
LOGGER.warn("transaction log error:", e);
|
||||
store.close();
|
||||
} finally {
|
||||
scheduler.shutdown();
|
||||
@@ -77,6 +86,15 @@ public class SlowQueryLogProcessor extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
public void initFlushLogTask() {
|
||||
scheduledFuture = scheduler.scheduleAtFixedRate(flushLogTask(), SlowQueryLog.getInstance().getFlushPeriod(), SlowQueryLog.getInstance().getFlushPeriod(), TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
public void cancelFlushLogTask() {
|
||||
scheduledFuture.cancel(false);
|
||||
}
|
||||
|
||||
|
||||
private synchronized boolean writeLog(SlowQueryLogEntry log) throws IOException {
|
||||
if (log == null) {
|
||||
return false;
|
||||
@@ -101,8 +119,8 @@ public class SlowQueryLogProcessor extends Thread {
|
||||
try {
|
||||
store.force(false);
|
||||
} catch (IOException e) {
|
||||
LOGGER.info("transaction log error:", e);
|
||||
store.close();
|
||||
LOGGER.warn("flush slow log error:", e);
|
||||
GeneralProvider.beforeSlowLogClose();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,6 +129,7 @@ public class SlowQueryLogProcessor extends Thread {
|
||||
@Override
|
||||
public void run() {
|
||||
synchronized (this) {
|
||||
GeneralProvider.runFlushLogTask();
|
||||
flushLog();
|
||||
}
|
||||
}
|
||||
@@ -120,13 +139,11 @@ public class SlowQueryLogProcessor extends Thread {
|
||||
public void putSlowQueryLog(ShardingService service, TraceResult log) {
|
||||
if (log.isCompleted() && log.getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) {
|
||||
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
|
||||
try {
|
||||
final boolean enQueue = queue.offer(logEntry, 3, TimeUnit.SECONDS);
|
||||
if (!enQueue) {
|
||||
LOGGER.warn("slow log queue has so many item. Discard log entry: {} ", logEntry.toString());
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.info(" ", e);
|
||||
final boolean enQueue = queue.offer(logEntry);
|
||||
if (!enQueue) {
|
||||
//abort
|
||||
String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}";
|
||||
LOGGER.warn(errorMsg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -318,7 +318,7 @@ public class NonBlockingSession extends Session {
|
||||
if (traceEnable || SlowQueryLog.getInstance().isEnableSlowLog()) {
|
||||
responseTime = System.nanoTime();
|
||||
traceResult.setVeryEnd(responseTime);
|
||||
if (isSuccess) {
|
||||
if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) {
|
||||
SlowQueryLog.getInstance().putSlowQueryLog(this.shardingService, (TraceResult) traceResult.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ package com.actiontech.dble.server.status;
|
||||
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.log.slow.SlowQueryLogProcessor;
|
||||
|
||||
import com.actiontech.dble.server.trace.TraceResult;
|
||||
import com.actiontech.dble.services.mysqlsharding.ShardingService;
|
||||
import org.slf4j.Logger;
|
||||
@@ -71,6 +70,10 @@ public final class SlowQueryLog {
|
||||
|
||||
public void setFlushPeriod(int flushPeriod) {
|
||||
this.flushPeriod = flushPeriod;
|
||||
if (this.enableSlowLog && processor != null) {
|
||||
processor.cancelFlushLogTask();
|
||||
processor.initFlushLogTask();
|
||||
}
|
||||
}
|
||||
|
||||
public int getFlushSize() {
|
||||
|
||||
@@ -6,10 +6,10 @@
|
||||
package com.actiontech.dble.services.manager.response;
|
||||
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.services.manager.ManagerService;
|
||||
import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap;
|
||||
import com.actiontech.dble.net.mysql.OkPacket;
|
||||
import com.actiontech.dble.server.status.SlowQueryLog;
|
||||
import com.actiontech.dble.services.manager.ManagerService;
|
||||
import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -22,7 +22,7 @@ public final class ReloadSlowQueryFlushPeriod {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ReloadSlowQueryFlushPeriod.class);
|
||||
|
||||
public static void execute(ManagerService service, int time) {
|
||||
if (time < 0) {
|
||||
if (time <= 0) {
|
||||
service.writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, "the commend is not correct");
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user