global sequence

This commit is contained in:
yanhuqing666
2017-03-29 16:05:32 +08:00
parent 433f185058
commit 8012dc8ecd
10 changed files with 211 additions and 692 deletions

View File

@@ -84,15 +84,19 @@ import io.mycat.net.NIOProcessor;
import io.mycat.net.NIOReactorPool;
import io.mycat.net.SocketAcceptor;
import io.mycat.net.SocketConnector;
import io.mycat.route.MyCATSequnceProcessor;
import io.mycat.route.RouteService;
import io.mycat.route.factory.RouteStrategyFactory;
import io.mycat.route.sequence.handler.DistributedSequenceHandler;
import io.mycat.route.sequence.handler.IncrSequenceMySQLHandler;
import io.mycat.route.sequence.handler.IncrSequencePropHandler;
import io.mycat.route.sequence.handler.IncrSequenceTimeHandler;
import io.mycat.route.sequence.handler.IncrSequenceZKHandler;
import io.mycat.route.sequence.handler.SequenceHandler;
import io.mycat.server.ServerConnectionFactory;
import io.mycat.server.interceptor.SQLInterceptor;
import io.mycat.server.interceptor.impl.GlobalTableUtil;
import io.mycat.sqlengine.OneRawSQLQueryResultHandler;
import io.mycat.sqlengine.SQLJob;
import io.mycat.statistic.SQLRecorder;
import io.mycat.statistic.stat.SqlResultSizeRecorder;
import io.mycat.statistic.stat.UserStat;
import io.mycat.statistic.stat.UserStatAnalyzer;
@@ -115,6 +119,8 @@ public class MycatServer {
private static final MycatServer INSTANCE = new MycatServer();
private static final Logger LOGGER = LoggerFactory.getLogger("MycatServer");
private static final Repository fileRepository = new FileSystemRepository();
//全局序列号
private final SequenceHandler sequenceHandler;
private final RouteService routerService;
private final CacheService cacheService;
private Properties dnIndexProperties;
@@ -125,8 +131,6 @@ public class MycatServer {
private AsynchronousChannelGroup[] asyncChannelGroups;
private volatile int channelIndex = 0;
//全局序列号
private final MyCATSequnceProcessor sequnceProcessor = new MyCATSequnceProcessor();
private final DynaClassLoader catletClassLoader;
private final SQLInterceptor sqlInterceptor;
private volatile int nextProcessor;
@@ -203,6 +207,11 @@ public class MycatServer {
dnindexLock = new InterProcessMutex(ZKUtils.getConnection(), path);
}
xaSessionCheck = new XASessionCheck();
sequenceHandler = initSequenceHandler(config.getSystem().getSequnceHandlerType());
}
public SequenceHandler getSequenceHandler() {
return sequenceHandler;
}
public long getTotalNetWorkBufferSize() {
@@ -221,10 +230,6 @@ public class MycatServer {
return catletClassLoader;
}
public MyCATSequnceProcessor getSequnceProcessor() {
return sequnceProcessor;
}
public SQLInterceptor getSqlInterceptor() {
return sqlInterceptor;
}
@@ -250,6 +255,22 @@ public class MycatServer {
}
}
private SequenceHandler initSequenceHandler(int seqHandlerType) {
switch (seqHandlerType) {
case SystemConfig.SEQUENCEHANDLER_MYSQLDB:
return IncrSequenceMySQLHandler.getInstance();
case SystemConfig.SEQUENCEHANDLER_LOCALFILE:
return IncrSequencePropHandler.getInstance();
case SystemConfig.SEQUENCEHANDLER_LOCAL_TIME:
return IncrSequenceTimeHandler.getInstance();
case SystemConfig.SEQUENCEHANDLER_ZK_DISTRIBUTED:
return DistributedSequenceHandler.getInstance(MycatServer.getInstance().getConfig().getSystem());
case SystemConfig.SEQUENCEHANDLER_ZK_GLOBAL_INCREMENT:
return IncrSequenceZKHandler.getInstance();
default:
throw new java.lang.IllegalArgumentException("Invalid sequnce handler type " + seqHandlerType);
}
}
public MyCatMemory getMyCatMemory() {
return myCatMemory;
}

View File

@@ -1,120 +0,0 @@
package io.mycat.route;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.mycat.MycatServer;
import io.mycat.config.ErrorCode;
import io.mycat.net.mysql.EOFPacket;
import io.mycat.net.mysql.FieldPacket;
import io.mycat.net.mysql.ResultSetHeaderPacket;
import io.mycat.net.mysql.RowDataPacket;
import io.mycat.route.parser.druid.DruidSequenceHandler;
import io.mycat.server.ServerConnection;
import io.mycat.util.StringUtil;
public class MyCATSequnceProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCATSequnceProcessor.class);
private LinkedBlockingQueue<SessionSQLPair> seqSQLQueue = new LinkedBlockingQueue<SessionSQLPair>();
private volatile boolean running=true;
public MyCATSequnceProcessor() {
new ExecuteThread().start();
}
public void addNewSql(SessionSQLPair pair) {
seqSQLQueue.add(pair);
}
private void outRawData(ServerConnection sc,String value) {
byte packetId = 0;
int fieldCount = 1;
ByteBuffer byteBuf = sc.allocate();
ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket();
headerPkg.fieldCount = fieldCount;
headerPkg.packetId = ++packetId;
byteBuf = headerPkg.write(byteBuf, sc, true);
FieldPacket fieldPkg = new FieldPacket();
fieldPkg.packetId = ++packetId;
fieldPkg.name = StringUtil.encode("SEQUNCE", sc.getCharset());
byteBuf = fieldPkg.write(byteBuf, sc, true);
EOFPacket eofPckg = new EOFPacket();
eofPckg.packetId = ++packetId;
byteBuf = eofPckg.write(byteBuf, sc, true);
RowDataPacket rowDataPkg = new RowDataPacket(fieldCount);
rowDataPkg.packetId = ++packetId;
rowDataPkg.add(StringUtil.encode(value, sc.getCharset()));
byteBuf = rowDataPkg.write(byteBuf, sc, true);
// write last eof
EOFPacket lastEof = new EOFPacket();
lastEof.packetId = ++packetId;
byteBuf = lastEof.write(byteBuf, sc, true);
// write buffer
sc.write(byteBuf);
}
private void executeSeq(SessionSQLPair pair) {
try {
/*// @micmiu 扩展NodeToString实现自定义全局序列号
NodeToString strHandler = new ExtNodeToString4SEQ(MycatServer
.getInstance().getConfig().getSystem()
.getSequnceHandlerType());
// 如果存在sequence 转化sequence为实际数值
String charset = pair.session.getSource().getCharset();
QueryTreeNode ast = SQLParserDelegate.parse(pair.sql,
charset == null ? "utf-8" : charset);
String sql = strHandler.toString(ast);
if (sql.toUpperCase().startsWith("SELECT")) {
String value=sql.substring("SELECT".length()).trim();
outRawData(pair.session.getSource(),value);
return;
}*/
//使用Druid解析器实现sequence处理 @兵临城下
DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer
.getInstance().getConfig().getSystem().getSequnceHandlerType());
String charset = pair.session.getSource().getCharset();
String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset);
pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);
} catch (Exception e) {
LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e);
pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e);
return;
}
}
public void shutdown(){
running=false;
}
class ExecuteThread extends Thread {
public ExecuteThread() {
setDaemon(true); // 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题
setName("MyCATSequnceProcessor");
}
public void run() {
while (running) {
try {
SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS);
if(pair!=null){
executeSeq(pair);
}
} catch (Exception e) {
LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e);
}
}
}
}
}

