This commit is contained in:
yanhuqing
2019-09-26 14:24:45 +08:00
parent cbb2695b1c
commit 015b0c9020
3 changed files with 96 additions and 4 deletions
+1
View File
@@ -0,0 +1 @@
dn1:10;dn2:20;dn3:30;dn4:40
@@ -26,16 +26,14 @@ import com.actiontech.dble.server.ServerConnection;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.statistic.stat.QueryResult;
import com.actiontech.dble.statistic.stat.QueryResultDispatcher;
import com.actiontech.dble.util.DebugPauseUtil;
import com.actiontech.dble.util.FormatUtil;
import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
/**
* @author mycat
@@ -60,6 +58,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
private volatile ByteBuffer byteBuffer;
private Set<BackendConnection> closedConnSet;
private final boolean modifiedSQL;
private Map<String, Integer> dataNodePauseInfo; // only for debug
public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) {
super(session);
@@ -75,6 +74,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
this.sessionAutocommit = session.getSource().isAutocommit();
this.modifiedSQL = rrs.getNodes()[0].isModifySQL();
initDebugInfo();
}
protected void reset(int initCount) {
@@ -136,6 +136,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
@Override
public void connectionClose(BackendConnection conn, String reason) {
pauseTime(conn);
if (checkClosedConn(conn)) {
return;
}
@@ -175,6 +176,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
@Override
public void errorResponse(byte[] data, BackendConnection conn) {
pauseTime(conn);
ErrorPacket errPacket = new ErrorPacket();
errPacket.read(data);
byte lastPacketId = packetId;
@@ -211,6 +213,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
LOGGER.debug("received ok response ,executeResponse:" + executeResponse + " from " + conn);
}
if (executeResponse) {
pauseTime(conn);
this.resultSize += data.length;
session.setBackendResponseEndTime((MySQLConnection) conn);
ServerConnection source = session.getSource();
@@ -617,4 +620,51 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
}
}
private void initDebugInfo() {
if (LOGGER.isDebugEnabled()) {
String info = DebugPauseUtil.getPauseInfo("MultiNodeQueryHandler.txt");
if (info != null) {
LOGGER.debug("Pause info of MultiNodeQueryHandler is " + info);
String[] infos = info.split(";");
dataNodePauseInfo = new HashMap<>(infos.length);
boolean formatCorrect = true;
for (String nodeInfo : infos) {
try {
String[] infoDetail = nodeInfo.split(":");
dataNodePauseInfo.put(infoDetail[0], Integer.valueOf(infoDetail[1]));
} catch (Throwable e) {
formatCorrect = false;
break;
}
}
if (!formatCorrect) {
dataNodePauseInfo.clear();
}
} else {
dataNodePauseInfo = new HashMap<>(0);
}
} else {
dataNodePauseInfo = new HashMap<>(0);
}
}
private void pauseTime(BackendConnection conn) {
if (LOGGER.isDebugEnabled()) {
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
Integer millis = dataNodePauseInfo.get(rNode.getName());
if (millis == null) {
millis = 0;
}
LOGGER.debug("datanode[" + rNode.getName() + "], which conn threadid[" + ((MySQLConnection) conn).getThreadId() + "] will sleep for " + millis + " milliseconds");
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
LOGGER.debug("datanode[" + rNode.getName() + "], which conn threadid[" + ((MySQLConnection) conn).getThreadId() + "] has slept for " + millis + " milliseconds");
}
}
}
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2016-2019 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.util;
import com.actiontech.dble.config.model.SystemConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
public final class DebugPauseUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(DebugPauseUtil.class);
private DebugPauseUtil() {
}
public static String getPauseInfo(String fileName) {
File pauseFile = new File(SystemConfig.getHomePath() + File.separator + "conf", fileName);
if (!pauseFile.exists()) {
LOGGER.debug(pauseFile.getAbsolutePath() + " is not exists");
return null;
}
String line = null;
try {
BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(pauseFile), "UTF-8"));
line = in.readLine();
in.close();
} catch (Exception e) {
LOGGER.warn("getPauseInfo error", e);
}
return line;
}
}