rw split simple (#2076)

* rw split simple

* fix findbug

* Perfect rwSplit

* resolve conflict

* resolve conflict

* resolve conflict

* fix findbug

* fix transaction

* fix findbug

* change into log4j2 print instead

* support load data

* resolve checkstyle
This commit is contained in:
Collapsar
2020-09-15 10:12:47 +08:00
committed by GitHub
parent bfe64a71e3
commit a3810c0457
39 changed files with 1035 additions and 505 deletions

View File

@@ -6,27 +6,6 @@
<FindBugsFilter>
<!-- protocol:ignore -->
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta"/>
</Match>
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta$ColumnMeta"/>
</Match>
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta$IndexMeta"/>
</Match>
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta$TableMeta"/>
</Match>
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta$ColumnMeta$Builder"/>
</Match>
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta$IndexMeta$Builder"/>
</Match>
<Match>
<Class name="com.actiontech.dble.meta.protocol.StructureMeta$TableMeta$Builder"/>
</Match>
<Match>
<Package name="com.actiontech.dble.cluster.general.impl.ushard"/>
</Match>
@@ -38,15 +17,11 @@
</Match>
<!-- protocol:ignore -->
<Match>
<Package name="com.actiontech.dble.util.dataMigrator.dataIOImpl"/>
</Match>
<!--MALICIOUS_CODE ,need refactor? -->
<Match>
<Bug category="MALICIOUS_CODE"/>
</Match>
<!-- I18N new String(byte[]) UTF-8 OR EVENT SET? -->
<Match>
<Bug category="I18N"/>
@@ -93,17 +68,6 @@
<Class name="com.actiontech.dble.server.parser.ServerParseSelect"/>
</Match>
<!-- use enum? -->
<Match>
<Bug category="STYLE"/>
<Bug pattern="SF_SWITCH_NO_DEFAULT"/>
<Class name="com.actiontech.dble.backend.heartbeat.MySQLHeartbeat"/>
</Match>
<Match>
<Bug category="STYLE"/>
<Bug pattern="SF_SWITCH_NO_DEFAULT"/>
<Class name="com.actiontech.dble.server.response.SessionIsolation"/>
</Match>
<!-- always throw new exception-->
<Match>
<Bug category="STYLE"/>
@@ -164,12 +128,6 @@
<Class name="com.actiontech.dble.plan.common.typelib.TypeLib"/>
<Bug pattern="URF_UNREAD_FIELD"/>
</Match>
<!-- need refactor -->
<Match>
<Bug category="PERFORMANCE"/>
<Class name="com.actiontech.dble.route.sequence.handler.SequenceVal"/>
<Bug pattern="URF_UNREAD_FIELD"/>
</Match>
<!-- PERFORMANCE end -->
<!-- CORRECTNESS start -->
@@ -203,24 +161,11 @@
<Bug category="MT_CORRECTNESS"/>
<Class name="com.actiontech.dble.server.handler.ServerLoadDataInfileHandler"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<Match>
<Bug category="MT_CORRECTNESS"/>
<Class name="com.actiontech.dble.backend.mysql.nio.handler.NewConnectionRespHandler"/>
<Method name="getBackConn"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>
<Match>
<Bug category="MT_CORRECTNESS"/>
<Class name="com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.XACommitNodesHandler"/>
<Method name="waitUntilSendFinish"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
</Match>
<Match>
<Bug category="MT_CORRECTNESS"/>
<Class name="com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.XARollbackNodesHandler"/>
<Method name="waitUntilSendFinish"/>
<Bug pattern="WA_AWAIT_NOT_IN_LOOP"/>
<Class name="com.actiontech.dble.services.rwsplit.RWSplitHandler"/>
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
<!-- MT_CORRECTNESS end -->
@@ -229,7 +174,7 @@
<Bug category="BAD_PRACTICE"/>
<Bug pattern="RR_NOT_CHECKED,RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
<Match>
<Match>
<Bug category="BAD_PRACTICE"/>
<Class name="com.actiontech.dble.server.handler.SetInnerHandler"/>
<Method name="isSwitchOn"/>

View File

@@ -0,0 +1,18 @@
package com.actiontech.dble.backend.datasource;
import java.util.List;
public abstract class AbstractLoadBalancer implements LoadBalancer {
@Override
public PhysicalDbInstance select(List<PhysicalDbInstance> okSources) {
if (okSources.size() == 1) {
return okSources.get(0);
} else {
return doSelect(okSources);
}
}
protected abstract PhysicalDbInstance doSelect(List<PhysicalDbInstance> okSources);
}

View File

@@ -0,0 +1,8 @@
package com.actiontech.dble.backend.datasource;
import java.util.List;
public interface LoadBalancer {
PhysicalDbInstance select(List<PhysicalDbInstance> okSources);
}

View File

@@ -6,11 +6,7 @@
package com.actiontech.dble.backend.datasource;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.mysql.nio.MySQLInstance;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.cluster.values.DbInstanceStatus;
import com.actiontech.dble.cluster.zkprocess.parse.JsonProcessBase;
import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables;
@@ -27,22 +23,21 @@ import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PhysicalDbGroup {
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalDbGroup.class);
public static final String JSON_NAME = "dbGroup";
public static final String JSON_LIST = "dbInstance";
// rw split
public static final int RW_SPLIT_OFF = 0;
private static final int RW_SPLIT_ALL_SLAVES = 1;
private static final int RW_SPLIT_ALL = 2;
public static final int RW_SPLIT_ALL_SLAVES = 1;
public static final int RW_SPLIT_ALL = 2;
// weight
public static final int WEIGHT = 0;
private final List<PhysicalDbInstance> writeInstanceList;
private final String groupName;
private final DbGroupConfig dbGroupConfig;
@@ -51,8 +46,7 @@ public class PhysicalDbGroup {
private final int rwSplitMode;
protected String[] schemas;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private final LoadBalancer loadBalancer = new RandomLoadBalancer();
private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock();
public PhysicalDbGroup(String name, DbGroupConfig config, PhysicalDbInstance writeDbInstances, PhysicalDbInstance[] readDbInstances, int rwSplitMode) {
@@ -62,6 +56,7 @@ public class PhysicalDbGroup {
writeDbInstances.setDbGroup(this);
this.writeDbInstance = writeDbInstances;
this.writeInstanceList = Collections.singletonList(writeDbInstance);
allSourceMap.put(writeDbInstances.getName(), writeDbInstances);
for (PhysicalDbInstance readDbInstance : readDbInstances) {
@@ -74,14 +69,19 @@ public class PhysicalDbGroup {
this.groupName = org.groupName;
this.rwSplitMode = org.rwSplitMode;
this.dbGroupConfig = org.dbGroupConfig;
allSourceMap = new HashMap<>();
this.allSourceMap = new HashMap<>();
for (Map.Entry<String, PhysicalDbInstance> entry : org.allSourceMap.entrySet()) {
MySQLInstance newSource = new MySQLInstance((MySQLInstance) entry.getValue());
allSourceMap.put(entry.getKey(), newSource);
this.allSourceMap.put(entry.getKey(), newSource);
if (entry.getValue() == org.writeDbInstance) {
writeDbInstance = newSource;
this.writeDbInstance = newSource;
}
}
writeInstanceList = Collections.singletonList(writeDbInstance);
}
public String getGroupName() {
return groupName;
}
public String[] getSchemas() {
@@ -123,6 +123,11 @@ public class PhysicalDbGroup {
return rwSplitMode;
}
private boolean checkSlaveSynStatus() {
return (dbGroupConfig.getDelayThreshold() != -1) &&
(dbGroupConfig.isShowSlaveSql());
}
public PhysicalDbInstance getWriteDbInstance() {
return writeDbInstance;
}
@@ -173,107 +178,18 @@ public class PhysicalDbGroup {
}
}
public Collection<PhysicalDbInstance> getAllActiveDbInstances() {
if (this.dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF) {
return allSourceMap.values();
} else {
return Collections.singletonList(writeDbInstance);
public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
return writeInstanceList;
}
}
public Collection<PhysicalDbInstance> getAllDbInstances() {
return new LinkedList<>(allSourceMap.values());
return allSourceMap.values();
}
public Map<String, PhysicalDbInstance> getAllDbInstanceMap() {
return allSourceMap;
}
void getRWSplitCon(String schema, ResponseHandler handler, Object attachment) throws Exception {
PhysicalDbInstance theNode = getRWSplitNode();
if (theNode.isDisabled() || theNode.isFakeNode()) {
if (this.getAllActiveDbInstances().size() > 0) {
theNode = this.getAllActiveDbInstances().iterator().next();
} else {
if (theNode.isDisabled()) {
String errorMsg = "the dbGroup[" + theNode.getDbGroupConfig().getName() + "] is disabled, please check it";
throw new IOException(errorMsg);
} else {
String errorMsg = "the dbGroup[" + theNode.getDbGroupConfig().getName() + "] is a fake node, please check it";
throw new IOException(errorMsg);
}
}
}
if (!theNode.isAlive()) {
String heartbeatError = "dbInstance[" + theNode.getConfig().getUrl() + "] can't reach. Please check the dbInstance status";
if (dbGroupConfig.isShowSlaveSql()) {
heartbeatError += ",Tip:heartbeat[show slave status] need the SUPER or REPLICATION CLIENT privilege(s)";
}
LOGGER.warn(heartbeatError);
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", theNode.getDbGroupConfig().getName() + "-" + theNode.getConfig().getInstanceName());
AlertUtil.alert(AlarmCode.DB_INSTANCE_CAN_NOT_REACH, Alert.AlertLevel.WARN, heartbeatError, "mysql", theNode.getConfig().getId(), labels);
throw new IOException(heartbeatError);
}
theNode.getConnection(schema, handler, attachment, false);
}
PhysicalDbInstance getRWSplitNode() {
PhysicalDbInstance theNode;
ArrayList<PhysicalDbInstance> okSources;
switch (rwSplitMode) {
case RW_SPLIT_ALL: {
okSources = getAllActiveRWSources(true, checkSlaveSynStatus());
theNode = randomSelect(okSources, true);
break;
}
case RW_SPLIT_ALL_SLAVES: {
okSources = getAllActiveRWSources(false, checkSlaveSynStatus());
theNode = randomSelect(okSources, true);
break;
}
case RW_SPLIT_OFF:
default:
// return default primary dbInstance
theNode = this.writeDbInstance;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select read dbInstance " + theNode.getName() + " for dbGroup:" + this.getGroupName());
}
theNode.incrementReadCount();
return theNode;
}
PhysicalDbInstance getRandomAliveReadNode() {
if (rwSplitMode == RW_SPLIT_OFF) {
return null;
} else {
return randomSelect(getAllActiveRWSources(false, checkSlaveSynStatus()), false);
}
}
boolean getReadCon(String schema, ResponseHandler handler, Object attachment) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("!readSources.isEmpty() " + (allSourceMap.values().size() > 1));
}
if (allSourceMap.values().size() > 1) {
PhysicalDbInstance theNode = getRandomAliveReadNode();
if (theNode != null) {
theNode.incrementReadCount();
theNode.getConnection(schema, handler, attachment, false);
return true;
} else {
LOGGER.info("read host is not available.");
return false;
}
} else {
LOGGER.info("read host is empty, read dbInstance is empty.");
return false;
}
}
PhysicalDbInstance[] getReadSources() {
public PhysicalDbInstance[] getReadDbInstances() {
PhysicalDbInstance[] readSources = new PhysicalDbInstance[allSourceMap.size() - 1];
int i = 0;
for (PhysicalDbInstance source : allSourceMap.values()) {
@@ -285,16 +201,29 @@ public class PhysicalDbGroup {
return readSources;
}
private ArrayList<PhysicalDbInstance> getAllActiveRWSources(boolean includeWriteNode, boolean filterWithDelayThreshold) {
ArrayList<PhysicalDbInstance> okSources = new ArrayList<>(allSourceMap.values().size());
if (writeDbInstance.isAlive() && includeWriteNode) {
okSources.add(writeDbInstance);
public PhysicalDbInstance select(boolean master) {
if (rwSplitMode == RW_SPLIT_OFF || allSourceMap.size() == 1 || master) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select write {}", writeDbInstance);
}
return writeDbInstance;
}
PhysicalDbInstance selectInstance = loadBalancer.select(getRWDbInstances());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select {}", selectInstance);
}
return selectInstance;
}
private List<PhysicalDbInstance> getRWDbInstances() {
ArrayList<PhysicalDbInstance> okSources = new ArrayList<>(allSourceMap.values().size());
for (PhysicalDbInstance ds : allSourceMap.values()) {
if (ds == writeDbInstance) {
if (rwSplitMode == RW_SPLIT_ALL && ds == writeDbInstance && writeDbInstance.isAlive()) {
okSources.add(ds);
continue;
}
if (ds.isAlive() && (!filterWithDelayThreshold || ds.canSelectAsReadNode())) {
if (ds.isAlive() && (!checkSlaveSynStatus() || ds.canSelectAsReadNode())) {
okSources.add(ds);
}
}
@@ -302,7 +231,6 @@ public class PhysicalDbGroup {
return okSources;
}
public String disableHosts(String hostNames, boolean syncWriteConf) {
String[] nameList = hostNames == null ? Arrays.copyOf(allSourceMap.keySet().toArray(), allSourceMap.keySet().toArray().length, String[].class) : hostNames.split(",");
final ReentrantReadWriteLock lock = DbleServer.getInstance().getConfig().getLock();
@@ -329,7 +257,6 @@ public class PhysicalDbGroup {
return snapshot;
}
public String enableHosts(String hostNames, boolean syncWriteConf) {
String[] nameList = hostNames == null ? Arrays.copyOf(allSourceMap.keySet().toArray(), allSourceMap.keySet().toArray().length, String[].class) : hostNames.split(",");
final ReentrantReadWriteLock lock = DbleServer.getInstance().getConfig().getLock();
@@ -458,7 +385,7 @@ public class PhysicalDbGroup {
if (instanceName != null) {
for (String dn : instanceName.split(",")) {
boolean find = false;
for (PhysicalDbInstance pds : this.getAllDbInstances()) {
for (PhysicalDbInstance pds : this.allSourceMap.values()) {
if (pds.getName().equals(dn)) {
find = true;
break;
@@ -472,48 +399,6 @@ public class PhysicalDbGroup {
return true;
}
public String getGroupName() {
return groupName;
}
public PhysicalDbInstance randomSelect(ArrayList<PhysicalDbInstance> okSources, boolean useWriteWhenEmpty) {
if (okSources.isEmpty()) {
if (useWriteWhenEmpty) {
return writeDbInstance;
} else {
return null;
}
} else {
int length = okSources.size();
int totalWeight = 0;
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
int readWeight = okSources.get(i).getConfig().getReadWeight();
totalWeight += readWeight;
if (sameWeight && i > 0 && readWeight != okSources.get(i - 1).getConfig().getReadWeight()) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// random by different weight
int offset = random.nextInt(totalWeight);
for (PhysicalDbInstance okSource : okSources) {
offset -= okSource.getConfig().getReadWeight();
if (offset < 0) {
return okSource;
}
}
}
return okSources.get(random.nextInt(length));
}
}
private boolean checkSlaveSynStatus() {
return (dbGroupConfig.getDelayThreshold() != -1) &&
(dbGroupConfig.isShowSlaveSql());
}
boolean equalsBaseInfo(PhysicalDbGroup pool) {
return pool.getDbGroupConfig().getName().equals(this.dbGroupConfig.getName()) &&
pool.getDbGroupConfig().getHeartbeatSQL().equals(this.dbGroupConfig.getHeartbeatSQL()) &&

View File

@@ -48,10 +48,10 @@ public class PhysicalDbGroupDiff {
//add or not change
PhysicalDbInstance newWriteHost = newDbGroup.getWriteDbInstance();
PhysicalDbInstance[] newReadHost = newDbGroup.getReadSources();
PhysicalDbInstance[] newReadHost = newDbGroup.getReadDbInstances();
PhysicalDbInstance oldHost = orgDbGroup.getWriteDbInstance();
PhysicalDbInstance[] oldRHost = orgDbGroup.getReadSources();
PhysicalDbInstance[] oldRHost = orgDbGroup.getReadDbInstances();
boolean sameFlag = false;
if (oldHost.equals(newWriteHost) && oldRHost.length == newReadHost.length) {

View File

@@ -171,7 +171,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
return con;
}
public BackendConnection getConnection(final String schema, final long hardTimeout) throws IOException {
private BackendConnection getConnection(final String schema, final long hardTimeout) throws IOException {
if (this.connectionPool == null) {
throw new IOException("connection pool isn't initialized");
}

View File

@@ -9,8 +9,7 @@ package com.actiontech.dble.backend.datasource;
* Created by szf on 2018/7/23.
*/
public class PhysicalDbInstanceDiff {
private String writeHostChangeType = null;
private String writeHostChangeType;
private PhysicalDbInstance selfInstance;
@@ -22,7 +21,6 @@ public class PhysicalDbInstanceDiff {
this.relatedInstance = relatedInstance;
}
public String getWriteHostChangeType() {
return writeHostChangeType;
}

View File

@@ -0,0 +1,38 @@
package com.actiontech.dble.backend.datasource;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
public class RandomLoadBalancer extends AbstractLoadBalancer {
private final ThreadLocalRandom random = ThreadLocalRandom.current();
public RandomLoadBalancer() {
}
@Override
protected PhysicalDbInstance doSelect(List<PhysicalDbInstance> okSources) {
int length = okSources.size();
int totalWeight = 0;
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
int readWeight = okSources.get(i).getConfig().getReadWeight();
totalWeight += readWeight;
if (sameWeight && i > 0 && readWeight != okSources.get(i - 1).getConfig().getReadWeight()) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// random by different weight
int offset = random.nextInt(totalWeight);
for (PhysicalDbInstance okSource : okSources) {
offset -= okSource.getConfig().getReadWeight();
if (offset < 0) {
return okSource;
}
}
}
return okSources.get(random.nextInt(length));
}
}

View File

@@ -5,9 +5,6 @@
*/
package com.actiontech.dble.backend.datasource;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.route.RouteResultsetNode;
@@ -16,7 +13,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
public class ShardingNode {
protected static final Logger LOGGER = LoggerFactory.getLogger(ShardingNode.class);
@@ -25,7 +21,7 @@ public class ShardingNode {
private final String dbGroupName;
protected String database;
protected volatile PhysicalDbGroup dbGroup;
private volatile boolean isSchemaExists = true;
private volatile boolean isSchemaExists = false;
public ShardingNode(String dbGroupName, String hostName, String database, PhysicalDbGroup dbGroup) {
this.dbGroupName = dbGroupName;
@@ -89,76 +85,36 @@ public class ShardingNode {
public void getConnection(String schema, boolean isMustWrite, boolean autoCommit, RouteResultsetNode rrs,
ResponseHandler handler, Object attachment) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-connection-from-sharding-node");
try {
if (isMustWrite) {
getWriteNodeConnection(schema, handler, attachment);
return;
}
if (rrs.getRunOnSlave() == null) {
if (rrs.canRunINReadDB(autoCommit)) {
dbGroup.getRWSplitCon(schema, handler, attachment);
} else {
getWriteNodeConnection(schema, handler, attachment);
}
} else {
if (rrs.getRunOnSlave()) {
if (!dbGroup.getReadCon(schema, handler, attachment)) {
throw new IllegalArgumentException("no valid read dbInstance in dbGroup:" + dbGroup.getGroupName());
}
} else {
rrs.setCanRunInReadDB(false);
getWriteNodeConnection(schema, handler, attachment);
}
}
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnSlave((RouteResultsetNode) attachment, isMustWrite || autoCommit));
instance.getConnection(schema, handler, rrs, isMustWrite);
} finally {
TraceManager.finishSpan(traceObject);
}
}
public BackendConnection getConnection(String schema, Boolean runOnSlave, Object attachment) throws IOException {
if (runOnSlave == null) {
PhysicalDbInstance readSource = dbGroup.getRWSplitNode();
if (!readSource.isAlive()) {
String heartbeatError = "the dbInstance[" + readSource.getConfig().getUrl() + "] can't reach. Please check the dbInstance status";
if (dbGroup.getDbGroupConfig().isShowSlaveSql()) {
heartbeatError += ",Tip:heartbeat[show slave status] need the SUPER or REPLICATION CLIENT privilege(s)";
}
LOGGER.warn(heartbeatError);
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", readSource.getDbGroupConfig().getName() + "-" + readSource.getConfig().getInstanceName());
AlertUtil.alert(AlarmCode.DB_INSTANCE_CAN_NOT_REACH, Alert.AlertLevel.WARN, heartbeatError, "mysql", readSource.getConfig().getId(), labels);
throw new IOException(heartbeatError);
}
return readSource.getConnection(schema, attachment);
} else if (runOnSlave) {
PhysicalDbInstance source = dbGroup.getRandomAliveReadNode();
if (source == null) {
throw new IllegalArgumentException("no valid dbInstance in dbGroup:" + dbGroup.getGroupName());
}
return source.getConnection(schema, attachment);
} else {
checkRequest(schema);
PhysicalDbInstance writeSource = dbGroup.getWriteDbInstance();
if (writeSource.isReadOnly()) {
throw new IllegalArgumentException("The dbInstance[" + writeSource.getConfig().getUrl() + "] is running with the --read-only option so it cannot execute this statement");
}
writeSource.incrementWriteCount();
return writeSource.getConnection(schema, attachment);
}
public BackendConnection getConnection(String schema, boolean autocommit, Object attachment) throws IOException {
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnSlave((RouteResultsetNode) attachment, autocommit));
return instance.getConnection(schema, attachment);
}
private void getWriteNodeConnection(String schema, ResponseHandler handler, Object attachment) throws IOException {
checkRequest(schema);
PhysicalDbInstance writeSource = dbGroup.getWriteDbInstance();
if (writeSource.isDisabled()) {
throw new IllegalArgumentException("[" + writeSource.getDbGroupConfig().getName() + "." + writeSource.getConfig().getInstanceName() + "] is disabled");
} else if (writeSource.isFakeNode()) {
throw new IllegalArgumentException("[" + writeSource.getDbGroupConfig().getName() + "." + writeSource.getConfig().getInstanceName() + "] is fake node");
private boolean canRunOnSlave(RouteResultsetNode rrs, boolean autoCommit) {
boolean canRunInSlave = false;
if (rrs.getRunOnSlave() == null) {
if (rrs.canRunINReadDB(autoCommit)) {
canRunInSlave = true;
}
} else {
if (!rrs.getRunOnSlave()) {
rrs.setCanRunInReadDB(false);
} else {
canRunInSlave = true;
}
}
if (writeSource.isReadOnly()) {
throw new IllegalArgumentException("The dbInstance[" + writeSource.getConfig().getUrl() + "] is running with the --read-only option so it cannot execute this statement");
}
writeSource.incrementWriteCount();
writeSource.getConnection(schema, handler, attachment, true);
return canRunInSlave;
}
}

View File

@@ -18,6 +18,7 @@ import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.ShardingNodeConfig;
import com.actiontech.dble.config.model.sharding.table.ERTable;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.config.util.ConfigException;
@@ -98,7 +99,7 @@ public class ConfigInitializer implements ProblemReporter {
}
//Mark all dbInstance whether they are fake or not
for (PhysicalDbGroup dbGroup : this.dbGroups.values()) {
for (PhysicalDbInstance source : dbGroup.getAllDbInstances()) {
for (PhysicalDbInstance source : dbGroup.getDbInstances(true)) {
if (checkSourceFake(source)) {
source.setFakeNode(true);
} else if (!source.isDisabled()) {
@@ -160,7 +161,14 @@ public class ConfigInitializer implements ProblemReporter {
}
}
allUseShardingNode.clear();
//delete redundancy dbGroup
// include rwSplit dbGroup
for (UserConfig config : this.users.values()) {
if (config instanceof RwSplitUserConfig) {
allUseHost.add(((RwSplitUserConfig) config).getDbGroup());
}
}
// delete redundancy dbGroup
if (allUseHost.size() < this.dbGroups.size()) {
Iterator<String> dbGroup = this.dbGroups.keySet().iterator();
while (dbGroup.hasNext()) {
@@ -178,36 +186,45 @@ public class ConfigInitializer implements ProblemReporter {
public void testConnection() {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection");
try {
Map<String, List<Pair<String, String>>> hostSchemaMap = genHostSchemaMap();
Set<String> errNodeKeys = new HashSet<>();
Set<String> errSourceKeys = new HashSet<>();
BoolPtr isConnectivity = new BoolPtr(true);
BoolPtr isAllDbInstanceConnected = new BoolPtr(true);
for (Map.Entry<String, List<Pair<String, String>>> entry : hostSchemaMap.entrySet()) {
String hostName = entry.getKey();
List<Pair<String, String>> nodeList = entry.getValue();
PhysicalDbGroup pool = dbGroups.get(hostName);
Map<String, List<Pair<String, String>>> hostSchemaMap = genDbInstanceSchemaMap();
Set<String> errDbInstanceNames = new HashSet<>();
boolean isAllDbInstanceConnected = true;
// check whether dbInstance is connected
String dbGroupName;
PhysicalDbGroup dbGroup;
for (Map.Entry<String, PhysicalDbGroup> entry : this.dbGroups.entrySet()) {
dbGroup = entry.getValue();
dbGroupName = entry.getKey();
checkMaxCon(pool);
for (PhysicalDbInstance ds : pool.getAllDbInstances()) {
// sharding group
List<Pair<String, String>> schemaList = null;
if (hostSchemaMap.containsKey(dbGroupName)) {
schemaList = hostSchemaMap.get(entry.getKey());
checkMaxCon(dbGroup, schemaList.size());
}
for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) {
if (ds.getConfig().isDisabled()) {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + pool.getGroupName() + "," + ds.getName() + "] is disabled"));
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is disabled"));
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it");
ds.setTestConnSuccess(false);
continue;
} else if (ds.isFakeNode()) {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + pool.getGroupName() + "," + ds.getName() + "] is fake Node"));
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is fake Node"));
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it");
ds.setTestConnSuccess(false);
continue;
}
testDbInstance(errNodeKeys, errSourceKeys, isConnectivity, isAllDbInstanceConnected, nodeList, pool, ds);
if (!testDbInstance(dbGroupName, ds, schemaList)) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
}
}
}
if (!isAllDbInstanceConnected.get()) {
if (!isAllDbInstanceConnected) {
StringBuilder sb = new StringBuilder("SelfCheck### there are some dbInstance connection failed, pls check these dbInstance:");
for (String key : errSourceKeys) {
for (String key : errDbInstanceNames) {
sb.append("{");
sb.append(key);
sb.append("},");
@@ -215,29 +232,13 @@ public class ConfigInitializer implements ProblemReporter {
throw new ConfigException(sb.toString());
}
if (!isConnectivity.get()) {
StringBuilder sb = new StringBuilder("SelfCheck### there are some sharding node connection failed, pls check these dbInstance:");
for (String key : errNodeKeys) {
sb.append("{");
sb.append(key);
sb.append("},");
}
LOGGER.warn(sb.toString());
}
} finally {
TraceManager.finishSpan(traceObject);
}
}
private void checkMaxCon(PhysicalDbGroup pool) {
int schemasCount = 0;
for (ShardingNode dn : shardingNodes.values()) {
if (dn.getDbGroup() == pool) {
schemasCount++;
}
}
for (PhysicalDbInstance dbInstance : pool.getAllDbInstances()) {
private void checkMaxCon(PhysicalDbGroup pool, int schemasCount) {
for (PhysicalDbInstance dbInstance : pool.getDbInstances(true)) {
if (dbInstance.getConfig().getMaxCon() < Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())) {
errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + pool.getGroupName() + "." + dbInstance.getConfig().getInstanceName() + "] maxCon too little,would be change to " +
Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())));
@@ -250,10 +251,9 @@ public class ConfigInitializer implements ProblemReporter {
}
}
private void testDbInstance(Set<String> errNodeKeys, Set<String> errSourceKeys, BoolPtr isConnectivity,
BoolPtr isAllDbInstanceConnected, List<Pair<String, String>> nodeList, PhysicalDbGroup pool, PhysicalDbInstance ds) {
boolean isMaster = ds == pool.getWriteDbInstance();
String dbInstanceName = "dbInstance[" + ds.getDbGroupConfig().getName() + "." + ds.getName() + "]";
private boolean testDbInstance(String dbGroupName, PhysicalDbInstance ds, List<Pair<String, String>> schemaList) {
boolean isConnectivity = true;
String dbInstanceKey = "dbInstance[" + dbGroupName + "." + ds.getName() + "]";
try {
BoolPtr isDSConnectedPtr = new BoolPtr(false);
TestTask testDsTask = new TestTask(ds, isDSConnectedPtr);
@@ -262,56 +262,33 @@ public class ConfigInitializer implements ProblemReporter {
boolean isDbInstanceConnected = isDSConnectedPtr.get();
ds.setTestConnSuccess(isDbInstanceConnected);
if (!isDbInstanceConnected) {
isConnectivity.set(false);
isAllDbInstanceConnected.set(false);
errSourceKeys.add(dbInstanceName);
errorInfos.add(new ErrorInfo("Backend", "WARNING", "Can't connect to [" + ds.getDbGroupConfig().getName() + "," + ds.getName() + "]"));
markDbInstanceSchemaFail(errNodeKeys, nodeList, dbInstanceName);
} else {
BoolPtr isSchemaConnectedPtr = new BoolPtr(true);
TestSchemasTask testSchemaTask = new TestSchemasTask(ds, nodeList, errNodeKeys, isSchemaConnectedPtr, isMaster);
errorInfos.add(new ErrorInfo("Backend", "WARNING", "Can't connect to [" + dbInstanceKey + "]"));
LOGGER.warn("SelfCheck### can't connect to [" + dbInstanceKey + "]");
isConnectivity = false;
} else if (schemaList != null) {
TestSchemasTask testSchemaTask = new TestSchemasTask(ds, schemaList, !ds.isReadInstance());
testSchemaTask.start();
testSchemaTask.join(3000);
boolean isConnected = isSchemaConnectedPtr.get();
if (!isConnected) {
isConnectivity.set(false);
for (Map.Entry<String, String> entry : testSchemaTask.getNodes().entrySet()) {
shardingNodes.get(entry.getValue()).setSchemaExists(false);
}
}
} else {
LOGGER.warn("SelfCheck### connect to [" + dbInstanceKey + "] successfully.");
}
} catch (InterruptedException e) {
isConnectivity.set(false);
isAllDbInstanceConnected.set(false);
errSourceKeys.add(dbInstanceName);
markDbInstanceSchemaFail(errNodeKeys, nodeList, dbInstanceName);
errorInfos.add(new ErrorInfo("Backend", "WARNING", "Can't connect to [" + dbInstanceKey + "]"));
LOGGER.warn("SelfCheck### can't connect to [" + dbInstanceKey + "]");
isConnectivity = false;
}
return isConnectivity;
}
private void markDbInstanceSchemaFail(Set<String> errKeys, List<Pair<String, String>> nodeList, String dbInstanceName) {
for (Pair<String, String> node : nodeList) {
String key = dbInstanceName + ",sharding_node[" + node.getKey() + "],sharding[" + node.getValue() + "]";
errKeys.add(key);
shardingNodes.get(node.getKey()).setSchemaExists(false);
LOGGER.warn("SelfCheck### test " + key + " database connection failed ");
}
}
private Map<String, List<Pair<String, String>>> genHostSchemaMap() {
Map<String, List<Pair<String, String>>> hostSchemaMap = new HashMap<>();
if (this.shardingNodes != null && this.dbGroups != null) {
for (Map.Entry<String, PhysicalDbGroup> entry : dbGroups.entrySet()) {
String hostName = entry.getKey();
PhysicalDbGroup pool = entry.getValue();
for (ShardingNode shardingNode : shardingNodes.values()) {
if (pool.equals(shardingNode.getDbGroup())) {
List<Pair<String, String>> nodes = hostSchemaMap.computeIfAbsent(hostName, k -> new ArrayList<>());
nodes.add(new Pair<>(shardingNode.getName(), shardingNode.getDatabase()));
}
}
private Map<String, List<Pair<String, String>>> genDbInstanceSchemaMap() {
Map<String, List<Pair<String, String>>> dbInstanceSchemaMap = new HashMap<>(16);
if (shardingNodes != null) {
for (ShardingNode shardingNode : shardingNodes.values()) {
List<Pair<String, String>> nodes = dbInstanceSchemaMap.computeIfAbsent(shardingNode.getDbGroupName(), k -> new ArrayList<>(8));
nodes.add(new Pair<>(shardingNode.getName(), shardingNode.getDatabase()));
}
}
return hostSchemaMap;
return dbInstanceSchemaMap;
}
@@ -353,7 +330,6 @@ public class ConfigInitializer implements ProblemReporter {
return nodes;
}
public boolean isFullyConfigured() {
return fullyConfigured;
}

View File

@@ -11,7 +11,7 @@ import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.alarm.ToResolveContainer;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.plan.common.ptr.BoolPtr;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.sqlengine.MultiRowSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.OneTimeConnJob;
@@ -23,30 +23,24 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestSchemasTask extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(TestSchemasTask.class);
private PhysicalDbInstance ds;
private BoolPtr boolPtr;
private Set<String> errKeys;
private Map<String, String> nodes = new HashMap<>();
private boolean needAlert;
private ReentrantLock lock = new ReentrantLock();
private volatile boolean isFinish = false;
private Condition finishCond = lock.newCondition();
public TestSchemasTask(PhysicalDbInstance ds, List<Pair<String, String>> nodeList, Set<String> errKeys, BoolPtr boolPtr, boolean needAlert) {
public TestSchemasTask(PhysicalDbInstance ds, List<Pair<String, String>> nodeList, boolean needAlert) {
this.ds = ds;
this.errKeys = errKeys;
this.boolPtr = boolPtr;
this.needAlert = needAlert;
for (Pair<String, String> node : nodeList) {
nodes.put(node.getValue(), node.getKey()); // sharding->node
}
}
public Map<String, String> getNodes() {
@@ -80,6 +74,7 @@ public class TestSchemasTask extends Thread {
@Override
public void onResult(SQLQueryResult<List<Map<String, String>>> result) {
Map<String, ShardingNode> shardingNodes = DbleServer.getInstance().getConfig().getShardingNodes();
if (result.isSuccess()) {
List<Map<String, String>> rows = result.getResult();
for (Map<String, String> row : rows) {
@@ -89,6 +84,7 @@ public class TestSchemasTask extends Thread {
}
String nodeName = nodes.remove(schema);
if (nodeName != null) {
shardingNodes.get(nodeName).setSchemaExists(true);
String key = "dbGroup[" + ds.getDbGroupConfig().getName() + "." + ds.getConfig().getInstanceName() + "],shardingNode[" + nodeName + "],schema[" + schema + "]";
LOGGER.info("SelfCheck### test " + key + " database connection success ");
}
@@ -100,11 +96,9 @@ public class TestSchemasTask extends Thread {
private void reportSchemaNotFound() {
for (Map.Entry<String, String> node : nodes.entrySet()) {
boolPtr.set(false);
String nodeName = node.getValue();
String key = "dbInstance[" + ds.getDbGroupConfig().getName() + "." + ds.getConfig().getInstanceName() + "],sharding_node[" + nodeName + "],schema[" + node.getKey() + "]";
errKeys.add(key);
LOGGER.warn("test conn " + key + " error");
LOGGER.warn("SelfCheck### test " + key + " database connection fail ");
if (needAlert) {
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", ds.getDbGroupConfig().getName() + "-" + ds.getConfig().getInstanceName());
labels.put("sharding_node", nodeName);

View File

@@ -1,8 +1,8 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.config.util;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
@@ -253,7 +253,7 @@ public final class ConfigUtil {
String hostName = entry.getKey();
PhysicalDbGroup pool = entry.getValue();
for (PhysicalDbInstance ds : pool.getAllDbInstances()) {
for (PhysicalDbInstance ds : pool.getDbInstances(true)) {
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
@@ -265,7 +265,7 @@ public final class ConfigUtil {
while (!service.awaitTermination(100, TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
if (i == 0) {
LOGGER.debug("wait get all datasouce's get key variable");
LOGGER.debug("wait to get all dbInstances's get key variable");
}
i++;
if (i == 100) { //log every 10 seconds

View File

@@ -2,6 +2,7 @@ package com.actiontech.dble.net.service;
import com.actiontech.dble.backend.mysql.ByteUtil;
import com.actiontech.dble.backend.mysql.proto.handler.Impl.MySQLProtoHandlerImpl;
import com.actiontech.dble.backend.mysql.proto.handler.ProtoHandler;
import com.actiontech.dble.backend.mysql.proto.handler.ProtoHandlerResult;
import com.actiontech.dble.net.connection.AbstractConnection;
@@ -20,23 +21,24 @@ import java.util.concurrent.atomic.AtomicInteger;
*/
public abstract class AbstractService implements Service {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractService.class);
protected final ConcurrentLinkedQueue<ServiceTask> taskQueue = new ConcurrentLinkedQueue<>();
protected ServiceTask currentTask = null;
protected volatile ProtoHandler proto;
protected AbstractConnection connection;
private AtomicInteger packetId;
private AtomicInteger packetId = new AtomicInteger(0);
protected volatile boolean isSupportCompress = false;
protected ServiceTask currentTask = null;
private volatile boolean isSupportCompress = false;
protected volatile ProtoHandler proto;
protected final ConcurrentLinkedQueue<ServiceTask> taskQueue;
public AbstractService(AbstractConnection connection) {
this.connection = connection;
this.proto = new MySQLProtoHandlerImpl();
this.taskQueue = new ConcurrentLinkedQueue<>();
this.packetId = new AtomicInteger(0);
}
@Override
public void handle(ByteBuffer dataBuffer) {
this.sessionStart();
boolean hasReming = true;
int offset = 0;
@@ -68,7 +70,7 @@ public abstract class AbstractService implements Service {
offset = result.getOffset();
continue;
default:
throw new RuntimeException("unknow error when read data");
throw new RuntimeException("unknown error when read data");
}
}
}

View File

@@ -63,9 +63,7 @@ public class HintMasterDBHandler implements HintHandler {
if (isRouteToMaster) {
rrs.setRunOnSlave(false);
}
if (!isRouteToMaster) {
} else {
rrs.setRunOnSlave(true);
}

View File

@@ -0,0 +1,77 @@
package com.actiontech.dble.rwsplit;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.services.rwsplit.Callback;
import com.actiontech.dble.services.rwsplit.RWSplitHandler;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class RWSplitNonBlockingSession {
public static final Logger LOGGER = LoggerFactory.getLogger(RWSplitNonBlockingSession.class);
private volatile BackendConnection conn;
private final RWSplitService rwSplitService;
private PhysicalDbGroup rwGroup;
public RWSplitNonBlockingSession(RWSplitService service) {
this.rwSplitService = service;
}
public void execute(boolean master, Callback callback) throws IOException {
execute(master, null, callback);
}
public void execute(boolean master, byte[] originPacket, Callback callback) throws IOException {
RWSplitHandler handler = new RWSplitHandler(rwSplitService, originPacket, callback);
if (conn != null && !conn.isClosed()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select bind conn[id={}]", conn.getId());
}
handler.execute(conn);
return;
}
PhysicalDbInstance instance = rwGroup.select(master);
instance.getConnection(rwSplitService.getSchema(), handler, null, false);
}
public void setRwGroup(PhysicalDbGroup rwGroup) {
this.rwGroup = rwGroup;
}
public void bind(BackendConnection bindConn) {
if (conn != null) {
LOGGER.warn("last conn is remaining");
}
this.conn = bindConn;
}
public void unbindIfSafe() {
if (rwSplitService.isAutocommit() && !rwSplitService.isLocked() &&
!rwSplitService.isTxStart() &&
!rwSplitService.isInLoadData() &&
!rwSplitService.isInPrepare()) {
this.conn.release();
this.conn = null;
}
}
public void unbind() {
this.conn = null;
}
public void close(String reason) {
if (conn != null) {
conn.close(reason);
}
}
public RWSplitService getService() {
return rwSplitService;
}
}

View File

@@ -1,8 +1,8 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.server.handler;
import com.actiontech.dble.DbleServer;
@@ -19,20 +19,9 @@ public final class UseHandler {
}
public static void handle(String sql, ShardingService service, int offset) {
String schema = sql.substring(offset).trim();
int length = schema.length();
if (length > 0) {
if (schema.endsWith(";")) {
schema = schema.substring(0, schema.length() - 1);
}
schema = StringUtil.replaceChars(schema, "`", null);
length = schema.length();
if (schema.charAt(0) == '\'' && schema.charAt(length - 1) == '\'') {
schema = schema.substring(1, length - 1);
}
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
schema = schema.toLowerCase();
}
String schema = getSchemaName(sql, offset);
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
schema = schema.toLowerCase();
}
if (!DbleServer.getInstance().getConfig().getSchemas().containsKey(schema)) {
service.writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + schema + "'");
@@ -50,4 +39,15 @@ public final class UseHandler {
service.write(service.getSession2().getOKPacket());
}
public static String getSchemaName(String sql, int offset) {
String schema = sql.substring(offset).trim();
if (schema.length() == 0) {
return "";
}
if (schema.endsWith(";")) {
schema = schema.substring(0, schema.length() - 1);
}
return StringUtil.removeApostropheOrBackQuote(schema);
}
}

View File

@@ -1,17 +1,16 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.server.response;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.mysql.ErrorPacket;
import com.actiontech.dble.net.mysql.HeartbeatPacket;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.util.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,7 +24,7 @@ public final class Heartbeat {
private static final Logger LOGGER = LoggerFactory.getLogger(Heartbeat.class);
public static void response(ShardingService service, byte[] data) {
public static void response(AbstractConnection conn, byte[] data) {
HeartbeatPacket hp = new HeartbeatPacket();
hp.read(data);
if (DbleServer.getInstance().isOnline()) {
@@ -33,25 +32,25 @@ public final class Heartbeat {
ok.setPacketId(1);
ok.setAffectedRows(hp.getId());
ok.setServerStatus(2);
ok.write(service.getConnection());
ok.write(conn);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(responseMessage("OK", service, hp.getId()));
LOGGER.debug(responseMessage("OK", conn, hp.getId()));
}
} else {
ErrorPacket error = new ErrorPacket();
error.setPacketId(1);
error.setErrNo(ErrorCode.ER_SERVER_SHUTDOWN);
error.setMessage(String.valueOf(hp.getId()).getBytes());
error.write(service.getConnection());
error.write(conn);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(responseMessage("ERROR", service, hp.getId()));
LOGGER.info(responseMessage("ERROR", conn, hp.getId()));
}
}
}
private static String responseMessage(String action, ShardingService service, long id) {
private static String responseMessage(String action, AbstractConnection conn, long id) {
return "RESPONSE:" + action + ", id=" + id + ", host=" +
service.getConnection().getHost() + ", port=" + service.getConnection().getPort() + ", time=" +
conn.getHost() + ", port=" + conn.getPort() + ", time=" +
TimeUtil.currentTimeMillis();
}

View File

@@ -70,7 +70,7 @@ public class VarsExtractorHandler {
}
if (ds == null) {
for (PhysicalDbGroup dbGroup : dbGroups.values()) {
for (PhysicalDbInstance dsTest : dbGroup.getAllActiveDbInstances()) {
for (PhysicalDbInstance dsTest : dbGroup.getDbInstances(false)) {
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
break;

View File

@@ -65,7 +65,7 @@ public abstract class MySQLBasedService extends AbstractService {
this.setPacketId(data[3]);
}
if (isSupportCompress()) {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(data, new ConcurrentLinkedQueue<byte[]>());
List<byte[]> packs = CompressUtil.decompressMysqlPacket(data, new ConcurrentLinkedQueue<>());
for (byte[] pack : packs) {
if (pack.length != 0) {
handleInnerData(pack);
@@ -112,7 +112,6 @@ public abstract class MySQLBasedService extends AbstractService {
err.write(connection);
}
public String getStringOfSysVariables() {
StringBuilder sbSysVariables = new StringBuilder();
int cnt = 0;

View File

@@ -2,6 +2,7 @@ package com.actiontech.dble.services.factorys;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.user.ManagerUserConfig;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.config.model.user.ShardingUserConfig;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.net.connection.AbstractConnection;
@@ -11,6 +12,7 @@ import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.services.mysqlsharding.MySQLCurrentResponseService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
/**
* Created by szf on 2020/6/28.
@@ -30,6 +32,10 @@ public final class BusinessServiceFactory {
ManagerService service = new ManagerService(connection);
service.initFromAuthInfo(info);
return service;
} else if (userConfig instanceof RwSplitUserConfig) {
RWSplitService service = new RWSplitService(connection);
service.initFromAuthInfo(info);
return service;
}
return null;
}

View File

@@ -83,7 +83,7 @@ public final class DbleBackendConnections extends ManagerBaseTable {
columnsType.put("conn_estab_time", Fields.FIELD_TYPE_LONG);
columns.put("borrowed_from_pool", new ColumnMeta("borrowed_from_pool", "int(11)", false));
columnsType.put("borrowed_from_pool", Fields.FIELD_TYPE_LONG);
columnsType.put("borrowed_from_pool", Fields.FIELD_TYPE_VAR_STRING);
columns.put("conn_recv_buffer", new ColumnMeta("conn_recv_buffer", "int(11)", false));
columnsType.put("conn_recv_buffer", Fields.FIELD_TYPE_LONG);

View File

@@ -183,7 +183,7 @@ public class DbleDbInstance extends ManagerBaseTable {
List<LinkedHashMap<String, String>> rowList = Lists.newLinkedList();
dbGroups.entrySet().forEach(dbGroupEntry -> {
PhysicalDbGroup dbGroup = dbGroupEntry.getValue();
dbGroup.getAllDbInstances().forEach(dbInstance -> {
dbGroup.getDbInstances(true).forEach(dbInstance -> {
if (nameSet.add(dbInstance.getName() + "-" + dbGroup.getGroupName())) {
LinkedHashMap<String, String> map = Maps.newLinkedHashMap();
DbInstanceConfig dbInstanceConfig = dbInstance.getConfig();

View File

@@ -42,12 +42,12 @@ public final class FreshBackendConn {
// single
try {
String[] nameList = instanceNames == null ? Arrays.copyOf(dh.getAllDbInstanceMap().keySet().toArray(), dh.getAllDbInstanceMap().keySet().toArray().length, String[].class) : instanceNames.split(",");
List<String> sourceNames = Arrays.asList(nameList).stream().distinct().collect(Collectors.toList());
List<String> sourceNames = Arrays.stream(nameList).distinct().collect(Collectors.toList());
if (dh.getRwSplitMode() == PhysicalDbGroup.RW_SPLIT_OFF && (!sourceNames.contains(dh.getWriteDbInstance().getName()))) {
if (dh.getRwSplitMode() == 0 && (!sourceNames.contains(dh.getWriteDbInstance().getName()))) {
warnMsg = "the rwSplitMode of this dbGroup is 0, so connection pool for slave dbInstance don't refresh";
} else {
if (dh.getRwSplitMode() == PhysicalDbGroup.RW_SPLIT_OFF && sourceNames.size() > 1 && sourceNames.contains(dh.getWriteDbInstance().getName())) {
if (dh.getRwSplitMode() == 0 && sourceNames.size() > 1 && sourceNames.contains(dh.getWriteDbInstance().getName())) {
warnMsg = "the rwSplitMode of this dbGroup is 0, so connection pool for slave dbInstance don't refresh";
}
dh.stop(sourceNames, "fresh backend conn", isForced);

View File

@@ -70,7 +70,7 @@ public final class ShowConnectionPoolProperty {
PoolConfig poolConfig;
for (PhysicalDbGroup group : DbleServer.getInstance().getConfig().getDbGroups().values()) {
for (PhysicalDbInstance instance : group.getAllDbInstances()) {
for (PhysicalDbInstance instance : group.getDbInstances(true)) {
poolConfig = instance.getConfig().getPoolConfig();
RowDataPacket row = getRow(group.getGroupName(), instance.getName(), "minCon", instance.getConfig().getMinCon() + "", service.getCharset().getClient());
row.setPacketId(++packetId);

View File

@@ -91,7 +91,7 @@ public final class ShowDbInstance {
if (null != name) {
ShardingNode dn = conf.getShardingNodes().get(name);
for (PhysicalDbInstance w : dn.getDbGroup().getAllDbInstances()) {
for (PhysicalDbInstance w : dn.getDbGroup().getDbInstances(true)) {
RowDataPacket row = getRow(w.getDbGroupConfig().getName(), w, service.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
@@ -102,7 +102,7 @@ public final class ShowDbInstance {
for (Map.Entry<String, PhysicalDbGroup> entry : conf.getDbGroups().entrySet()) {
PhysicalDbGroup dbGroup = entry.getValue();
String dbGroupName = entry.getKey();
for (PhysicalDbInstance source : dbGroup.getAllDbInstances()) {
for (PhysicalDbInstance source : dbGroup.getDbInstances(true)) {
RowDataPacket sRow = getRow(dbGroupName, source, service.getCharset().getResults());
sRow.setPacketId(++packetId);
buffer = sRow.write(buffer, service, true);

View File

@@ -1,8 +1,8 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.services.manager.response;
import com.actiontech.dble.DbleServer;
@@ -122,7 +122,7 @@ public final class ShowDbInstanceSyn {
for (Map.Entry<String, PhysicalDbGroup> entry : dbGroups.entrySet()) {
String dbGroupName = entry.getKey();
PhysicalDbGroup pool = entry.getValue();
for (PhysicalDbInstance ds : pool.getAllActiveDbInstances()) {
for (PhysicalDbInstance ds : pool.getDbInstances(false)) {
if (ds.isDisabled()) {
continue;
}

View File

@@ -1,8 +1,8 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.services.manager.response;
import com.actiontech.dble.DbleServer;
@@ -13,8 +13,8 @@ import com.actiontech.dble.backend.mysql.PacketUtil;
import com.actiontech.dble.config.Fields;
import com.actiontech.dble.config.ServerConfig;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.route.parser.ManagerParseShow;
import com.actiontech.dble.services.manager.ManagerService;
import com.actiontech.dble.statistic.DbInstanceSyncRecorder;
import com.actiontech.dble.statistic.DbInstanceSyncRecorder.Record;
import com.actiontech.dble.util.LongUtil;
@@ -115,7 +115,7 @@ public final class ShowDbInstanceSynDetail {
for (Map.Entry<String, PhysicalDbGroup> entry : dbGroups.entrySet()) {
String dbGroupName = entry.getKey();
PhysicalDbGroup pool = entry.getValue();
for (PhysicalDbInstance ds : pool.getAllActiveDbInstances()) {
for (PhysicalDbInstance ds : pool.getDbInstances(false)) {
if (ds.isDisabled()) {
continue;
}

View File

@@ -110,7 +110,7 @@ public final class ShowHeartbeat {
// host nodes
Map<String, PhysicalDbGroup> dbGroups = conf.getDbGroups();
for (PhysicalDbGroup pool : dbGroups.values()) {
for (PhysicalDbInstance ds : pool.getAllDbInstances()) {
for (PhysicalDbInstance ds : pool.getDbInstances(true)) {
MySQLHeartbeat hb = ds.getHeartbeat();
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(ds.getName().getBytes());

View File

@@ -110,7 +110,7 @@ public final class ShowHeartbeatDetail {
Map<String, PhysicalDbGroup> dbGroups = conf.getDbGroups();
for (PhysicalDbGroup pool : dbGroups.values()) {
for (PhysicalDbInstance ds : pool.getAllActiveDbInstances()) {
for (PhysicalDbInstance ds : pool.getDbInstances(false)) {
if (name.equals(ds.getName())) {
hb = ds.getHeartbeat();
ip = ds.getConfig().getIp();

View File

@@ -8,7 +8,10 @@ package com.actiontech.dble.services.mysqlauthenticate.util;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.config.Capabilities;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.user.*;
import com.actiontech.dble.config.model.user.ManagerUserConfig;
import com.actiontech.dble.config.model.user.ShardingUserConfig;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.services.mysqlauthenticate.PluginName;
@@ -33,9 +36,6 @@ public final class AuthUtil {
if (userConfig == null) {
return "Access denied for user '" + user + "' with host '" + connection.getHost() + "'";
}
if (userConfig instanceof RwSplitUserConfig) {
return "this version does not support rwSplitUser";
}
FrontendConnection fcon = (FrontendConnection) connection;

View File

@@ -6,8 +6,6 @@ import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.mysql.EOFPacket;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.server.response.FieldList;
import com.actiontech.dble.server.response.Heartbeat;
import com.actiontech.dble.server.response.Ping;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,14 +70,6 @@ public class MySQLProtoLogicHandler {
service.query(sql);
}
public void ping() {
Ping.response(service.getConnection());
}
public void kill(byte[] data) {
service.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
}
public String stmtPrepare(byte[] data) {
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
@@ -98,11 +88,6 @@ public class MySQLProtoLogicHandler {
return sql;
}
public void heartbeat(byte[] data) {
Heartbeat.response(service, data);
}
public void setOption(byte[] data) {
MySQLMessage mm = new MySQLMessage(data); //see sql\protocol_classic.cc parse_packet
if (mm.length() == 7) {

View File

@@ -20,6 +20,7 @@ import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.services.MySQLBasedService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.statistic.stat.ThreadWorkUsage;
import com.actiontech.dble.util.CompressUtil;
@@ -280,7 +281,6 @@ public class MySQLResponseService extends MySQLBasedService {
}
}
public void query(String query) {
query(query, this.autocommit);
}
@@ -289,10 +289,10 @@ public class MySQLResponseService extends MySQLBasedService {
RouteResultsetNode rrn = new RouteResultsetNode("default", ServerParse.SELECT, query);
StringBuilder synSQL = getSynSql(null, rrn, this.getConnection().getCharsetName(), this.txIsolation, isAutoCommit, usrVariables, sysVariables);
LOGGER.info("try to send command of " + rrn);
synAndDoExecute(synSQL, rrn, this.getConnection().getCharsetName());
synAndDoExecute(synSQL, rrn.getStatement(), this.getConnection().getCharsetName());
}
private void synAndDoExecute(StringBuilder synSQL, RouteResultsetNode rrn, CharsetNames clientCharset) {
private void synAndDoExecute(StringBuilder synSQL, String sql, CharsetNames clientCharset) {
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(this, "syn&do-execute-sql");
if (synSQL != null && traceObject != null) {
TraceManager.log(ImmutableMap.of("synSQL", synSQL), traceObject);
@@ -303,12 +303,12 @@ public class MySQLResponseService extends MySQLBasedService {
if (session != null) {
session.setBackendRequestTime(this.getConnection().getId());
}
sendQueryCmd(rrn.getStatement(), clientCharset);
sendQueryCmd(sql, clientCharset);
return;
}
// and our query sql to multi command at last
synSQL.append(rrn.getStatement()).append(";");
synSQL.append(sql).append(";");
// syn and execute others
if (session != null) {
session.setBackendRequestTime(this.getConnection().getId());
@@ -320,6 +320,38 @@ public class MySQLResponseService extends MySQLBasedService {
}
}
private StringBuilder getSynSql(CharsetNames clientCharset, int clientTxIsolation, boolean expectAutocommit) {
int schemaSyn = StringUtil.equals(connection.getSchema(), connection.getOldSchema()) ? 0 : 1;
int charsetSyn = (this.getConnection().getCharsetName().equals(clientCharset)) ? 0 : 1;
int txIsolationSyn = (this.txIsolation == clientTxIsolation) ? 0 : 1;
int autoCommitSyn = (this.autocommit == expectAutocommit) ? 0 : 1;
int synCount = schemaSyn + charsetSyn + txIsolationSyn + autoCommitSyn;
if (synCount == 0) {
return null;
}
StringBuilder sb = new StringBuilder();
if (schemaSyn == 1) {
getChangeSchemaCommand(sb, connection.getSchema());
}
if (charsetSyn == 1) {
getCharsetCommand(sb, clientCharset);
}
if (txIsolationSyn == 1) {
getTxIsolationCommand(sb, clientTxIsolation);
}
if (autoCommitSyn == 1) {
getAutocommitCommand(sb, expectAutocommit);
}
metaDataSynced = false;
statusSync = new StatusSync(connection.getSchema(),
clientCharset, clientTxIsolation, expectAutocommit,
synCount, Collections.emptyMap(), Collections.emptyMap(), Collections.emptySet());
return sb;
}
private StringBuilder getSynSql(String xaTxID, RouteResultsetNode rrn,
CharsetNames clientCharset, int clientTxIsolation,
boolean expectAutocommit, Map<String, String> usrVariables, Map<String, String> sysVariables) {
@@ -564,8 +596,7 @@ public class MySQLResponseService extends MySQLBasedService {
}
}
public void execute(RouteResultsetNode rrn, ShardingService service,
boolean isAutoCommit) {
public void execute(RouteResultsetNode rrn, ShardingService service, boolean isAutoCommit) {
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(this, "execute-route-result");
TraceManager.log(ImmutableMap.of("route-result-set", rrn, "service-detail", this.compactInfo()), traceObject);
try {
@@ -574,12 +605,26 @@ public class MySQLResponseService extends MySQLBasedService {
service.setTxStarted(true);
}
StringBuilder synSQL = getSynSql(xaTxId, rrn, service.getCharset(), service.getTxIsolation(), isAutoCommit, service.getUsrVariables(), service.getSysVariables());
synAndDoExecute(synSQL, rrn, service.getCharset());
synAndDoExecute(synSQL, rrn.getStatement(), service.getCharset());
} finally {
TraceManager.finishSpan(this, traceObject);
}
}
public void execute(RWSplitService service) {
StringBuilder synSQL = getSynSql(service.getCharset(), service.getTxIsolation(), service.isAutocommit());
synAndDoExecute(synSQL, service.getExecuteSql(), service.getCharset());
}
public void execute(RWSplitService service, byte[] originPacket) {
if (service != null) {
StringBuilder synSQL = getSynSql(service.getCharset(), service.getTxIsolation(), service.isAutocommit());
if (synSQL != null) {
sendQueryCmd(synSQL.toString(), service.getCharset());
}
}
writeDirectly(originPacket);
}
private void synAndDoExecuteMultiNode(StringBuilder synSQL, RouteResultsetNode rrn, CharsetNames clientCharset) {
if (LOGGER.isDebugEnabled()) {
@@ -792,7 +837,6 @@ public class MySQLResponseService extends MySQLBasedService {
StatusSync(String schema,
CharsetNames clientCharset, Integer txtIsolation, Boolean autocommit,
int synCount, Map<String, String> usrVariables, Map<String, String> sysVariables, Set<String> toResetSys) {
super();
this.schema = schema;
this.clientCharset = clientCharset;
this.txtIsolation = txtIsolation;

View File

@@ -29,7 +29,6 @@ public class MysqlBackendLogicHandler {
this.service = service;
}
protected void handleInnerData(byte[] data) {
if (service.getConnection().isClosed()) {
return;

View File

@@ -29,7 +29,9 @@ import com.actiontech.dble.server.handler.ServerPrepareHandler;
import com.actiontech.dble.server.handler.SetHandler;
import com.actiontech.dble.server.handler.SetInnerHandler;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.response.Heartbeat;
import com.actiontech.dble.server.response.InformationSchemaProfiling;
import com.actiontech.dble.server.response.Ping;
import com.actiontech.dble.server.util.SchemaUtil;
import com.actiontech.dble.services.MySQLBasedService;
import com.actiontech.dble.services.mysqlsharding.handler.LoadDataProtoHandlerImpl;
@@ -174,16 +176,16 @@ public class ShardingService extends MySQLBasedService implements FrontEndServic
break;
case MySQLPacket.COM_PING:
commands.doPing();
protoLogicHandler.ping();
Ping.response(connection);
break;
case MySQLPacket.COM_HEARTBEAT:
commands.doHeartbeat();
Heartbeat.response(connection, data);
break;
case MySQLPacket.COM_QUIT:
commands.doQuit();
connection.close("quit cmd");
break;
case MySQLPacket.COM_PROCESS_KILL:
commands.doKill();
protoLogicHandler.kill(data);
break;
case MySQLPacket.COM_STMT_PREPARE:
commands.doStmtPrepare();
String prepareSql = protoLogicHandler.stmtPrepare(data);
@@ -210,10 +212,6 @@ public class ShardingService extends MySQLBasedService implements FrontEndServic
commands.doStmtExecute();
this.stmtExecute(data, blobDataQueue);
break;
case MySQLPacket.COM_HEARTBEAT:
commands.doHeartbeat();
protoLogicHandler.heartbeat(data);
break;
case MySQLPacket.COM_SET_OPTION:
commands.doOther();
protoLogicHandler.setOption(data);
@@ -231,6 +229,10 @@ public class ShardingService extends MySQLBasedService implements FrontEndServic
commands.doOther();
protoLogicHandler.fieldList(data);
break;
case MySQLPacket.COM_PROCESS_KILL:
commands.doKill();
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
break;
default:
commands.doOther();
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command");
@@ -275,11 +277,11 @@ public class ShardingService extends MySQLBasedService implements FrontEndServic
}
public void stmtExecute(byte[] data, Queue<byte[]> dataqueue) {
byte[] sendData = dataqueue.poll();
public void stmtExecute(byte[] data, Queue<byte[]> dataQueue) {
byte[] sendData = dataQueue.poll();
while (sendData != null) {
this.stmtSendLongData(sendData);
sendData = dataqueue.poll();
sendData = dataQueue.poll();
}
if (prepareHandler != null) {
prepareHandler.execute(data);

View File

@@ -0,0 +1,7 @@
package com.actiontech.dble.services.rwsplit;
public interface Callback {
void callback(RWSplitService rwSplitService);
}

View File

@@ -0,0 +1,171 @@
package com.actiontech.dble.services.rwsplit;
import com.actiontech.dble.backend.mysql.nio.handler.LoadDataResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.ErrorPacket;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.util.StringUtil;
import java.nio.ByteBuffer;
import java.util.List;
public class RWSplitHandler implements ResponseHandler, LoadDataResponseHandler {
private final RWSplitService rwSplitService;
private final byte[] originPacket;
private final AbstractConnection frontedConnection;
protected volatile ByteBuffer buffer;
private boolean write2Client = false;
private final Callback callback;
public RWSplitHandler(RWSplitService service, byte[] originPacket, Callback callback) {
this.rwSplitService = service;
this.originPacket = originPacket;
this.frontedConnection = service.getConnection();
this.callback = callback;
}
public void execute(final BackendConnection conn) {
MySQLResponseService mysqlService = conn.getBackendService();
mysqlService.setResponseHandler(this);
if (originPacket != null) {
mysqlService.execute(rwSplitService, originPacket);
} else {
mysqlService.execute(rwSplitService);
}
}
@Override
public void connectionAcquired(final BackendConnection conn) {
rwSplitService.getSession().bind(conn);
execute(conn);
}
@Override
public void connectionError(Throwable e, Object attachment) {
writeErrorMsg(rwSplitService.nextPacketId(), "can't connect to dbGroup[" + ((RwSplitUserConfig) rwSplitService.getUserConfig()).getDbGroup());
}
@Override
public void errorResponse(byte[] data, AbstractService service) {
MySQLResponseService mysqlService = (MySQLResponseService) service;
boolean syncFinished = mysqlService.syncAndExecute();
if (callback != null) {
callback.callback(rwSplitService);
}
if (!syncFinished) {
mysqlService.getConnection().businessClose("unfinished sync");
rwSplitService.getSession().unbind();
} else {
rwSplitService.getSession().unbindIfSafe();
}
synchronized (this) {
if (!write2Client) {
data[3] = (byte) rwSplitService.nextPacketId();
frontedConnection.write(data);
write2Client = true;
}
}
}
@Override
public void okResponse(byte[] data, AbstractService service) {
// TraceManager.TraceObject traceObject = TraceManager.serviceTrace(service, "get-ok-packet");
// TraceManager.finishSpan(service, traceObject);
MySQLResponseService mysqlService = (MySQLResponseService) service;
boolean executeResponse = mysqlService.syncAndExecute();
if (executeResponse) {
if (callback != null) {
callback.callback(rwSplitService);
}
rwSplitService.getSession().unbindIfSafe();
synchronized (this) {
if (!write2Client) {
data[3] = (byte) rwSplitService.nextPacketId();
frontedConnection.write(data);
write2Client = true;
}
}
}
}
@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPacketsNull, byte[] eof,
boolean isLeft, AbstractService service) {
buffer = frontedConnection.allocate();
synchronized (this) {
header[3] = (byte) rwSplitService.nextPacketId();
buffer = frontedConnection.writeToBuffer(header, buffer);
for (byte[] field : fields) {
field[3] = (byte) rwSplitService.nextPacketId();
buffer = frontedConnection.writeToBuffer(field, buffer);
}
eof[3] = (byte) rwSplitService.nextPacketId();
buffer = frontedConnection.writeToBuffer(eof, buffer);
}
}
@Override
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, AbstractService service) {
synchronized (this) {
row[3] = (byte) rwSplitService.nextPacketId();
buffer = frontedConnection.writeToBuffer(row, buffer);
}
return false;
}
@Override
public void rowEofResponse(byte[] eof, boolean isLeft, AbstractService service) {
synchronized (this) {
if (!write2Client) {
eof[3] = (byte) rwSplitService.nextPacketId();
rwSplitService.getSession().unbindIfSafe();
buffer = frontedConnection.writeToBuffer(eof, buffer);
frontedConnection.write(buffer);
write2Client = true;
}
}
}
@Override
public void requestDataResponse(byte[] requestFilePacket, MySQLResponseService service) {
synchronized (this) {
if (!write2Client) {
frontedConnection.write(requestFilePacket);
}
}
}
@Override
public void connectionClose(AbstractService service, String reason) {
((MySQLResponseService) service).setResponseHandler(null);
synchronized (this) {
if (!write2Client) {
rwSplitService.getSession().bind(null);
writeErrorMsg(rwSplitService.nextPacketId(), "connection close");
write2Client = true;
if (buffer != null) {
frontedConnection.recycle(buffer);
buffer = null;
}
}
}
}
private void writeErrorMsg(int pId, String reason) {
ErrorPacket errPacket = new ErrorPacket();
errPacket.setPacketId(pId);
errPacket.setErrNo(ErrorCode.ER_DB_INSTANCE_ABORTING_CONNECTION);
errPacket.setMessage(StringUtil.encode(reason, frontedConnection.getCharsetName().getClient()));
errPacket.write(frontedConnection);
}
}

View File

@@ -0,0 +1,143 @@
package com.actiontech.dble.services.rwsplit;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.handler.FrontendQueryHandler;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.server.ServerQueryHandler;
import com.actiontech.dble.server.handler.UseHandler;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.singleton.TraceManager;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLAssignItem;
import com.alibaba.druid.sql.ast.statement.SQLSetStatement;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
public class RWSplitQueryHandler implements FrontendQueryHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerQueryHandler.class);
private final RWSplitNonBlockingSession session;
//private Boolean readOnly = true;
//private boolean sessionReadOnly = true;
@Override
public void setReadOnly(Boolean readOnly) {
//this.readOnly = readOnly;
}
@Override
public void setSessionReadOnly(boolean sessionReadOnly) {
// this.sessionReadOnly = sessionReadOnly;
}
public RWSplitQueryHandler(RWSplitNonBlockingSession session) {
this.session = session;
}
@Override
public void query(String sql) {
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(session.getService(), "handle-query-sql");
TraceManager.log(ImmutableMap.of("sql", sql), traceObject);
try {
int rs = ServerParse.parse(sql);
int sqlType = rs & 0xff;
switch (sqlType) {
case ServerParse.USE:
String schema = UseHandler.getSchemaName(sql, rs >>> 8);
session.execute(true, rwSplitService -> rwSplitService.setSchema(schema));
break;
case ServerParse.SHOW:
case ServerParse.SELECT:
session.execute(false, null);
break;
case ServerParse.SET:
parseSet(sql);
break;
case ServerParse.LOCK:
session.execute(true, rwSplitService -> rwSplitService.setLocked(true));
break;
case ServerParse.UNLOCK:
session.execute(true, rwSplitService -> rwSplitService.setLocked(false));
break;
case ServerParse.START:
case ServerParse.BEGIN:
session.execute(true, rwSplitService -> rwSplitService.setTxStart(true));
break;
case ServerParse.COMMIT:
case ServerParse.ROLLBACK:
session.execute(true, rwSplitService -> {
rwSplitService.getSession().unbindIfSafe();
rwSplitService.setTxStart(false);
});
break;
case ServerParse.LOAD_DATA_INFILE_SQL:
session.getService().setInLoadData(true);
session.execute(true, rwSplitService -> rwSplitService.setInLoadData(false));
break;
default:
// 1. DDL
// 2. DML
// 3. procedure
// 4. function
session.execute(true, null);
break;
}
} catch (Exception e) {
LOGGER.warn("execute error", e);
session.getService().writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, e.getMessage());
} finally {
TraceManager.finishSpan(traceObject);
}
}
// private boolean parseSelectQuery(String sql) {
// boolean canSelectSlave = true;
// SQLStatementParser parser = new MySqlStatementParser(sql);
// SQLStatement statement = parser.parseStatement(true);
// if (statement instanceof SQLSelectStatement) {
// if (!((SQLSelectStatement) statement).getSelect().getQueryBlock().isForUpdate()) {
// canSelectSlave = true;
// }
// } else {
// LOGGER.warn("unknown select");
// throw new UnsupportedOperationException("unknown");
// }
//
// return canSelectSlave;
// }
private void parseSet(String sql) throws IOException {
SQLStatementParser parser = new MySqlStatementParser(sql);
SQLStatement statement = parser.parseStatement(true);
if (statement instanceof SQLSetStatement) {
List<SQLAssignItem> assignItems = ((SQLSetStatement) statement).getItems();
if (assignItems.size() == 1) {
SQLAssignItem item = assignItems.get(0);
if (item.getTarget().toString().equalsIgnoreCase("autocommit")) {
if (session.getService().isAutocommit() && item.getValue().toString().equalsIgnoreCase("0")) {
session.getService().setAutocommit(false);
session.getService().writeDirectly(OkPacket.OK);
}
if (!session.getService().isAutocommit() && item.getValue().toString().equalsIgnoreCase("1")) {
session.execute(false, rwSplitService -> rwSplitService.setAutocommit(true));
}
}
session.execute(true, null);
} else {
// throw new UnsupportedOperationException("unknown");
session.execute(true, null);
}
} else {
session.execute(true, null);
}
}
}

View File

@@ -0,0 +1,287 @@
package com.actiontech.dble.services.rwsplit;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.MySQLMessage;
import com.actiontech.dble.config.Capabilities;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.mysql.AuthPacket;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.net.service.AuthResultInfo;
import com.actiontech.dble.net.service.FrontEndService;
import com.actiontech.dble.net.service.ServiceTask;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.response.Heartbeat;
import com.actiontech.dble.server.response.Ping;
import com.actiontech.dble.services.MySQLBasedService;
import com.actiontech.dble.singleton.FrontendUserManager;
import com.actiontech.dble.statistic.CommandCount;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
public class RWSplitService extends MySQLBasedService implements FrontEndService {
protected static final Logger LOGGER = LoggerFactory.getLogger(RWSplitService.class);
private volatile String schema;
private volatile int txIsolation;
private volatile boolean autocommit;
private volatile boolean isLocked;
private volatile boolean txStart;
private volatile boolean inLoadData;
private volatile boolean inPrepare;
private volatile String executeSql;
private UserName user;
private final CommandCount commands;
private final RWSplitQueryHandler queryHandler;
private final RWSplitNonBlockingSession session;
public RWSplitService(AbstractConnection connection) {
super(connection);
this.commands = connection.getProcessor().getCommands();
this.session = new RWSplitNonBlockingSession(this);
this.queryHandler = new RWSplitQueryHandler(session);
}
public void initFromAuthInfo(AuthResultInfo info) {
AuthPacket auth = info.getMysqlAuthPacket();
this.user = new UserName(auth.getUser(), auth.getTenant());
this.schema = info.getMysqlAuthPacket().getDatabase();
this.userConfig = info.getUserConfig();
this.session.setRwGroup(DbleServer.getInstance().getConfig().getDbGroups().get(((RwSplitUserConfig) userConfig).getDbGroup()));
this.txIsolation = SystemConfig.getInstance().getTxIsolation();
this.autocommit = SystemConfig.getInstance().getAutocommit() == 1;
this.connection.initCharsetIndex(info.getMysqlAuthPacket().getCharsetIndex());
boolean clientCompress = Capabilities.CLIENT_COMPRESS == (Capabilities.CLIENT_COMPRESS & auth.getClientFlags());
boolean usingCompress = SystemConfig.getInstance().getUseCompression() == 1;
if (clientCompress && usingCompress) {
this.setSupportCompress(true);
}
if (LOGGER.isDebugEnabled()) {
StringBuilder s = new StringBuilder();
s.append(this).append('\'').append(auth.getUser()).append("' login success");
byte[] extra = auth.getExtra();
if (extra != null && extra.length > 0) {
s.append(",extra:").append(new String(extra));
}
LOGGER.debug(s.toString());
}
}
@Override
protected void taskToTotalQueue(ServiceTask task) {
DbleServer.getInstance().getFrontHandlerQueue().offer(task);
}
@Override
protected void handleInnerData(byte[] data) {
// if the statement is load data, directly push down
if (inLoadData) {
try {
session.execute(true, data, null);
} catch (IOException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, e.getMessage());
}
return;
}
switch (data[4]) {
case MySQLPacket.COM_INIT_DB:
commands.doInitDB();
handleComInitDb(data);
break;
case MySQLPacket.COM_QUERY:
commands.doQuery();
handleComQuery(data);
break;
// prepared statement
case MySQLPacket.COM_STMT_PREPARE:
commands.doStmtPrepare();
handleComStmtPrepare(data);
break;
case MySQLPacket.COM_STMT_RESET:
commands.doStmtReset();
execute(data);
break;
case MySQLPacket.COM_STMT_EXECUTE:
commands.doStmtExecute();
execute(data);
break;
case MySQLPacket.COM_STMT_SEND_LONG_DATA:
commands.doStmtSendLongData();
execute(data);
break;
case MySQLPacket.COM_STMT_CLOSE:
commands.doStmtClose();
try {
session.execute(true, data, rwSplitService -> rwSplitService.setInPrepare(false));
} catch (IOException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, e.getMessage());
}
break;
// connection
case MySQLPacket.COM_QUIT:
commands.doQuit();
connection.close("quit cmd");
break;
case MySQLPacket.COM_HEARTBEAT:
commands.doHeartbeat();
Heartbeat.response(connection, data);
break;
case MySQLPacket.COM_PING:
commands.doPing();
Ping.response(connection);
break;
case MySQLPacket.COM_FIELD_LIST:
commands.doOther();
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "unsupport statement");
break;
default:
commands.doOther();
// other statement push down to master
execute(data);
break;
}
}
private void handleComInitDb(byte[] data) {
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
String switchSchema;
try {
switchSchema = mm.readString(getCharset().getClient());
session.execute(true, data, rwSplitService -> rwSplitService.setSchema(switchSchema));
} catch (UnsupportedEncodingException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + getCharset().getClient() + "'");
} catch (IOException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_ERROR, e.getMessage());
}
}
private void handleComQuery(byte[] data) {
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
try {
String sql = mm.readString(getCharset().getClient());
executeSql = sql;
queryHandler.query(sql);
} catch (UnsupportedEncodingException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, e.getMessage());
}
}
private void handleComStmtPrepare(byte[] data) {
MySQLMessage mm = new MySQLMessage(data);
mm.position(5);
try {
inPrepare = true;
String sql = mm.readString(getCharset().getClient());
int rs = ServerParse.parse(sql);
int sqlType = rs & 0xff;
switch (sqlType) {
case ServerParse.SELECT:
session.execute(false, data, null);
break;
default:
session.execute(true, data, null);
break;
}
} catch (IOException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, e.getMessage());
}
}
private void execute(byte[] data) {
try {
session.execute(true, data, null);
} catch (IOException e) {
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, e.getMessage());
}
}
@Override
public void userConnectionCount() {
FrontendUserManager.getInstance().countDown(user, false);
}
@Override
public UserName getUser() {
return user;
}
public String getSchema() {
return schema;
}
public RWSplitNonBlockingSession getSession() {
return session;
}
public void setSchema(String schema) {
this.schema = schema;
}
@Override
public String getExecuteSql() {
return executeSql;
}
public boolean isLocked() {
return isLocked;
}
public void setLocked(boolean locked) {
isLocked = locked;
}
public int getTxIsolation() {
return txIsolation;
}
public boolean isAutocommit() {
return autocommit;
}
public void setAutocommit(boolean autocommit) {
this.autocommit = autocommit;
}
public boolean isTxStart() {
return txStart;
}
public void setTxStart(boolean txStart) {
this.txStart = txStart;
}
public boolean isInLoadData() {
return inLoadData;
}
public void setInLoadData(boolean inLoadData) {
this.inLoadData = inLoadData;
}
public boolean isInPrepare() {
return inPrepare;
}
public void setInPrepare(boolean inPrepare) {
this.inPrepare = inPrepare;
}
@Override
public void killAndClose(String reason) {
session.close(reason);
connection.close(reason);
}
}

View File

@@ -17,7 +17,6 @@ import org.junit.Test;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Map;
import java.util.Set;
@@ -118,9 +117,6 @@ public class ConfigTest {
}
}
/**
* testReadHostWeight
*
@@ -133,11 +129,8 @@ public class ConfigTest {
Map<String, PhysicalDbGroup> dbGroups = dbLoader.getDbGroups();
PhysicalDbGroup pool = dbGroups.get("localhost2");
ArrayList<PhysicalDbInstance> okSources = new ArrayList<PhysicalDbInstance>();
okSources.addAll(pool.getAllActiveDbInstances());
PhysicalDbInstance source = pool.randomSelect(okSources, true);
Assert.assertTrue(source != null);
PhysicalDbInstance source = pool.select(true);
Assert.assertNotNull(source);
}
}