fix service conversion error inner 1600 (#3125)

* fix service conversion error inner 1600

fix

* add connection
This commit is contained in:
ylinzhu
2022-02-11 16:43:29 +08:00
committed by GitHub
parent be6ceb9540
commit b976810f96
8 changed files with 69 additions and 20 deletions
@@ -14,6 +14,7 @@ import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.singleton.PauseShardingNodeManager;
@@ -59,8 +60,9 @@ public class PauseResumeClusterLogic extends AbstractClusterLogic {
boolean nextTurn = false;
for (IOProcessor processor : DbleServer.getInstance().getFrontProcessors()) {
for (Map.Entry<Long, FrontendConnection> entry : processor.getFrontends().entrySet()) {
if (!entry.getValue().isManager()) {
ShardingService shardingService = (ShardingService) entry.getValue().getService();
AbstractService service = entry.getValue().getService();
if (service instanceof ShardingService) {
ShardingService shardingService = (ShardingService) service;
for (Map.Entry<RouteResultsetNode, BackendConnection> conEntry : shardingService.getSession2().getTargetMap().entrySet()) {
if (shardingNodeSet.contains(conEntry.getKey().getName())) {
nextTurn = true;
@@ -71,10 +71,10 @@ public final class KillHandler {
killConn = findFrontConn(id);
if (killConn == null) {
service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "Unknown connection id:" + id);
service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "Unknown connection id: " + id);
return;
} else if (!killConn.isManager() && !((ShardingService) killConn.getService()).getUser().equals(service.getUser())) {
service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "can't kill other user's connection" + id);
} else if (!(killConn.getService() instanceof ShardingService) || !((ShardingService) killConn.getService()).getUser().equals(service.getUser())) {
service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "can't kill other user's connection: " + id);
return;
}
@@ -11,9 +11,12 @@ import com.actiontech.dble.meta.ColumnMeta;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.services.FrontendService;
import com.actiontech.dble.services.manager.information.ManagerBaseTable;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import com.google.common.collect.Maps;
import java.util.ArrayList;
@@ -26,7 +29,7 @@ public class DbleFlowControl extends ManagerBaseTable {
private static final String COLUMN_CONNECTION_TYPE = "connection_type";
private static final String COLUMN_CONNECTION_ID = "connection_id";
private static final String COLUMN_CONNECTION_INFO = "connection_info";
private static final String COLUMN_WRITING_QUEUE_BYTES = "writing_queue_bytes" ;
private static final String COLUMN_WRITING_QUEUE_BYTES = "writing_queue_bytes";
private static final String COLUMN_READING_QUEUE_BYTES = "reading_queue_bytes";
private static final String COLUMN_FLOW_CONTROLLED = "flow_controlled";
@@ -63,12 +66,13 @@ public class DbleFlowControl extends ManagerBaseTable {
for (IOProcessor p : processors) {
//find all front connection
for (FrontendConnection fc : p.getFrontends().values()) {
if (!fc.isManager()) {
AbstractService service = fc.getService();
if (service instanceof ShardingService || service instanceof RWSplitService) {
int size = fc.getWritingSize().get();
LinkedHashMap<String, String> row = Maps.newLinkedHashMap();
row.put(COLUMN_CONNECTION_TYPE, "ServerConnection");
row.put(COLUMN_CONNECTION_ID, Long.toString(fc.getId()));
row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((ShardingService) fc.getService()).getSchema() + " user = " + ((ShardingService) fc.getService()).getUser());
row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) service).getSchema() + " user = " + ((FrontendService) service).getUser());
row.put(COLUMN_WRITING_QUEUE_BYTES, Integer.toString(size));
row.put(COLUMN_READING_QUEUE_BYTES, null);
row.put(COLUMN_FLOW_CONTROLLED, fc.isFrontWriteFlowControlled() ? "true" : "false");
@@ -12,9 +12,12 @@ import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.services.FrontendService;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import com.actiontech.dble.singleton.FlowController;
import com.actiontech.dble.util.LongUtil;
import com.actiontech.dble.util.StringUtil;
@@ -124,12 +127,13 @@ public final class FlowControlList {
IOProcessor[] processors = DbleServer.getInstance().getFrontProcessors();
for (IOProcessor p : processors) {
for (FrontendConnection fc : p.getFrontends().values()) {
if (!fc.isManager()) {
AbstractService fcService = fc.getService();
if (fcService instanceof ShardingService || fcService instanceof RWSplitService) {
int size = fc.getWritingSize().get();
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode("ServerConnection", service.getCharset().getResults()));
row.add(LongUtil.toBytes(fc.getId()));
row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((ShardingService) fc.getService()).getSchema() + " user = " + ((ShardingService) fc.getService()).getUser(), service.getCharset().getResults()));
row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) fcService).getSchema() + " user = " + ((FrontendService) fcService).getUser(), service.getCharset().getResults()));
row.add(LongUtil.toBytes(size));
row.add(null); // not support
row.add(fc.isFrontWriteFlowControlled() ? "true".getBytes() : "false".getBytes());
@@ -10,6 +10,7 @@ import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.services.manager.ManagerService;
@@ -137,8 +138,9 @@ public final class PauseStart {
boolean nextTurn = false;
for (IOProcessor processor : DbleServer.getInstance().getFrontProcessors()) {
for (Map.Entry<Long, FrontendConnection> entry : processor.getFrontends().entrySet()) {
if (!entry.getValue().isManager()) {
ShardingService shardingService = (ShardingService) entry.getValue().getService();
AbstractService service = entry.getValue().getService();
if ((service instanceof ShardingService)) {
ShardingService shardingService = (ShardingService) service;
for (Map.Entry<RouteResultsetNode, BackendConnection> conEntry : shardingService.getSession2().getTargetMap().entrySet()) {
if (shardingNodes.contains(conEntry.getKey().getName())) {
nextTurn = true;
@@ -28,6 +28,7 @@ import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.SQLJob;
import com.actiontech.dble.sqlengine.SQLQueryResult;
@@ -229,7 +230,7 @@ public final class ShowBinlogStatus {
List<NonBlockingSession> fcList = new ArrayList<>();
for (IOProcessor process : DbleServer.getInstance().getFrontProcessors()) {
for (FrontendConnection front : process.getFrontends().values()) {
if (front.isManager()) {
if (front.isManager() || front.getService() instanceof RWSplitService) {
continue;
}
NonBlockingSession session = ((ShardingService) front.getService()).getSession2();
@@ -12,9 +12,11 @@ import com.actiontech.dble.config.Fields;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import com.actiontech.dble.util.StringUtil;
import java.nio.ByteBuffer;
@@ -80,10 +82,15 @@ public final class ShowConnectionSQLStatus {
service.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " doesn't exist");
return;
}
AbstractService frontService = target.getService();
if (target.isManager()) {
service.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " is a manager connection");
return;
}
if (frontService instanceof RWSplitService) {
service.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " is a RWSplit connection");
return;
}
ByteBuffer buffer = service.allocate();
// write header
@@ -100,7 +107,7 @@ public final class ShowConnectionSQLStatus {
// write rows
byte packetId = EOF.getPacketId();
List<String[]> results = ((ShardingService) target.getService()).getSession2().genRunningSQLStage();
List<String[]> results = ((ShardingService) frontService).getSession2().genRunningSQLStage();
if (results != null) {
for (String[] result : results) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
@@ -12,13 +12,17 @@ import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import com.actiontech.dble.util.StringUtil;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Objects;
/**
* show front session detail info
@@ -71,7 +75,7 @@ public final class ShowSession {
if (front.isManager()) {
continue;
}
RowDataPacket row = getRow((ShardingService) front.getService(), service.getCharset().getResults());
RowDataPacket row = getRow(front.getService(), service.getCharset().getResults());
if (row != null) {
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
@@ -87,6 +91,16 @@ public final class ShowSession {
lastEof.write(buffer, service);
}
private static RowDataPacket getRow(AbstractService sc, String charset) {
if (sc instanceof ShardingService) {
return getRow((ShardingService) sc, charset);
} else if (sc instanceof RWSplitService) {
return getRow((RWSplitService) sc, charset);
} else {
return null;
}
}
private static RowDataPacket getRow(ShardingService sc, String charset) {
StringBuilder sb = new StringBuilder();
NonBlockingSession session = sc.getSession2();
@@ -98,10 +112,25 @@ public final class ShowSession {
for (BackendConnection backCon : backConnections) {
sb.append(backCon).append("\r\n");
}
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(sc.getConnection().getId() + "", charset));
row.add(StringUtil.encode(cnCount + "", charset));
row.add(StringUtil.encode(sb.toString(), charset));
return createRowDataPacket(FIELD_COUNT, sc.getConnection().getId(), cnCount, sb.toString(), charset);
}
private static RowDataPacket getRow(RWSplitService sc, String charset) {
StringBuilder sb = new StringBuilder();
RWSplitNonBlockingSession session = sc.getSession();
BackendConnection backendConnection = session.getConn();
if (Objects.isNull(backendConnection)) {
return null;
}
sb.append(backendConnection);
return createRowDataPacket(FIELD_COUNT, sc.getConnection().getId(), 1, sb.toString(), charset);
}
private static RowDataPacket createRowDataPacket(int fieldCount, long id, int cnCount, String sb, String charset) {
RowDataPacket row = new RowDataPacket(fieldCount);
row.add(StringUtil.encode(String.valueOf(id), charset));
row.add(StringUtil.encode(String.valueOf(cnCount), charset));
row.add(StringUtil.encode(sb, charset));
return row;
}
}