Merge pull request #3809 from actiontech/inner-2112_3

[inner-2112] implement interrupt thread through interrupt()/shutdownNow()
This commit is contained in:
wenyh
2023-09-07 17:39:55 +08:00
committed by GitHub
29 changed files with 610 additions and 122 deletions

View File

@@ -45,8 +45,8 @@ import com.actiontech.dble.services.manager.information.ManagerSchemaInfo;
import com.actiontech.dble.singleton.*;
import com.actiontech.dble.statistic.sql.StatisticManager;
import com.actiontech.dble.statistic.stat.ThreadWorkUsage;
import com.actiontech.dble.singleton.ThreadChecker;
import com.actiontech.dble.util.ExecutorUtil;
import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor;
import com.actiontech.dble.util.TimeUtil;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -77,6 +77,7 @@ public final class DbleServer {
public static final String WRITE_TO_BACKEND_WORKER_NAME = "writeToBackendWorker";
public static final String COMPLEX_QUERY_EXECUTOR_NAME = "complexQueryWorker";
public static final String TIMER_WORKER_NAME = "Timer";
public static final String TIMER_SCHEDULER_WORKER_NAME = "TimerScheduler";
public static final String NIO_FRONT_RW = "NIOFrontRW";
public static final String NIO_BACKEND_RW = "NIOBackendRW";
public static final String AIO_EXECUTOR_NAME = "AIO";
@@ -113,7 +114,8 @@ public final class DbleServer {
private ExecutorService backendExecutor;
private ExecutorService writeToBackendExecutor;
private ExecutorService complexQueryExecutor;
private ExecutorService timerExecutor;
private volatile ExecutorService timerExecutor;
private volatile NameableScheduledThreadPoolExecutor timerSchedulerExecutor;
private Map<String, ThreadWorkUsage> threadUsedMap = new ConcurrentHashMap<>();
private Deque<ServiceTask> frontHandlerQueue;
@@ -295,7 +297,7 @@ public final class DbleServer {
LOGGER.info("=====================================Server started success=======================================");
ThreadCheckerScheduler.getInstance().init();
Scheduler.getInstance().init(timerExecutor);
Scheduler.getInstance().init();
LOGGER.info("=======================================Scheduler started==========================================");
XaCheckHandler.initXaIdCheckPeriod();
@@ -340,6 +342,7 @@ public final class DbleServer {
writeToBackendExecutor = ExecutorUtil.createFixed(WRITE_TO_BACKEND_WORKER_NAME, SystemConfig.getInstance().getWriteToBackendWorker(), runnableMap);
complexQueryExecutor = ExecutorUtil.createCached(COMPLEX_QUERY_EXECUTOR_NAME, SystemConfig.getInstance().getComplexQueryWorker(), null);
timerExecutor = ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance());
timerSchedulerExecutor = ExecutorUtil.createFixedScheduled(TIMER_SCHEDULER_WORKER_NAME, 2, ThreadChecker.getInstance());
nioFrontExecutor = ExecutorUtil.createFixed(NIO_FRONT_RW, frontProcessorCount, runnableMap);
nioBackendExecutor = ExecutorUtil.createFixed(NIO_BACKEND_RW, backendProcessorCount, runnableMap);
}
@@ -488,6 +491,10 @@ public final class DbleServer {
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "processorCheck()");
} catch (Throwable e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("scheduled task processorCheck() happen exception: ", e);
}
}
}
};
@@ -621,6 +628,17 @@ public final class DbleServer {
return timerExecutor;
}
public NameableScheduledThreadPoolExecutor getTimerSchedulerExecutor() {
return timerSchedulerExecutor;
}
public void setTimerExecutor(ExecutorService timerExecutor) {
this.timerExecutor = timerExecutor;
}
public void setTimerSchedulerExecutor(NameableScheduledThreadPoolExecutor timerSchedulerExecutor) {
this.timerSchedulerExecutor = timerSchedulerExecutor;
}
public ExecutorService getComplexQueryExecutor() {
return complexQueryExecutor;

View File

@@ -5,6 +5,7 @@
package com.actiontech.dble.backend.delyDetection;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
@@ -13,7 +14,6 @@ import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.singleton.Scheduler;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
@@ -105,7 +105,7 @@ public class DelayDetection {
//avoid concurrency with the master
initialDelay = initialDelay >> 1;
}
this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> execute(),
this.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleAtFixedRate(() -> execute(),
initialDelay, this.delayPeriodMillis, TimeUnit.MILLISECONDS);
}

View File