View File

@@ -1,85 +0,0 @@
package io.mycat.route.parser.druid;
import io.mycat.MycatServer;
import io.mycat.config.model.SystemConfig;
import io.mycat.route.sequence.handler.*;
import java.io.UnsupportedEncodingException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 使用Druid解析器实现对Sequence处理
*
* @author 兵临城下
* @date 2015/03/13
*/
public class DruidSequenceHandler {
private final SequenceHandler sequenceHandler;
/**
* 获取MYCAT SEQ的匹配语句
*/
private final static String MATCHED_FEATURE = "NEXT VALUE FOR MYCATSEQ_";
private final static Pattern pattern = Pattern.compile("(?:(\\s*next\\s+value\\s+for\\s*MYCATSEQ_(\\w+))(,|\\)|\\s)*)+", Pattern.CASE_INSENSITIVE);
public DruidSequenceHandler(int seqHandlerType) {
switch (seqHandlerType) {
case SystemConfig.SEQUENCEHANDLER_MYSQLDB:
sequenceHandler = IncrSequenceMySQLHandler.getInstance();
break;
case SystemConfig.SEQUENCEHANDLER_LOCALFILE:
sequenceHandler = IncrSequencePropHandler.getInstance();
break;
case SystemConfig.SEQUENCEHANDLER_LOCAL_TIME:
sequenceHandler = IncrSequenceTimeHandler.getInstance();
break;
case SystemConfig.SEQUENCEHANDLER_ZK_DISTRIBUTED:
sequenceHandler = DistributedSequenceHandler.getInstance(MycatServer.getInstance().getConfig().getSystem());
break;
case SystemConfig.SEQUENCEHANDLER_ZK_GLOBAL_INCREMENT:
sequenceHandler = IncrSequenceZKHandler.getInstance();
break;
default:
throw new java.lang.IllegalArgumentException("Invalid sequnce handler type " + seqHandlerType);
}
}
/**
* 根据原sql获取可执行的sql
*
* @param sql
* @return
* @throws UnsupportedEncodingException
*/
public String getExecuteSql(String sql, String charset) throws UnsupportedEncodingException {
String executeSql = null;
if (null != sql && !"".equals(sql)) {
//sql不能转大写因为sql可能是insert语句会把values也给转换了
// 获取表名。
Matcher matcher = pattern.matcher(sql);
if (matcher.find()) {
String tableName = matcher.group(2);
long value = sequenceHandler.nextId(tableName.toUpperCase());
// 将MATCHED_FEATURE+表名替换成序列号。
executeSql = sql.replace(matcher.group(1), " " + value);
}
}
return executeSql;
}
//just for test
public String getTableName(String sql) {
Matcher matcher = pattern.matcher(sql);
if (matcher.find()) {
return matcher.group(2);
}
return null;
}
}

View File

