fix: capture Throwable to ensure that the thread will not exit when an error or exception occurs (#2696)

This commit is contained in:
LUA
2021-05-26 18:07:12 +08:00
committed by GitHub
parent d49f7b20a4
commit f4b9341f09
4 changed files with 48 additions and 25 deletions
@@ -1,6 +1,8 @@
package com.actiontech.dble.net.executor;
import com.actiontech.dble.net.service.ServiceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Queue;
@@ -8,7 +10,7 @@ import java.util.Queue;
* Created by szf on 2020/7/9.
*/
public class BackendCurrentRunnable implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(BackendCurrentRunnable.class);
private final Queue<ServiceTask> concurrentBackQueue;
public BackendCurrentRunnable(Queue<ServiceTask> concurrentBackQueue) {
@@ -19,8 +21,12 @@ public class BackendCurrentRunnable implements Runnable {
public void run() {
ServiceTask task;
while (true) {
while ((task = concurrentBackQueue.poll()) != null) {
task.getService().consumerInternalData(task);
try {
while ((task = concurrentBackQueue.poll()) != null) {
task.getService().consumerInternalData(task);
}
} catch (Throwable t) {
LOGGER.warn("Unknown error:", t);
}
}
}
@@ -4,6 +4,8 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.net.service.ServiceTask;
import com.actiontech.dble.statistic.stat.ThreadWorkUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Queue;
@@ -12,6 +14,7 @@ import java.util.Queue;
*/
public class FrontendCurrentRunnable implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(FrontendCurrentRunnable.class);
private final Queue<ServiceTask> frontNormalTasks;
private final Queue<ServiceTask> frontPriorityTasks;
@@ -30,27 +33,31 @@ public class FrontendCurrentRunnable implements Runnable {
DbleServer.getInstance().getThreadUsedMap().put(threadName, workUsage);
}
while (true) {
task = frontPriorityTasks.poll();
if (task == null) {
task = frontNormalTasks.poll();
}
//threadUsageStat start
long workStart = 0;
if (workUsage != null) {
workStart = System.nanoTime();
}
if (task != null) {
//handler data
if (task.getService() == null) {
continue;
try {
task = frontPriorityTasks.poll();
if (task == null) {
task = frontNormalTasks.poll();
}
task.getService().execute(task);
}
//threadUsageStat end
if (workUsage != null) {
workUsage.setCurrentSecondUsed(workUsage.getCurrentSecondUsed() + System.nanoTime() - workStart);
//threadUsageStat start
long workStart = 0;
if (workUsage != null) {
workStart = System.nanoTime();
}
if (task != null) {
//handler data
if (task.getService() == null) {
continue;
}
task.getService().execute(task);
}
//threadUsageStat end
if (workUsage != null) {
workUsage.setCurrentSecondUsed(workUsage.getCurrentSecondUsed() + System.nanoTime() - workStart);
}
} catch (Throwable t) {
LOGGER.warn("Unknown error:", t);
}
}
}
@@ -9,11 +9,14 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.net.mysql.WriteToBackendTask;
import com.actiontech.dble.statistic.stat.ThreadWorkUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.BlockingQueue;
public class WriteToBackendRunnable implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(WriteToBackendRunnable.class);
private final BlockingQueue<List<WriteToBackendTask>> writeToBackendQueue;
public WriteToBackendRunnable(BlockingQueue<List<WriteToBackendTask>> writeToBackendQueue) {
@@ -48,6 +51,8 @@ public class WriteToBackendRunnable implements Runnable {
}
} catch (InterruptedException e) {
throw new RuntimeException("FrontendCommandHandler error.", e);
} catch (Throwable t) {
LOGGER.warn("Unknown error:", t);
}
}
@@ -305,9 +305,14 @@ public abstract class AbstractService implements Service {
}
} catch (Throwable e) {
LOGGER.error("process task error", e);
writeErrMessage(ErrorCode.ER_YES, "process task error, exception is " + e);
connection.close("process task error");
String msg = e.getMessage();
if (StringUtil.isEmpty(msg)) {
LOGGER.warn("Maybe occur a bug, please check it.", e);
msg = e.toString();
} else {
LOGGER.warn("There is an error you may need know.", e);
}
writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, msg);
} finally {
synchronized (this) {
currentTask = null;