@@ -11,7 +11,6 @@ import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.singleton.Scheduler;
import com.actiontech.dble.statistic.DbInstanceSyncRecorder;
import com.actiontech.dble.statistic.HeartbeatRecorder;
import com.actiontech.dble.util.TimeUtil;
@@ -94,7 +93,7 @@ public class MySQLHeartbeat {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("start heartbeat :{}", this.toString());
}
this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> {
this.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleAtFixedRate(() -> {
if (DbleServer.getInstance().getConfig().isFullyConfigured()) {
if (TimeUtil.currentTimeMillis() < heartbeatRecoveryTime) {
return;

View File

@@ -233,6 +233,9 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
if (!itemToDiscard.isNullItem()) {
BlockingQueue<HeapItem> discardQueue = queues.get(itemToDiscard.getIndex());
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (discardQueue.take().isNullItem() || isFail()) {
break;
}
@@ -245,15 +248,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex());
}
}
Iterator<Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>>> iterator = this.queues.entrySet().iterator();
MySQLResponseService service = null;
while (iterator.hasNext()) {
Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>> entry = iterator.next();
service = entry.getKey();
entry.getValue().clear();
session.releaseConnectionIfSafe(entry.getKey(), false);
iterator.remove();
}
MySQLResponseService service = clearQueueAndReleaseConnection();
session.trace(t -> t.doSqlStat(selectRows, netOutBytes.intValue(), resultSize.intValue()));
assert service != null;
if (!isFail()) {
@@ -269,4 +264,17 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
session.onQueryError(msg.getBytes());
}
}
private MySQLResponseService clearQueueAndReleaseConnection() {
Iterator<Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>>> iterator = this.queues.entrySet().iterator();
MySQLResponseService service = null;
while (iterator.hasNext()) {
Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>> entry = iterator.next();
service = entry.getKey();
entry.getValue().clear();
session.releaseConnectionIfSafe(entry.getKey(), false);
iterator.remove();
}
return service;
}
}

View File

@@ -195,6 +195,9 @@ public class MultiNodeMergeAndOrderHandler extends MultiNodeMergeHandler {
if (!itemToDiscard.isNullItem()) {
BlockingQueue<HeapItem> discardQueue = queues.get(itemToDiscard.getIndex());
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (discardQueue.take().isNullItem() || terminate.get()) {
break;
}
@@ -238,6 +241,10 @@ public class MultiNodeMergeAndOrderHandler extends MultiNodeMergeHandler {
Entry<MySQLResponseService, BlockingQueue<HeapItem>> entry = iterator.next();
// fair lock queue,poll for clear
while (true) {
if (Thread.currentThread().isInterrupted()) {
LOGGER.info("manual interrupted");
break;
}
if (entry.getValue().poll() == null) {
break;
}

View File

@@ -99,6 +99,9 @@ public class OrderByHandler extends OwnThreadDMLHandler {
recordElapsedTime("order writeDirectly start :");
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (terminate.get()) {
return;
}
@@ -110,13 +113,16 @@ public class OrderByHandler extends OwnThreadDMLHandler {
}
localResult.add(row);
} catch (InterruptedException e) {
//ignore error
throw e;
}
}
recordElapsedTime("order writeDirectly end :");
localResult.done();
recordElapsedTime("order read start :");
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (terminate.get()) {
return;
}

View File

@@ -155,6 +155,9 @@ public class DirectGroupByHandler extends OwnThreadDMLHandler {
try {
int eofCount = 0;
for (; ; ) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
RowDataPacket row = outQueue.take();
if (row.getFieldCount() == 0) {
eofCount++;

View File

@@ -187,6 +187,9 @@ public class JoinHandler extends OwnThreadDMLHandler {
leftLocal = takeFirst(leftQueue);
rightLocal = takeFirst(rightQueue);
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
if (terminate.get())
return;
RowDataPacket leftRow = leftLocal.getLastRow();
@@ -228,15 +231,7 @@ public class JoinHandler extends OwnThreadDMLHandler {
if (!nestLoopDependOn) {
HandlerTool.terminateHandlerTree(this);
}
// for trace, when join end before all rows return ,the handler should mark as finished
for (DMLResponseHandler mergeHandler : this.getMerges()) {
DMLResponseHandler handler = mergeHandler;
while (handler != null && handler != this) {
session.setHandlerEnd(handler);
handler = handler.getNextHandler();
}
}
session.setHandlerEnd(this);
makeFinishedOfHandler();
nextHandler.rowEofResponse(null, isLeft, service);
} catch (MySQLOutPutException e) {
String msg = e.getLocalizedMessage();
@@ -254,6 +249,18 @@ public class JoinHandler extends OwnThreadDMLHandler {
}
}
// for trace, when join end before all rows return ,the handler should mark as finished
private void makeFinishedOfHandler() {
for (DMLResponseHandler mergeHandler : this.getMerges()) {
DMLResponseHandler handler = mergeHandler;
while (handler != null && handler != this) {
session.setHandlerEnd(handler);
handler = handler.getNextHandler();
}
}
session.setHandlerEnd(this);
}
private LocalResult takeFirst(FairLinkedBlockingDeque<LocalResult> deque) throws InterruptedException {
/**
* it must be in single thread

View File

@@ -138,6 +138,9 @@ public class NotInHandler extends OwnThreadDMLHandler {
leftLocal = takeFirst(leftQueue);
rightLocal = takeFirst(rightQueue);
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("manual interrupted");
}
RowDataPacket leftRow = leftLocal.getLastRow();
RowDataPacket rightRow = rightLocal.getLastRow();
if (leftRow.getFieldCount() == 0) {

View File

@@ -15,7 +15,6 @@ import com.actiontech.dble.config.model.ClusterConfig;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
import com.actiontech.dble.singleton.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,10 +167,10 @@ public final class XaCheckHandler {
}
}
private static void startXaIdCheckPeriod() {
public static void startXaIdCheckPeriod() {
synchronized (INSTANCE) {
if (INSTANCE.xaIdCheckPeriod > 0) {
INSTANCE.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleWithFixedDelay(() -> {
INSTANCE.scheduledFuture = DbleServer.getInstance().getTimerSchedulerExecutor().scheduleWithFixedDelay(() -> {
(new XAAnalysisHandler()).checkResidualTask();
}, 0, INSTANCE.xaIdCheckPeriod, TimeUnit.SECONDS);
LOGGER.info("====================================Start XaIdCheckPeriod[{}]=========================================", INSTANCE.xaIdCheckPeriod);
@@ -179,7 +178,7 @@ public final class XaCheckHandler {
}
}
private static void stopXaIdCheckPeriod() {
public static void stopXaIdCheckPeriod() {
synchronized (INSTANCE) {
ScheduledFuture future = INSTANCE.scheduledFuture;
if (future != null) {

View File

@@ -103,6 +103,10 @@ public class AbstractClusterLogic extends GeneralClusterLogic {
Map<String, OnlineType> expectedMap = ClusterHelper.getOnlineMap();
StringBuilder errorMsg = new StringBuilder();
for (; ; ) {
if (Thread.currentThread().isInterrupted()) {
errorMsg.append("manual interrupted");
break;
}
errorMsg.setLength(0);
if (checkResponseForOneTime(path, expectedMap, errorMsg)) {
break;

View File

@@ -217,6 +217,9 @@ public class ProxyMetaManager {
TraceManager.log(ImmutableMap.of("schema", schema, "table", tbName), traceObject);
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new SQLNonTransientException("manual interrupted");
}
int oldVersion = version.get();
if (metaCount.get() == 0) {
TableMeta meta = getTableMeta(schema, tbName);
@@ -248,6 +251,9 @@ public class ProxyMetaManager {
TraceManager.log(ImmutableMap.of("schema", schema, "table", vName), traceObject);
try {
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new SQLNonTransientException("manual interrupted");
}
int oldVersion = version.get();
if (catalogs.get(schema) == null) {
return null;

View File

@@ -56,6 +56,7 @@ public final class ManagerParse {
public static final int CLUSTER = 38;
public static final int SPLIT_LOAD_DATA = 39;
public static final int KILL_CLUSTER_RENEW_THREAD = 40;
public static final int THREAD = 41;
public static int parse(String stmt) {
for (int i = 0; i < stmt.length(); i++) {
@@ -109,10 +110,26 @@ public final class ManagerParse {
return OTHER;
}
// truncate table
// t
private static int tCheck(String stmt, int offset) {
if (stmt.length() > offset + 8) {
char c1 = stmt.charAt(++offset);
if (stmt.length() > offset++) {
switch (stmt.charAt(offset)) {
case 'H':
case 'h':
return thCheck(stmt, offset);
case 'R':
case 'r':
return trCheck(stmt, offset);
default:
return OTHER;
}
}
return OTHER;
}
// truncate table
private static int trCheck(String stmt, int offset) {
if (stmt.length() > offset + 7) {
char c2 = stmt.charAt(++offset);
char c3 = stmt.charAt(++offset);
char c4 = stmt.charAt(++offset);
@@ -120,8 +137,7 @@ public final class ManagerParse {
char c6 = stmt.charAt(++offset);
char c7 = stmt.charAt(++offset);
char c8 = stmt.charAt(++offset);
if ((c1 == 'R' || c1 == 'r') &&
(c2 == 'U' || c2 == 'u') &&
if ((c2 == 'U' || c2 == 'u') &&
(c3 == 'N' || c3 == 'n') &&
(c4 == 'C' || c4 == 'c') &&
(c5 == 'A' || c5 == 'a') &&
@@ -134,6 +150,25 @@ public final class ManagerParse {
return OTHER;
}
// thread
private static int thCheck(String stmt, int offset) {
if (stmt.length() > offset + 5) {
char c1 = stmt.charAt(++offset);
char c2 = stmt.charAt(++offset);
char c3 = stmt.charAt(++offset);
char c4 = stmt.charAt(++offset);
char c5 = stmt.charAt(++offset);
if ((c1 == 'R' || c1 == 'r') &&
(c2 == 'E' || c2 == 'e') &&
(c3 == 'A' || c3 == 'a') &&
(c4 == 'D' || c4 == 'd') &&
ParseUtil.isSpace(c5)) {
return offset << 8 | THREAD;
}
}
return OTHER;
}
private static int dCheck(String stmt, int offset) {
if (stmt.length() > ++offset) {
switch (stmt.charAt(offset)) {

View File

@@ -86,6 +86,9 @@ public class DistributedSequenceHandler implements Closeable, SequenceHandler {
private void loadInstanceIdByZK() {
int execCount = 1;
while (true) {
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("instanceId allocate error when using zk, reason: manual interrupted");
}
if (execCount > this.retryCount) {
throw new RuntimeException("instanceId allocate error when using zk, reason: no available instanceId found");
}

View File

@@ -321,6 +321,14 @@ public abstract class FrontendService<T extends UserConfig> extends AbstractServ
write(ok);
}
public void writeOkPacket(String msg) {
OkPacket ok = OkPacket.getDefault();
byte packet = (byte) this.packetId.incrementAndGet();
ok.setPacketId(packet);
ok.setMessage(StringUtil.encode(msg, charsetName.getResults()));
write(ok);
}
public void writeErrMessage(String code, String msg, int vendorCode) {
writeErrMessage((byte) this.nextPacketId(), vendorCode, code, msg);
}

View File

@@ -171,6 +171,9 @@ public class ManagerQueryHandler {
case ManagerParse.CLUSTER:
ClusterManageHandler.handle(sql, service, rs >>> SHIFT);
break;
case ManagerParse.THREAD:
ThreadHandler.handle(sql, service, rs >>> SHIFT);
break;
default:
service.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
}

View File

@@ -52,6 +52,10 @@ public final class DumpFileHandler implements Runnable {
public void run() {
while (true) {
if (Thread.currentThread().isInterrupted()) {
LOGGER.info("dump file handler was manual interrupted.");
break;
}
try {
String stmts = handleQueue.take();
SplitFileProvider.getHandleQueueSizeOfTake(handleQueue.size());

View File

@@ -13,13 +13,17 @@ import com.actiontech.dble.config.Fields;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.meta.ColumnMeta;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.executor.*;
import com.actiontech.dble.net.executor.BackendCurrentRunnable;
import com.actiontech.dble.net.executor.FrontendBlockRunnable;
import com.actiontech.dble.net.executor.FrontendCurrentRunnable;
import com.actiontech.dble.net.executor.WriteToBackendRunnable;
import com.actiontech.dble.net.impl.nio.RW;
import com.actiontech.dble.net.service.ServiceTask;
import com.actiontech.dble.services.manager.handler.WriteDynamicBootstrap;
import com.actiontech.dble.services.manager.information.ManagerWritableTable;
import com.actiontech.dble.util.IntegerUtil;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor;
import com.actiontech.dble.util.StringUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -75,6 +79,7 @@ public final class DbleThreadPool extends ManagerWritableTable {
DbleServer server = DbleServer.getInstance();
List<LinkedHashMap<String, String>> lst = new ArrayList<>(5);
lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getTimerExecutor())));
lst.add(getRow(new ThreadPoolInfo((server.getTimerSchedulerExecutor()))));
lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getFrontExecutor())));
lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getManagerFrontExecutor())));
lst.add(getRow(new ThreadPoolInfo((NameableExecutor) server.getBackendExecutor())));
@@ -372,6 +377,14 @@ public final class DbleThreadPool extends ManagerWritableTable {
this.queueSize = nameableExecutor.getQueue().size();
}
ThreadPoolInfo(NameableScheduledThreadPoolExecutor executor) {
this.name = executor.getName();
this.poolSize = executor.getPoolSize();
this.corePoolSize = executor.getCorePoolSize();
this.activeCount = executor.getActiveCount();
this.queueSize = executor.getQueue().size();
}
ThreadPoolInfo(String name, int poolSize, int corePoolSize, int activeCount, int queueSize) {
this.name = name;
this.poolSize = poolSize;

View File

@@ -22,6 +22,7 @@ import com.actiontech.dble.services.BackendService;
import com.actiontech.dble.services.FrontendService;
import com.actiontech.dble.services.manager.information.ManagerBaseTable;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +31,8 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import static com.actiontech.dble.DbleServer.TIMER_SCHEDULER_WORKER_NAME;
public final class DbleThreadPoolTask extends ManagerBaseTable {
private static final Logger LOGGER = LoggerFactory.getLogger(DbleThreadPoolTask.class);
private static final String COLUMN_NAME = "name";
@@ -74,6 +77,7 @@ public final class DbleThreadPoolTask extends ManagerBaseTable {
DbleServer server = DbleServer.getInstance();
List<LinkedHashMap<String, String>> lst = new ArrayList<>(5);
lst.add(getRow((NameableExecutor) server.getTimerExecutor()));
lst.add(getRow(server.getTimerSchedulerExecutor()));
lst.add(getRow((NameableExecutor) server.getFrontExecutor()));
lst.add(getRow((NameableExecutor) server.getManagerFrontExecutor()));
lst.add(getRow((NameableExecutor) server.getBackendExecutor()));
@@ -94,6 +98,20 @@ public final class DbleThreadPoolTask extends ManagerBaseTable {
return row;
}
private LinkedHashMap<String, String> getRow(NameableScheduledThreadPoolExecutor exec) {
LinkedHashMap<String, String> row = new LinkedHashMap<>();
if (exec.getName().equals(TIMER_SCHEDULER_WORKER_NAME)) {
row.put(COLUMN_NAME, exec.getName());
row.put(COLUMN_POOL_SIZE, exec.getPoolSize() + "");
row.put(COLUMN_ACTIVE_TASK_COUNT, exec.getActiveCount() + "");
row.put(COLUMN_TASK_QUEUE_SIZE, exec.getQueue().size() + "");
row.put(COLUMN_COMPLETED_TASK_COUNT, exec.getCompletedTaskCount() + "");
row.put(COLUMN_TOTAL_TASK_COUNT, exec.getTaskCount() + "");
return row;
}
return null;
}
public static synchronized Row calculateRow(NameableExecutor exec) {
long activeCount, completedTaskCount, queueSize, totalCount;
final Map<Thread, Runnable> threadRunnableMap = DbleServer.getInstance().getRunnableMap().get(exec.getName());

View File

@@ -251,6 +251,9 @@ public final class ShowHelp {
HELPS.put("enable @@memory_buffer_monitor", "Turn on memory buffer monitor");
HELPS.put("disable @@memory_buffer_monitor", "Turn off memory buffer monitor");
HELPS.put("thread @@kill [name|poolname] =''", "Gracefully interrupt a single thread or thread pool");
HELPS.put("thread @@recover [name|poolname] =''", "Restoring a single thread or thread pool");
// list sort
KEYS.addAll(HELPS.keySet());
}

View File

@@ -10,15 +10,10 @@ import com.actiontech.dble.backend.mysql.PacketUtil;
import com.actiontech.dble.config.Fields;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.util.IntegerUtil;
import com.actiontech.dble.util.LongUtil;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.*;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* ShowThreadPool status
@@ -80,13 +75,20 @@ public final class ShowThreadPool {
// write rows
byte packetId = EOF.getPacketId();
List<ExecutorService> executors = getExecutors();
for (ExecutorService exec : executors) {
if (exec != null) {
RowDataPacket row = getRow((NameableExecutor) exec, service.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
}
DbleServer server = DbleServer.getInstance();
LinkedList<RowDataPacket> rows = new LinkedList<>();
rows.add(getRow((NameableExecutor) server.getTimerExecutor(), service.getCharset().getResults()));
rows.add(getRow(server.getTimerSchedulerExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getFrontExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getManagerFrontExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getBackendExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getComplexQueryExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getWriteToBackendExecutor(), service.getCharset().getResults()));
for (RowDataPacket row : rows) {
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
}
// write last eof
@@ -108,18 +110,14 @@ public final class ShowThreadPool {
return row;
}
private static List<ExecutorService> getExecutors() {
List<ExecutorService> list = new LinkedList<>();
DbleServer server = DbleServer.getInstance();
list.add(server.getTimerExecutor());
list.add(server.getFrontExecutor());
list.add(server.getManagerFrontExecutor());
list.add(server.getBackendExecutor());
list.add(server.getComplexQueryExecutor());
list.add(server.getWriteToBackendExecutor());
// for (NIOProcessor pros : server.getProcessors()) {
// list.add(pros.getExecutor());
// }
return list;
private static RowDataPacket getRow(NameableScheduledThreadPoolExecutor exec, String charset) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(exec.getName(), charset));
row.add(IntegerUtil.toBytes(exec.getPoolSize()));
row.add(IntegerUtil.toBytes(exec.getActiveCount()));
row.add(IntegerUtil.toBytes(exec.getQueue().size()));
row.add(LongUtil.toBytes(exec.getCompletedTaskCount()));
row.add(LongUtil.toBytes(exec.getTaskCount()));
return row;
}
}

View File

@@ -12,14 +12,10 @@ import com.actiontech.dble.config.Fields;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.manager.information.tables.DbleThreadPoolTask;
import com.actiontech.dble.util.LongUtil;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.*;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* ShowThreadPool status
@@ -82,13 +78,19 @@ public final class ShowThreadPoolTask {
// write rows
byte packetId = EOF.getPacketId();
List<ExecutorService> executors = getExecutors();
for (ExecutorService exec : executors) {
if (exec != null) {
RowDataPacket row = getRow((NameableExecutor) exec, service.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
}
DbleServer server = DbleServer.getInstance();
LinkedList<RowDataPacket> rows = new LinkedList<>();
rows.add(getRow((NameableExecutor) server.getTimerExecutor(), service.getCharset().getResults()));
rows.add(getRow(server.getTimerSchedulerExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getFrontExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getManagerFrontExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getBackendExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getComplexQueryExecutor(), service.getCharset().getResults()));
rows.add(getRow((NameableExecutor) server.getWriteToBackendExecutor(), service.getCharset().getResults()));
for (RowDataPacket row : rows) {
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
}
// write last eof
@@ -111,17 +113,14 @@ public final class ShowThreadPoolTask {
return row;
}
private static List<ExecutorService> getExecutors() {
List<ExecutorService> list = new LinkedList<>();
DbleServer server = DbleServer.getInstance();
list.add(server.getTimerExecutor());
list.add(server.getFrontExecutor());
list.add(server.getManagerFrontExecutor());
list.add(server.getBackendExecutor());
list.add(server.getComplexQueryExecutor());
list.add(server.getWriteToBackendExecutor());
return list;
private static RowDataPacket getRow(NameableScheduledThreadPoolExecutor exec, String charset) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(exec.getName(), charset));
row.add(IntegerUtil.toBytes(exec.getPoolSize()));
row.add(IntegerUtil.toBytes(exec.getActiveCount()));
row.add(IntegerUtil.toBytes(exec.getQueue().size()));
row.add(LongUtil.toBytes(exec.getCompletedTaskCount()));
row.add(LongUtil.toBytes(exec.getTaskCount()));
return row;
}
}

View File

@@ -0,0 +1,59 @@
package com.actiontech.dble.services.manager.response;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.singleton.ThreadManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public final class ThreadHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadHandler.class);
private static final Pattern THREAD_KILL = Pattern.compile("^\\s*@@kill\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE);
private static final Pattern THREAD_RECOVER = Pattern.compile("^\\s*@@recover\\s*(name|poolname)\\s*=\\s*'([a-zA-Z_0-9\\-]+)'?$", Pattern.CASE_INSENSITIVE);
private ThreadHandler() {
}
public static void handle(String stmt, ManagerService service, int offset) {
String sql = stmt.substring(offset).trim();
Matcher kill = THREAD_KILL.matcher(sql);
Matcher recover = THREAD_RECOVER.matcher(sql);
try {
if (kill.matches()) {
String type = kill.group(1);
String name = kill.group(2);
kill(service, type, name);
} else if (recover.matches()) {
String type = recover.group(1);
String name = recover.group(2);
recover(service, type, name);
} else {
service.writeErrMessage(ErrorCode.ER_YES, "Syntax Error, Please check the help to use the thread command");
}
} catch (Exception e) {
LOGGER.info("thread command happen exception:", e);
service.writeErrMessage(ErrorCode.ER_YES, e.getMessage());
}
}
public static void kill(ManagerService service, String type, String name) throws Exception {
if (type.equalsIgnoreCase("name")) {
ThreadManager.interruptSingleThread(name);
} else if (type.equalsIgnoreCase("poolname")) {
ThreadManager.shutDownThreadPool(name);
}
service.writeOkPacket("Please see logs in logs/thread.log");
}
public static void recover(ManagerService service, String type, String name) throws Exception {
if (type.equalsIgnoreCase("name")) {
ThreadManager.recoverSingleThread(name);
} else if (type.equalsIgnoreCase("poolname")) {
ThreadManager.recoverThreadPool(name);
}
service.writeOkPacket("Please see logs in logs/thread.log");
}
}

View File

@@ -14,11 +14,12 @@ import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.statistic.stat.FrontActiveRatioStat;
import com.actiontech.dble.statistic.stat.ThreadWorkUsage;
import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor;
import com.actiontech.dble.util.TimeUtil;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Iterator;
import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;
import static com.actiontech.dble.server.NonBlockingSession.LOGGER;
@@ -31,17 +32,9 @@ public final class Scheduler {
private static final long TIME_UPDATE_PERIOD = 20L;
private static final long DDL_EXECUTE_CHECK_PERIOD = 60L;
private static final long DEFAULT_OLD_CONNECTION_CLEAR_PERIOD = 5 * 1000L;
private static final long DEFAULT_SQL_STAT_RECYCLE_PERIOD = 5 * 1000L;
private static final int DEFAULT_CHECK_XAID = 5;
private ExecutorService timerExecutor;
private ScheduledExecutorService scheduledExecutor;
private Scheduler() {
this.scheduledExecutor = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setNameFormat("TimerScheduler-%d").build());
}
public void init(ExecutorService executor) {
this.timerExecutor = executor;
public void init() {
NameableScheduledThreadPoolExecutor scheduledExecutor = DbleServer.getInstance().getTimerSchedulerExecutor();
scheduledExecutor.scheduleAtFixedRate(updateTime(), 0L, TIME_UPDATE_PERIOD, TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(DbleServer.getInstance().processorCheck(), 0L, SystemConfig.getInstance().getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleAtFixedRate(dbInstanceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
@@ -60,7 +53,11 @@ public final class Scheduler {
return new Runnable() {
@Override
public void run() {
DDLTraceHelper.printDDLOutOfLimit();
try {
DDLTraceHelper.printDDLOutOfLimit();
} catch (Throwable e) {
LOGGER.warn("scheduled task printLongTimeDDL() happen exception:{} ", e.getMessage());
}
}
};
}
@@ -82,7 +79,7 @@ public final class Scheduler {
@Override
public void run() {
try {
timerExecutor.execute(new Runnable() {
DbleServer.getInstance().getTimerExecutor().execute(new Runnable() {
@Override
public void run() {
@@ -102,6 +99,8 @@ public final class Scheduler {
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "dbInstanceOldConsClear()");
} catch (Throwable e) {
LOGGER.warn("scheduled task dbInstanceOldConsClear() happen exception:{} ", e.getMessage());
}
}
};
@@ -115,7 +114,7 @@ public final class Scheduler {
@Override
public void run() {
try {
timerExecutor.execute(() -> {
DbleServer.getInstance().getTimerExecutor().execute(() -> {
Iterator<PhysicalDbGroup> iterator = IOProcessor.BACKENDS_OLD_GROUP.iterator();
while (iterator.hasNext()) {
PhysicalDbGroup dbGroup = iterator.next();
@@ -128,6 +127,8 @@ public final class Scheduler {
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "oldDbGroupClear()");
} catch (Throwable e) {
LOGGER.warn("scheduled task oldDbGroupClear() happen exception:{} ", e.getMessage());
}
}
};
@@ -141,7 +142,7 @@ public final class Scheduler {
@Override
public void run() {
try {
timerExecutor.execute(() -> {
DbleServer.getInstance().getTimerExecutor().execute(() -> {
Iterator<PhysicalDbInstance> iterator = IOProcessor.BACKENDS_OLD_INSTANCE.iterator();
while (iterator.hasNext()) {
PhysicalDbInstance dbInstance = iterator.next();
@@ -155,19 +156,20 @@ public final class Scheduler {
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "oldDbInstanceClear()");
} catch (Throwable e) {
LOGGER.warn("scheduled task oldDbInstanceClear() happen exception:{} ", e.getMessage());
}
}
};
}
// XA session check job
private Runnable xaSessionCheck() {
return new Runnable() {
@Override
public void run() {
try {
timerExecutor.execute(new Runnable() {
DbleServer.getInstance().getTimerExecutor().execute(new Runnable() {
@Override
public void run() {
XASessionCheck.getInstance().checkSessions();
@@ -175,6 +177,8 @@ public final class Scheduler {
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "xaSessionCheck()");
} catch (Throwable e) {
LOGGER.warn("scheduled task xaSessionCheck() happen exception:{} ", e.getMessage());
}
}
};
@@ -185,7 +189,7 @@ public final class Scheduler {
@Override
public void run() {
try {
timerExecutor.execute(new Runnable() {
DbleServer.getInstance().getTimerExecutor().execute(new Runnable() {
@Override
public void run() {
XAStateLog.cleanCompleteRecoveryLog();
@@ -193,6 +197,8 @@ public final class Scheduler {
});
} catch (RejectedExecutionException e) {
ThreadChecker.getInstance().timerExecuteError(e, "xaLogClean()");
} catch (Throwable e) {
LOGGER.warn("scheduled task xaLogClean() happen exception:{} ", e.getMessage());
}
}
};
@@ -202,8 +208,12 @@ public final class Scheduler {
return new Runnable() {
@Override
public void run() {
for (ThreadWorkUsage obj : DbleServer.getInstance().getThreadUsedMap().values()) {
obj.switchToNew();
try {
for (ThreadWorkUsage obj : DbleServer.getInstance().getThreadUsedMap().values()) {
obj.switchToNew();
}
} catch (Throwable e) {
LOGGER.warn("scheduled task threadStatRenew() happen exception:{} ", e.getMessage());
}
}
};
@@ -213,19 +223,15 @@ public final class Scheduler {
return new Runnable() {
@Override
public void run() {
FrontActiveRatioStat.getInstance().compress();
try {
FrontActiveRatioStat.getInstance().compress();
} catch (Throwable e) {
LOGGER.warn("scheduled task compressionsActiveStat() happen exception:{} ", e.getMessage());
}
}
};
}
public ExecutorService getTimerExecutor() {
return timerExecutor;
}
public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}
public static Scheduler getInstance() {
return INSTANCE;
}

View File

@@ -6,6 +6,7 @@ import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.alarm.ToResolveContainer;
import com.actiontech.dble.util.NameableExecutor;
import com.actiontech.dble.util.NameableScheduledThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,6 +144,9 @@ public class ThreadChecker {
case TIMER_WORKER_NAME:
NameableExecutor exec = (NameableExecutor) DbleServer.getInstance().getTimerExecutor();
return new LastRecordInfo(name, th.getState(), lastExecTime, lastFinishTime, exec.getActiveCount(), exec.getQueue().size(), exec.getCompletedTaskCount());
case DbleServer.TIMER_SCHEDULER_WORKER_NAME:
NameableScheduledThreadPoolExecutor exec1 = DbleServer.getInstance().getTimerSchedulerExecutor();
return new LastRecordInfo(name, th.getState(), lastExecTime, lastFinishTime, exec1.getActiveCount(), exec1.getQueue().size(), exec1.getCompletedTaskCount());
default:
return null;
}

View File

@@ -1,21 +1,19 @@
package com.actiontech.dble.singleton;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.actiontech.dble.util.ExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadCheckerScheduler {
public static final Logger LOGGER = LoggerFactory.getLogger(ThreadCheckerScheduler.class);
public static final Logger LOGGER = LoggerFactory.getLogger("ThreadChecker");
private static final ThreadCheckerScheduler INSTANCE = new ThreadCheckerScheduler();
private ScheduledExecutorService scheduledExecutor;
public ThreadCheckerScheduler() {
this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ThreadChecker-%d").
setUncaughtExceptionHandler((Thread threads, Throwable e) -> LOGGER.warn("unknown exception ", e)).build());
this.scheduledExecutor = ExecutorUtil.createFixedScheduled("ThreadChecker", 1, null);
}
public void init() {
@@ -26,7 +24,11 @@ public class ThreadCheckerScheduler {
return new Runnable() {
@Override
public void run() {
ThreadChecker.getInstance().doSelfCheck();
try {
ThreadChecker.getInstance().doSelfCheck();
} catch (Throwable e) {
LOGGER.warn("doSelfCheck() happen fail, exception :", e);
}
}
};
}

View File

@@ -0,0 +1,221 @@
package com.actiontech.dble.singleton;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.mysql.xa.XaCheckHandler;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.net.executor.BackendCurrentRunnable;
import com.actiontech.dble.net.executor.FrontendBlockRunnable;
import com.actiontech.dble.net.executor.FrontendCurrentRunnable;
import com.actiontech.dble.net.executor.WriteToBackendRunnable;
import com.actiontech.dble.net.impl.nio.RW;
import com.actiontech.dble.net.service.ServiceTask;
import com.actiontech.dble.util.ExecutorUtil;
import com.actiontech.dble.util.NameableExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.BlockingDeque;
import static com.actiontech.dble.DbleServer.*;
public final class ThreadManager {
private static final Logger LOGGER = LoggerFactory.getLogger("ThreadChecker");
private ThreadManager() {
}
// single thread
public static void interruptSingleThread(String threadName) throws Exception {
Thread[] threads = getAllThread();
Thread find = null;
for (Thread thread : threads) {
if (thread.getName().equals(threadName)) {
find = thread;
break;
}
}
if (find == null)
throw new Exception("Thread[" + threadName + "] does not exist");
find.interrupt();
LOGGER.info("exec interrupt Thread[{}]", find.getName());
}
public static void recoverSingleThread(String threadName) throws Exception {
String[] array = threadName.split("-");
if (array.length == 2) {
DbleServer server = DbleServer.getInstance();
switch (array[1]) {
case FRONT_WORKER_NAME:
NameableExecutor nameableExecutor0 = (NameableExecutor) server.getFrontExecutor();
if (nameableExecutor0.getPoolSize() > nameableExecutor0.getActiveCount()) {
if (SystemConfig.getInstance().getUsePerformanceMode() == 1) {
nameableExecutor0.execute(new FrontendCurrentRunnable(server.getFrontHandlerQueue()));
} else {
nameableExecutor0.execute(new FrontendBlockRunnable((BlockingDeque<ServiceTask>) server.getFrontHandlerQueue()));
}
} else {
throw new Exception("threadPool[{" + FRONT_WORKER_NAME + "}] does not need to be recover");
}
break;
case FRONT_MANAGER_WORKER_NAME:
NameableExecutor nameableExecutor1 = (NameableExecutor) server.getFrontExecutor();
if (nameableExecutor1.getPoolSize() > nameableExecutor1.getActiveCount()) {
nameableExecutor1.execute(new FrontendBlockRunnable((BlockingDeque<ServiceTask>) server.getManagerFrontHandlerQueue()));
} else {
throw new Exception("threadPool[{" + FRONT_MANAGER_WORKER_NAME + "}] does not need to be recover");
}
break;
case BACKEND_WORKER_NAME:
NameableExecutor nameableExecutor2 = (NameableExecutor) server.getBackendExecutor();
if (nameableExecutor2.getPoolSize() > nameableExecutor2.getActiveCount()) {
if (SystemConfig.getInstance().getUsePerformanceMode() == 1) {
nameableExecutor2.execute(new BackendCurrentRunnable(server.getConcurrentBackHandlerQueue()));
}
} else {
throw new Exception("threadPool[{" + BACKEND_WORKER_NAME + "}] does not need to be recover");
}
break;
case WRITE_TO_BACKEND_WORKER_NAME:
NameableExecutor nameableExecutor3 = (NameableExecutor) server.getWriteToBackendExecutor();
if (nameableExecutor3.getPoolSize() > nameableExecutor3.getActiveCount()) {
nameableExecutor3.execute(new WriteToBackendRunnable(server.getWriteToBackendQueue()));
} else {
throw new Exception("threadPool[{" + WRITE_TO_BACKEND_WORKER_NAME + "}] does not need to be recover");
}
break;
case NIO_FRONT_RW:
if (SystemConfig.getInstance().getUsingAIO() != 1) {
try {
NameableExecutor nameableExecutor4 = (NameableExecutor) server.getNioFrontExecutor();
if (nameableExecutor4.getPoolSize() > nameableExecutor4.getActiveCount()) {
nameableExecutor4.execute(new RW(server.getFrontRegisterQueue()));
} else {
throw new Exception("threadPool[{" + NIO_FRONT_RW + "}] does not need to be recover");
}
} catch (IOException e) {
throw new Exception("recover threadPool[{" + NIO_FRONT_RW + "}] fail", e);
}
}
break;
case NIO_BACKEND_RW:
if (SystemConfig.getInstance().getUsingAIO() != 1) {
try {
NameableExecutor nameableExecutor5 = (NameableExecutor) server.getNioBackendExecutor();
if (nameableExecutor5.getPoolSize() > nameableExecutor5.getActiveCount()) {
nameableExecutor5.execute(new RW(server.getBackendRegisterQueue()));
} else {
throw new Exception("threadPool[{" + NIO_BACKEND_RW + "}] does not need to be recover");
}
} catch (IOException e) {
throw new Exception("recover threadPool[{" + NIO_BACKEND_RW + "}] fail", e);
}
}
break;
default:
throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported");
}
} else {
throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported");
}
}
// thread poolTIMER_WORKER_NAME、TIMER_SCHEDULER_WORKER_NAME
public static void shutDownThreadPool(String threadPoolName) throws Exception {
switch (threadPoolName) {
case TIMER_WORKER_NAME:
if (DbleServer.getInstance().getTimerExecutor().isShutdown()) {
throw new Exception("threadPool[" + TIMER_WORKER_NAME + "] already shutdown");
}
LOGGER.info("manual shutdown threadPool[{}] ... start ...", TIMER_WORKER_NAME);
DbleServer.getInstance().getTimerExecutor().shutdownNow();
LOGGER.info("manual shutdown threadPool[{}] ... end ...", TIMER_WORKER_NAME);
break;
case TIMER_SCHEDULER_WORKER_NAME:
if (DbleServer.getInstance().getTimerSchedulerExecutor().isShutdown()) {
throw new Exception("threadPool[" + TIMER_SCHEDULER_WORKER_NAME + "] already shutdown");
}
/*
0、shutdown
1、stopHeartbeat
2、stopDelayDetection
3、stopXaIdCheckPeriod
*/
LOGGER.info("manual shutdown threadPool[{}] ... start ...", TIMER_SCHEDULER_WORKER_NAME);
DbleServer.getInstance().getTimerSchedulerExecutor().shutdownNow();
Iterator<PhysicalDbGroup> iterator = DbleServer.getInstance().getConfig().getDbGroups().values().iterator();
while (iterator.hasNext()) {
PhysicalDbGroup dbGroup = iterator.next();
LOGGER.info("dbGroup[{}] stopHeartbeat...", dbGroup.getGroupName());
dbGroup.stopHeartbeat("manual shutdown thread pool TimerScheduler");
LOGGER.info("dbGroup[{}] stopDelayDetection...", dbGroup.getGroupName());
dbGroup.stopDelayDetection("manual shutdown thread pool TimerScheduler");
}
LOGGER.info("stopXaIdCheckPeriod...");
XaCheckHandler.stopXaIdCheckPeriod();
LOGGER.info("manual shutdown threadPool[{}] ... end ...", TIMER_SCHEDULER_WORKER_NAME);
break;
default:
throw new Exception("The shutdown operation of thread[" + TIMER_SCHEDULER_WORKER_NAME + "] is not supported");
}
}
public static void recoverThreadPool(String threadName) throws Exception {
switch (threadName) {
case TIMER_WORKER_NAME:
if (!DbleServer.getInstance().getTimerExecutor().isShutdown()) {
throw new Exception("threadPool[" + TIMER_WORKER_NAME + "] is not shutdown, no need to recover");
}
LOGGER.info("manual recover threadPool[{}] ... start ...", TIMER_WORKER_NAME);
DbleServer.getInstance().setTimerExecutor(
ExecutorUtil.createCached(TIMER_WORKER_NAME, 1, 2, ThreadChecker.getInstance()));
LOGGER.info("manual recover threadPool[{}] ... end ...", TIMER_WORKER_NAME);
break;
case TIMER_SCHEDULER_WORKER_NAME:
if (!DbleServer.getInstance().getTimerSchedulerExecutor().isShutdown()) {
throw new Exception("threadPool[" + TIMER_SCHEDULER_WORKER_NAME + "] is not shutdown, no need to recover");
}
/*
0、new TimerSchedulerExecutor AND init
1、startHeartbeat
2、startDelayDetection
3、startXaIdCheckPeriod
*/
LOGGER.info("manual recover threadPool[{}] ... start ...", TIMER_SCHEDULER_WORKER_NAME);
DbleServer.getInstance().setTimerSchedulerExecutor(
ExecutorUtil.createFixedScheduled(TIMER_SCHEDULER_WORKER_NAME, 2, ThreadChecker.getInstance()));
Scheduler.getInstance().init();
Iterator<PhysicalDbGroup> iterator = DbleServer.getInstance().getConfig().getDbGroups().values().iterator();
while (iterator.hasNext()) {
PhysicalDbGroup dbGroup = iterator.next();
LOGGER.info("dbGroup[{}] startHeartbeat...", dbGroup.getGroupName());
dbGroup.startHeartbeat();
LOGGER.info("dbGroup[{}] startDelayDetection...", dbGroup.getGroupName());
dbGroup.startDelayDetection();
}
LOGGER.info("startXaIdCheckPeriod...");
XaCheckHandler.startXaIdCheckPeriod();
LOGGER.info("manual recover threadPool[{}] ... end ...", TIMER_SCHEDULER_WORKER_NAME);
break;
default:
throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported");
}
}
private static Thread[] getAllThread() {
ThreadGroup group = Thread.currentThread().getThreadGroup();
ThreadGroup topGroup = group;
while (group != null) {
topGroup = group;
group = group.getParent();
}
int slackSize = topGroup.activeCount() * 2;
Thread[] slackThreads = new Thread[slackSize];
int actualSize = topGroup.enumerate(slackThreads);
Thread[] atualThreads = new Thread[actualSize];
System.arraycopy(slackThreads, 0, atualThreads, 0, actualSize);
return atualThreads;
}
}

View File

@@ -6,6 +6,7 @@
package com.actiontech.dble.util;
import com.actiontech.dble.singleton.ThreadChecker;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
@@ -65,6 +66,11 @@ public final class ExecutorUtil {
*/
public static NameableExecutor createCached(String name, int size, int maxSize, ThreadChecker checker) {
NameableThreadFactory factory = new NameableThreadFactory(name, true);
return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(256), factory, null, checker);
return new NameableExecutor(name, size, maxSize, 60, new LinkedBlockingQueue<>(1024), factory, null, checker);
}
public static NameableScheduledThreadPoolExecutor createFixedScheduled(String name, int size, ThreadChecker checker) {
NameableThreadFactory factory = new NameableThreadFactory(name, true);
return new NameableScheduledThreadPoolExecutor(name, size, factory, checker);
}
}

View File

@@ -0,0 +1,46 @@
package com.actiontech.dble.util;
import com.actiontech.dble.singleton.ThreadChecker;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
public class NameableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
protected String name;
private ThreadChecker threadChecker = null;
public NameableScheduledThreadPoolExecutor(String name, int corePoolSize, ThreadFactory threadFactory, ThreadChecker threadChecker) {
super(corePoolSize, threadFactory);
this.name = name;
this.threadChecker = threadChecker;
}
public String getName() {
return name;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
if (threadChecker != null) {
threadChecker.startExec(t);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (threadChecker != null) {
threadChecker.endExec();
}
}
@Override
protected void terminated() {
super.terminated();
if (threadChecker != null) {
threadChecker.terminated();
}
}
}