[inner-2316&2326] performance tuning, code optimization, and time-consuming methods

This commit is contained in:
wenyh
2023-08-05 13:44:49 +08:00
committed by wenyh1
parent d893ab89b4
commit d5ff39e885
38 changed files with 509 additions and 335 deletions

View File

@@ -11,9 +11,9 @@ import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.route.parser.util.ParseUtil;
import com.actiontech.dble.statistic.sql.entry.FrontendInfo;
import com.actiontech.dble.statistic.trace.AbstractTrackProbe;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@@ -22,6 +22,7 @@ public abstract class Session {
protected final AtomicBoolean isMultiStatement = new AtomicBoolean(false);
protected volatile String remainingSql = null;
protected AbstractTrackProbe trackProbe;
private volatile FrontendInfo traceFrontendInfo;
/**
* get frontend conn
@@ -29,7 +30,16 @@ public abstract class Session {
public abstract FrontendConnection getSource();
public void trace(Consumer<AbstractTrackProbe> consumer) {
Optional.ofNullable(trackProbe).ifPresent(consumer);
if (trackProbe != null) {
consumer.accept(trackProbe);
}
}
public FrontendInfo getTraceFrontendInfo() {
if (traceFrontendInfo == null) {
traceFrontendInfo = new FrontendInfo(this.getSource().getFrontEndService());
}
return traceFrontendInfo;
}
public void setHandlerStart(DMLResponseHandler handler) {
@@ -105,4 +115,8 @@ public abstract class Session {
public void setRemainingSql(String remainingSql) {
this.remainingSql = remainingSql;
}
public void setTrackProbe(AbstractTrackProbe trackProbe) {
this.trackProbe = trackProbe;
}
}

View File

@@ -19,6 +19,7 @@ import com.actiontech.dble.net.service.AuthService;
import com.actiontech.dble.services.mysqlauthenticate.MySQLBackAuthService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.singleton.FlowController;
import com.actiontech.dble.statistic.sql.entry.BackendInfo;
import com.actiontech.dble.util.TimeUtil;
import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ public class BackendConnection extends PooledConnection {
private volatile boolean backendWriteFlowControlled;
private volatile String bindFront;
private volatile BackendInfo traceBackendInfo;
public BackendConnection(NetworkChannel channel, SocketWR socketWR, ReadTimeStatusInstance instance, ResponseHandler handler, String schema) {
super(channel, socketWR);
@@ -242,6 +244,12 @@ public class BackendConnection extends PooledConnection {
return instance.isReadInstance();
}
public BackendInfo getTraceBackendInfo() {
if (traceBackendInfo == null)
traceBackendInfo = new BackendInfo(this);
return traceBackendInfo;
}
@Override
public String toString() { // show all
return "BackendConnection[id = " + id + " host = " + host + " port = " + port + " localPort = " + localPort + " mysqlId = " + threadId + " db config = " + instance + (bindFront != null ? ", currentBindFrontend = " + bindFront : "") + "]";

View File

@@ -67,7 +67,8 @@ public class DefaultResponseHandler implements ProtocolResponseHandler {
beforeError();
if (respHand != null) {
Optional.ofNullable(service.getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseEndTime(service)));
if (service.getOriginSession() != null)
service.getOriginSession().trace(t -> t.setBackendResponseEndTime(service));
IODelayProvider.beforeErrorResponse(service);
respHand.errorResponse(data, service);
} else {
@@ -127,7 +128,8 @@ public class DefaultResponseHandler implements ProtocolResponseHandler {
protected void closeNoHandler() {
if (!service.getConnection().isClosed()) {
LOGGER.info("no handler bind in this service " + service);
Optional.ofNullable(service.getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseEndTime(service)));
if (service.getOriginSession() != null)
service.getOriginSession().trace(t -> t.setBackendResponseEndTime(service));
service.getConnection().close("no handler");
}
}
@@ -146,7 +148,8 @@ public class DefaultResponseHandler implements ProtocolResponseHandler {
//LOGGER.info("get into rowing data " + data.length);
ResponseHandler respHand = service.getResponseHandler();
if (respHand != null) {
Optional.ofNullable(service.getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendSqlAddRows(service)));
if (service.getOriginSession() != null)
service.getOriginSession().trace(t -> t.setBackendSqlAddRows(service));
respHand.rowResponse(data, null, false, service);
} else {
closeNoHandler();

View File

@@ -23,6 +23,7 @@ import com.actiontech.dble.route.util.RouterUtil;
import com.actiontech.dble.server.util.SchemaUtil;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.singleton.ProxyMeta;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlOutputVisitor;
@@ -53,6 +54,9 @@ public class DefaultDruidParser implements DruidParser {
ctx = new DruidShardingParseInfo();
schema = visitorParse(schema, rrs, stmt, schemaStatVisitor, service, isExplain);
changeSql(schema, rrs, stmt);
if (!CollectionUtil.isEmpty(ctx.getTables())) {
service.getSession2().trace(t -> t.addTable(ctx.getTables()));
}
return schema;
}

View File

@@ -12,6 +12,7 @@ import com.actiontech.dble.config.privileges.ShardingPrivileges;
import com.actiontech.dble.config.privileges.ShardingPrivileges.CheckType;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.route.util.RouterUtil;
import com.actiontech.dble.server.util.SchemaUtil;
import com.actiontech.dble.server.util.SchemaUtil.SchemaInfo;
@@ -28,6 +29,7 @@ import com.google.common.collect.Sets;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -95,6 +97,8 @@ public class DruidDeleteParser extends DruidModifyParser {
String msg = "The statement DML privilege check is not passed, sql:" + stmt.toString().replaceAll("[\\t\\n\\r]", " ");
throw new SQLNonTransientException(msg);
}
service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), schemaInfo.getTable()))));
SchemaConfig originSchema = schema;
schema = schemaInfo.getSchemaConfig();
BaseTableConfig tc = schema.getTables().get(schemaInfo.getTable());

View File

@@ -59,7 +59,8 @@ public class DruidInsertParser extends DruidInsertReplaceParser {
}
schema = schemaInfo.getSchemaConfig();
visitor.setCurrentTable(schemaInfo.getTable());
String tableName = schemaInfo.getTable();
service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), tableName))));
if (insert.getQuery() != null) {
tryRouteInsertQuery(service, rrs, stmt, visitor, schemaInfo);
return schema;
@@ -70,7 +71,6 @@ public class DruidInsertParser extends DruidInsertReplaceParser {
throw new SQLNonTransientException(msg);
}
String tableName = schemaInfo.getTable();
if (parserNoSharding(service, schemaName, schemaInfo, rrs, insert)) {
return schema;
}

View File

@@ -71,6 +71,7 @@ abstract class DruidInsertReplaceParser extends DruidModifyParser {
for (String selectTable : visitor.getSelectTableList()) {
SchemaUtil.SchemaInfo schemaInfox = SchemaUtil.getSchemaInfo(service.getUser(), schema, selectTable);
tableSet.add(schemaInfox.getSchema() + "." + schemaInfox.getTable());
service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfox.getSchema(), schemaInfox.getTable()))));
if (!ShardingPrivileges.checkPrivilege(service.getUserConfig(), schemaInfox.getSchema(), schemaInfox.getTable(), ShardingPrivileges.CheckType.SELECT)) {
String msg = "The statement DML privilege check is not passed, sql:" + stmt.toString().replaceAll("[\\t\\n\\r]", " ");
throw new SQLNonTransientException(msg);

View File

@@ -63,6 +63,7 @@ public class DruidReplaceParser extends DruidInsertReplaceParser {
//No sharding table check
schema = schemaInfo.getSchemaConfig();
String tableName = schemaInfo.getTable();
service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), tableName))));
if (parserNoSharding(service, schemaName, schemaInfo, rrs, replace)) {
return schema;
}

View File

@@ -124,6 +124,9 @@ public class DruidSelectParser extends DefaultDruidParser {
mysqlFrom instanceof SQLJoinTableSource ||
mysqlFrom instanceof SQLUnionQueryTableSource) {
super.visitorParse(schema, rrs, stmt, visitor, service, isExplain);
if (!CollectionUtil.isEmpty(ctx.getTables())) {
service.getSession2().trace(t -> t.addTable(ctx.getTables()));
}
return executeComplexSQL(schemaName, schema, rrs, selectStmt, service, visitor.getSelectTableList().size(), visitor.isContainsInnerFunction());
}
} else if (sqlSelectQuery instanceof SQLUnionQuery) {

View File

@@ -98,6 +98,7 @@ public class DruidUpdateParser extends DruidModifyParser {
super.visitorParse(originSchema, rrs, stmt, visitor, service, isExplain);
String tableName = schemaInfo.getTable();
service.getSession2().trace(t -> t.addTable(Collections.singletonList(new Pair<>(schemaInfo.getSchema(), tableName))));
BaseTableConfig tc = schema.getTables().get(tableName);
String noShardingNode = RouterUtil.isNoSharding(schema, tableName);

View File

@@ -201,7 +201,7 @@ public class RWSplitNonBlockingSession extends Session {
if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) {
long statementId = ByteUtil.readUB4(originPacket, 5);
PreparedStatementHolder holder = rwSplitService.getPrepareStatement(statementId);
trace(t -> t.setQuery(holder.getPrepareSql()));
trace(t -> t.setQuery(holder.getPrepareSql(), holder.getSqlType()));
if (holder.isMustMaster() && conn.getInstance().isReadInstance()) {
holder.setExecuteOrigin(originPacket);
PSHandler psHandler = new PSHandler(rwSplitService, holder);
@@ -225,6 +225,7 @@ public class RWSplitNonBlockingSession extends Session {
/**
* jdbc compatible pre-delivery statements
*
* @param master
* @param originPacket
* @param callback

View File

@@ -52,7 +52,7 @@ public class ServerQueryHandler implements FrontendQueryHandler {
sql = sql.substring(0, ParseUtil.findNextBreak(sql));
}
String finalSql = sql;
this.service.getSession2().trace(t -> t.setQuery(finalSql));
this.service.setExecuteSql(sql);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("{} query sql: {}", service.toString3(), (sql.length() > 1024 ? sql.substring(0, 1024) + "..." : sql));
@@ -61,6 +61,7 @@ public class ServerQueryHandler implements FrontendQueryHandler {
int rs = serverParse.parse(sql);
boolean isWithHint = serverParse.startWithHint(sql);
int sqlType = rs & 0xff;
this.service.getSession2().trace(t -> t.setQuery(finalSql, sqlType));
if (isWithHint) {
service.controlTx(TransactionOperate.QUERY);
if (sqlType == ServerParse.INSERT || sqlType == ServerParse.DELETE || sqlType == ServerParse.UPDATE ||

View File

@@ -139,7 +139,7 @@ public class FrontendByBackendByEntryByUser extends ManagerBaseTable {
map.put(COLUMN_FRONTEND_HOST, v.getValue().getFrontend().getHost());
map.put(COLUMN_BACKEND_HOST, v.getValue().getBackend().getHost());
map.put(COLUMN_BACKEND_PORT, String.valueOf(v.getValue().getBackend().getPort()));
map.put(COLUMN_SHARDING_NODE, v.getValue().getBackend().getNode());
map.put(COLUMN_SHARDING_NODE, v.getValue().getNode());
map.put(COLUMN_DB_INSTANCE, v.getValue().getBackend().getName());
map.put(COLUMN_TX_COUNT, String.valueOf(v.getValue().getTxCount()));

View File

@@ -44,7 +44,10 @@ import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
@@ -69,6 +72,8 @@ public class MySQLResponseService extends BackendService {
private static final CommandPacket COMMIT = new CommandPacket();
private static final CommandPacket ROLLBACK = new CommandPacket();
private volatile String traceRouteKey = null;
static {
COMMIT.setPacketId(0);
COMMIT.setCommand(MySQLPacket.COM_QUERY);
@@ -164,7 +169,8 @@ public class MySQLResponseService extends BackendService {
}
StringBuilder synSQL = getSynSql(service.isAutocommit(), service);
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
if (synSQL != null) {
sendQueryCmd(synSQL.toString(), service.getCharset());
}
@@ -245,7 +251,8 @@ public class MySQLResponseService extends BackendService {
if (synSQL == null) {
// not need syn connection
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
DbleServer.getInstance().getWriteToBackendQueue().add(Collections.singletonList(sendQueryCmdTask(rrn.getStatement(), clientCharset)));
return;
}
@@ -256,7 +263,8 @@ public class MySQLResponseService extends BackendService {
// and our query sql to multi command at last
synSQL.append(rrn.getStatement()).append(";");
// syn and execute others
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
// syn sharding
List<WriteToBackendTask> taskList = new ArrayList<>(1);
taskList.add(sendQueryCmdTask(synSQL.toString(), clientCharset));
@@ -272,7 +280,8 @@ public class MySQLResponseService extends BackendService {
try {
if (synSQL == null) {
// not need syn connection
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
sendQueryCmd(sql, clientCharset);
return;
}
@@ -283,7 +292,8 @@ public class MySQLResponseService extends BackendService {
// and our query sql to multi command at last
synSQL.append(sql).append(";");
// syn and execute others
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
this.sendQueryCmd(synSQL.toString(), clientCharset);
// waiting syn result...
} finally {
@@ -362,7 +372,8 @@ public class MySQLResponseService extends BackendService {
protected boolean innerRelease() {
if (isRowDataFlowing) {
if (logResponse.compareAndSet(false, true)) {
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
}
if (SystemConfig.getInstance().getEnableAsyncRelease() == 1) {
DbleServer.getInstance().getComplexQueryExecutor().execute(new BackEndRecycleRunnable(this));
@@ -434,7 +445,8 @@ public class MySQLResponseService extends BackendService {
}
if (synSQL == null) {
// not need syn connection
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
DbleServer.getInstance().getWriteToBackendQueue().add(Collections.singletonList(sendQueryCmdTask(rrn.getStatement(), clientCharset)));
waitSyncResult(rrn, clientCharset);
return;
@@ -444,7 +456,8 @@ public class MySQLResponseService extends BackendService {
// and our query sql to multi command at last
// syn and execute others
synSQL.append(rrn.getStatement()).append(";");
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendRequestTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendRequestTime(this));
taskList.add(sendQueryCmdTask(synSQL.toString(), clientCharset));
DbleServer.getInstance().getWriteToBackendQueue().add(taskList);
// waiting syn result...
@@ -479,13 +492,15 @@ public class MySQLResponseService extends BackendService {
}
public void rollback() {
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseTxEnd(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendResponseTxEnd(this));
executeSql = "rollback";
ROLLBACK.write(this);
}
public void commit() {
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseTxEnd(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendResponseTxEnd(this));
executeSql = "commit";
COMMIT.write(this);
}
@@ -511,7 +526,8 @@ public class MySQLResponseService extends BackendService {
if (task.getType() == ServiceTaskType.CLOSE) {
return;
}
Optional.ofNullable(getOriginSession()).ifPresent(p -> p.trace(t -> t.setBackendResponseTime(this)));
if (getOriginSession() != null)
getOriginSession().trace(t -> t.setBackendResponseTime(this));
}
@Override
@@ -629,6 +645,20 @@ public class MySQLResponseService extends BackendService {
return logResponse;
}
public String getTraceRouteKey() {
if (traceRouteKey == null) {
RouteResultsetNode node;
if (this.getAttachment() instanceof RouteResultsetNode) {
node = (RouteResultsetNode) this.getAttachment();
traceRouteKey = new StringBuilder().
append(this.getConnection().getId()).
append(":").append(node.getName()).
append(":").append(node.getStatementHash()).toString();
}
}
return traceRouteKey;
}
@Override
public String toString() {
return "MySQLResponseService[isExecuting = " + isExecuting + " attachment = " + attachment + " autocommitSynced = " + autocommitSynced + " isolationSynced = " + isolationSynced +

View File

@@ -39,10 +39,10 @@ public final class RWSplitMultiQueryHandler extends RWSplitQueryHandler {
}
String finalSql = sql.trim();
session.getService().setExecuteSql(finalSql);
session.trace(t -> t.setQuery(finalSql));
DbleHintParser.HintInfo hintInfo = DbleHintParser.parseRW(finalSql);
int rs = serverParse.parse(finalSql);
int sqlType = rs & 0xff;
session.trace(t -> t.setQuery(finalSql, sqlType));
Callback callback = null;
if (hintInfo != null) {
session.getService().controlTx(TransactionOperate.QUERY);

View File

@@ -40,10 +40,10 @@ public class RWSplitQueryHandler implements FrontendQueryHandler {
try {
session.getService().queryCount();
session.getService().setExecuteSql(sql);
session.trace(t -> t.setQuery(sql));
DbleHintParser.HintInfo hintInfo = DbleHintParser.parseRW(sql);
int rs = serverParse.parse(sql);
int sqlType = rs & 0xff;
session.trace(t -> t.setQuery(sql, sqlType));
if (hintInfo != null) {
session.executeHint(hintInfo, sqlType, sql, null);
session.getService().controlTx(TransactionOperate.QUERY);

View File

@@ -302,7 +302,7 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql));
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql, sqlType));
}
});
} else {
@@ -310,7 +310,7 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql));
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, false, finalSql, sqlType));
}
});
}
@@ -319,7 +319,7 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {
if (isSuccess) {
long statementId = ByteUtil.readUB4(resp, 5);
int paramCount = ByteUtil.readUB2(resp, 11);
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql));
psHolder.put(statementId, new PreparedStatementHolder(data, paramCount, true, finalSql, sqlType));
}
});
}

View File

@@ -14,12 +14,14 @@ public class PreparedStatementHolder {
private byte[] fieldType;
private boolean needAddFieldType;
private String prepareSql;
private int sqlType;
public PreparedStatementHolder(byte[] prepareOrigin, int paramsCount, boolean mustMaster, String sql) {
public PreparedStatementHolder(byte[] prepareOrigin, int paramsCount, boolean mustMaster, String sql, int sqlType) {
this.prepareOrigin = prepareOrigin;
this.paramsCount = paramsCount;
this.mustMaster = mustMaster;
this.prepareSql = sql;
this.sqlType = sqlType;
}
public boolean isMustMaster() {
@@ -63,4 +65,8 @@ public class PreparedStatementHolder {
public String getPrepareSql() {
return prepareSql;
}
public int getSqlType() {
return sqlType;
}
}

View File

@@ -207,6 +207,10 @@ public final class StatisticManager {
}
}
public boolean isPureRecordSql() {
return !enable && (samplingRate > 0 || enableAnalysis);
}
public boolean isEnableAnalysis() {
return enableAnalysis;
}

View File

@@ -46,8 +46,8 @@ public class TableStat implements Comparable<TableStat> {
this.lastExecuteTime = 0;
}
public void update(int sqlType, long endTime, List<String> relationTables) {
switch (AbstractServerParse.getBusinessType(sqlType)) {
public void update(AbstractServerParse.BusinessType businessType, long endTime, List<String> relationTables) {
switch (businessType) {
case R:
this.rCount.incrementAndGet();
break;

View File

@@ -5,15 +5,10 @@
package com.actiontech.dble.statistic.sql.analyzer;
import com.actiontech.dble.server.parser.AbstractServerParse;
import com.actiontech.dble.services.manager.information.ManagerTableUtil;
import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.*;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlASTVisitorAdapter;
import com.alibaba.druid.sql.parser.SQLParserUtils;
import com.alibaba.druid.sql.parser.SQLStatementParser;
import com.alibaba.druid.sql.visitor.SQLASTVisitorAdapter;
import com.alibaba.druid.util.JdbcConstants;
import com.actiontech.dble.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,9 +28,6 @@ public final class TableStatAnalyzer implements AbstractAnalyzer {
private Map<String, TableStat> tableStatMap = new ConcurrentHashMap<>();
private ReentrantLock lock = new ReentrantLock();
//PARSER SQL TO GET NAME
private SQLParser sqlParser = new SQLParser();
private static final TableStatAnalyzer INSTANCE = new TableStatAnalyzer();
private TableStatAnalyzer() {
@@ -47,21 +39,27 @@ public final class TableStatAnalyzer implements AbstractAnalyzer {
@Override
public void toAnalyzing(StatisticFrontendSqlEntry fEntry) {
String masterTable = null;
List<String> relationTables = new ArrayList<>();
List<String> tables = sqlParser.parseTableNames(fEntry.getSql());
for (int i = 0; i < tables.size(); i++) {
String table = tables.get(i);
if (i == 0) {
masterTable = table;
} else {
relationTables.add(table);
}
}
if (masterTable != null) {
TableStat tableStat = getTableStat(masterTable);
tableStat.update(fEntry.getSqlType(), fEntry.getEndTimeMs(), relationTables);
AbstractServerParse.BusinessType businessType = AbstractServerParse.getBusinessType(fEntry.getSqlType());
switch (businessType) {
case R:
case W:
List<String> tableList = new ArrayList<>(fEntry.getTables());
if (CollectionUtil.isEmpty(tableList) ||
(tableList.size() > 1 && tableList.size() != tableList.stream().distinct().count())) {
tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql());
}
String masterTable = null;
if (tableList.size() >= 1) {
masterTable = tableList.get(0);
}
if (masterTable != null) {
tableList.remove(0);
TableStat tableStat = getTableStat(masterTable);
tableStat.update(businessType, fEntry.getEndTimeMs(), tableList);
}
break;
default:
break;
}
}
@@ -97,81 +95,4 @@ public final class TableStatAnalyzer implements AbstractAnalyzer {
tableStatMap.clear();
}
/**
* PARSER table name
*/
private static class SQLParser {
private SQLStatement parseStmt(String sql) {
SQLStatementParser statParser = SQLParserUtils.createSQLStatementParser(sql, "mysql");
SQLStatement stmt = statParser.parseStatement();
return stmt;
}
/**
* fix SCHEMA,`
*
* @param tableName
* @return
*/
private String fixName(String tableName) {
if (tableName != null) {
tableName = tableName.replace("`", "");
int dotIdx = tableName.indexOf(".");
if (dotIdx > 0) {
tableName = tableName.substring(1 + dotIdx).trim();
}
}
return tableName;
}
/**
* PARSER SQL table name
*/
public List<String> parseTableNames(String sql) {
final List<String> tables = new ArrayList<>();
try {
SQLStatement stmt = parseStmt(sql);
if (stmt instanceof SQLReplaceStatement) {
String table = ((SQLReplaceStatement) stmt).getTableName().getSimpleName();
tables.add(fixName(table));
} else if (stmt instanceof SQLInsertStatement) {
String table = ((SQLInsertStatement) stmt).getTableName().getSimpleName();
tables.add(fixName(table));
} else if (stmt instanceof SQLUpdateStatement) {
addTableName(tables, stmt);
} else if (stmt instanceof SQLDeleteStatement) {
String table = ((SQLDeleteStatement) stmt).getTableName().getSimpleName();
tables.add(fixName(table));
} else if (stmt instanceof SQLSelectStatement) {
addTableName(tables, stmt);
}
} catch (Exception e) {
LOGGER.info("TableStatAnalyzer err:" + e.toString());
}
return tables;
}
private void addTableName(List<String> tables, SQLStatement stmt) {
String dbType = stmt.getDbType().name();
if (!StringUtil.isEmpty(dbType) && JdbcConstants.MYSQL.equals(dbType)) {
stmt.accept(new MySqlASTVisitorAdapter() {
public boolean visit(SQLExprTableSource x) {
tables.add(fixName(x.toString()));
return super.visit(x);
}
});
} else {
stmt.accept(new SQLASTVisitorAdapter() {
public boolean visit(SQLExprTableSource x) {
tables.add(fixName(x.toString()));
return super.visit(x);
}
});
}
}
}
}

