diff --git a/src/main/java/com/actiontech/dble/cluster/logic/PauseResumeClusterLogic.java b/src/main/java/com/actiontech/dble/cluster/logic/PauseResumeClusterLogic.java index 7b080d986..0dd1bfea0 100644 --- a/src/main/java/com/actiontech/dble/cluster/logic/PauseResumeClusterLogic.java +++ b/src/main/java/com/actiontech/dble/cluster/logic/PauseResumeClusterLogic.java @@ -14,6 +14,7 @@ import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.FrontendConnection; +import com.actiontech.dble.net.service.AbstractService; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.services.mysqlsharding.ShardingService; import com.actiontech.dble.singleton.PauseShardingNodeManager; @@ -59,8 +60,9 @@ public class PauseResumeClusterLogic extends AbstractClusterLogic { boolean nextTurn = false; for (IOProcessor processor : DbleServer.getInstance().getFrontProcessors()) { for (Map.Entry entry : processor.getFrontends().entrySet()) { - if (!entry.getValue().isManager()) { - ShardingService shardingService = (ShardingService) entry.getValue().getService(); + AbstractService service = entry.getValue().getService(); + if (service instanceof ShardingService) { + ShardingService shardingService = (ShardingService) service; for (Map.Entry conEntry : shardingService.getSession2().getTargetMap().entrySet()) { if (shardingNodeSet.contains(conEntry.getKey().getName())) { nextTurn = true; diff --git a/src/main/java/com/actiontech/dble/server/handler/KillHandler.java b/src/main/java/com/actiontech/dble/server/handler/KillHandler.java index 08a0e0939..75e8aecfd 100644 --- a/src/main/java/com/actiontech/dble/server/handler/KillHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/KillHandler.java @@ -71,10 +71,10 @@ public final class KillHandler { killConn = findFrontConn(id); if (killConn == null) { - service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "Unknown connection id:" + id); + service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "Unknown connection id: " + id); return; - } else if (!killConn.isManager() && !((ShardingService) killConn.getService()).getUser().equals(service.getUser())) { - service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "can't kill other user's connection" + id); + } else if (!(killConn.getService() instanceof ShardingService) || !((ShardingService) killConn.getService()).getUser().equals(service.getUser())) { + service.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "can't kill other user's connection: " + id); return; } diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java index de2953f54..fe20decca 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java @@ -11,9 +11,12 @@ import com.actiontech.dble.meta.ColumnMeta; import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.FrontendConnection; +import com.actiontech.dble.net.service.AbstractService; +import com.actiontech.dble.services.FrontendService; import com.actiontech.dble.services.manager.information.ManagerBaseTable; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.services.rwsplit.RWSplitService; import com.google.common.collect.Maps; import java.util.ArrayList; @@ -26,7 +29,7 @@ public class DbleFlowControl extends ManagerBaseTable { private static final String COLUMN_CONNECTION_TYPE = "connection_type"; private static final String COLUMN_CONNECTION_ID = "connection_id"; private static final String COLUMN_CONNECTION_INFO = "connection_info"; - private static final String COLUMN_WRITING_QUEUE_BYTES = "writing_queue_bytes" ; + private static final String COLUMN_WRITING_QUEUE_BYTES = "writing_queue_bytes"; private static final String COLUMN_READING_QUEUE_BYTES = "reading_queue_bytes"; private static final String COLUMN_FLOW_CONTROLLED = "flow_controlled"; @@ -63,12 +66,13 @@ public class DbleFlowControl extends ManagerBaseTable { for (IOProcessor p : processors) { //find all front connection for (FrontendConnection fc : p.getFrontends().values()) { - if (!fc.isManager()) { + AbstractService service = fc.getService(); + if (service instanceof ShardingService || service instanceof RWSplitService) { int size = fc.getWritingSize().get(); LinkedHashMap row = Maps.newLinkedHashMap(); row.put(COLUMN_CONNECTION_TYPE, "ServerConnection"); row.put(COLUMN_CONNECTION_ID, Long.toString(fc.getId())); - row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((ShardingService) fc.getService()).getSchema() + " user = " + ((ShardingService) fc.getService()).getUser()); + row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) service).getSchema() + " user = " + ((FrontendService) service).getUser()); row.put(COLUMN_WRITING_QUEUE_BYTES, Integer.toString(size)); row.put(COLUMN_READING_QUEUE_BYTES, null); row.put(COLUMN_FLOW_CONTROLLED, fc.isFrontWriteFlowControlled() ? "true" : "false"); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java b/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java index e02e52d2b..ea58d56c1 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java @@ -12,9 +12,12 @@ import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.FrontendConnection; import com.actiontech.dble.net.mysql.*; +import com.actiontech.dble.net.service.AbstractService; +import com.actiontech.dble.services.FrontendService; import com.actiontech.dble.services.manager.ManagerService; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.services.rwsplit.RWSplitService; import com.actiontech.dble.singleton.FlowController; import com.actiontech.dble.util.LongUtil; import com.actiontech.dble.util.StringUtil; @@ -124,12 +127,13 @@ public final class FlowControlList { IOProcessor[] processors = DbleServer.getInstance().getFrontProcessors(); for (IOProcessor p : processors) { for (FrontendConnection fc : p.getFrontends().values()) { - if (!fc.isManager()) { + AbstractService fcService = fc.getService(); + if (fcService instanceof ShardingService || fcService instanceof RWSplitService) { int size = fc.getWritingSize().get(); RowDataPacket row = new RowDataPacket(FIELD_COUNT); row.add(StringUtil.encode("ServerConnection", service.getCharset().getResults())); row.add(LongUtil.toBytes(fc.getId())); - row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((ShardingService) fc.getService()).getSchema() + " user = " + ((ShardingService) fc.getService()).getUser(), service.getCharset().getResults())); + row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) fcService).getSchema() + " user = " + ((FrontendService) fcService).getUser(), service.getCharset().getResults())); row.add(LongUtil.toBytes(size)); row.add(null); // not support row.add(fc.isFrontWriteFlowControlled() ? "true".getBytes() : "false".getBytes()); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/PauseStart.java b/src/main/java/com/actiontech/dble/services/manager/response/PauseStart.java index 97bedf81a..e784f549d 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/PauseStart.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/PauseStart.java @@ -10,6 +10,7 @@ import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.FrontendConnection; import com.actiontech.dble.net.mysql.OkPacket; +import com.actiontech.dble.net.service.AbstractService; import com.actiontech.dble.plan.common.exception.MySQLOutPutException; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.services.manager.ManagerService; @@ -137,8 +138,9 @@ public final class PauseStart { boolean nextTurn = false; for (IOProcessor processor : DbleServer.getInstance().getFrontProcessors()) { for (Map.Entry entry : processor.getFrontends().entrySet()) { - if (!entry.getValue().isManager()) { - ShardingService shardingService = (ShardingService) entry.getValue().getService(); + AbstractService service = entry.getValue().getService(); + if ((service instanceof ShardingService)) { + ShardingService shardingService = (ShardingService) service; for (Map.Entry conEntry : shardingService.getSession2().getTargetMap().entrySet()) { if (shardingNodes.contains(conEntry.getKey().getName())) { nextTurn = true; diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowBinlogStatus.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowBinlogStatus.java index 1611808bd..38d58a2ac 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowBinlogStatus.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowBinlogStatus.java @@ -28,6 +28,7 @@ import com.actiontech.dble.net.mysql.*; import com.actiontech.dble.server.NonBlockingSession; import com.actiontech.dble.services.manager.ManagerService; import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.services.rwsplit.RWSplitService; import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; import com.actiontech.dble.sqlengine.SQLJob; import com.actiontech.dble.sqlengine.SQLQueryResult; @@ -229,7 +230,7 @@ public final class ShowBinlogStatus { List fcList = new ArrayList<>(); for (IOProcessor process : DbleServer.getInstance().getFrontProcessors()) { for (FrontendConnection front : process.getFrontends().values()) { - if (front.isManager()) { + if (front.isManager() || front.getService() instanceof RWSplitService) { continue; } NonBlockingSession session = ((ShardingService) front.getService()).getSession2(); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQLStatus.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQLStatus.java index ce546babf..aafc70510 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQLStatus.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQLStatus.java @@ -12,9 +12,11 @@ import com.actiontech.dble.config.Fields; import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.FrontendConnection; import com.actiontech.dble.net.mysql.*; +import com.actiontech.dble.net.service.AbstractService; import com.actiontech.dble.server.status.SlowQueryLog; import com.actiontech.dble.services.manager.ManagerService; import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.services.rwsplit.RWSplitService; import com.actiontech.dble.util.StringUtil; import java.nio.ByteBuffer; @@ -80,10 +82,15 @@ public final class ShowConnectionSQLStatus { service.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " doesn't exist"); return; } + AbstractService frontService = target.getService(); if (target.isManager()) { service.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " is a manager connection"); return; } + if (frontService instanceof RWSplitService) { + service.writeErrMessage(ErrorCode.ER_YES, "The front_id " + id + " is a RWSplit connection"); + return; + } ByteBuffer buffer = service.allocate(); // write header @@ -100,7 +107,7 @@ public final class ShowConnectionSQLStatus { // write rows byte packetId = EOF.getPacketId(); - List results = ((ShardingService) target.getService()).getSession2().genRunningSQLStage(); + List results = ((ShardingService) frontService).getSession2().genRunningSQLStage(); if (results != null) { for (String[] result : results) { RowDataPacket row = new RowDataPacket(FIELD_COUNT); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowSession.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowSession.java index 93837b9d6..61efff949 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowSession.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowSession.java @@ -12,13 +12,17 @@ import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.FrontendConnection; import com.actiontech.dble.net.mysql.*; -import com.actiontech.dble.services.manager.ManagerService; +import com.actiontech.dble.net.service.AbstractService; +import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession; import com.actiontech.dble.server.NonBlockingSession; +import com.actiontech.dble.services.manager.ManagerService; import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.services.rwsplit.RWSplitService; import com.actiontech.dble.util.StringUtil; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.Objects; /** * show front session detail info @@ -71,7 +75,7 @@ public final class ShowSession { if (front.isManager()) { continue; } - RowDataPacket row = getRow((ShardingService) front.getService(), service.getCharset().getResults()); + RowDataPacket row = getRow(front.getService(), service.getCharset().getResults()); if (row != null) { row.setPacketId(++packetId); buffer = row.write(buffer, service, true); @@ -87,6 +91,16 @@ public final class ShowSession { lastEof.write(buffer, service); } + private static RowDataPacket getRow(AbstractService sc, String charset) { + if (sc instanceof ShardingService) { + return getRow((ShardingService) sc, charset); + } else if (sc instanceof RWSplitService) { + return getRow((RWSplitService) sc, charset); + } else { + return null; + } + } + private static RowDataPacket getRow(ShardingService sc, String charset) { StringBuilder sb = new StringBuilder(); NonBlockingSession session = sc.getSession2(); @@ -98,10 +112,25 @@ public final class ShowSession { for (BackendConnection backCon : backConnections) { sb.append(backCon).append("\r\n"); } - RowDataPacket row = new RowDataPacket(FIELD_COUNT); - row.add(StringUtil.encode(sc.getConnection().getId() + "", charset)); - row.add(StringUtil.encode(cnCount + "", charset)); - row.add(StringUtil.encode(sb.toString(), charset)); + return createRowDataPacket(FIELD_COUNT, sc.getConnection().getId(), cnCount, sb.toString(), charset); + } + + private static RowDataPacket getRow(RWSplitService sc, String charset) { + StringBuilder sb = new StringBuilder(); + RWSplitNonBlockingSession session = sc.getSession(); + BackendConnection backendConnection = session.getConn(); + if (Objects.isNull(backendConnection)) { + return null; + } + sb.append(backendConnection); + return createRowDataPacket(FIELD_COUNT, sc.getConnection().getId(), 1, sb.toString(), charset); + } + + private static RowDataPacket createRowDataPacket(int fieldCount, long id, int cnCount, String sb, String charset) { + RowDataPacket row = new RowDataPacket(fieldCount); + row.add(StringUtil.encode(String.valueOf(id), charset)); + row.add(StringUtil.encode(String.valueOf(cnCount), charset)); + row.add(StringUtil.encode(sb, charset)); return row; } }