@@ -1,12 +1,10 @@
package io.mycat.route.parser.druid.impl;
import java.sql.SQLNonTransientException;
import java.sql.SQLSyntaxErrorException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -19,6 +17,8 @@ import com.alibaba.druid.sql.ast.expr.SQLIntegerExpr;
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement.ValuesClause;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import io.mycat.MycatServer;
import io.mycat.backend.mysql.nio.handler.FetchStoreNodeOfChildTableHandler;
@@ -26,16 +26,14 @@ import io.mycat.config.MycatPrivileges;
import io.mycat.config.MycatPrivileges.Checktype;
import io.mycat.config.model.SchemaConfig;
import io.mycat.config.model.TableConfig;
import io.mycat.meta.protocol.MyCatMeta.IndexMeta;
import io.mycat.meta.protocol.MyCatMeta.TableMeta;
import io.mycat.route.RouteResultset;
import io.mycat.route.RouteResultsetNode;
import io.mycat.route.SessionSQLPair;
import io.mycat.route.function.AbstractPartitionAlgorithm;
import io.mycat.route.parser.druid.MycatSchemaStatVisitor;
import io.mycat.route.util.RouterUtil;
import io.mycat.server.ServerConnection;
import io.mycat.server.interceptor.impl.GlobalTableUtil;
import io.mycat.server.parser.ServerParse;
import io.mycat.server.util.SchemaUtil;
import io.mycat.server.util.SchemaUtil.SchemaInfo;
import io.mycat.sqlengine.mpp.ColumnRoutePair;
@@ -59,14 +57,7 @@ public class DruidInsertParser extends DefaultDruidParser {
}
schema = schemaInfo.schemaConfig;
String tableName = schemaInfo.table;
if (processWithMycatSeq(schema, ServerParse.INSERT, rrs.getStatement(), rrs.getSession().getSource())) {
rrs.setFinishedExecute(true);
return schema;
}
if (processInsert(schema, tableName, rrs.getStatement(), rrs.getSession().getSource())) {
rrs.setFinishedExecute(true);
return schema;
}
if (parserNoSharding(schemaName, schemaInfo, rrs, insert)) {
return schema;
}
@@ -76,10 +67,16 @@ public class DruidInsertParser extends DefaultDruidParser {
String msg = "can't find table [" + tableName + "] define in schema:" + schema.getName();
throw new SQLNonTransientException(msg);
}
if (insert.getQuery() != null) {
// insert into .... select ....
String msg = "TODO:insert into .... select .... not supported!";
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
if (tc.isGlobalTable()) {
String sql = rrs.getStatement();
if (GlobalTableUtil.useGlobleTableCheck()) {
sql = convertInsertSQL(schemaInfo, insert, sql);
if (tc.isAutoIncrement() || GlobalTableUtil.useGlobleTableCheck()) {
sql = convertInsertSQL(schemaInfo, insert, sql, tc, GlobalTableUtil.useGlobleTableCheck());
}else{
sql = RouterUtil.removeSchema(sql, schemaInfo.schema);
}
@@ -88,6 +85,14 @@ public class DruidInsertParser extends DefaultDruidParser {
rrs.setFinishedRoute(true);
return schema;
}
if (tc.isAutoIncrement()) {
String sql = convertInsertSQL(schemaInfo, insert, rrs.getStatement(), tc, false);
rrs.setStatement(sql);
SQLStatementParser parser = new MySqlStatementParser(sql);
stmt = parser.parseStatement();
insert = (MySqlInsertStatement) stmt;
}
// childTable的insert直接在解析过程中完成路由
if (tc.getParentTC()!= null) {
parserChildTable(schemaInfo, rrs, insert);
@@ -107,159 +112,7 @@ public class DruidInsertParser extends DefaultDruidParser {
}
return schema;
}
private boolean processInsert(SchemaConfig schema, String tableName, String origSQL, ServerConnection sc)
throws SQLNonTransientException {
TableConfig tableConfig = schema.getTables().get(tableName);
boolean processedInsert = false;
// 判断是有自增字段
if (null != tableConfig && tableConfig.isAutoIncrement()) {
String primaryKey = tableConfig.getPrimaryKey();
processedInsert = processInsert(sc, schema, origSQL, tableName, primaryKey);
}
return processedInsert;
}
private boolean processInsert(ServerConnection sc,SchemaConfig schema,
String origSQL,String tableName,String primaryKey) throws SQLNonTransientException {
int firstLeftBracketIndex = origSQL.indexOf("(");
int firstRightBracketIndex = origSQL.indexOf(")");
String upperSql = origSQL.toUpperCase();
int valuesIndex = upperSql.indexOf("VALUES");
int selectIndex = upperSql.indexOf("SELECT");
int fromIndex = upperSql.indexOf("FROM");
//屏蔽insert into table1 select * from table2语句
if(firstLeftBracketIndex < 0) {
String msg = "invalid sql:" + origSQL;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
//屏蔽批量插入
if(selectIndex > 0 &&fromIndex>0&&selectIndex>firstRightBracketIndex&&valuesIndex<0) {
String msg = "multi insert not provided" ;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
//插入语句必须提供列结构因为MyCat默认对于表结构无感知
if(valuesIndex + "VALUES".length() <= firstLeftBracketIndex) {
throw new SQLSyntaxErrorException("insert must provide ColumnList");
}
//如果主键不在插入语句的fields中则需要进一步处理
boolean processedInsert=!isPKInFields(origSQL,primaryKey,firstLeftBracketIndex,firstRightBracketIndex);
if(processedInsert){
List<String> insertSQLs = handleBatchInsert(origSQL, valuesIndex);
for(String insertSQL:insertSQLs) {
processInsert(sc, schema, insertSQL, tableName, primaryKey, firstLeftBracketIndex + 1, insertSQL.indexOf('(', firstRightBracketIndex) + 1);
}
}
return processedInsert;
}
private boolean isPKInFields(String origSQL, String primaryKey, int firstLeftBracketIndex,
int firstRightBracketIndex) {
if (primaryKey == null) {
throw new RuntimeException("please make sure the primaryKey's config is not null in schemal.xml");
}
boolean isPrimaryKeyInFields = false;
String upperSQL = origSQL.substring(firstLeftBracketIndex, firstRightBracketIndex + 1).toUpperCase();
for (int pkOffset = 0, primaryKeyLength = primaryKey.length(), pkStart = 0;;) {
pkStart = upperSQL.indexOf(primaryKey, pkOffset);
if (pkStart >= 0 && pkStart < firstRightBracketIndex) {
char pkSide = upperSQL.charAt(pkStart - 1);
if (pkSide <= ' ' || pkSide == '`' || pkSide == ',' || pkSide == '(') {
pkSide = upperSQL.charAt(pkStart + primaryKey.length());
isPrimaryKeyInFields = pkSide <= ' ' || pkSide == '`' || pkSide == ',' || pkSide == ')';
}
if (isPrimaryKeyInFields) {
break;
}
pkOffset = pkStart + primaryKeyLength;
} else {
break;
}
}
return isPrimaryKeyInFields;
}
private List<String> handleBatchInsert(String origSQL, int valuesIndex){
List<String> handledSQLs = new LinkedList<>();
String prefix = origSQL.substring(0,valuesIndex + "VALUES".length());
String values = origSQL.substring(valuesIndex + "VALUES".length());
int flag = 0;
StringBuilder currentValue = new StringBuilder();
currentValue.append(prefix);
for (int i = 0; i < values.length(); i++) {
char j = values.charAt(i);
if(j=='(' && flag == 0){
flag = 1;
currentValue.append(j);
}else if(j=='\"' && flag == 1){
flag = 2;
currentValue.append(j);
} else if(j=='\'' && flag == 1){
flag = 2;
currentValue.append(j);
} else if(j=='\\' && flag == 2){
flag = 3;
currentValue.append(j);
} else if (flag == 3){
flag = 2;
currentValue.append(j);
}else if(j=='\"' && flag == 2){
flag = 1;
currentValue.append(j);
} else if(j=='\'' && flag == 2){
flag = 1;
currentValue.append(j);
} else if (j==')' && flag == 1){
flag = 0;
currentValue.append(j);
handledSQLs.add(currentValue.toString());
currentValue = new StringBuilder();
currentValue.append(prefix);
} else if(j == ',' && flag == 0){
continue;
} else {
currentValue.append(j);
}
}
return handledSQLs;
}
private void processInsert(ServerConnection sc, SchemaConfig schema, String origSQL,
String tableName, String primaryKey, int afterFirstLeftBracketIndex, int afterLastLeftBracketIndex) {
/**
* 对于主键不在插入语句的fields中的SQL需要改写。比如hotnews主键为id插入语句为
* insert into hotnews(title) values('aaa');
* 需要改写成:
* insert into hotnews(id, title) values(next value for MYCATSEQ_hotnews,'aaa');
*/
int primaryKeyLength = primaryKey.length();
int insertSegOffset = afterFirstLeftBracketIndex;
String mycatSeqPrefix = "next value for MYCATSEQ_";
int mycatSeqPrefixLength = mycatSeqPrefix.length();
int tableNameLength = tableName.length();
char[] newSQLBuf = new char[origSQL.length() + primaryKeyLength + mycatSeqPrefixLength + tableNameLength + 2];
origSQL.getChars(0, afterFirstLeftBracketIndex, newSQLBuf, 0);
primaryKey.getChars(0, primaryKeyLength, newSQLBuf, insertSegOffset);
insertSegOffset += primaryKeyLength;
newSQLBuf[insertSegOffset] = ',';
insertSegOffset++;
origSQL.getChars(afterFirstLeftBracketIndex, afterLastLeftBracketIndex, newSQLBuf, insertSegOffset);
insertSegOffset += afterLastLeftBracketIndex - afterFirstLeftBracketIndex;
mycatSeqPrefix.getChars(0, mycatSeqPrefixLength, newSQLBuf, insertSegOffset);
insertSegOffset += mycatSeqPrefixLength;
tableName.getChars(0, tableNameLength, newSQLBuf, insertSegOffset);
insertSegOffset += tableNameLength;
newSQLBuf[insertSegOffset] = ',';
insertSegOffset++;
origSQL.getChars(afterLastLeftBracketIndex, origSQL.length(), newSQLBuf, insertSegOffset);
String newSQL = RouterUtil.removeSchema(new String(newSQLBuf), schema.getName());
processSQL(sc, schema, newSQL, ServerParse.INSERT);
}
private boolean parserNoSharding(String schemaName, SchemaInfo schemaInfo, RouteResultset rrs, MySqlInsertStatement insert) {
if (RouterUtil.isNoSharding(schemaInfo.schemaConfig, schemaInfo.table)) {
if (insert.getQuery() != null) {
@@ -278,7 +131,7 @@ public class DruidInsertParser extends DefaultDruidParser {
* @return
*/
private boolean isMultiInsert(MySqlInsertStatement insertStmt) {
return (insertStmt.getValuesList() != null && insertStmt.getValuesList().size() > 1) || insertStmt.getQuery() != null;
return (insertStmt.getValuesList() != null && insertStmt.getValuesList().size() > 1);
}
private RouteResultset parserChildTable(SchemaInfo schemaInfo, RouteResultset rrs, MySqlInsertStatement insertStmt) throws SQLNonTransientException {
@@ -295,7 +148,6 @@ public class DruidInsertParser extends DefaultDruidParser {
String joinKeyVal = insertStmt.getValues().getValues().get(joinKeyIndex).toString();
String realVal = StringUtil.removeApostrophe(joinKeyVal);
String sql = RouterUtil.removeSchema(insertStmt.toString(), schemaInfo.schema);
rrs.setStatement(sql);
// try to route by ER parent partion key
RouteResultset theRrs = routeByERParentKey(rrs, tc, realVal);
@@ -401,68 +253,93 @@ public class DruidInsertParser extends DefaultDruidParser {
private void parserBatchInsert(SchemaInfo schemaInfo, RouteResultset rrs, String partitionColumn,
MySqlInsertStatement insertStmt) throws SQLNonTransientException {
// insert into table() values (),(),....
if (insertStmt.getValuesList().size() > 1) {
SchemaConfig schema = schemaInfo.schemaConfig;
String tableName = schemaInfo.table;
// 字段列数
int columnNum = getTableColumns(schemaInfo, insertStmt);
int shardingColIndex = getShardingColIndex(schemaInfo, insertStmt, partitionColumn);
List<ValuesClause> valueClauseList = insertStmt.getValuesList();
Map<Integer, List<ValuesClause>> nodeValuesMap = new HashMap<Integer, List<ValuesClause>>();
TableConfig tableConfig = schema.getTables().get(tableName);
AbstractPartitionAlgorithm algorithm = tableConfig.getRule().getRuleAlgorithm();
for (ValuesClause valueClause : valueClauseList) {
if (valueClause.getValues().size() != columnNum) {
String msg = "bad insert sql columnSize != valueSize:" + columnNum + " != "
+ valueClause.getValues().size() + "values:" + valueClause;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
SQLExpr expr = valueClause.getValues().get(shardingColIndex);
String shardingValue = null;
if (expr instanceof SQLIntegerExpr) {
SQLIntegerExpr intExpr = (SQLIntegerExpr) expr;
shardingValue = intExpr.getNumber() + "";
} else if (expr instanceof SQLCharExpr) {
SQLCharExpr charExpr = (SQLCharExpr) expr;
shardingValue = charExpr.getText();
}
Integer nodeIndex = algorithm.calculate(shardingValue);
// 没找到插入的分片
if (nodeIndex == null) {
String msg = "can't find any valid datanode :" + tableName + " -> " + partitionColumn + " -> "
+ shardingValue;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
if (nodeValuesMap.get(nodeIndex) == null) {
nodeValuesMap.put(nodeIndex, new ArrayList<ValuesClause>());
}
nodeValuesMap.get(nodeIndex).add(valueClause);
SchemaConfig schema = schemaInfo.schemaConfig;
String tableName = schemaInfo.table;
// 字段列数
int columnNum = getTableColumns(schemaInfo, insertStmt);
int shardingColIndex = getShardingColIndex(schemaInfo, insertStmt, partitionColumn);
List<ValuesClause> valueClauseList = insertStmt.getValuesList();
Map<Integer, List<ValuesClause>> nodeValuesMap = new HashMap<Integer, List<ValuesClause>>();
TableConfig tableConfig = schema.getTables().get(tableName);
AbstractPartitionAlgorithm algorithm = tableConfig.getRule().getRuleAlgorithm();
for (ValuesClause valueClause : valueClauseList) {
if (valueClause.getValues().size() != columnNum) {
String msg = "bad insert sql columnSize != valueSize:" + columnNum + " != "
+ valueClause.getValues().size() + "values:" + valueClause;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
SQLExpr expr = valueClause.getValues().get(shardingColIndex);
String shardingValue = null;
if (expr instanceof SQLIntegerExpr) {
SQLIntegerExpr intExpr = (SQLIntegerExpr) expr;
shardingValue = intExpr.getNumber() + "";
} else if (expr instanceof SQLCharExpr) {
SQLCharExpr charExpr = (SQLCharExpr) expr;
shardingValue = charExpr.getText();
}
RouteResultsetNode[] nodes = new RouteResultsetNode[nodeValuesMap.size()];
int count = 0;
for (Map.Entry<Integer, List<ValuesClause>> node : nodeValuesMap.entrySet()) {
Integer nodeIndex = node.getKey();
List<ValuesClause> valuesList = node.getValue();
insertStmt.setValuesList(valuesList);
nodes[count] = new RouteResultsetNode(tableConfig.getDataNodes().get(nodeIndex), rrs.getSqlType(),
RouterUtil.removeSchema(insertStmt.toString(), schemaInfo.schema));
nodes[count++].setSource(rrs);
Integer nodeIndex = algorithm.calculate(shardingValue);
// 没找到插入的分片
if (nodeIndex == null) {
String msg = "can't find any valid datanode :" + tableName + " -> " + partitionColumn + " -> "
+ shardingValue;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
rrs.setNodes(nodes);
rrs.setFinishedRoute(true);
} else if (insertStmt.getQuery() != null) {
// insert into .... select ....
String msg = "TODO:insert into .... select .... not supported!";
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
if (nodeValuesMap.get(nodeIndex) == null) {
nodeValuesMap.put(nodeIndex, new ArrayList<ValuesClause>());
}
nodeValuesMap.get(nodeIndex).add(valueClause);
}
RouteResultsetNode[] nodes = new RouteResultsetNode[nodeValuesMap.size()];
int count = 0;
for (Map.Entry<Integer, List<ValuesClause>> node : nodeValuesMap.entrySet()) {
Integer nodeIndex = node.getKey();
List<ValuesClause> valuesList = node.getValue();
insertStmt.setValuesList(valuesList);
nodes[count] = new RouteResultsetNode(tableConfig.getDataNodes().get(nodeIndex), rrs.getSqlType(),
RouterUtil.removeSchema(insertStmt.toString(), schemaInfo.schema));
nodes[count++].setSource(rrs);
}
rrs.setNodes(nodes);
rrs.setFinishedRoute(true);
}
private int getPrimaryKeyIndex(SchemaInfo schemaInfo, String primaryKeyColumn) throws SQLNonTransientException {
if (primaryKeyColumn == null) {
throw new SQLNonTransientException("please make sure the primaryKey's config is not null in schemal.xml");
}
int primaryKeyIndex = -1;
TableMeta tbMeta = MycatServer.getInstance().getTmManager().getSyncTableMeta(schemaInfo.schema,
schemaInfo.table);
if (tbMeta != null) {
boolean hasPrimaryKey = false;
IndexMeta primaryKey = tbMeta.getPrimary();
if (primaryKey != null) {
for (int i = 0; i < tbMeta.getPrimary().getColumnsCount(); i++) {
if (primaryKeyColumn.equalsIgnoreCase(tbMeta.getPrimary().getColumns(i))) {
hasPrimaryKey = true;
break;
}
}
}
if (!hasPrimaryKey) {
String msg = "please make sure your table structure has primaryKey";
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
for (int i = 0; i < tbMeta.getColumnsCount(); i++) {
if (primaryKeyColumn.equalsIgnoreCase(tbMeta.getColumns(i).getName())) {
return i;
}
}
}
return primaryKeyIndex;
}
/**
* 寻找拆分字段在 columnList中的索引
*
@@ -523,22 +400,21 @@ public class DruidInsertParser extends DefaultDruidParser {
private int getJoinKeyIndex(SchemaInfo schemaInfo, MySqlInsertStatement insertStmt, String joinKey) throws SQLNonTransientException {
return getShardingColIndex(schemaInfo, insertStmt, joinKey);
}
private String convertInsertSQL(SchemaInfo schemaInfo, MySqlInsertStatement insert, String originSql) throws SQLNonTransientException {
private String convertInsertSQL(SchemaInfo schemaInfo, MySqlInsertStatement insert, String originSql, TableConfig tc , boolean isGlobalCheck) throws SQLNonTransientException {
TableMeta orgTbMeta = MycatServer.getInstance().getTmManager().getSyncTableMeta(schemaInfo.schema,
schemaInfo.table);
if (orgTbMeta == null)
return originSql;
boolean isAutoIncrement = tc.isAutoIncrement();
String tableName = schemaInfo.table;
if (!GlobalTableUtil.isInnerColExist(schemaInfo, orgTbMeta))
return originSql;
// insert into .... select ....
if (insert.getQuery() != null) {
String msg = "TODO:insert into .... select .... not supported!";
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
if (isGlobalCheck && !GlobalTableUtil.isInnerColExist(schemaInfo, orgTbMeta)) {
if (!isAutoIncrement) {
return originSql;
} else {
isGlobalCheck = false;
}
}
StringBuilder sb = new StringBuilder(200) // 指定初始容量可以提高性能
@@ -546,19 +422,25 @@ public class DruidInsertParser extends DefaultDruidParser {
List<SQLExpr> columns = insert.getColumns();
int idx = -1;
int autoIncrement = -1;
int idxGlobal = -1;
int colSize = -1;
// insert 没有带列名insert into t values(xxx,xxx)
if (columns == null || columns.size() <= 0) {
if (isAutoIncrement) {
autoIncrement = getPrimaryKeyIndex(schemaInfo, tc.getPrimaryKey());
}
colSize = orgTbMeta.getColumnsList().size();
sb.append("(");
for (int i = 0; i < colSize; i++) {
String column = orgTbMeta.getColumnsList().get(i).getName();
if (i > 0)
if (i > 0){
sb.append(",");
if (column.equalsIgnoreCase(GlobalTableUtil.GLOBAL_TABLE_MYCAT_COLUMN))
idx = i; // 找到 内部列的索引位置
}
sb.append(column);
if (isGlobalCheck && column.equalsIgnoreCase(GlobalTableUtil.GLOBAL_TABLE_MYCAT_COLUMN)) {
idxGlobal = i; // 找到 内部列的索引位置
}
}
sb.append(")");
} else { // insert 语句带有 列名
@@ -569,27 +451,44 @@ public class DruidInsertParser extends DefaultDruidParser {
else
sb.append(columns.get(i).toString());
String column = StringUtil.removeBackQuote(insert.getColumns().get(i).toString());
if (column.equalsIgnoreCase(GlobalTableUtil.GLOBAL_TABLE_MYCAT_COLUMN))
idx = i;
if (isGlobalCheck && column.equalsIgnoreCase(GlobalTableUtil.GLOBAL_TABLE_MYCAT_COLUMN)) {
String msg = "In insert Syntax, you can't set value for Global check column!";
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
if (isAutoIncrement && column.equalsIgnoreCase(tc.getPrimaryKey())) {
String msg = "In insert Syntax, you can't set value for Autoincrement column!";
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
}
if (idx <= -1)
sb.append(",").append(GlobalTableUtil.GLOBAL_TABLE_MYCAT_COLUMN);
sb.append(")");
colSize = columns.size();
if (isAutoIncrement) {
autoIncrement = columns.size();
sb.append(",").append(tc.getPrimaryKey());
colSize++;
}
if (isGlobalCheck && idxGlobal <= -1){
idxGlobal = isAutoIncrement ? columns.size() + 1 : columns.size();
sb.append(",").append(GlobalTableUtil.GLOBAL_TABLE_MYCAT_COLUMN);
colSize++;
}
sb.append(")");
}
sb.append(" values");
String tableKey = schemaInfo.schema + "_" + schemaInfo.table;
List<ValuesClause> vcl = insert.getValuesList();
if (vcl != null && vcl.size() > 1) { // 批量insert
for (int j = 0; j < vcl.size(); j++) {
if (j != vcl.size() - 1)
appendValues(vcl.get(j).getValues(), sb, idx, colSize).append(",");
appendValues(tableKey, vcl.get(j).getValues(), sb, autoIncrement, idxGlobal, colSize).append(",");
else
appendValues(vcl.get(j).getValues(), sb, idx, colSize);
appendValues(tableKey, vcl.get(j).getValues(), sb, autoIncrement, idxGlobal, colSize);
}
} else { // 非批量 insert
List<SQLExpr> valuse = insert.getValues().getValues();
appendValues(valuse, sb, idx, colSize);
appendValues(tableKey,valuse, sb ,autoIncrement, idxGlobal, colSize);
}
List<SQLExpr> dku = insert.getDuplicateKeyUpdate();
@@ -629,63 +528,33 @@ public class DruidInsertParser extends DefaultDruidParser {
sb.append(")");
}
private static StringBuilder appendValues(List<SQLExpr> valuse, StringBuilder sb, int idx, int colSize) {
String operationTimestamp = String.valueOf(new Date().getTime());
private static StringBuilder appendValues(String tableKey, List<SQLExpr> valuse, StringBuilder sb, int autoIncrement, int idxGlobal, int colSize) throws SQLNonTransientException {
int size = valuse.size();
if (size < colSize)
size = colSize;
int checkSize = colSize - (autoIncrement < 0 ? 0 : 1) - (idxGlobal < 0 ? 0 : 1);
if (checkSize != size){
String msg = "In insert Syntax, you can't set value for Autoincrement column!";
if (autoIncrement < 0) {
msg = "In insert Syntax, you can't set value for Global check column!";
}
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
sb.append("(");
for (int i = 0; i < size; i++) {
if (i < size - 1) {
if (i != idx)
sb.append(valuse.get(i).toString()).append(",");
else
sb.append(operationTimestamp).append(",");
int iValue =0;
for (int i = 0; i < colSize; i++) {
if (i == idxGlobal) {
sb.append(String.valueOf(new Date().getTime()));
} else if (i == autoIncrement) {
long id = MycatServer.getInstance().getSequenceHandler().nextId(tableKey);
sb.append(id);
} else {
if (i != idx) {
sb.append(valuse.get(i).toString());
} else {
sb.append(operationTimestamp);
}
sb.append(valuse.get(iValue++).toString());
}
if (i < colSize-1) {
sb.append(",");
}
}
if (idx <= -1)
sb.append(",").append(operationTimestamp);
return sb.append(")");
}
public static boolean processWithMycatSeq(SchemaConfig schema, int sqlType,
String origSQL, ServerConnection sc) {
// check if origSQL is with global sequence
// @micmiu it is just a simple judgement
// 对应本地文件配置方式insert into table1(id,name) values(next value for MYCATSEQ_GLOBAL,test);
if (origSQL.indexOf(" MYCATSEQ_") != -1) {
origSQL = RouterUtil.removeSchema(origSQL, schema.getName());
processSQL(sc, schema, origSQL, sqlType);
return true;
}
return false;
}
public static void processSQL(ServerConnection sc, SchemaConfig schema, String sql, int sqlType) {
// int sequenceHandlerType = MycatServer.getInstance().getConfig().getSystem().getSequnceHandlerType();
SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
// if(sequenceHandlerType == 3 || sequenceHandlerType == 4){
// DruidSequenceHandler sequenceHandler = new
// DruidSequenceHandler(MycatServer
// .getInstance().getConfig().getSystem().getSequnceHandlerType());
// String charset = sessionSQLPair.session.getSource().getCharset();
// String executeSql = null;
// try {
// executeSql = sequenceHandler.getExecuteSql(sessionSQLPair.sql,charset
// == null ? "utf-8":charset);
// } catch (UnsupportedEncodingException e) {
// LOGGER.error("UnsupportedEncodingException!");
// }
// sessionSQLPair.session.getSource().routeEndExecuteSQL(executeSql,
// sessionSQLPair.type,sessionSQLPair.schema);
// } else {
MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);
// }
}
}

View File

@@ -1,11 +1,14 @@
package io.mycat.route.sequence.handler;
import io.mycat.config.loader.console.ZookeeperPath;
import io.mycat.config.loader.zkprocess.comm.ZkConfig;
import io.mycat.config.loader.zkprocess.comm.ZkParamCfg;
import io.mycat.config.model.SystemConfig;
import io.mycat.route.util.PropertiesUtil;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.CancelLeadershipException;
@@ -18,13 +21,11 @@ import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import io.mycat.config.loader.console.ZookeeperPath;
import io.mycat.config.loader.zkprocess.comm.ZkConfig;
import io.mycat.config.loader.zkprocess.comm.ZkParamCfg;
import io.mycat.config.model.SystemConfig;
import io.mycat.route.util.PropertiesUtil;
/**
* 基于ZK与本地配置的分布式ID生成器(可以通过ZK获取集群机房唯一InstanceID也可以通过配置文件配置InstanceID)

View File

@@ -1,66 +0,0 @@
/*
* Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software;Designed and Developed mainly by many Chinese
* opensource volunteers. you can redistribute it and/or modify it under the
* terms of the GNU General Public License version 2 only, as published by the
* Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Any questions about this component can be directed to it's project Web address
* https://code.google.com/p/opencloudb/.
*
*/
package io.mycat.route.sequence.handler;
import java.util.Map;
/**
* BDB 数据库实现递增序列号
*
* @author <a href="http://www.micmiu.com">Michael</a>
* @time Create on 2013-12-29 下午11:05:44
* @version 1.0
*/
public class IncrSequenceBDBHandler extends IncrSequenceHandler {
private static class IncrSequenceBDBHandlerHolder {
private static final IncrSequenceBDBHandler instance = new IncrSequenceBDBHandler();
}
public static IncrSequenceBDBHandler getInstance() {
return IncrSequenceBDBHandlerHolder.instance;
}
private IncrSequenceBDBHandler() {
}
@Override
public Map<String, String> getParaValMap(String prefixName) {
return null;
}
@Override
public Boolean fetchNextPeriod(String prefixName) {
return null;
}
@Override
public Boolean updateCURIDVal(String prefixName, Long val) {
return null;
}
}

View File

@@ -54,7 +54,7 @@ public abstract class IncrSequenceHandler implements SequenceHandler {
public abstract Boolean fetchNextPeriod(String prefixName);
@Override
public long nextId(String prefixName) {
public synchronized long nextId(String prefixName) {
Map<String, String> paraMap = this.getParaValMap(prefixName);
if (null == paraMap) {
throw new RuntimeException("fetch Param Values error.");

View File

@@ -84,7 +84,7 @@ public class IncrSequenceMySQLHandler implements SequenceHandler {
private ConcurrentHashMap<String, SequenceVal> seqValueMap = new ConcurrentHashMap<String, SequenceVal>();
@Override
public long nextId(String seqName) {
public synchronized long nextId(String seqName) {
SequenceVal seqVal = seqValueMap.get(seqName);
if (seqVal == null) {
throw new ConfigException("can't find definition for sequence :"

View File

@@ -1,42 +0,0 @@
/*
* Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software;Designed and Developed mainly by many Chinese
* opensource volunteers. you can redistribute it and/or modify it under the
* terms of the GNU General Public License version 2 only, as published by the
* Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Any questions about this component can be directed to it's project Web address
* https://code.google.com/p/opencloudb/.
*
*/
package io.mycat.route.sequence.handler;
/**
* 通过Thirft客户端获取集群中心分配的全局ID
*
* @author <a href="http://www.micmiu.com">Michael</a>
* @time Create on 2013-12-25 上午12:15:48
* @version 1.0
*/
public class ThirftClientSequenceHandler implements SequenceHandler {
@Override
public long nextId(String prefixName) {
return 0;
}
}

View File

@@ -1,59 +0,0 @@
package io.mycat.parser.druid;
import static junit.framework.Assert.assertEquals;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.junit.Test;
import io.mycat.config.model.SystemConfig;
import io.mycat.route.parser.druid.DruidSequenceHandler;
/**
* 获取MYCAT SEQ 表名。
*/
public class DruidSequenceHandlerTest {
@Test
public void test() {
DruidSequenceHandler handler = new DruidSequenceHandler(SystemConfig.SEQUENCEHANDLER_LOCALFILE);
String sql = "select next value for mycatseq_xxxx".toUpperCase();
String tableName = handler.getTableName(sql);
assertEquals(tableName, "XXXX");
sql = " insert into test(id,sid)values(next value for MYCATSEQ_TEST,1)".toUpperCase();
tableName = handler.getTableName(sql);
assertEquals(tableName, "TEST");
sql = " insert into test(id,sid)values(next value for MYCATSEQ_TEST ,1)".toUpperCase();
tableName = handler.getTableName(sql);
assertEquals(tableName, "TEST");
sql = " insert into test(id)values(next value for MYCATSEQ_TEST )".toUpperCase();
tableName = handler.getTableName(sql);
assertEquals(tableName, "TEST");
}
@Test
public void test2() {
DruidSequenceHandler handler = new DruidSequenceHandler(SystemConfig.SEQUENCEHANDLER_LOCALFILE);
String sql = "/* APPLICATIONNAME=DBEAVER 3.3.2 - MAIN CONNECTION */ SELECT NEXT VALUE FOR MYCATSEQ_XXXX".toUpperCase();
String tableName = handler.getTableName(sql);
assertEquals(tableName, "XXXX");
}
public static void main(String[] args)
{
String patten="(?:(\\s*next\\s+value\\s+for\\s*MYCATSEQ_(\\w+))(,|\\)|\\s)*)+";
Pattern pattern = Pattern.compile(patten,Pattern.CASE_INSENSITIVE);
String sql="insert into test(id,sid)values( next value for MYCATSEQ_TEST ,1)";
Matcher matcher = pattern.matcher(sql);
System.out.println(matcher.find());
System.out.println(matcher.group(1));
System.out.println(matcher.group(2));
}
}