View File

@@ -30,10 +30,10 @@ public final class UserStatAbstractAnalyzer implements AbstractAnalyzer {
@Override
public void toAnalyzing(final StatisticFrontendSqlEntry fEntry) {
UserStat newUserStat = new UserStat(fEntry.getFrontend());
UserStat userStat = userStatMap.putIfAbsent(fEntry.getFrontend().getUser(), newUserStat);
if (userStat == null) {
userStat = newUserStat;
UserStat userStat;
if ((userStat = userStatMap.get(fEntry.getFrontend().getUser())) == null) {
userStat = new UserStat(fEntry.getFrontend());
userStatMap.put(fEntry.getFrontend().getUser(), userStat);
}
userStat.update(fEntry);
}

View File

@@ -7,13 +7,11 @@ public class BackendInfo {
String name; // db_instance
String host;
int port;
String node; // sharding_node
public BackendInfo(BackendConnection bConn, String node) {
public BackendInfo(BackendConnection bConn) {
this.name = ((MySQLInstance) bConn.getInstance()).getName();
this.host = bConn.getHost();
this.port = bConn.getPort();
this.node = node;
}
public String getName() {
@@ -27,8 +25,4 @@ public class BackendInfo {
public int getPort() {
return port;
}
public String getNode() {
return node;
}
}

View File

@@ -8,13 +8,15 @@ package com.actiontech.dble.statistic.sql.entry;
public final class StatisticBackendSqlEntry extends StatisticEntry {
private BackendInfo backend;
private boolean isNeedToTx;
private String node; // sharding_node
public StatisticBackendSqlEntry(
FrontendInfo frontendInfo,
BackendInfo backendInfo, long startTime,
BackendInfo backendInfo, String node, long startTime,
String sql, int sqlType, long rows, long endTime) {
super(frontendInfo, startTime, sql, rows, endTime);
super(frontendInfo, startTime, sql, sqlType, rows, endTime);
this.backend = backendInfo;
this.node = node;
this.sqlType = sqlType;
}
@@ -22,6 +24,10 @@ public final class StatisticBackendSqlEntry extends StatisticEntry {
return backend;
}
public String getNode() {
return node;
}
public String getSql() {
return sql;
}
@@ -35,14 +41,14 @@ public final class StatisticBackendSqlEntry extends StatisticEntry {
}
public String getKey() {
StringBuffer key = new StringBuffer();
StringBuilder key = new StringBuilder();
key.append(getFrontend().getUserId());
key.append(":");
key.append(getFrontend().getUser());
key.append(":");
key.append(getFrontend().getHost());
key.append("|");
key.append(getBackend().getNode());
key.append(getNode());
key.append(":");
key.append(getBackend().getName());
key.append(":");

View File

@@ -11,13 +11,14 @@ public class StatisticEntry {
private FrontendInfo frontend;
protected long rows;
protected String sql;
protected int sqlType = -99;
protected int sqlType;
protected long duration;
public StatisticEntry(FrontendInfo frontendInfo, long startTime,
String sql, long rows, long endTime) {
String sql, int sqlType, long rows, long endTime) {
this.frontend = frontendInfo;
this.sql = sql.replaceAll("[\\t\\n\\r]", " ").trim();
this.sql = sql;
this.sqlType = sqlType;
this.rows = rows;
this.duration = endTime - startTime;
}
@@ -35,10 +36,7 @@ public class StatisticEntry {
}
public int getSqlType() {
if (null == sql) {
return sqlType;
}
if (sqlType == -99) {
if (sqlType == -99 && sql != null) {
this.sqlType = ServerParseFactory.getShardingParser().parse(sql) & 0xff;
}
return sqlType;

View File

@@ -5,6 +5,8 @@
package com.actiontech.dble.statistic.sql.entry;
import java.util.ArrayList;
public class StatisticFrontendSqlEntry extends StatisticEntry {
private String schema;
@@ -14,11 +16,12 @@ public class StatisticFrontendSqlEntry extends StatisticEntry {
private long resultSize;
private long startTimeMs;
private long endTimeMs;
private ArrayList<String> tableList;
public StatisticFrontendSqlEntry(FrontendInfo frontendInfo, long startTime, long startTimeMs,
String schema, String sql, long txId, long examinedRows, long rows,
long netOutBytes, long resultSize, long endTime, long endTimeMs) {
super(frontendInfo, startTime, sql, rows, endTime);
String schema, String sql, int sqlType, long txId, long examinedRows, long rows,
long netOutBytes, long resultSize, long endTime, long endTimeMs, ArrayList<String> tableList) {
super(frontendInfo, startTime, sql, sqlType, rows, endTime);
this.schema = schema;
this.txId = txId;
this.examinedRows = examinedRows;
@@ -26,6 +29,7 @@ public class StatisticFrontendSqlEntry extends StatisticEntry {
this.resultSize = resultSize;
this.startTimeMs = startTimeMs;
this.endTimeMs = endTimeMs;
this.tableList = tableList;
}
public long getStartTimeMs() {
@@ -59,4 +63,8 @@ public class StatisticFrontendSqlEntry extends StatisticEntry {
public long getResultSize() {
return resultSize;
}
public ArrayList<String> getTables() {
return tableList;
}
}

View File

@@ -35,7 +35,10 @@ public class AnalysisHandler implements StatisticDataHandler {
if (!StatisticManager.getInstance().isEnableAnalysis()) {
return;
}
StatisticEntry entry = statisticEvent.getEntry();
handle(statisticEvent.getEntry());
}
public void handle(StatisticEntry entry) {
if (entry instanceof StatisticFrontendSqlEntry) {
StatisticFrontendSqlEntry frontendSqlEntry = (StatisticFrontendSqlEntry) entry;
for (AbstractAnalyzer listener : listeners) {

View File

@@ -11,6 +11,7 @@ import com.actiontech.dble.statistic.sql.StatisticManager;
import com.actiontech.dble.statistic.sql.entry.FrontendInfo;
import com.actiontech.dble.statistic.sql.entry.StatisticEntry;
import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry;
import com.actiontech.dble.util.CollectionUtil;
import java.util.*;
@@ -43,7 +44,11 @@ public class AssociateTablesByEntryByUserCalcHandler implements StatisticDataHan
if (entry instanceof StatisticFrontendSqlEntry) {
StatisticFrontendSqlEntry fEntry = ((StatisticFrontendSqlEntry) entry);
if (fEntry.getSqlType() == 7) {
List<String> tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql());
List<String> tableList = new ArrayList<>(fEntry.getTables());
if (CollectionUtil.isEmpty(tableList) ||
(tableList.size() > 1 && tableList.size() != tableList.stream().distinct().count())) {
tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql());
}
if (!tableList.isEmpty() && tableList.size() > 1) {
Collections.sort(tableList);
String tables = String.join(",", tableList);

View File

@@ -51,10 +51,11 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH
boolean isNew = currRecord == null;
if (isNew) {
checkEliminate();
currRecord = new Record(entry.getFrontend().getUserId(), entry.getFrontend(), backendSqlEntry.getBackend());
currRecord = new Record(entry.getFrontend().getUserId(), entry.getFrontend(), backendSqlEntry.getBackend(), backendSqlEntry.getNode());
}
if (backendSqlEntry.getSqlType() == 4 || backendSqlEntry.getSqlType() == 11 || backendSqlEntry.getSqlType() == 3 || backendSqlEntry.getSqlType() == 7) {
switch (backendSqlEntry.getSqlType()) {
int sqlType = backendSqlEntry.getSqlType();
if (sqlType == 4 || sqlType == 11 || sqlType == 3 || sqlType == 7) {
switch (sqlType) {
case ServerParse.INSERT:
currRecord.addInsert(backendSqlEntry.getRows(), backendSqlEntry.getDuration());
break;
@@ -102,6 +103,7 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH
int entry;
FrontendInfo frontend;
BackendInfo backend;
String node;
int txCount = 0;
long txRows = 0L;
@@ -125,10 +127,11 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH
long lastUpdateTime = 0L;
public Record(int entry, FrontendInfo frontend, BackendInfo backend) {
public Record(int entry, FrontendInfo frontend, BackendInfo backend, String node) {
this.entry = entry;
this.frontend = frontend;
this.backend = backend;
this.node = node;
}
public void incrementTx() {
@@ -185,6 +188,10 @@ public class FrontendByBackendByEntryByUserCalcHandler implements StatisticDataH
return backend;
}
public String getNode() {
return node;
}
public long getUpdateTime() {
return updateTime;
}

View File

@@ -16,11 +16,13 @@ import com.alibaba.druid.sql.visitor.ParameterizedOutputVisitorUtils;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class SqlStatisticHandler implements StatisticDataHandler {
private final ConcurrentSkipListMap<Long, TxRecord> txRecords = new ConcurrentSkipListMap<>();
private AtomicLong txRecordSize = new AtomicLong(0);
private volatile BitSet sampleDecisions;
public SqlStatisticHandler() {
@@ -32,16 +34,20 @@ public class SqlStatisticHandler implements StatisticDataHandler {
if (StatisticManager.getInstance().getSamplingRate() == 0) {
return;
}
handle(statisticEvent.getEntry());
}
StatisticEntry entry = statisticEvent.getEntry();
public void handle(StatisticEntry entry) {
if (entry instanceof StatisticFrontendSqlEntry) {
StatisticFrontendSqlEntry frontendSqlEntry = (StatisticFrontendSqlEntry) entry;
if (sampleDecisions.get((int) (frontendSqlEntry.getTxId() % 100))) {
if (null == txRecords.get(frontendSqlEntry.getTxId())) {
if (txRecords.size() >= StatisticManager.getInstance().getSqlLogSize()) {
if (txRecordSize.intValue() >= StatisticManager.getInstance().getSqlLogSize()) {
txRecords.pollFirstEntry();
txRecordSize.decrementAndGet();
}
txRecords.put(frontendSqlEntry.getTxId(), new TxRecord(frontendSqlEntry));
txRecordSize.incrementAndGet();
checkEliminate();
} else {
txRecords.get(frontendSqlEntry.getTxId()).getSqls().add(new SQLRecord(frontendSqlEntry));
@@ -52,9 +58,10 @@ public class SqlStatisticHandler implements StatisticDataHandler {
private void checkEliminate() {
int removeIndex;
if ((removeIndex = txRecords.size() - StatisticManager.getInstance().getSqlLogSize()) > 0) {
if ((removeIndex = txRecordSize.intValue() - StatisticManager.getInstance().getSqlLogSize()) > 0) {
while (removeIndex-- > 0) {
txRecords.pollFirstEntry();
txRecordSize.decrementAndGet();
}
}
}
@@ -68,6 +75,7 @@ public class SqlStatisticHandler implements StatisticDataHandler {
@Override
public void clear() {
txRecords.clear();
txRecordSize.set(0);
}
private BitSet randomBitSet(int cardinality, Random rnd) {
@@ -149,18 +157,11 @@ public class SqlStatisticHandler implements StatisticDataHandler {
private long duration;
private long startTime;
private AtomicBoolean init = new AtomicBoolean(false);
public SQLRecord(StatisticFrontendSqlEntry entry) {
this.sqlId = SQL_ID_GENERATOR.incrementAndGet();
this.stmt = entry.getSql();
if (stmt.equalsIgnoreCase("begin")) {
this.sqlDigest = "begin";
} else {
try {
this.sqlDigest = ParameterizedOutputVisitorUtils.parameterize(this.stmt, DbType.mysql).replaceAll("[\\t\\n\\r]", " ");
} catch (RuntimeException e) {
this.sqlDigest = "Other";
}
}
this.sqlType = entry.getSqlType();
this.txId = entry.getTxId();
@@ -228,6 +229,18 @@ public class SqlStatisticHandler implements StatisticDataHandler {
}
public String getSqlDigest() {
if (init.compareAndSet(false, true)) {
try {
if (stmt.equalsIgnoreCase("begin")) {
this.sqlDigest = "begin";
} else {
String tmpStmt = ParameterizedOutputVisitorUtils.parameterize(this.stmt, DbType.mysql);
this.sqlDigest = tmpStmt.replaceAll("[\\t\\n\\r]", " ");
}
} catch (RuntimeException e) {
this.sqlDigest = "Other";
}
}
return sqlDigest;
}

View File

@@ -12,6 +12,7 @@ import com.actiontech.dble.statistic.sql.StatisticManager;
import com.actiontech.dble.statistic.sql.entry.FrontendInfo;
import com.actiontech.dble.statistic.sql.entry.StatisticEntry;
import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry;
import com.actiontech.dble.util.CollectionUtil;
import java.util.*;
@@ -43,48 +44,60 @@ public class TableByUserByEntryCalcHandler implements StatisticDataHandler {
synchronized (records) {
if (entry instanceof StatisticFrontendSqlEntry) {
StatisticFrontendSqlEntry fEntry = ((StatisticFrontendSqlEntry) entry);
Set<String> tableSet = new HashSet<>(ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql()));
if (tableSet.isEmpty()) {
// dual, no table
toRecord("null", fEntry);
} else {
for (String t : tableSet) {
toRecord(t, fEntry);
int sqlType = fEntry.getSqlType();
if (sqlType == 4 || sqlType == 11 || sqlType == 3 || sqlType == 7) {
List<String> tableList = new ArrayList<>(fEntry.getTables());
if (CollectionUtil.isEmpty(tableList) ||
(tableList.size() > 1 && tableList.size() != tableList.stream().distinct().count())) {
tableList = ManagerTableUtil.getTables(fEntry.getSchema(), fEntry.getSql());
}
if (tableList.isEmpty()) {
// dual, no table
toRecord("null", fEntry, sqlType);
} else {
if (sqlType != 7 && tableList.size() > 1) {
toRecord(tableList.get(0), fEntry, sqlType);
} else {
Set<String> tableSet = new HashSet<>();
tableSet.addAll(tableList);
for (String t : tableSet) {
toRecord(t, fEntry, sqlType);
}
}
}
}
}
}
}
private void toRecord(String table, StatisticFrontendSqlEntry fEntry) {
if (fEntry.getSqlType() == 4 || fEntry.getSqlType() == 11 || fEntry.getSqlType() == 3 || fEntry.getSqlType() == 7) {
String key = fEntry.getFrontend().getUserId() + "-" + fEntry.getFrontend().getUser() + "-" + table;
Record currRecord = records.get(key);
boolean isNew = currRecord == null;
if (isNew) {
checkEliminate();
currRecord = new Record(fEntry.getFrontend().getUserId(), fEntry.getFrontend(), table);
}
switch (fEntry.getSqlType()) {
case ServerParse.INSERT:
currRecord.addInsert(fEntry.getRows(), fEntry.getDuration());
break;
case ServerParse.UPDATE:
currRecord.addUpdate(fEntry.getRows(), fEntry.getDuration());
break;
case ServerParse.DELETE:
currRecord.addDelete(fEntry.getRows(), fEntry.getDuration());
break;
case ServerParse.SELECT:
currRecord.addSelect(fEntry.getExaminedRows(), fEntry.getRows(), fEntry.getDuration());
break;
default:
// ignore
break;
}
if (isNew) {
records.put(key, currRecord);
}
private void toRecord(String table, StatisticFrontendSqlEntry fEntry, int sqlType) {
String key = (new StringBuilder().append(fEntry.getFrontend().getUserId()).append("-").append(fEntry.getFrontend().getUser()).append("-").append(table)).toString();
Record currRecord = records.get(key);
boolean isNew = currRecord == null;
if (isNew) {
checkEliminate();
currRecord = new Record(fEntry.getFrontend().getUserId(), fEntry.getFrontend(), table);
}
switch (sqlType) {
case ServerParse.INSERT:
currRecord.addInsert(fEntry.getRows(), fEntry.getDuration());
break;
case ServerParse.UPDATE:
currRecord.addUpdate(fEntry.getRows(), fEntry.getDuration());
break;
case ServerParse.DELETE:
currRecord.addDelete(fEntry.getRows(), fEntry.getDuration());
break;
case ServerParse.SELECT:
currRecord.addSelect(fEntry.getExaminedRows(), fEntry.getRows(), fEntry.getDuration());
break;
default:
// ignore
break;
}
if (isNew) {
records.put(key, currRecord);
}
}

View File

@@ -20,8 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger;
public class QueryTimeCost implements Cloneable {
public static final Logger LOGGER = LoggerFactory.getLogger(QueryTimeCost.class);
private final CostTimeProvider provider;
private final ComplexQueryProvider xProvider;
private volatile CostTimeProvider provider;
private volatile ComplexQueryProvider xProvider;
private long connId = 0;
private long requestTime = 0;
@@ -38,11 +38,11 @@ public class QueryTimeCost implements Cloneable {
public QueryTimeCost(long connId) {
this.connId = connId;
this.xProvider = new ComplexQueryProvider();
this.provider = new CostTimeProvider();
}
public void setRequestTime(long requestTime) {
if (this.xProvider == null) this.xProvider = new ComplexQueryProvider();
if (this.provider == null) this.provider = new CostTimeProvider();
reset();
this.requestTime = requestTime;
provider.beginRequest(connId);

View File

@@ -6,12 +6,14 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.services.rwsplit.RWSplitService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.function.Consumer;
public abstract class AbstractTrackProbe {
@@ -23,7 +25,10 @@ public abstract class AbstractTrackProbe {
public void startProcess() {
}
public void setQuery(String sql) {
public void setQuery(String sql, int sqlType) {
}
public void addTable(List<Pair<String, String>> tables) {
}
public void endParse() {

View File

@@ -4,16 +4,14 @@ import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.statistic.sql.StatisticManager;
import com.actiontech.dble.statistic.sql.entry.BackendInfo;
import com.actiontech.dble.statistic.sql.entry.FrontendInfo;
import com.actiontech.dble.statistic.sql.entry.StatisticBackendSqlEntry;
import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
public class RwTraceResult implements Cloneable {
@@ -24,24 +22,26 @@ public class RwTraceResult implements Cloneable {
protected long requestEnd;
protected long requestEndMs;
protected String sql;
protected int sqlType = -1;
protected String schema;
protected long sqlRows;
protected volatile long examinedRows;
protected long netOutBytes;
protected long resultSize;
protected FrontendInfo frontendInfo;
final RWSplitNonBlockingSession currentSession;
protected List<ActualRoute> actualRouteList = Lists.newCopyOnWriteArrayList();
protected volatile long previousTxId = 0;
// only samplingRate=100
protected boolean pureRecordSql = true;
public RwTraceResult(RWSplitNonBlockingSession currentSession) {
this.currentSession = currentSession;
this.frontendInfo = new FrontendInfo(currentSession.getService());
}
public void setRequestTime(long time, long timeMs) {
reset();
this.pureRecordSql = StatisticManager.getInstance().isPureRecordSql();
this.requestStart = time;
this.requestStartMs = timeMs;
}
@@ -50,14 +50,20 @@ public class RwTraceResult implements Cloneable {
this.parseStart = time;
}
public void setQuery(String sql0) {
public void setQuery(String sql0, int sqlType0) {
this.schema = currentSession.getService().getSchema();
this.sql = sql0;
this.sqlType = sqlType0;
// multi-query
if (currentSession.getIsMultiStatement().get() && currentSession.getMultiQueryHandler() != null) {
Optional<ActualRoute> find = actualRouteList.stream().filter(f -> (f.handler == currentSession.getMultiQueryHandler())).findFirst();
if (find.isPresent()) {
ActualRoute ar = find.get();
ActualRoute ar = null;
for (ActualRoute a : actualRouteList) {
if (a.handler == currentSession.getMultiQueryHandler()) {
ar = a;
break;
}
}
if (ar != null) {
ar.setSql(sql0);
ar.setRow(0);
ar.setFinished(0);
@@ -70,9 +76,15 @@ public class RwTraceResult implements Cloneable {
public void setBackendRequestTime(MySQLResponseService service, long time) {
final ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null && sql != null) {
Optional<ActualRoute> find = actualRouteList.stream().filter(f -> (f.handler == responseHandler)).findFirst();
if (!find.isPresent()) {
ActualRoute ar = new ActualRoute(responseHandler, sql, time);
ActualRoute ar = null;
for (ActualRoute a : actualRouteList) {
if (a.handler == responseHandler) {
ar = a;
break;
}
}
if (ar == null) {
ar = new ActualRoute(responseHandler, sql, time);
actualRouteList.add(ar);
}
}
@@ -82,9 +94,14 @@ public class RwTraceResult implements Cloneable {
public void setBackendSqlAddRows(MySQLResponseService service, Long num) {
final ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null && sql != null) {
Optional<ActualRoute> find = actualRouteList.stream().filter(f -> (f.handler == responseHandler)).findFirst();
if (find.isPresent()) {
ActualRoute ar = find.get();
ActualRoute ar = null;
for (ActualRoute a : actualRouteList) {
if (a.handler == responseHandler) {
ar = a;
break;
}
}
if (ar != null) {
if (num == null) {
ar.addRow();
} else {
@@ -97,16 +114,21 @@ public class RwTraceResult implements Cloneable {
public void setBackendResponseEndTime(MySQLResponseService service, long time) {
ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null && sql != null) {
Optional<ActualRoute> find = actualRouteList.stream().filter(f -> (f.handler == responseHandler && f.finished == 0)).findFirst();
if (find.isPresent()) {
ActualRoute ar = find.get();
ActualRoute ar = null;
for (ActualRoute a : actualRouteList) {
if (a.handler == responseHandler && a.finished == 0) {
ar = a;
break;
}
}
if (ar != null) {
ar.setFinished(time);
examinedRows += ar.getRow();
if (pureRecordSql) return;
StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry(
frontendInfo,
new BackendInfo(service.getConnection(), "-"),
ar.getRequestTime(), ar.getSql(), -99, ar.getRow(), ar.getFinished());
currentSession.getTraceFrontendInfo(),
service.getConnection().getTraceBackendInfo(), "-",
ar.getRequestTime(), ar.getSql(), sqlType, ar.getRow(), ar.getFinished());
bEntry.setNeedToTx(isNeedToTx());
StatisticManager.getInstance().push(bEntry);
}
@@ -114,10 +136,11 @@ public class RwTraceResult implements Cloneable {
}
public void setBackendResponseTxEnd(MySQLResponseService service, long time) {
if (pureRecordSql) return;
if (!isNeedToTx()) {
StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry(
new FrontendInfo(currentSession.getService()),
new BackendInfo(service.getConnection(), "-"),
currentSession.getTraceFrontendInfo(),
service.getConnection().getTraceBackendInfo(), "-",
time, "/** txEnd **/", 0, 0, time);
bEntry.setNeedToTx(true);
StatisticManager.getInstance().push(bEntry);
@@ -135,9 +158,9 @@ public class RwTraceResult implements Cloneable {
this.requestEnd = time;
this.requestEndMs = timeMs;
if (this.isCompletedV1() && isSuccess) {
StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(frontendInfo, requestStart, requestStartMs,
schema, sql, currentSession.getService().getTxId(), examinedRows, sqlRows,
netOutBytes, resultSize, requestEnd, requestEndMs);
StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(currentSession.getTraceFrontendInfo(), requestStart, requestStartMs,
schema, sql, sqlType, currentSession.getService().getTxId(), examinedRows, sqlRows,
netOutBytes, resultSize, requestEnd, requestEndMs, new ArrayList<>());
StatisticManager.getInstance().push(f);
}
}
@@ -145,7 +168,6 @@ public class RwTraceResult implements Cloneable {
public void setExit() {
reset();
frontendInfo = null;
}
public boolean isNeedToTx() {
@@ -170,6 +192,7 @@ public class RwTraceResult implements Cloneable {
requestEnd = 0;
requestEndMs = 0;
sql = null;
sqlType = -1;
schema = null;
sqlRows = 0;
examinedRows = 0;

View File

@@ -30,8 +30,8 @@ public class RwTrackProbe extends AbstractTrackProbe {
sqlTracking(t -> t.startProcess(System.nanoTime()));
}
public void setQuery(String sql) {
sqlTracking(t -> t.setQuery(sql));
public void setQuery(String sql, int sqlType) {
sqlTracking(t -> t.setQuery(sql, sqlType));
}
public void setBackendRequestTime(MySQLResponseService service) {

View File

@@ -11,13 +11,11 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.status.SlowQueryLog;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.statistic.sql.StatisticManager;
import com.actiontech.dble.statistic.sql.entry.BackendInfo;
import com.actiontech.dble.statistic.sql.entry.FrontendInfo;
import com.actiontech.dble.statistic.sql.entry.StatisticBackendSqlEntry;
import com.actiontech.dble.statistic.sql.entry.StatisticFrontendSqlEntry;
import com.google.common.collect.Lists;
@@ -25,9 +23,9 @@ import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
@@ -60,12 +58,13 @@ public class TraceResult implements Cloneable {
protected boolean subQuery = false;
protected String sql;
protected int sqlType = -1;
protected String schema;
protected ArrayList<String> tableList = new ArrayList<>();
protected long sqlRows = 0;
protected long netOutBytes;
protected long resultSize;
protected TraceResult previous = null;
protected FrontendInfo frontendInfo;
protected NonBlockingSession currentSession;
/*
* when 'set trace = 1' or 'enableSlowLog==true', need to record the time spent in each phase;
@@ -77,17 +76,18 @@ public class TraceResult implements Cloneable {
*
*/
protected boolean isDetailTrace = false;
// only samplingRate=100
protected boolean pureRecordSql = true;
public TraceResult(NonBlockingSession session0) {
this.currentSession = session0;
ShardingService shardingService = currentSession.getShardingService();
this.frontendInfo = new FrontendInfo(shardingService);
}
public void setRequestTime(long time, long timeMs) {
copyToPrevious();
reset();
this.isDetailTrace = currentSession.isTraceEnable() || SlowQueryLog.getInstance().isEnableSlowLog();
this.pureRecordSql = !isDetailTrace && StatisticManager.getInstance().isPureRecordSql();
this.requestStart = time;
this.requestStartMs = timeMs;
}
@@ -98,9 +98,16 @@ public class TraceResult implements Cloneable {
this.parseStart = time;
}
public void setQuery(String sql0) {
public void setQuery(String sql0, int sqlType0) {
this.schema = currentSession.getShardingService().getSchema();
this.sql = sql0;
this.sqlType = sqlType0;
}
public void addTable(List<Pair<String, String>> tables) { // schema.table
for (Pair<String, String> p : tables) {
tableList.add(p.getKey() + "." + p.getValue());
}
}
public void endParse(long time) {
@@ -130,10 +137,16 @@ public class TraceResult implements Cloneable {
final ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + +node.getStatementHash();
Optional<BackendRoute> find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key))).findFirst();
if (!find.isPresent()) {
BackendRoute ar = new BackendRoute(responseHandler, key, node.getName(), node.getStatement(), time);
String key = service.getTraceRouteKey();
BackendRoute ar = null;
for (BackendRoute b : backendRouteList) {
if (b.handler == responseHandler && b.routeKey.equals(key)) {
ar = b;
break;
}
}
if (ar == null) {
ar = new BackendRoute(responseHandler, key, node.getName(), node.getStatement(), time);
backendRouteList.add(ar);
}
}
@@ -142,11 +155,15 @@ public class TraceResult implements Cloneable {
public void setBackendResponseTime(MySQLResponseService service, long time) {
final ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + node.getStatementHash();
Optional<BackendRoute> find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key) && f.firstRevTime == 0)).findFirst();
if (find.isPresent()) {
BackendRoute ar = find.get();
String key = service.getTraceRouteKey();
BackendRoute ar = null;
for (BackendRoute b : backendRouteList) {
if (b.handler == responseHandler && b.routeKey.equals(key) && b.firstRevTime == 0) {
ar = b;
break;
}
}
if (ar != null) {
ar.setFirstRevTime(time);
ar.setMysqlResponseService(service);
}
@@ -156,11 +173,15 @@ public class TraceResult implements Cloneable {
public void setBackendSqlAddRows(MySQLResponseService service, Long num) {
final ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + node.getStatementHash();
Optional<BackendRoute> find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key) && f.firstRevTime != 0)).findFirst();
if (find.isPresent()) {
BackendRoute ar = find.get();
String key = service.getTraceRouteKey();
BackendRoute ar = null;
for (BackendRoute b : backendRouteList) {
if (b.handler == responseHandler && b.routeKey.equals(key) && b.firstRevTime != 0) {
ar = b;
break;
}
}
if (ar != null) {
if (num == null) {
ar.getRow().incrementAndGet();
} else {
@@ -174,15 +195,21 @@ public class TraceResult implements Cloneable {
ResponseHandler responseHandler = service.getResponseHandler();
if (responseHandler != null) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
String key = service.getConnection().getId() + ":" + node.getName() + ":" + node.getStatementHash();
Optional<BackendRoute> find = backendRouteList.stream().filter(f -> (f.handler == responseHandler && f.routeKey.equals(key) && f.firstRevTime != 0 && f.finished == 0)).findFirst();
if (find.isPresent()) {
BackendRoute ar = find.get();
String key = service.getTraceRouteKey();
BackendRoute ar = null;
for (BackendRoute b : backendRouteList) {
if (b.handler == responseHandler && b.routeKey.equals(key) && b.firstRevTime != 0 && b.finished == 0) {
ar = b;
break;
}
}
if (ar != null) {
ar.setFinished(time);
ar.setAutocommit(service.isAutocommit());
if (pureRecordSql) return;
StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry(
frontendInfo,
new BackendInfo(service.getConnection(), node.getName()),
currentSession.getTraceFrontendInfo(),
service.getConnection().getTraceBackendInfo(), node.getName(),
ar.getRequestTime(), ar.getSql(), node.getSqlType(), ar.getRow().get(), ar.getFinished());
bEntry.setNeedToTx(ar.isAutocommit());
StatisticManager.getInstance().push(bEntry);
@@ -192,18 +219,24 @@ public class TraceResult implements Cloneable {
public void setBackendTerminateByComplex(MultiNodeMergeHandler mergeHandler, long time) {
for (BaseDMLHandler handler : mergeHandler.getExeHandlers()) {
Optional<BackendRoute> find = backendRouteList.stream().filter(f -> (f.handler == handler && f.firstRevTime != 0 && f.finished == 0)).findFirst();
if (find.isPresent()) {
BackendRoute ar = find.get();
BackendRoute ar = null;
for (BackendRoute b : backendRouteList) {
if (b.handler == handler && b.firstRevTime != 0 && b.finished == 0) {
ar = b;
break;
}
}
if (ar != null) {
ar.setFinished(time);
if (pureRecordSql) return;
MySQLResponseService service;
if ((service = ar.getMysqlResponseService()) != null) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
if (node != null) {
ar.setAutocommit(service.isAutocommit());
StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry(
frontendInfo,
new BackendInfo(service.getConnection(), node.getName()),
currentSession.getTraceFrontendInfo(),
service.getConnection().getTraceBackendInfo(), node.getName(),
ar.getRequestTime(), ar.getSql(), node.getSqlType(), ar.getRow().get(), ar.getFinished());
bEntry.setNeedToTx(ar.isAutocommit());
StatisticManager.getInstance().push(bEntry);
@@ -215,11 +248,12 @@ public class TraceResult implements Cloneable {
// commit、rollback、quit
public void setBackendResponseTxEnd(MySQLResponseService service, long time) {
if (pureRecordSql) return;
if (!service.isAutocommit()) {
RouteResultsetNode node = (RouteResultsetNode) service.getAttachment();
StatisticBackendSqlEntry bEntry = new StatisticBackendSqlEntry(
new FrontendInfo(currentSession.getShardingService()),
new BackendInfo(service.getConnection(), node.getName()),
currentSession.getTraceFrontendInfo(),
service.getConnection().getTraceBackendInfo(), node.getName(),
time, "/** txEnd **/", 0, 0, time);
bEntry.setNeedToTx(true);
StatisticManager.getInstance().push(bEntry);
@@ -245,10 +279,10 @@ public class TraceResult implements Cloneable {
this.requestEnd = time;
this.requestEndMs = timeMs;
if (this.isCompletedV1() && isSuccess) {
long examinedRows = backendRouteList.stream().filter(f -> f.finished != 0).mapToLong(m -> m.getRow().get()).sum();
StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(frontendInfo, requestStart, requestStartMs,
schema, sql, currentSession.getShardingService().getTxId(), examinedRows, sqlRows,
netOutBytes, resultSize, requestEnd, requestEndMs);
long examinedRows = getExaminedRows();
StatisticFrontendSqlEntry f = new StatisticFrontendSqlEntry(currentSession.getTraceFrontendInfo(), requestStart, requestStartMs,
schema, sql, sqlType, currentSession.getShardingService().getTxId(), examinedRows, sqlRows,
netOutBytes, resultSize, requestEnd, requestEndMs, new ArrayList<String>(tableList));
StatisticManager.getInstance().push(f);
if (isDetailTrace) {
SlowQueryLog.getInstance().putSlowQueryLog(currentSession.getShardingService(), this.clone());
@@ -260,10 +294,19 @@ public class TraceResult implements Cloneable {
}
}
private long getExaminedRows() {
long examinedRows = 0;
for (BackendRoute backendRoute : backendRouteList) {
if (backendRoute.finished != 0) {
examinedRows += backendRoute.getRow().get();
}
}
return examinedRows;
}
public void setExit() {
reset();
previous = null;
frontendInfo = null;
}
public void setShardingNodes(RouteResultsetNode[] shardingNodes) {
@@ -324,7 +367,9 @@ public class TraceResult implements Cloneable {
adtCommitBegin = 0;
adtCommitEnd = 0;
sql = null;
sqlType = -1;
schema = null;
tableList.clear();
sqlRows = 0;
netOutBytes = 0;
resultSize = 0;

View File

@@ -6,6 +6,7 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.SessionStage;
import com.actiontech.dble.server.status.SlowQueryLog;
@@ -15,9 +16,8 @@ import com.actiontech.dble.statistic.stat.QueryTimeCost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
public class TrackProbe extends AbstractTrackProbe {
public static final Logger LOGGER = LoggerFactory.getLogger(TrackProbe.class);
@@ -41,142 +41,183 @@ public class TrackProbe extends AbstractTrackProbe {
sessionStage = SessionStage.Read_SQL;
long requestTime = System.nanoTime();
isTrace = currentSession.isTraceEnable() || SlowQueryLog.getInstance().isEnableSlowLog() || StatisticManager.getInstance().mainSwitch();
sqlTracking(t -> t.setRequestTime(requestTime, System.currentTimeMillis()));
if (isTrace)
traceResult.setRequestTime(requestTime, System.currentTimeMillis());
timeCost = (SystemConfig.getInstance().getUseCostTimeStat() != 0) && !(ThreadLocalRandom.current().nextInt(100) >= SystemConfig.getInstance().getCostSamplePercent());
sqlCosting(c -> c.setRequestTime(requestTime));
if (timeCost)
queryTimeCost.setRequestTime(requestTime);
}
public void startProcess() {
sessionStage = SessionStage.Parse_SQL;
long startProcess = System.nanoTime();
sqlTracking(t -> t.startProcess(startProcess));
sqlCosting(c -> c.startProcess());
if (isTrace)
traceResult.startProcess(startProcess);
if (timeCost)
queryTimeCost.startProcess();
}
public void setQuery(String sql) {
sqlTracking(t -> t.setQuery(sql));
public void setQuery(String sql, int sqlType) {
if (isTrace)
traceResult.setQuery(sql, sqlType);
}
public void addTable(List<Pair<String, String>> tables) {
if (isTrace)
traceResult.addTable(tables);
}
public void endParse() {
sessionStage = SessionStage.Route_Calculation;
sqlTracking(t -> t.endParse(System.nanoTime()));
sqlCosting(c -> c.endParse());
if (isTrace)
traceResult.endParse(System.nanoTime());
if (timeCost)
queryTimeCost.endParse();
}
public void endRoute(RouteResultset rrs) {
sessionStage = SessionStage.Prepare_to_Push;
sqlTracking(t -> t.endRoute(System.nanoTime()));
sqlCosting(c -> c.endRoute(rrs));
if (isTrace)
traceResult.endRoute(System.nanoTime());
if (timeCost)
queryTimeCost.endRoute(rrs);
}
public void endComplexRoute() {
sqlCosting(c -> c.endComplexRoute());
if (timeCost)
queryTimeCost.endComplexRoute();
}
public void endComplexExecute() {
sqlCosting(c -> c.endComplexExecute());
if (timeCost)
queryTimeCost.endComplexExecute();
}
public void readyToDeliver() {
sqlCosting(c -> c.readyToDeliver());
if (timeCost)
queryTimeCost.readyToDeliver();
}
public void setPreExecuteEnd(TraceResult.SqlTraceType type) {
sessionStage = SessionStage.Execute_SQL;
sqlTracking(t -> t.setPreExecuteEnd(type, System.nanoTime()));
if (isTrace)
traceResult.setPreExecuteEnd(type, System.nanoTime());
}
public void setSubQuery() {
sqlTracking(t -> t.setSubQuery());
if (isTrace)
traceResult.setSubQuery();
}
public void setBackendRequestTime(MySQLResponseService service) {
long requestTime0 = System.nanoTime();
sqlTracking(t -> t.setBackendRequestTime(service, requestTime0));
sqlCosting(c -> c.setBackendRequestTime(service.getConnection().getId(), requestTime0));
if (isTrace)
traceResult.setBackendRequestTime(service, requestTime0);
if (timeCost)
queryTimeCost.setBackendRequestTime(service.getConnection().getId(), requestTime0);
}
// receives the response package (before being pushed into BackendService.taskQueue)
public void setBackendResponseTime(MySQLResponseService service) {
sessionStage = SessionStage.Fetching_Result;
long responseTime = System.nanoTime();
sqlTracking(t -> t.setBackendResponseTime(service, responseTime));
sqlCosting(c -> c.setBackendResponseTime(service.getConnection().getId(), responseTime));
if (isTrace)
traceResult.setBackendResponseTime(service, responseTime);
if (timeCost)
queryTimeCost.setBackendResponseTime(service.getConnection().getId(), responseTime);
}
// start processing the response package (the first package taken out of the BackendService.taskQueue)
public void startExecuteBackend() {
sqlCosting(c -> c.startExecuteBackend());
if (timeCost)
queryTimeCost.startExecuteBackend();
}
// When multiple nodes are queried, all nodes return the point in time of the EOF package
public void allBackendConnReceive() {
sqlCosting(c -> c.allBackendConnReceive());
if (timeCost)
queryTimeCost.allBackendConnReceive();
}
public void setBackendSqlAddRows(MySQLResponseService service) {
sqlTracking(t -> t.setBackendSqlAddRows(service, null));
if (isTrace)
traceResult.setBackendSqlAddRows(service, null);
}
public void setBackendSqlSetRows(MySQLResponseService service, long rows) {
sqlTracking(t -> t.setBackendSqlAddRows(service, rows));
if (isTrace)
traceResult.setBackendSqlAddRows(service, rows);
}
// the final response package received,(include connection is accidentally closed or released)
public void setBackendResponseEndTime(MySQLResponseService service) {
sessionStage = SessionStage.First_Node_Fetched_Result;
sqlTracking(t -> t.setBackendResponseEndTime(service, System.nanoTime()));
sqlCosting(c -> c.setBackendResponseEndTime());
if (isTrace)
traceResult.setBackendResponseEndTime(service, System.nanoTime());
if (timeCost)
queryTimeCost.setBackendResponseEndTime();
}
public void setBackendTerminateByComplex(MultiNodeMergeHandler mergeHandler) {
sqlTracking(t -> t.setBackendTerminateByComplex(mergeHandler, System.nanoTime()));
if (isTrace)
traceResult.setBackendTerminateByComplex(mergeHandler, System.nanoTime());
}
public void setBackendResponseTxEnd(MySQLResponseService service) {
sqlTracking(t -> t.setBackendResponseTxEnd(service, System.nanoTime()));
if (isTrace)
traceResult.setBackendResponseTxEnd(service, System.nanoTime());
}
public void setBackendResponseClose(MySQLResponseService service) {
sqlTracking(t -> t.setBackendResponseTxEnd(service, System.nanoTime()));
if (isTrace)
traceResult.setBackendResponseTxEnd(service, System.nanoTime());
}
public void setFrontendAddRows() {
sqlTracking(t -> t.setFrontendAddRows());
if (isTrace)
traceResult.setFrontendAddRows();
}
public void setFrontendSetRows(long rows) {
sqlTracking(t -> t.setFrontendSetRows(rows));
if (isTrace)
traceResult.setFrontendSetRows(rows);
}
// get the rows、 netOutBytes、resultSize information in the last handler
public void doSqlStat(long sqlRows, long netOutBytes, long resultSize) {
sqlTracking(t -> t.setSqlStat(sqlRows, netOutBytes, resultSize));
if (isTrace)
traceResult.setSqlStat(sqlRows, netOutBytes, resultSize);
}
public void setResponseTime(boolean isSuccess) {
sessionStage = SessionStage.Finished;
long responseTime = System.nanoTime();
sqlTracking(t -> t.setResponseTime(isSuccess, responseTime, System.currentTimeMillis()));
sqlCosting(t -> t.setResponseTime(responseTime));
if (isTrace)
traceResult.setResponseTime(isSuccess, responseTime, System.currentTimeMillis());
if (timeCost)
queryTimeCost.setResponseTime(responseTime);
}
public void setExit() {
sqlTracking(t -> t.setExit());
if (isTrace)
traceResult.setExit();
}
public void setBeginCommitTime() {
sessionStage = SessionStage.Distributed_Transaction_Commit;
sqlTracking(t -> t.setAdtCommitBegin(System.nanoTime()));
if (isTrace)
traceResult.setAdtCommitBegin(System.nanoTime());
}
public void setFinishedCommitTime() {
sqlTracking(t -> t.setAdtCommitEnd(System.nanoTime()));
if (isTrace)
traceResult.setAdtCommitEnd(System.nanoTime());
}
// record the start time of each handler in the complex-query
public void setHandlerStart(DMLResponseHandler handler) {
sqlTracking(t -> t.addToRecordStartMap(handler, System.nanoTime()));
if (isTrace)
traceResult.addToRecordStartMap(handler, System.nanoTime());
}
// record the end time of each handler in the complex-query
@@ -185,38 +226,45 @@ public class TrackProbe extends AbstractTrackProbe {
DMLResponseHandler next = handler.getNextHandler();
sessionStage = SessionStage.changeFromHandlerType(next.type());
}
sqlTracking(t -> t.addToRecordEndMap(handler, System.nanoTime()));
if (isTrace)
traceResult.addToRecordEndMap(handler, System.nanoTime());
}
public void setTraceBuilder(BaseHandlerBuilder baseBuilder) {
sqlTracking(t -> t.setBuilder(baseBuilder));
if (isTrace)
traceResult.setBuilder(baseBuilder);
}
public void setTraceSimpleHandler(ResponseHandler simpleHandler) {
sqlTracking(t -> t.setSimpleHandler(simpleHandler));
if (isTrace)
traceResult.setSimpleHandler(simpleHandler);
}
private void sqlTracking(Consumer<TraceResult> consumer) {
/*private void sqlTracking(Consumer<TraceResult> consumer) {
try {
if (isTrace) {
Optional.ofNullable(traceResult).ifPresent(consumer);
if (traceResult != null) {
consumer.accept(traceResult);
}
}
} catch (Exception e) {
// Should not affect the main task
LOGGER.warn("sqlTracking occurred ", e);
}
}
}*/
private void sqlCosting(Consumer<QueryTimeCost> costConsumer) {
/*private void sqlCosting(Consumer<QueryTimeCost> costConsumer) {
try {
if (timeCost) {
Optional.ofNullable(queryTimeCost).ifPresent(costConsumer);
if (queryTimeCost != null) {
costConsumer.accept(queryTimeCost);
}
}
} catch (Exception e) {
// Should not affect the main task
LOGGER.warn("sqlCosting occurred ", e);
}
}
}*/
public SessionStage getSessionStage() {
return sessionStage;

View File

@@ -36,6 +36,10 @@ public final class SqlStringUtil {
String type;
switch (sqlType) {
case ServerParse.DDL:
case ServerParse.CREATE_DATABASE:
case ServerParse.CREATE_VIEW:
case ServerParse.DROP_VIEW:
case ServerParse.DROP_TABLE:
type = "DDL";
break;
case ServerParse.INSERT: