#164 user privilege because of sql_cache

This commit is contained in:
yanhuqing666
2017-06-28 15:55:53 +08:00
parent cd726cebfa
commit 60f26ac2dc
27 changed files with 281 additions and 299 deletions

View File

@@ -60,10 +60,10 @@ public class MergeBuilder {
MycatSchemaStatVisitor visitor = new MycatSchemaStatVisitor();
DruidParser druidParser = new DruidSingleUnitSelectParser();
RouteResultset rrs = new RouteResultset(sql, ServerParse.SELECT, session);
RouteResultset rrs = new RouteResultset(sql, ServerParse.SELECT);
LayerCachePool pool = MycatServer.getInstance().getRouterservice().getTableId2DataNodeCache();
SchemaConfig schemaConfig = mycatConfig.getSchemas().get(node.getReferedTableNodes().get(0).getSchema());
return RouterUtil.routeFromParser(druidParser, schemaConfig, rrs, select, sql, pool, visitor);
return RouterUtil.routeFromParser(druidParser, schemaConfig, rrs, select, sql, pool, visitor, session.getSource());
}

View File

@@ -30,7 +30,6 @@ import java.util.Map;
import com.alibaba.druid.sql.ast.SQLStatement;
import io.mycat.config.model.SchemaConfig;
import io.mycat.server.NonBlockingSession;
import io.mycat.sqlengine.mpp.HavingCols;
import io.mycat.util.FormatUtil;
@@ -68,8 +67,6 @@ public final class RouteResultset implements Serializable {
// 是否完成了执行
private boolean isFinishedExecute = false;
//是否自动提交此属性主要用于记录ServerConnection上的autocommit状态
private boolean autocommit = true;
private boolean isLoadData=false;
@@ -80,12 +77,6 @@ public final class RouteResultset implements Serializable {
// 传给 RouteResultsetNode 来实现,但是 强制走 slave需要增加一个属性来实现:
private Boolean runOnSlave = null; // 默认null表示不施加影响
private NonBlockingSession session;
public NonBlockingSession getSession() {
return session;
}
public boolean isNeedOptimizer() {
return needOptimizer;
}
@@ -146,12 +137,11 @@ public final class RouteResultset implements Serializable {
this.globalTableFlag = globalTableFlag;
}
public RouteResultset(String stmt, int sqlType, NonBlockingSession session) {
public RouteResultset(String stmt, int sqlType) {
this.statement = stmt;
this.srcStatement = stmt;
this.limitSize = -1;
this.sqlType = sqlType;
this.session = session;
}
public void copyLimitToNodes() {
@@ -355,15 +345,6 @@ public final class RouteResultset implements Serializable {
}
}
}
public boolean isAutocommit() {
return autocommit;
}
public void setAutocommit(boolean autocommit) {
this.autocommit = autocommit;
}
public Boolean getCanRunInReadDB() {
return canRunInReadDB;
}

View File

@@ -75,7 +75,7 @@ public class RouteService {
* SELECT 类型的SQL, 检测,debug 模式下不缓存
*/
if (sqlType == ServerParse.SELECT && !LOGGER.isDebugEnabled()) {
cacheKey = (schema == null ? "NULL_" : schema.getName()) + stmt;
cacheKey = (schema == null ? "NULL" : schema.getName())+"_"+sc.getUser()+"_" + stmt;
rrs = (RouteResultset) sqlRouteCache.get(cacheKey);
if (rrs != null) {
return rrs;

View File

@@ -31,7 +31,7 @@ public class HintDataNodeHandler implements HintHandler {
LOGGER.debug("route datanode sql hint from " + realSQL);
}
RouteResultset rrs = new RouteResultset(realSQL, sqlType, sc.getSession2());
RouteResultset rrs = new RouteResultset(realSQL, sqlType);
PhysicalDBNode dataNode = MycatServer.getInstance().getConfig().getDataNodes().get(hintSQLValue);
if (dataNode != null) {
rrs = RouterUtil.routeToSingleNode(rrs, dataNode.getName());

View File

@@ -19,7 +19,7 @@ public abstract class AbstractRouteStrategy implements RouteStrategy {
public RouteResultset route(SchemaConfig schema, int sqlType, String origSQL,
String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLException {
RouteResultset rrs = new RouteResultset(origSQL, sqlType, sc.getSession2());
RouteResultset rrs = new RouteResultset(origSQL, sqlType);
/*
* 优化debug loaddata输出cache的日志会极大降低性能
@@ -28,20 +28,12 @@ public abstract class AbstractRouteStrategy implements RouteStrategy {
rrs.setCacheAble(false);
}
/*
* rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到
* select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性
*/
if (sc != null ) {
rrs.setAutocommit(sc.isAutocommit());
}
if (schema == null) {
rrs = routeNormalSqlWithAST(schema, origSQL, rrs, charset, cachePool);
rrs = routeNormalSqlWithAST(schema, origSQL, rrs, charset, cachePool, sc);
} else {
RouteResultset returnedSet = routeSystemInfo(schema, sqlType, origSQL, rrs);
if (returnedSet == null) {
rrs = routeNormalSqlWithAST(schema, origSQL, rrs, charset, cachePool);
rrs = routeNormalSqlWithAST(schema, origSQL, rrs, charset, cachePool, sc);
}
}
@@ -53,7 +45,7 @@ public abstract class AbstractRouteStrategy implements RouteStrategy {
* 通过解析AST语法树类来寻找路由
*/
public abstract RouteResultset routeNormalSqlWithAST(SchemaConfig schema, String stmt, RouteResultset rrs,
String charset, LayerCachePool cachePool) throws SQLException;
String charset, LayerCachePool cachePool, ServerConnection sc) throws SQLException;
/**
* 路由信息指令, 如 SHOW、SELECT@@、DESCRIBE

View File

@@ -12,6 +12,7 @@ import io.mycat.route.parser.druid.DruidParser;
import io.mycat.route.parser.druid.DruidParserFactory;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -26,8 +27,8 @@ public class DruidMycatRouteStrategy extends AbstractRouteStrategy {
@Override
public RouteResultset routeNormalSqlWithAST(SchemaConfig schema,
String originSql, RouteResultset rrs, String charset,
LayerCachePool cachePool) throws SQLException {
String originSql, RouteResultset rrs, String charset,
LayerCachePool cachePool, ServerConnection sc) throws SQLException {
SQLStatementParser parser = new MySqlStatementParser(originSql);
MycatSchemaStatVisitor visitor = null;
SQLStatement statement;
@@ -54,7 +55,7 @@ public class DruidMycatRouteStrategy extends AbstractRouteStrategy {
DruidParser druidParser = DruidParserFactory.create(statement);
return RouterUtil.routeFromParser(druidParser, schema, rrs, statement, originSql, cachePool, visitor);
return RouterUtil.routeFromParser(druidParser, schema, rrs, statement, originSql, cachePool, visitor, sc);
}

View File

@@ -4,6 +4,7 @@ import com.alibaba.druid.sql.ast.SQLStatement;
import io.mycat.cache.LayerCachePool;
import io.mycat.config.model.SchemaConfig;
import io.mycat.route.RouteResultset;
import io.mycat.server.ServerConnection;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
@@ -21,15 +22,17 @@ public interface DruidParser {
* 使用MycatSchemaStatVisitor解析,得到tables、tableAliasMap、conditions等
* @param schema
* @param stmt
* @param sc
*/
SchemaConfig parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql,LayerCachePool cachePool,MycatSchemaStatVisitor schemaStatVisitor) throws SQLException;
SchemaConfig parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql, LayerCachePool cachePool, MycatSchemaStatVisitor schemaStatVisitor, ServerConnection sc) throws SQLException;
/**
* 子类可覆盖如果该方法解析得不到表名、字段等信息的就覆盖该方法覆盖成空方法然后通过statementPparse去解析
* 通过visitor解析有些类型的Statement通过visitor解析得不到表名、
* @param stmt
* @param sc
*/
SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,MycatSchemaStatVisitor visitor) throws SQLException;
SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc) throws SQLException;
/**
* 改写sql加limit加group by、加order by如有些没有加limit的可以通过该方法增加

View File

@@ -7,6 +7,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.mycat.server.ServerConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +44,12 @@ public class DefaultDruidParser implements DruidParser {
* 使用MycatSchemaStatVisitor解析,得到tables、tableAliasMap、conditions等
* @param schema
* @param stmt
* @param sc
*/
public SchemaConfig parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql,LayerCachePool cachePool,MycatSchemaStatVisitor schemaStatVisitor) throws SQLException {
public SchemaConfig parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql, LayerCachePool cachePool, MycatSchemaStatVisitor schemaStatVisitor, ServerConnection sc) throws SQLException {
ctx = new DruidShardingParseInfo();
//通过visitor解析
schema = visitorParse(schema, rrs,stmt,schemaStatVisitor);
schema = visitorParse(schema, rrs,stmt,schemaStatVisitor, sc);
//改写sql如insert语句主键自增长的可以
changeSql(schema, rrs, stmt, cachePool);
@@ -69,9 +71,10 @@ public class DefaultDruidParser implements DruidParser {
* 子类可覆盖如果该方法解析得不到表名、字段等信息的就覆盖该方法覆盖成空方法然后通过statementPparse去解析
* 通过visitor解析有些类型的Statement通过visitor解析得不到表名、
* @param stmt
* @param sc
*/
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
stmt.accept(visitor);
if(visitor.getNotSupportMsg()!= null){

View File

@@ -17,6 +17,7 @@ import io.mycat.config.model.TableConfig;
import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -28,7 +29,7 @@ import io.mycat.server.util.SchemaUtil.SchemaInfo;
*/
public class DruidDeleteParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
String schemaName = schema == null ? null : schema.getName();
MySqlDeleteStatement delete = (MySqlDeleteStatement) stmt;
@@ -38,7 +39,7 @@ public class DruidDeleteParser extends DefaultDruidParser {
tableSource = fromSource;
}
if (tableSource instanceof SQLJoinTableSource) {
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(rrs.getSession().getSource(), schemaName, (SQLJoinTableSource) tableSource, stmt);
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(sc, schemaName, (SQLJoinTableSource) tableSource, stmt);
if (schemaInfo == null) {
String msg = "deleting from multiple tables is not supported, sql:" + stmt;
throw new SQLNonTransientException(msg);
@@ -49,12 +50,12 @@ public class DruidDeleteParser extends DefaultDruidParser {
}
} else {
SQLExprTableSource deleteTableSource = (SQLExprTableSource) tableSource;
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, deleteTableSource);
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, deleteTableSource);
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);
}
if(!MycatPrivileges.checkPrivilege(rrs.getSession().getSource(), schemaInfo.schema, schemaInfo.table, Checktype.DELETE)){
if(!MycatPrivileges.checkPrivilege(sc, schemaInfo.schema, schemaInfo.table, Checktype.DELETE)){
String msg = "The statement DML privilege check is not passed, sql:" + stmt;
throw new SQLNonTransientException(msg);
}
@@ -64,7 +65,7 @@ public class DruidDeleteParser extends DefaultDruidParser {
RouterUtil.routeToSingleNode(rrs, schema.getDataNode());
return schema;
}
super.visitorParse(schema, rrs, stmt, visitor);
super.visitorParse(schema, rrs, stmt, visitor, sc);
TableConfig tc = schema.getTables().get(schemaInfo.table);
if (tc != null && tc.isGlobalTable()) {
rrs.setGlobalTable(true);

View File

@@ -35,6 +35,7 @@ import io.mycat.route.RouteResultsetNode;
import io.mycat.route.function.AbstractPartitionAlgorithm;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.GlobalTableUtil;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -43,17 +44,17 @@ import io.mycat.util.StringUtil;
public class DruidInsertParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
MySqlInsertStatement insert = (MySqlInsertStatement) stmt;
String schemaName = schema == null ? null : schema.getName();
SQLExprTableSource tableSource = insert.getTableSource();
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, tableSource);
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, tableSource);
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);
}
if(!MycatPrivileges.checkPrivilege(rrs.getSession().getSource(), schemaInfo.schema, schemaInfo.table, Checktype.INSERT)){
if(!MycatPrivileges.checkPrivilege(sc, schemaInfo.schema, schemaInfo.table, Checktype.INSERT)){
String msg = "The statement DML privilege check is not passed, sql:" + stmt;
throw new SQLNonTransientException(msg);
}
@@ -97,7 +98,7 @@ public class DruidInsertParser extends DefaultDruidParser {
}
// childTable的insert直接在解析过程中完成路由
if (tc.getParentTC()!= null) {
parserChildTable(schemaInfo, rrs, insert);
parserChildTable(schemaInfo, rrs, insert, sc);
return schema;
}
String partitionColumn = tc.getPartitionColumn();
@@ -135,7 +136,7 @@ public class DruidInsertParser extends DefaultDruidParser {
return (insertStmt.getValuesList() != null && insertStmt.getValuesList().size() > 1);
}
private RouteResultset parserChildTable(SchemaInfo schemaInfo, RouteResultset rrs, MySqlInsertStatement insertStmt) throws SQLNonTransientException {
private RouteResultset parserChildTable(SchemaInfo schemaInfo, RouteResultset rrs, MySqlInsertStatement insertStmt, ServerConnection sc) throws SQLNonTransientException {
SchemaConfig schema = schemaInfo.schemaConfig;
String tableName = schemaInfo.table;
TableConfig tc = schema.getTables().get(tableName);
@@ -161,7 +162,7 @@ public class DruidInsertParser extends DefaultDruidParser {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("find root parent's node sql " + findRootTBSql);
}
FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler(findRootTBSql, rrs.getSession());
FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler(findRootTBSql, sc.getSession2());
String dn = fetchHandler.execute(schema.getName(), tc.getRootParent().getDataNodes());
if (dn == null) {
throw new SQLNonTransientException("can't find (root) parent sharding node for sql:" + sql);

View File

@@ -13,6 +13,7 @@ import io.mycat.config.model.TableConfig;
import io.mycat.route.RouteResultset;
import io.mycat.route.RouteResultsetNode;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParse;
import io.mycat.util.SplitUtil;
@@ -22,7 +23,7 @@ import io.mycat.util.SplitUtil;
*/
public class DruidLockTableParser extends DefaultDruidParser{
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLNonTransientException {
// 对于lock tables table1 write, table2
// read类型的多表锁语句DruidParser只能解析出table1

View File

@@ -28,6 +28,7 @@ import io.mycat.route.RouteResultsetNode;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.RouteCalculateUnit;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.handler.MysqlInformationSchemaHandler;
import io.mycat.server.handler.MysqlProcHandler;
import io.mycat.server.response.InformationSchemaProfiling;
@@ -49,7 +50,7 @@ public class DruidSelectParser extends DefaultDruidParser {
}
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,
MycatSchemaStatVisitor visitor) throws SQLException {
MycatSchemaStatVisitor visitor, ServerConnection sc) throws SQLException {
SQLSelectStatement selectStmt = (SQLSelectStatement) stmt;
SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
String schemaName = schema == null ? null : schema.getName();
@@ -75,7 +76,7 @@ public class DruidSelectParser extends DefaultDruidParser {
SchemaInfo schemaInfo;
if (mysqlFrom instanceof SQLExprTableSource){
SQLExprTableSource fromSource = (SQLExprTableSource) mysqlFrom;
schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, fromSource);
schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, fromSource);
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000",ErrorCode.ER_NO_DB_ERROR);
@@ -83,7 +84,7 @@ public class DruidSelectParser extends DefaultDruidParser {
// 兼容PhpAdmin's, 支持对MySQL元数据的模拟返回
//TODO:refactor INFORMATION_SCHEMA,MYSQL 等系統表的去向???
if (SchemaUtil.INFORMATION_SCHEMA.equals(schemaInfo.schema)) {
MysqlInformationSchemaHandler.handle(schemaInfo, rrs.getSession().getSource());
MysqlInformationSchemaHandler.handle(schemaInfo, sc);
rrs.setFinishedExecute(true);
return schema;
}
@@ -91,7 +92,7 @@ public class DruidSelectParser extends DefaultDruidParser {
if (SchemaUtil.MYSQL_SCHEMA.equals(schemaInfo.schema)
&& SchemaUtil.TABLE_PROC.equals(schemaInfo.table)) {
// 兼容MySQLWorkbench
MysqlProcHandler.handle(rrs.getStatement(), rrs.getSession().getSource());
MysqlProcHandler.handle(rrs.getStatement(), sc);
rrs.setFinishedExecute(true);
return schema;
}
@@ -102,7 +103,7 @@ public class DruidSelectParser extends DefaultDruidParser {
if (SchemaUtil.INFORMATION_SCHEMA.equals(schemaInfo.schema)
&& SchemaUtil.TABLE_PROFILING.equals(schemaInfo.table)
&& rrs.getStatement().toUpperCase().contains("CONCAT(ROUND(SUM(DURATION)/*100,3)")) {
InformationSchemaProfiling.response(rrs.getSession().getSource());
InformationSchemaProfiling.response(sc);
rrs.setFinishedExecute(true);
return schema;
}
@@ -110,7 +111,7 @@ public class DruidSelectParser extends DefaultDruidParser {
String msg = "No Supported, sql:" + stmt;
throw new SQLNonTransientException(msg);
}
if (!MycatPrivileges.checkPrivilege(rrs.getSession().getSource(), schemaInfo.schema, schemaInfo.table, Checktype.SELECT)) {
if (!MycatPrivileges.checkPrivilege(sc, schemaInfo.schema, schemaInfo.table, Checktype.SELECT)) {
String msg = "The statement DML privilege check is not passed, sql:" + stmt;
throw new SQLNonTransientException(msg);
}
@@ -125,7 +126,7 @@ public class DruidSelectParser extends DefaultDruidParser {
String msg = "Table '"+schema.getName()+"."+schemaInfo.table+"' doesn't exist";
throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE);
}
super.visitorParse(schema, rrs, stmt, visitor);
super.visitorParse(schema, rrs, stmt, visitor, sc);
if(visitor.isHasSubQuery()){
rrs.setSqlStatement(selectStmt);
rrs.setNeedOptimizer(true);
@@ -134,21 +135,21 @@ public class DruidSelectParser extends DefaultDruidParser {
parseOrderAggGroupMysql(schema, stmt, rrs, mysqlSelectQuery, tc);
// 更改canRunInReadDB属性
if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode())
&& !rrs.isAutocommit()) {
&& !sc.isAutocommit()) {
rrs.setCanRunInReadDB(false);
}
} else if (mysqlFrom instanceof SQLSubqueryTableSource || mysqlFrom instanceof SQLJoinTableSource || mysqlFrom instanceof SQLUnionQueryTableSource) {
schema = executeComplexSQL(schemaName, schema, rrs, selectStmt);
schema = executeComplexSQL(schemaName, schema, rrs, selectStmt, sc);
if (rrs.isFinishedRoute()) {
return schema;
}
}
} else if (sqlSelectQuery instanceof MySqlUnionQuery ) {
schema = executeComplexSQL(schemaName, schema, rrs, selectStmt);
schema = executeComplexSQL(schemaName, schema, rrs, selectStmt, sc);
if (rrs.isFinishedRoute()) {
return schema;
}
super.visitorParse(schema, rrs, stmt, visitor);
super.visitorParse(schema, rrs, stmt, visitor, sc);
}
return schema;
@@ -445,9 +446,9 @@ public class DruidSelectParser extends DefaultDruidParser {
// }
// return map;
// }
private SchemaConfig executeComplexSQL(String schemaName, SchemaConfig schema, RouteResultset rrs, SQLSelectStatement selectStmt)
private SchemaConfig executeComplexSQL(String schemaName, SchemaConfig schema, RouteResultset rrs, SQLSelectStatement selectStmt, ServerConnection sc)
throws SQLException {
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(rrs.getSession().getSource(), schemaName, selectStmt.getSelect().getQuery(), selectStmt);
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(sc, schemaName, selectStmt.getSelect().getQuery(), selectStmt);
if (schemaInfo == null) {
rrs.setSqlStatement(selectStmt);
rrs.setNeedOptimizer(true);

View File

@@ -22,6 +22,7 @@ import io.mycat.config.model.SchemaConfig;
import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.handler.MysqlInformationSchemaHandler;
import io.mycat.server.handler.MysqlProcHandler;
import io.mycat.server.response.InformationSchemaProfiling;
@@ -31,7 +32,7 @@ import io.mycat.server.util.SchemaUtil.SchemaInfo;
public class DruidSingleUnitSelectParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,
MycatSchemaStatVisitor visitor) throws SQLException {
MycatSchemaStatVisitor visitor, ServerConnection sc) throws SQLException {
SQLSelectStatement selectStmt = (SQLSelectStatement) stmt;
SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
String schemaName = schema == null ? null : schema.getName();
@@ -50,26 +51,26 @@ public class DruidSingleUnitSelectParser extends DefaultDruidParser {
return schema;
}
if (mysqlFrom instanceof SQLSubqueryTableSource || mysqlFrom instanceof SQLJoinTableSource || mysqlFrom instanceof SQLUnionQueryTableSource) {
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(rrs.getSession().getSource(), schemaName, selectStmt.getSelect().getQuery(), selectStmt);
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(sc, schemaName, selectStmt.getSelect().getQuery(), selectStmt);
if (schemaInfo != null) {
rrs.setStatement(RouterUtil.removeSchema(rrs.getStatement(), schemaInfo.schema));
RouterUtil.routeToSingleNode(rrs, schemaInfo.schemaConfig.getDataNode());
return schemaInfo.schemaConfig;
} else{
super.visitorParse(schema, rrs, stmt, visitor);
super.visitorParse(schema, rrs, stmt, visitor, sc);
return schema;
}
}
SQLExprTableSource fromSource = (SQLExprTableSource) mysqlFrom;
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, fromSource);
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, fromSource);
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);
}
// 兼容PhpAdmin's, 支持对MySQL元数据的模拟返回
if (SchemaUtil.INFORMATION_SCHEMA.equals(schemaInfo.schema)) {
MysqlInformationSchemaHandler.handle(schemaInfo, rrs.getSession().getSource());
MysqlInformationSchemaHandler.handle(schemaInfo, sc);
rrs.setFinishedExecute(true);
return schema;
}
@@ -77,7 +78,7 @@ public class DruidSingleUnitSelectParser extends DefaultDruidParser {
if (SchemaUtil.MYSQL_SCHEMA.equals(schemaInfo.schema)
&& SchemaUtil.TABLE_PROC.equals(schemaInfo.table)) {
// 兼容MySQLWorkbench
MysqlProcHandler.handle(rrs.getStatement(), rrs.getSession().getSource());
MysqlProcHandler.handle(rrs.getStatement(), sc);
rrs.setFinishedExecute(true);
return schema;
}
@@ -88,7 +89,7 @@ public class DruidSingleUnitSelectParser extends DefaultDruidParser {
if (SchemaUtil.INFORMATION_SCHEMA.equals(schemaInfo.schema)
&& SchemaUtil.TABLE_PROFILING.equals(schemaInfo.table)
&& rrs.getStatement().toUpperCase().contains("CONCAT(ROUND(SUM(DURATION)/*100,3)")) {
InformationSchemaProfiling.response(rrs.getSession().getSource());
InformationSchemaProfiling.response(sc);
rrs.setFinishedExecute(true);
return schema;
}
@@ -96,26 +97,26 @@ public class DruidSingleUnitSelectParser extends DefaultDruidParser {
String msg = "No Supported, sql:" + stmt;
throw new SQLNonTransientException(msg);
}
if (!MycatPrivileges.checkPrivilege(rrs.getSession().getSource(), schemaInfo.schema, schemaInfo.table, Checktype.SELECT)) {
if (!MycatPrivileges.checkPrivilege(sc, schemaInfo.schema, schemaInfo.table, Checktype.SELECT)) {
String msg = "The statement DML privilege check is not passed, sql:" + stmt;
throw new SQLNonTransientException(msg);
}
rrs.setStatement(RouterUtil.removeSchema(rrs.getStatement(), schemaInfo.schema));
schema = schemaInfo.schemaConfig;
super.visitorParse(schema, rrs, stmt, visitor);
super.visitorParse(schema, rrs, stmt, visitor, sc);
// 更改canRunInReadDB属性
if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode())
&& !rrs.isAutocommit()) {
&& !sc.isAutocommit()) {
rrs.setCanRunInReadDB(false);
}
} else if (sqlSelectQuery instanceof MySqlUnionQuery) {
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(rrs.getSession().getSource(), schemaName, selectStmt.getSelect().getQuery(), selectStmt);
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(sc, schemaName, selectStmt.getSelect().getQuery(), selectStmt);
if (schemaInfo != null) {
rrs.setStatement(RouterUtil.removeSchema(rrs.getStatement(), schemaInfo.schema));
RouterUtil.routeToSingleNode(rrs, schemaInfo.schemaConfig.getDataNode());
return schemaInfo.schemaConfig;
} else{
super.visitorParse(schema, rrs, stmt, visitor);
super.visitorParse(schema, rrs, stmt, visitor, sc);
}
}
return schema;

View File

@@ -37,6 +37,7 @@ import io.mycat.meta.protocol.MyCatMeta.TableMeta;
import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.GlobalTableUtil;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -47,13 +48,13 @@ import io.mycat.util.StringUtil;
*
*/
public class DruidUpdateParser extends DefaultDruidParser {
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
MySqlUpdateStatement update = (MySqlUpdateStatement) stmt;
SQLTableSource tableSource = update.getTableSource();
String schemaName = schema == null ? null : schema.getName();
if (tableSource instanceof SQLJoinTableSource) {
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(rrs.getSession().getSource(), schemaName, (SQLJoinTableSource) tableSource, stmt);
SchemaInfo schemaInfo = SchemaUtil.isNoSharding(sc, schemaName, (SQLJoinTableSource) tableSource, stmt);
if (schemaInfo == null) {
String msg = "updating multiple tables is not supported, sql:" + stmt;
throw new SQLNonTransientException(msg);
@@ -64,14 +65,14 @@ public class DruidUpdateParser extends DefaultDruidParser {
return schema;
}
} else {
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, (SQLExprTableSource) tableSource);
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, (SQLExprTableSource) tableSource);
//数据库校验
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);
}
//权限控制
if(!MycatPrivileges.checkPrivilege(rrs.getSession().getSource(), schemaInfo.schema, schemaInfo.table, Checktype.UPDATE)){
if(!MycatPrivileges.checkPrivilege(sc, schemaInfo.schema, schemaInfo.table, Checktype.UPDATE)){
String msg = "The statement DML privilege check is not passed, sql:" + stmt;
throw new SQLNonTransientException(msg);
}
@@ -84,7 +85,7 @@ public class DruidUpdateParser extends DefaultDruidParser {
return schema;
}
TableConfig tc = schema.getTables().get(tableName);
super.visitorParse(schema, rrs, stmt, visitor);
super.visitorParse(schema, rrs, stmt, visitor, sc);
if (tc!=null && tc.isGlobalTable()) {
if (GlobalTableUtil.useGlobleTableCheck()) {
String sql = convertUpdateSQL(schemaInfo, update, rrs.getStatement());

View File

@@ -35,6 +35,7 @@ import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.impl.DefaultDruidParser;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.GlobalTableUtil;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -47,11 +48,11 @@ import io.mycat.util.StringUtil;
*/
public class DruidAlterTableParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
SQLAlterTableStatement alterTable = (SQLAlterTableStatement) stmt;
String schemaName = schema == null ? null : schema.getName();
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, alterTable.getTableSource());
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, alterTable.getTableSource());
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);

View File

@@ -14,18 +14,19 @@ import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.impl.DefaultDruidParser;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
public class DruidCreateIndexParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,
MycatSchemaStatVisitor visitor) throws SQLException {
MycatSchemaStatVisitor visitor, ServerConnection sc) throws SQLException {
SQLCreateIndexStatement createStmt = (SQLCreateIndexStatement) stmt;
SQLTableSource tableSource = createStmt.getTable();
if (tableSource instanceof SQLExprTableSource) {
String schemaName = schema == null ? null : schema.getName();
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, (SQLExprTableSource) tableSource);
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, (SQLExprTableSource) tableSource);
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);

View File

@@ -19,6 +19,7 @@ import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.impl.DefaultDruidParser;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.GlobalTableUtil;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -27,7 +28,7 @@ import io.mycat.util.StringUtil;
public class DruidCreateTableParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
MySqlCreateTableStatement createStmt = (MySqlCreateTableStatement)stmt;
//创建新表select from禁止
@@ -44,7 +45,7 @@ public class DruidCreateTableParser extends DefaultDruidParser {
}
String schemaName = schema == null ? null : schema.getName();
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, createStmt.getTableSource());
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, createStmt.getTableSource());
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);

View File

@@ -8,6 +8,7 @@ import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.impl.DefaultDruidParser;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -20,11 +21,11 @@ import java.sql.SQLException;
*/
public class DruidDropIndexParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
String schemaName = schema == null ? null : schema.getName();
SQLDropIndexStatement dropStmt = (SQLDropIndexStatement) stmt;
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, dropStmt.getTableName());
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, dropStmt.getTableName());
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);

View File

@@ -8,6 +8,7 @@ import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.impl.DefaultDruidParser;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -16,7 +17,7 @@ import java.sql.SQLNonTransientException;
public class DruidDropTableParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
SQLDropTableStatement dropTable = (SQLDropTableStatement) stmt;
if(dropTable.getTableSources().size()>1){
@@ -24,7 +25,7 @@ public class DruidDropTableParser extends DefaultDruidParser {
throw new SQLNonTransientException(msg);
}
String schemaName = schema == null ? null : schema.getName();
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, dropTable.getTableSources().get(0));
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, dropTable.getTableSources().get(0));
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);

View File

@@ -8,6 +8,7 @@ import io.mycat.route.RouteResultset;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.impl.DefaultDruidParser;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
@@ -15,11 +16,11 @@ import java.sql.SQLException;
public class DruidTruncateTableParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor)
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, MycatSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
String schemaName = schema == null ? null : schema.getName();
SQLTruncateStatement truncateTable = (SQLTruncateStatement) stmt;
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(rrs.getSession().getSource().getUser(), schemaName, truncateTable.getTableSources().get(0));
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, truncateTable.getTableSources().get(0));
if (schemaInfo == null) {
String msg = "No database selected";
throw new SQLException(msg,"3D000", ErrorCode.ER_NO_DB_ERROR);

View File

@@ -93,7 +93,7 @@ public class BatchInsertSequence implements Catlet {
throw new SQLException(msg,"3D000",ErrorCode.ER_NO_DB_ERROR);
}
rrs.setStatement(RouterUtil.removeSchema(rrs.getStatement(), schemaInfo.schema));
if(!MycatPrivileges.checkPrivilege(rrs.getSession().getSource(), schemaInfo.schema, schemaInfo.table, Checktype.INSERT)){
if(!MycatPrivileges.checkPrivilege(sc, schemaInfo.schema, schemaInfo.table, Checktype.INSERT)){
String msg = "The statement DML privilege check is not passed, sql:" + realSQL;
throw new SQLNonTransientException(msg);
}

View File

@@ -42,11 +42,10 @@ public final class RouteResultCopy {
}
public static RouteResultset RRCopy(RouteResultset rrs, int sqlType, String stmt) {
RouteResultset rr = new RouteResultset(stmt, sqlType, rrs.getSession());
RouteResultset rr = new RouteResultset(stmt, sqlType);
rr.setRunOnSlave(rrs.getRunOnSlave());
rr.setFinishedRoute(rrs.isFinishedRoute());
rr.setGlobalTable(rrs.isGlobalTable());
rr.setAutocommit(rrs.isAutocommit());
rr.setCanRunInReadDB(rrs.getCanRunInReadDB());
RouteResultsetNode[] ns = rrs.getNodes();

View File

@@ -7,7 +7,6 @@ import com.alibaba.druid.wall.spi.WallVisitorUtils;
import io.mycat.MycatServer;
import io.mycat.cache.LayerCachePool;
import io.mycat.config.ErrorCode;
import io.mycat.config.loader.console.ZookeeperPath;
import io.mycat.config.model.SchemaConfig;
import io.mycat.config.model.TableConfig;
import io.mycat.config.model.rule.RuleConfig;
@@ -18,48 +17,45 @@ import io.mycat.route.parser.druid.DruidParser;
import io.mycat.route.parser.druid.DruidShardingParseInfo;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.parser.druid.RouteCalculateUnit;
import io.mycat.server.ServerConnection;
import io.mycat.server.parser.ServerParse;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
import io.mycat.sqlengine.mpp.ColumnRoutePair;
import io.mycat.sqlengine.mpp.LoadData;
import io.mycat.util.StringUtil;
import io.mycat.util.ZKUtils;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import static io.mycat.config.loader.zkprocess.zookeeper.process.DDLInfo.DDLStatus;
/**
* 从ServerRouterUtil中抽取的一些公用方法路由解析工具类
* @author wang.dw
*
* @author wang.dw
*/
public class RouterUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(RouterUtil.class);
public static String removeSchema(String stmt, String schema) {
return removeSchema(stmt, schema, MycatServer.getInstance().getConfig().getSystem().isLowerCaseTableNames());
}
/**
* 移除执行语句中的数据库名
* @param stmt 执行语句
* @param schema 数据库名 ,如果需要,已经小写化过了
* @param isLowerCase 是否lowercase
* @return 执行语句
*
* @param stmt 执行语句
* @param schema 数据库名 ,如果需要,已经小写化过了
* @param isLowerCase 是否lowercase
* @return 执行语句
*/
public static String removeSchema(String stmt, String schema, boolean isLowerCase) {
final String forCmpStmt = isLowerCase ? stmt.toLowerCase() : stmt;
final String maySchema1 = schema + ".";
final String maySchema2 = "`"+schema + "`.";
final String maySchema2 = "`" + schema + "`.";
int indx1 = forCmpStmt.indexOf(maySchema1, 0);
int indx2 = forCmpStmt.indexOf(maySchema2, 0);
if (indx1 < 0 && indx2 < 0) {
@@ -82,18 +78,17 @@ public class RouterUtil {
} else {
flag = false;
}
if(flag){
if (flag) {
index = indx2;
result.append(stmt.substring(strtPos, index ));
result.append(stmt.substring(strtPos, index));
strtPos = index + maySchema2.length();
if (index > firstE && index < endE && countChar(stmt, index) % 2 == 1) {
result.append(stmt.substring(index, strtPos));
}
indx2 = forCmpStmt.indexOf(maySchema2, strtPos);
}
else{
} else {
index = indx1;
result.append(stmt.substring(strtPos, index ));
result.append(stmt.substring(strtPos, index));
strtPos = index + maySchema1.length();
if (index > firstE && index < endE && countChar(stmt, index) % 2 == 1) {
result.append(stmt.substring(index, strtPos));
@@ -104,33 +99,34 @@ public class RouterUtil {
result.append(stmt.substring(strtPos));
return result.toString();
}
private static int countChar(String sql,int end)
{
int count=0;
private static int countChar(String sql, int end) {
int count = 0;
boolean skipChar = false;
for (int i = 0; i < end; i++) {
if(sql.charAt(i)=='\'' && !skipChar) {
if (sql.charAt(i) == '\'' && !skipChar) {
count++;
skipChar = false;
}else if( sql.charAt(i)=='\\'){
} else if (sql.charAt(i) == '\\') {
skipChar = true;
}else{
} else {
skipChar = false;
}
}
return count;
}
public static RouteResultset routeFromParser(DruidParser druidParser, SchemaConfig schema, RouteResultset rrs, SQLStatement statement, String originSql,LayerCachePool cachePool,MycatSchemaStatVisitor visitor) throws SQLException {
schema = druidParser.parser(schema, rrs, statement, originSql,cachePool,visitor);
if(rrs.isFinishedExecute()){
public static RouteResultset routeFromParser(DruidParser druidParser, SchemaConfig schema, RouteResultset rrs, SQLStatement statement,
String originSql, LayerCachePool cachePool, MycatSchemaStatVisitor visitor, ServerConnection sc) throws SQLException {
schema = druidParser.parser(schema, rrs, statement, originSql, cachePool, visitor, sc);
if (rrs.isFinishedExecute()) {
return null;
}
// DruidParser 解析过程中已完成了路由的直接返回
if ( rrs.isFinishedRoute() ) {
if (rrs.isFinishedRoute()) {
return rrs;
}
/**
* 没有from的select语句或其他
*/
@@ -144,52 +140,53 @@ public class RouterUtil {
}
/* 多表*/
if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) {
if (druidParser.getCtx().getRouteCalculateUnits().size() == 0) {
RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit);
}
SortedSet<RouteResultsetNode> nodeSet = new TreeSet<RouteResultsetNode>();
for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) {
for (RouteCalculateUnit unit : druidParser.getCtx().getRouteCalculateUnits()) {
RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool);
if(rrsTmp != null) {
for(RouteResultsetNode node :rrsTmp.getNodes()) {
if (rrsTmp != null) {
for (RouteResultsetNode node : rrsTmp.getNodes()) {
nodeSet.add(node);
}
}
}
RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()];
int i = 0;
for (RouteResultsetNode aNodeSet : nodeSet) {
nodes[i] = aNodeSet;
i++;
}
rrs.setNodes(nodes);
}
rrs.setNodes(nodes);
return rrs;
}
/**
* SELECT 语句
*/
private static boolean isSelect(SQLStatement statement) {
if(statement instanceof SQLSelectStatement) {
private static boolean isSelect(SQLStatement statement) {
if (statement instanceof SQLSelectStatement) {
return true;
}
return false;
}
/**
* 获取第一个节点作为路由
*
* @param rrs 数据路由集合
* @param dataNode 数据库所在节点
* @return 数据路由集合
*
* @param rrs 数据路由集合
* @param dataNode 数据库所在节点
* @return 数据路由集合
* @author mycat
*/
public static RouteResultset routeToSingleNode(RouteResultset rrs,
String dataNode) {
String dataNode) {
if (dataNode == null) {
return rrs;
}
@@ -201,23 +198,23 @@ public class RouterUtil {
if (rrs.getCanRunInReadDB() != null) {
nodes[0].setCanRunInReadDB(rrs.getCanRunInReadDB());
}
if(rrs.getRunOnSlave() != null){
if (rrs.getRunOnSlave() != null) {
nodes[0].setRunOnSlave(rrs.getRunOnSlave());
}
return rrs;
}
public static void routeToDDLNode(SchemaInfo schemaInfo, RouteResultset rrs) throws SQLException {
String stmt = getFixedSql(removeSchema(rrs.getStatement(),schemaInfo.schema));
String stmt = getFixedSql(removeSchema(rrs.getStatement(), schemaInfo.schema));
List<String> dataNodes;
Map<String, TableConfig> tables = schemaInfo.schemaConfig.getTables();
TableConfig tc = tables.get(schemaInfo.table);
if (tables != null && (tc != null)) {
dataNodes = tc.getDataNodes();
} else {
String msg = "Table '"+schemaInfo.schema+"."+schemaInfo.table+"' doesn't exist";
String msg = "Table '" + schemaInfo.schema + "." + schemaInfo.table + "' doesn't exist";
throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE);
}
Iterator<String> iterator1 = dataNodes.iterator();
@@ -240,11 +237,11 @@ public class RouterUtil {
/**
* 处理SQL
*
* @param stmt 执行语句
* @return 处理后SQL
* @param stmt 执行语句
* @return 处理后SQL
* @author AStoneGod
*/
public static String getFixedSql(String stmt){
public static String getFixedSql(String stmt) {
stmt = stmt.replaceAll("\r\n", " "); //对于\r\n的字符 用 空格处理 rainbow
return stmt = stmt.trim();
}
@@ -252,8 +249,8 @@ public class RouterUtil {
/**
* 获取table名字
*
* @param stmt 执行语句
* @param repPos 开始位置和位数
* @param stmt 执行语句
* @param repPos 开始位置和位数
* @return 表名
* @author AStoneGod
*/
@@ -263,21 +260,21 @@ public class RouterUtil {
if (secInd < 0) {
secInd = stmt.length();
}
int thiInd = stmt.indexOf('(',secInd+1);
int thiInd = stmt.indexOf('(', secInd + 1);
if (thiInd < 0) {
thiInd = stmt.length();
}
repPos[1] = secInd;
String tableName = "";
if (stmt.toUpperCase().startsWith("DESC")||stmt.toUpperCase().startsWith("DESCRIBE")){
if (stmt.toUpperCase().startsWith("DESC") || stmt.toUpperCase().startsWith("DESCRIBE")) {
tableName = stmt.substring(startPos, thiInd).trim();
}else {
} else {
tableName = stmt.substring(secInd, thiInd).trim();
}
//ALTER TABLE
if (tableName.contains(" ")){
tableName = tableName.substring(0,tableName.indexOf(" "));
if (tableName.contains(" ")) {
tableName = tableName.substring(0, tableName.indexOf(" "));
}
int ind2 = tableName.indexOf('.');
if (ind2 > 0) {
@@ -290,8 +287,8 @@ public class RouterUtil {
/**
* 获取show语句table名字
*
* @param stmt 执行语句
* @param repPos 开始位置和位数
* @param stmt 执行语句
* @param repPos 开始位置和位数
* @return 表名
* @author AStoneGod
*/
@@ -312,19 +309,19 @@ public class RouterUtil {
return lowerCaseTable(tableName);
}
public static String lowerCaseTable (String tableName) {
if(MycatServer.getInstance().getConfig().getSystem().isLowerCaseTableNames()){
public static String lowerCaseTable(String tableName) {
if (MycatServer.getInstance().getConfig().getSystem().isLowerCaseTableNames()) {
return tableName.toLowerCase();
}
return tableName;
}
/**
* 获取语句中前关键字位置和占位个数表名位置
*
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
*
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
* @author mycat
*/
public static int[] getCreateTablePos(String upStmt, int start) {
@@ -334,19 +331,17 @@ public class RouterUtil {
int tabInd = upStmt.indexOf(token2, start);
// 既包含CREATE又包含TABLE且CREATE关键字在TABLE关键字之前
if (createInd >= 0 && tabInd > 0 && tabInd > createInd) {
return new int[] { tabInd, token2.length() };
return new int[]{tabInd, token2.length()};
} else {
return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1第二个任意
return new int[]{-1, token2.length()};// 不满足条件时,只关注第一个返回值为-1第二个任意
}
}
/**
* 获取语句中前关键字位置和占位个数表名位置
*
* @param upStmt
* 执行语句
* @param start
* 开始位置
* @param upStmt 执行语句
* @param start 开始位置
* @return int[]关键字位置和占位个数
* @author aStoneGod
*/
@@ -359,17 +354,17 @@ public class RouterUtil {
int onInd = upStmt.indexOf(token3, start);
// 既包含CREATE又包含INDEX且CREATE关键字在INDEX关键字之前, 且包含ON...
if (createInd >= 0 && idxInd > 0 && idxInd > createInd && onInd > 0 && onInd > idxInd) {
return new int[] {onInd , token3.length() };
return new int[]{onInd, token3.length()};
} else {
return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1第二个任意
return new int[]{-1, token2.length()};// 不满足条件时,只关注第一个返回值为-1第二个任意
}
}
/**
* 获取ALTER语句中前关键字位置和占位个数表名位置
*
* @param upStmt 执行语句
* @param start 开始位置
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
* @author aStoneGod
*/
@@ -380,42 +375,42 @@ public class RouterUtil {
int tabInd = upStmt.indexOf(token2, start);
// 既包含CREATE又包含TABLE且CREATE关键字在TABLE关键字之前
if (createInd >= 0 && tabInd > 0 && tabInd > createInd) {
return new int[] { tabInd, token2.length() };
return new int[]{tabInd, token2.length()};
} else {
return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1第二个任意
return new int[]{-1, token2.length()};// 不满足条件时,只关注第一个返回值为-1第二个任意
}
}
/**
* 获取DROP语句中前关键字位置和占位个数表名位置
*
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
* @author aStoneGod
*/
public static int[] getDropTablePos(String upStmt, int start) {
//增加 if exists判断
if(upStmt.contains("EXISTS")){
if (upStmt.contains("EXISTS")) {
String token1 = "IF ";
String token2 = " EXISTS ";
int ifInd = upStmt.indexOf(token1, start);
int tabInd = upStmt.indexOf(token2, start);
if (ifInd >= 0 && tabInd > 0 && tabInd > ifInd) {
return new int[] { tabInd, token2.length() };
return new int[]{tabInd, token2.length()};
} else {
return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1第二个任意
return new int[]{-1, token2.length()};// 不满足条件时,只关注第一个返回值为-1第二个任意
}
}else {
} else {
String token1 = "DROP ";
String token2 = " TABLE ";
int createInd = upStmt.indexOf(token1, start);
int tabInd = upStmt.indexOf(token2, start);
if (createInd >= 0 && tabInd > 0 && tabInd > createInd) {
return new int[] { tabInd, token2.length() };
return new int[]{tabInd, token2.length()};
} else {
return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1第二个任意
return new int[]{-1, token2.length()};// 不满足条件时,只关注第一个返回值为-1第二个任意
}
}
}
@@ -424,10 +419,8 @@ public class RouterUtil {
/**
* 获取DROP语句中前关键字位置和占位个数表名位置
*
* @param upStmt
* 执行语句
* @param start
* 开始位置
* @param upStmt 执行语句
* @param start 开始位置
* @return int[]关键字位置和占位个数
* @author aStoneGod
*/
@@ -441,18 +434,18 @@ public class RouterUtil {
int onInd = upStmt.indexOf(token3, start);
// 既包含CREATE又包含INDEX且CREATE关键字在INDEX关键字之前, 且包含ON...
if (createInd >= 0 && idxInd > 0 && idxInd > createInd && onInd > 0 && onInd > idxInd) {
return new int[] {onInd , token3.length() };
return new int[]{onInd, token3.length()};
} else {
return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1第二个任意
return new int[]{-1, token2.length()};// 不满足条件时,只关注第一个返回值为-1第二个任意
}
}
/**
* 获取TRUNCATE语句中前关键字位置和占位个数表名位置
*
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
* @author aStoneGod
*/
public static int[] getTruncateTablePos(String upStmt, int start) {
@@ -462,17 +455,17 @@ public class RouterUtil {
int tabInd = upStmt.indexOf(token2, start);
// 既包含CREATE又包含TABLE且CREATE关键字在TABLE关键字之前
if (createInd >= 0 && tabInd > 0 && tabInd > createInd) {
return new int[] { tabInd, token2.length() };
return new int[]{tabInd, token2.length()};
} else {
return new int[] { -1, token2.length() };// 不满足条件时,只关注第一个返回值为-1第二个任意
return new int[]{-1, token2.length()};// 不满足条件时,只关注第一个返回值为-1第二个任意
}
}
/**
* 获取语句中前关键字位置和占位个数表名位置
*
* @param upStmt 执行语句
* @param start 开始位置
* @param upStmt 执行语句
* @param start 开始位置
* @return int[] 关键字位置和占位个数
* @author mycat
*/
@@ -483,20 +476,20 @@ public class RouterUtil {
int tabInd2 = upStmt.indexOf(token2, start);
if (tabInd1 > 0) {
if (tabInd2 < 0) {
return new int[] { tabInd1, token1.length() };
return new int[]{tabInd1, token1.length()};
}
return (tabInd1 < tabInd2) ? new int[] { tabInd1, token1.length() }
: new int[] { tabInd2, token2.length() };
return (tabInd1 < tabInd2) ? new int[]{tabInd1, token1.length()}
: new int[]{tabInd2, token2.length()};
} else {
return new int[] { tabInd2, token2.length() };
return new int[]{tabInd2, token2.length()};
}
}
/**
* 获取开始位置后的 LIKE、WHERE 位置 如果不含 LIKE、WHERE 则返回执行语句的长度
*
* @param upStmt 执行sql
* @param start 开始位置
* @param upStmt 执行sql
* @param start 开始位置
* @return int
* @author mycat
*/
@@ -511,7 +504,6 @@ public class RouterUtil {
return tabInd;
}
public static RouteResultset routeToMultiNode(boolean cache, RouteResultset rrs, Collection<String> dataNodes) {
RouteResultsetNode[] nodes = new RouteResultsetNode[dataNodes.size()];
@@ -523,7 +515,7 @@ public class RouterUtil {
if (rrs.getCanRunInReadDB() != null) {
node.setCanRunInReadDB(rrs.getCanRunInReadDB());
}
if(rrs.getRunOnSlave() != null){
if (rrs.getRunOnSlave() != null) {
nodes[0].setRunOnSlave(rrs.getRunOnSlave());
}
nodes[i++] = node;
@@ -540,35 +532,35 @@ public class RouterUtil {
}
public static void routeToRandomNode(RouteResultset rrs,
SchemaConfig schema, String tableName) throws SQLException {
String dataNode = getRandomDataNode(schema, tableName);
routeToSingleNode(rrs,dataNode);
SchemaConfig schema, String tableName) throws SQLException {
String dataNode = getRandomDataNode(schema, tableName);
routeToSingleNode(rrs, dataNode);
}
/**
* 根据标名随机获取一个节点
*
* @param schema 数据库名
* @param table 表名
* @return 数据节点
* @param schema 数据库名
* @param table 表名
* @return 数据节点
* @author mycat
*/
private static String getRandomDataNode(SchemaConfig schema,
String table) throws SQLException {
String table) throws SQLException {
String dataNode = null;
Map<String, TableConfig> tables = schema.getTables();
TableConfig tc;
if (tables != null && (tc = tables.get(table)) != null) {
dataNode = tc.getRandomDataNode();
} else {
String msg = "Table '"+schema.getName()+"."+table+"' doesn't exist";
String msg = "Table '" + schema.getName() + "." + table + "' doesn't exist";
throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE);
}
return dataNode;
}
public static Set<String> ruleByJoinValueCalculate(RouteResultset rrs, TableConfig tc,
Set<ColumnRoutePair> colRoutePairSet) throws SQLNonTransientException {
Set<ColumnRoutePair> colRoutePairSet) throws SQLNonTransientException {
Set<String> retNodeSet = new LinkedHashSet<String>();
if (tc.getDirectRouteTC() != null) {
Set<String> nodeSet = ruleCalculate(tc.getDirectRouteTC(), colRoutePairSet);
@@ -586,7 +578,7 @@ public class RouterUtil {
return retNodeSet;
}
public static Set<String> ruleCalculate(TableConfig tc, Set<ColumnRoutePair> colRoutePairSet) {
public static Set<String> ruleCalculate(TableConfig tc, Set<ColumnRoutePair> colRoutePairSet) {
Set<String> routeNodeSet = new LinkedHashSet<String>();
String col = tc.getRule().getColumn();
RuleConfig rule = tc.getRule();
@@ -628,18 +620,18 @@ public class RouterUtil {
* 多表路由
*/
public static RouteResultset tryRouteForTables(SchemaConfig schema, DruidShardingParseInfo ctx,
RouteCalculateUnit routeUnit, RouteResultset rrs, boolean isSelect, LayerCachePool cachePool)
RouteCalculateUnit routeUnit, RouteResultset rrs, boolean isSelect, LayerCachePool cachePool)
throws SQLException {
List<String> tables = ctx.getTables();
// no sharding table
if(isNoSharding(schema,tables.get(0))) {
if (isNoSharding(schema, tables.get(0))) {
return routeToSingleNode(rrs, schema.getDataNode());
}
//只有一个表的
if(tables.size() == 1) {
if (tables.size() == 1) {
return RouterUtil.tryRouteForOneTable(schema, ctx, routeUnit, tables.get(0), rrs, isSelect, cachePool);
}
@@ -647,26 +639,26 @@ public class RouterUtil {
* 多表 一定是ER关系的以及global* normal表, global* er表的join
*/
//每个表对应的路由映射 <table,datanodes>
Map<String,Set<String>> tablesRouteMap = new HashMap<String,Set<String>>();
Map<String, Set<String>> tablesRouteMap = new HashMap<String, Set<String>>();
//分库解析信息不为空
Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions = routeUnit.getTablesAndConditions();
if(tablesAndConditions != null && tablesAndConditions.size() > 0) {
if (tablesAndConditions != null && tablesAndConditions.size() > 0) {
//为分库表找路由
RouterUtil.findRouteWithcConditionsForTables(schema, rrs, tablesAndConditions, tablesRouteMap, cachePool, isSelect);
if(rrs.isFinishedRoute()) {
if (rrs.isFinishedRoute()) {
return rrs;
}
}
//为单库表找路由,全局表不改变结果集,全局表*任意表 无交集的已经退化为普通表join了
for(String tableName : tables) {
for (String tableName : tables) {
TableConfig tableConfig = schema.getTables().get(tableName);
if(tableConfig == null) {
String msg = "Table '"+schema.getName()+"."+tableName+"' doesn't exist";
if (tableConfig == null) {
String msg = "Table '" + schema.getName() + "." + tableName + "' doesn't exist";
throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE);
}
if (!tableConfig.isGlobalTable() && tablesRouteMap.get(tableName) == null) { // 余下的表都是单库表
tablesRouteMap.put(tableName, new HashSet<String>());
tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
@@ -675,16 +667,16 @@ public class RouterUtil {
Set<String> retNodesSet = new HashSet<String>();
boolean isFirstAdd = true;
for(Map.Entry<String, Set<String>> entry : tablesRouteMap.entrySet()) {
if(entry.getValue() == null || entry.getValue().size() == 0) {
for (Map.Entry<String, Set<String>> entry : tablesRouteMap.entrySet()) {
if (entry.getValue() == null || entry.getValue().size() == 0) {
throw new SQLNonTransientException("parent key can't find any valid datanode ");
} else {
if(isFirstAdd) {
if (isFirstAdd) {
retNodesSet.addAll(entry.getValue());
isFirstAdd = false;
} else {
retNodesSet.retainAll(entry.getValue());
if(retNodesSet.size() == 0) {//两个表的路由无交集
if (retNodesSet.size() == 0) {//两个表的路由无交集
String errMsg = "invalid route in sql, multi tables found but datanode has no intersection "
+ " sql:" + rrs.getStatement();
LOGGER.warn(errMsg);
@@ -698,28 +690,27 @@ public class RouterUtil {
return rrs;
}
/**
*
* 单表路由
*/
public static RouteResultset tryRouteForOneTable(SchemaConfig schema, DruidShardingParseInfo ctx,
RouteCalculateUnit routeUnit, String tableName, RouteResultset rrs, boolean isSelect,
LayerCachePool cachePool) throws SQLException {
RouteCalculateUnit routeUnit, String tableName, RouteResultset rrs, boolean isSelect,
LayerCachePool cachePool) throws SQLException {
TableConfig tc = schema.getTables().get(tableName);
if(tc == null) {
String msg = "Table '"+schema.getName()+"."+tableName+"' doesn't exist";
if (tc == null) {
String msg = "Table '" + schema.getName() + "." + tableName + "' doesn't exist";
throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE);
}
if(tc.isGlobalTable()) {//全局表
if(isSelect) {
if (tc.isGlobalTable()) {//全局表
if (isSelect) {
// global select ,not cache route result
rrs.setCacheAble(false);
return routeToSingleNode(rrs, tc.getRandomDataNode());
} else {//insert into 全局表的记录
return routeToMultiNode(false, rrs, tc.getDataNodes(),true);
return routeToMultiNode(false, rrs, tc.getDataNodes(), true);
}
} else {//单表或者分库表
if (!checkRuleRequired(schema, ctx, routeUnit, tc)) {
@@ -733,14 +724,14 @@ public class RouterUtil {
return routeToMultiNode(rrs.isCacheAble(), rrs, tc.getDataNodes());
} else {
//每个表对应的路由映射
Map<String,Set<String>> tablesRouteMap = new HashMap<String,Set<String>>();
if(routeUnit.getTablesAndConditions() != null && routeUnit.getTablesAndConditions().size() > 0) {
Map<String, Set<String>> tablesRouteMap = new HashMap<String, Set<String>>();
if (routeUnit.getTablesAndConditions() != null && routeUnit.getTablesAndConditions().size() > 0) {
RouterUtil.findRouteWithcConditionsForTables(schema, rrs, routeUnit.getTablesAndConditions(), tablesRouteMap, cachePool, isSelect);
if(rrs.isFinishedRoute()) {
if (rrs.isFinishedRoute()) {
return rrs;
}
}
if(tablesRouteMap.get(tableName) == null) {
if (tablesRouteMap.get(tableName) == null) {
return routeToMultiNode(rrs.isCacheAble(), rrs, tc.getDataNodes());
} else {
return routeToMultiNode(rrs.isCacheAble(), rrs, tablesRouteMap.get(tableName));
@@ -748,27 +739,27 @@ public class RouterUtil {
}
}
}
/**
* 处理分库表路由
*/
public static void findRouteWithcConditionsForTables(SchemaConfig schema, RouteResultset rrs,
Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions,
Map<String, Set<String>> tablesRouteMap, LayerCachePool cachePool, boolean isSelect)
Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions,
Map<String, Set<String>> tablesRouteMap, LayerCachePool cachePool, boolean isSelect)
throws SQLNonTransientException {
//为分库表找路由
for(Map.Entry<String, Map<String, Set<ColumnRoutePair>>> entry : tablesAndConditions.entrySet()) {
for (Map.Entry<String, Map<String, Set<ColumnRoutePair>>> entry : tablesAndConditions.entrySet()) {
String tableName = entry.getKey();
if (MycatServer.getInstance().getConfig().getSystem().isLowerCaseTableNames()) {
tableName = tableName.toLowerCase();
}
if(tableName.startsWith(schema.getName()+".")){
tableName = tableName.substring(schema.getName().length()+1);
if (tableName.startsWith(schema.getName() + ".")) {
tableName = tableName.substring(schema.getName().length() + 1);
}
TableConfig tableConfig = schema.getTables().get(tableName);
if(tableConfig == null) {
if (tableConfig == null) {
String msg = "can't find table ["
+ tableName + "[ define in schema "
+ ":" + schema.getName();
@@ -776,7 +767,7 @@ public class RouterUtil {
throw new SQLNonTransientException(msg);
}
//全局表或者不分库的表略过(全局表后面再计算)
if(tableConfig.isGlobalTable() || schema.getTables().get(tableName).getDataNodes().size() == 1) {
if (tableConfig.isGlobalTable() || schema.getTables().get(tableName).getDataNodes().size() == 1) {
continue;
} else {//非全局表分库表、childTable、其他
Map<String, Set<ColumnRoutePair>> columnsMap = entry.getValue();
@@ -784,12 +775,12 @@ public class RouterUtil {
String partionCol = tableConfig.getPartitionColumn();
String primaryKey = tableConfig.getPrimaryKey();
boolean isFoundPartitionValue = partionCol != null && columnsMap.get(partionCol) != null;
boolean isLoadData=false;
if (LOGGER.isDebugEnabled()
&& rrs.getStatement().startsWith(LoadData.loadDataHint)||rrs.isLoadData()) {
//由于load data一次会计算很多路由数据如果输出此日志会极大降低load data的性能
isLoadData=true;
}
boolean isLoadData = false;
if (LOGGER.isDebugEnabled()
&& rrs.getStatement().startsWith(LoadData.loadDataHint) || rrs.isLoadData()) {
//由于load data一次会计算很多路由数据如果输出此日志会极大降低load data的性能
isLoadData = true;
}
if (columnsMap.get(primaryKey) != null && columnsMap.size() == 1 && !isLoadData) {
//TODO: IS NEEDED?? 主键查找 try by primary key if found in cache
Set<ColumnRoutePair> primaryKeyPairs = columnsMap.get(primaryKey);
@@ -798,10 +789,10 @@ public class RouterUtil {
LOGGER.debug("try to find cache by primary key ");
}
String tableKey = StringUtil.getFullName(schema.getName(),tableName,'_');
String tableKey = StringUtil.getFullName(schema.getName(), tableName, '_');
boolean allFound = true;
for (ColumnRoutePair pair : primaryKeyPairs) {// 可能id
// in(1,2,3)多主键
// in(1,2,3)多主键
String cacheKey = pair.colValue;
String dataNode = (String) cachePool.get(tableKey, cacheKey);
if (dataNode == null) {
@@ -827,17 +818,17 @@ public class RouterUtil {
}
if (isFoundPartitionValue) {//分库表
Set<ColumnRoutePair> partitionValue = columnsMap.get(partionCol);
if(partitionValue.size() == 0) {
if(tablesRouteMap.get(tableName) == null) {
if (partitionValue.size() == 0) {
if (tablesRouteMap.get(tableName) == null) {
tablesRouteMap.put(tableName, new HashSet<String>());
}
tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
} else {
for(ColumnRoutePair pair : partitionValue) {
for (ColumnRoutePair pair : partitionValue) {
AbstractPartitionAlgorithm algorithm = tableConfig.getRule().getRuleAlgorithm();
if(pair.colValue != null) {
if (pair.colValue != null) {
Integer nodeIndex = algorithm.calculate(pair.colValue);
if(nodeIndex == null) {
if (nodeIndex == null) {
String msg = "can't find any valid datanode :" + tableConfig.getName()
+ " -> " + tableConfig.getPartitionColumn() + " -> " + pair.colValue;
LOGGER.warn(msg);
@@ -846,7 +837,7 @@ public class RouterUtil {
ArrayList<String> dataNodes = tableConfig.getDataNodes();
String node;
if (nodeIndex >=0 && nodeIndex < dataNodes.size()) {
if (nodeIndex >= 0 && nodeIndex < dataNodes.size()) {
node = dataNodes.get(nodeIndex);
} else {
@@ -857,19 +848,19 @@ public class RouterUtil {
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
if(node != null) {
if(tablesRouteMap.get(tableName) == null) {
if (node != null) {
if (tablesRouteMap.get(tableName) == null) {
tablesRouteMap.put(tableName, new HashSet<String>());
}
tablesRouteMap.get(tableName).add(node);
}
}
if(pair.rangeValue != null) {
if (pair.rangeValue != null) {
Integer[] nodeIndexs = algorithm
.calculateRange(pair.rangeValue.beginValue.toString(), pair.rangeValue.endValue.toString());
ArrayList<String> dataNodes = tableConfig.getDataNodes();
String node;
for(Integer idx : nodeIndexs) {
for (Integer idx : nodeIndexs) {
if (idx >= 0 && idx < dataNodes.size()) {
node = dataNodes.get(idx);
} else {
@@ -878,8 +869,8 @@ public class RouterUtil {
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
if(node != null) {
if(tablesRouteMap.get(tableName) == null) {
if (node != null) {
if (tablesRouteMap.get(tableName) == null) {
tablesRouteMap.put(tableName, new HashSet<String>());
}
tablesRouteMap.get(tableName).add(node);
@@ -889,9 +880,9 @@ public class RouterUtil {
}
}
}
} else if(joinKey != null && columnsMap.get(joinKey) != null && columnsMap.get(joinKey).size() != 0) {//childTable (如果是select 语句的父子表join)之前要找到root table,将childTable移除,只留下root table
} else if (joinKey != null && columnsMap.get(joinKey) != null && columnsMap.get(joinKey).size() != 0) {//childTable (如果是select 语句的父子表join)之前要找到root table,将childTable移除,只留下root table
Set<ColumnRoutePair> joinKeyValue = columnsMap.get(joinKey);
Set<String> dataNodeSet = ruleByJoinValueCalculate(rrs, tableConfig, joinKeyValue);
if (dataNodeSet.isEmpty()) {
@@ -914,7 +905,7 @@ public class RouterUtil {
} else {
//没找到拆分字段,该表的所有节点都路由
if(tablesRouteMap.get(tableName) == null) {
if (tablesRouteMap.get(tableName) == null) {
tablesRouteMap.put(tableName, new HashSet<String>());
}
tablesRouteMap.get(tableName).addAll(tableConfig.getDataNodes());
@@ -924,26 +915,25 @@ public class RouterUtil {
}
/**
*
* @param schema
* @param ctx
* @param tc
* @return true表示校验通过false表示检验不通过
*/
public static boolean checkRuleRequired(SchemaConfig schema, DruidShardingParseInfo ctx, RouteCalculateUnit routeUnit, TableConfig tc) {
if(!tc.isRuleRequired()) {
if (!tc.isRuleRequired()) {
return true;
}
boolean hasRequiredValue = false;
String tableName = tc.getName();
if(routeUnit.getTablesAndConditions().get(tableName) == null || routeUnit.getTablesAndConditions().get(tableName).size() == 0) {
if (routeUnit.getTablesAndConditions().get(tableName) == null || routeUnit.getTablesAndConditions().get(tableName).size() == 0) {
hasRequiredValue = false;
} else {
for(Map.Entry<String, Set<ColumnRoutePair>> condition : routeUnit.getTablesAndConditions().get(tableName).entrySet()) {
for (Map.Entry<String, Set<ColumnRoutePair>> condition : routeUnit.getTablesAndConditions().get(tableName).entrySet()) {
String colName = RouterUtil.getFixedSql(RouterUtil.removeSchema(condition.getKey(), schema.getName()));
//条件字段是拆分字段
if(colName.equals(tc.getPartitionColumn())) {
if (colName.equals(tc.getPartitionColumn())) {
hasRequiredValue = true;
break;
}
@@ -955,6 +945,7 @@ public class RouterUtil {
/**
* 增加判断支持未配置分片的表走默认的dataNode
*
* @param schemaConfig
* @param tableName
* @return
@@ -972,12 +963,13 @@ public class RouterUtil {
/**
* 判断条件是否永真
*
* @param expr
* @return
*/
public static boolean isConditionAlwaysTrue(SQLExpr expr) {
Object o = WallVisitorUtils.getValue(expr);
if(Boolean.TRUE.equals(o)) {
if (Boolean.TRUE.equals(o)) {
return true;
}
return false;
@@ -985,12 +977,13 @@ public class RouterUtil {
/**
* 判断条件是否永假的
*
* @param expr
* @return
*/
public static boolean isConditionAlwaysFalse(SQLExpr expr) {
Object o = WallVisitorUtils.getValue(expr);
if(Boolean.FALSE.equals(o)) {
if (Boolean.FALSE.equals(o)) {
return true;
}
return false;

View File

@@ -74,7 +74,7 @@ public class Explain2Handler {
}
RouteResultsetNode node = new RouteResultsetNode(dataNode, ServerParse.SELECT, sql);
RouteResultset rrs = new RouteResultset(sql, ServerParse.SELECT, c.getSession2());
RouteResultset rrs = new RouteResultset(sql, ServerParse.SELECT);
node.setSource(rrs);
EMPTY_ARRAY[0] = node;
rrs.setNodes(EMPTY_ARRAY);

View File

@@ -36,7 +36,6 @@ import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -326,7 +325,7 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler
private RouteResultset tryDirectRoute(String sql, String[] lineList)
{
RouteResultset rrs = new RouteResultset(sql, ServerParse.INSERT, serverConnection.getSession2());
RouteResultset rrs = new RouteResultset(sql, ServerParse.INSERT);
rrs.setLoadData(true);
if (tableConfig == null && schema.getDataNode() != null)
{
@@ -530,10 +529,9 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler
statement.setFileName(fn);
//在这里使用替换方法替换掉SQL语句中的 IGNORE X LINES 防止每个物理节点都IGNORE X个元素
String srcStatement = this.ignoreLinesDelete(statement.toString());
RouteResultset rrs = new RouteResultset(srcStatement, ServerParse.LOAD_DATA_INFILE_SQL, serverConnection.getSession2());
RouteResultset rrs = new RouteResultset(srcStatement, ServerParse.LOAD_DATA_INFILE_SQL);
rrs.setLoadData(true);
rrs.setStatement(srcStatement);
rrs.setAutocommit(serverConnection.isAutocommit());
rrs.setFinishedRoute(true);
int size = routeMap.size();
RouteResultsetNode[] routeResultsetNodes = new RouteResultsetNode[size];

View File

@@ -92,7 +92,7 @@ public class DruidUpdateParserTest {
when((schemaConfig).getTables()).thenReturn(tables);
when(tables.get(tableName)).thenReturn(tableConfig);
when(tableConfig.getParentTC()).thenReturn(null);
RouteResultset routeResultset = new RouteResultset(sql, 11, null);
RouteResultset routeResultset = new RouteResultset(sql, 11);
Class c = DruidUpdateParser.class;
Method method = c.getDeclaredMethod("confirmShardColumnNotUpdated", new Class[]{SQLUpdateStatement.class, SchemaConfig.class, String.class, String.class, String.class, RouteResultset.class});
method.setAccessible(true);

View File

@@ -46,7 +46,7 @@ public class DQLRouteTest {
public void test() throws Exception {
String stmt = "select * from `offer` where id = 100";
SchemaConfig schema = schemaMap.get("mysqldb");
RouteResultset rrs = new RouteResultset(stmt, 7, null);
RouteResultset rrs = new RouteResultset(stmt, 7);
SQLStatementParser parser = null;
parser = new MySqlStatementParser(stmt);